Add ReadBuffer unit tests.
authorTimo Ewalds <tewalds@google.com>
Wed, 22 Oct 2014 00:10:05 +0000 (20:10 -0400)
committerTimo Ewalds <tewalds@google.com>
Wed, 22 Oct 2014 00:10:05 +0000 (20:10 -0400)
tests/test_streaming.py
tweepy/streaming.py

index d41ae0e322367874074c2a827674c16fb08e5112..68ae420f3831e7537acde596df58a4103e1e63c5 100644 (file)
@@ -1,10 +1,11 @@
+from StringIO import StringIO
 from time import sleep
 import unittest2 as unittest
 
 from tweepy.api import API
 from tweepy.auth import OAuthHandler
 from tweepy.models import Status
-from tweepy.streaming import Stream, StreamListener
+from tweepy.streaming import Stream, StreamListener, ReadBuffer
 
 from config import create_auth
 from test_utils import mock_tweet
@@ -102,6 +103,21 @@ class TweepyStreamTests(unittest.TestCase):
         # Should be UTF-8 encoded
         self.assertEqual(u'Caf\xe9'.encode('utf8'), s.session.params['follow'])
 
+
+class TweepyStreamReadBuffer(unittest.TestCase):
+
+    stream = """11\n{id:12345}\n\n24\n{id:23456, test:"blah"}\n"""
+
+    def test_read_tweet(self):
+        for length in [1, 2, 5, 10, 20, 50]:
+            buf = ReadBuffer(StringIO(self.stream), length)
+            self.assertEqual('11\n', buf.read_line())
+            self.assertEqual('{id:12345}\n', buf.read_len(11))
+            self.assertEqual('\n', buf.read_line())
+            self.assertEqual('24\n', buf.read_line())
+            self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24))
+
+
 class TweepyStreamBackoffTests(unittest.TestCase):
     def setUp(self):
         #bad auth causes twitter to return 401 errors
index 2dd3770099e4e31488a69872a35ed4cf0c4b61ca..a3fcb6b2241b620bdd809b4b39cecc6b71c32f46 100644 (file)
@@ -120,6 +120,47 @@ class StreamListener(object):
         return
 
 
+class ReadBuffer(object):
+    """Buffer data from the response in a smarter way than httplib/requests can.
+
+    Tweets are roughly in the 2-12kb range, averaging around 3kb.
+    Requests/urllib3/httplib/socket all use socket.read, which blocks
+    until enough data is returned. On some systems (eg google appengine), socket
+    reads are quite slow. To combat this latency we can read big chunks,
+    but the blocking part means we won't get results until enough tweets
+    have arrived. That may not be a big deal for high throughput systems.
+    For low throughput systems we don't want to sacrafice latency, so we
+    use small chunks so it can read the length and the tweet in 2 read calls.
+    """
+
+    def __init__(self, stream, chunk_size):
+        self._stream = stream
+        self._buffer = ""
+        self._chunk_size = chunk_size
+
+    def read_len(self, length):
+        while True:
+            if len(self._buffer) >= length:
+                return self._pop(length)
+            read_len = max(self._chunk_size, length - len(self._buffer))
+            self._buffer += self._stream.read(read_len)
+
+    def read_line(self, sep='\n'):
+        start = 0
+        while True:
+            loc = self._buffer.find(sep, start)
+            if loc >= 0:
+                return self._pop(loc + len(sep))
+            else:
+                start = len(self._buffer)
+            self._buffer += self._stream.read(self._chunk_size)
+
+    def _pop(self, length):
+        r = self._buffer[:length]
+        self._buffer = self._buffer[length:]
+        return r
+
+
 class Stream(object):
 
     host = 'stream.twitter.com'
@@ -226,47 +267,7 @@ class Stream(object):
             self.running = False
 
     def _read_loop(self, resp):
-        class ReadBuffer(object):
-            """Buffer data from the response in a smarter way than httplib/requests can.
-
-            Tweets are roughly in the 2-12kb range, averaging around 3kb.
-            Requests/urllib3/httplib/socket all use socket.read, which blocks
-            until enough data is returned. On some systems (eg google appengine), socket
-            reads are quite slow. To combat this latency we can read big chunks,
-            but the blocking part means we won't get results until enough tweets
-            have arrived. That may not be a big deal for high throughput systems.
-            For low throughput systems we don't want to sacrafice latency, so we
-            use small chunks so it can read the length and the tweet in 2 read calls.
-            """
-
-            def __init__(self, resp, chunk_size):
-                self._resp = resp
-                self._buffer = ""
-                self._chunk_size = chunk_size
-
-            def read_len(self, length):
-                while True:
-                    if len(self._buffer) >= length:
-                        return self._pop(length)
-                    read_len = max(self._chunk_size, length - len(self._buffer))
-                    self._buffer += self._resp.raw.read(read_len)
-
-            def read_line(self, sep='\n'):
-                start = 0
-                while True:
-                    loc = self._buffer.find(sep, start)
-                    if loc >= 0:
-                        return self._pop(loc + len(sep))
-                    else:
-                        start = len(self._buffer)
-                    self._buffer += self._resp.raw.read(self._chunk_size)
-
-            def _pop(self, length):
-                r = self._buffer[:length]
-                self._buffer = self._buffer[length:]
-                return r
-
-        buf = ReadBuffer(resp, self.chunk_size)
+        buf = ReadBuffer(resp.raw, self.chunk_size)
 
         while self.running:
             length = 0