From 1bbb2f7b0898c7d9534f35c1d1ccf093895c69c0 Mon Sep 17 00:00:00 2001 From: Timo Ewalds Date: Tue, 21 Oct 2014 20:10:05 -0400 Subject: [PATCH] Add ReadBuffer unit tests. --- tests/test_streaming.py | 18 ++++++++- tweepy/streaming.py | 83 +++++++++++++++++++++-------------------- 2 files changed, 59 insertions(+), 42 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index d41ae0e..68ae420 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -1,10 +1,11 @@ +from StringIO import StringIO from time import sleep import unittest2 as unittest from tweepy.api import API from tweepy.auth import OAuthHandler from tweepy.models import Status -from tweepy.streaming import Stream, StreamListener +from tweepy.streaming import Stream, StreamListener, ReadBuffer from config import create_auth from test_utils import mock_tweet @@ -102,6 +103,21 @@ class TweepyStreamTests(unittest.TestCase): # Should be UTF-8 encoded self.assertEqual(u'Caf\xe9'.encode('utf8'), s.session.params['follow']) + +class TweepyStreamReadBuffer(unittest.TestCase): + + stream = """11\n{id:12345}\n\n24\n{id:23456, test:"blah"}\n""" + + def test_read_tweet(self): + for length in [1, 2, 5, 10, 20, 50]: + buf = ReadBuffer(StringIO(self.stream), length) + self.assertEqual('11\n', buf.read_line()) + self.assertEqual('{id:12345}\n', buf.read_len(11)) + self.assertEqual('\n', buf.read_line()) + self.assertEqual('24\n', buf.read_line()) + self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24)) + + class TweepyStreamBackoffTests(unittest.TestCase): def setUp(self): #bad auth causes twitter to return 401 errors diff --git a/tweepy/streaming.py b/tweepy/streaming.py index 2dd3770..a3fcb6b 100644 --- a/tweepy/streaming.py +++ b/tweepy/streaming.py @@ -120,6 +120,47 @@ class StreamListener(object): return +class ReadBuffer(object): + """Buffer data from the response in a smarter way than httplib/requests can. + + Tweets are roughly in the 2-12kb range, averaging around 3kb. + Requests/urllib3/httplib/socket all use socket.read, which blocks + until enough data is returned. On some systems (eg google appengine), socket + reads are quite slow. To combat this latency we can read big chunks, + but the blocking part means we won't get results until enough tweets + have arrived. That may not be a big deal for high throughput systems. + For low throughput systems we don't want to sacrafice latency, so we + use small chunks so it can read the length and the tweet in 2 read calls. + """ + + def __init__(self, stream, chunk_size): + self._stream = stream + self._buffer = "" + self._chunk_size = chunk_size + + def read_len(self, length): + while True: + if len(self._buffer) >= length: + return self._pop(length) + read_len = max(self._chunk_size, length - len(self._buffer)) + self._buffer += self._stream.read(read_len) + + def read_line(self, sep='\n'): + start = 0 + while True: + loc = self._buffer.find(sep, start) + if loc >= 0: + return self._pop(loc + len(sep)) + else: + start = len(self._buffer) + self._buffer += self._stream.read(self._chunk_size) + + def _pop(self, length): + r = self._buffer[:length] + self._buffer = self._buffer[length:] + return r + + class Stream(object): host = 'stream.twitter.com' @@ -226,47 +267,7 @@ class Stream(object): self.running = False def _read_loop(self, resp): - class ReadBuffer(object): - """Buffer data from the response in a smarter way than httplib/requests can. - - Tweets are roughly in the 2-12kb range, averaging around 3kb. - Requests/urllib3/httplib/socket all use socket.read, which blocks - until enough data is returned. On some systems (eg google appengine), socket - reads are quite slow. To combat this latency we can read big chunks, - but the blocking part means we won't get results until enough tweets - have arrived. That may not be a big deal for high throughput systems. - For low throughput systems we don't want to sacrafice latency, so we - use small chunks so it can read the length and the tweet in 2 read calls. - """ - - def __init__(self, resp, chunk_size): - self._resp = resp - self._buffer = "" - self._chunk_size = chunk_size - - def read_len(self, length): - while True: - if len(self._buffer) >= length: - return self._pop(length) - read_len = max(self._chunk_size, length - len(self._buffer)) - self._buffer += self._resp.raw.read(read_len) - - def read_line(self, sep='\n'): - start = 0 - while True: - loc = self._buffer.find(sep, start) - if loc >= 0: - return self._pop(loc + len(sep)) - else: - start = len(self._buffer) - self._buffer += self._resp.raw.read(self._chunk_size) - - def _pop(self, length): - r = self._buffer[:length] - self._buffer = self._buffer[length:] - return r - - buf = ReadBuffer(resp, self.chunk_size) + buf = ReadBuffer(resp.raw, self.chunk_size) while self.running: length = 0 -- 2.25.1