Add a read buffer so that tweepy does fewer socket.read calls, which are expensive...
authorTimo Ewalds <tewalds@google.com>
Mon, 20 Oct 2014 22:33:43 +0000 (18:33 -0400)
committerTimo Ewalds <tewalds@google.com>
Mon, 20 Oct 2014 22:33:43 +0000 (18:33 -0400)
tweepy/streaming.py

index c29222002b5f5b5aa3d459c3af7b56f755f5bb6a..2dd3770099e4e31488a69872a35ed4cf0c4b61ca 100644 (file)
@@ -2,6 +2,8 @@
 # Copyright 2009-2010 Joshua Roesslein
 # See LICENSE for details.
 
+# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets
+
 import logging
 import requests
 from requests.exceptions import Timeout
@@ -135,7 +137,13 @@ class Stream(object):
         self.retry_time_cap = options.get("retry_time_cap", 320.0)
         self.snooze_time_step = options.get("snooze_time", 0.25)
         self.snooze_time_cap = options.get("snooze_time_cap", 16)
-        self.buffer_size = options.get("buffer_size",  1500)
+
+        # The default socket.read size. Default to less than half the size of
+        # a tweet so that it reads tweets with the minimal latency of 2 reads
+        # per tweet. Values higher than ~1kb will increase latency by waiting
+        # for more data to arrive but may also increase throughput by doing
+        # fewer socket read calls.
+        self.chunk_size = options.get("chunk_size",  512)
 
         self.api = API()
         self.session = requests.Session()
@@ -218,33 +226,63 @@ class Stream(object):
             self.running = False
 
     def _read_loop(self, resp):
+        class ReadBuffer(object):
+            """Buffer data from the response in a smarter way than httplib/requests can.
+
+            Tweets are roughly in the 2-12kb range, averaging around 3kb.
+            Requests/urllib3/httplib/socket all use socket.read, which blocks
+            until enough data is returned. On some systems (eg google appengine), socket
+            reads are quite slow. To combat this latency we can read big chunks,
+            but the blocking part means we won't get results until enough tweets
+            have arrived. That may not be a big deal for high throughput systems.
+            For low throughput systems we don't want to sacrafice latency, so we
+            use small chunks so it can read the length and the tweet in 2 read calls.
+            """
+
+            def __init__(self, resp, chunk_size):
+                self._resp = resp
+                self._buffer = ""
+                self._chunk_size = chunk_size
+
+            def read_len(self, length):
+                while True:
+                    if len(self._buffer) >= length:
+                        return self._pop(length)
+                    read_len = max(self._chunk_size, length - len(self._buffer))
+                    self._buffer += self._resp.raw.read(read_len)
+
+            def read_line(self, sep='\n'):
+                start = 0
+                while True:
+                    loc = self._buffer.find(sep, start)
+                    if loc >= 0:
+                        return self._pop(loc + len(sep))
+                    else:
+                        start = len(self._buffer)
+                    self._buffer += self._resp.raw.read(self._chunk_size)
+
+            def _pop(self, length):
+                r = self._buffer[:length]
+                self._buffer = self._buffer[length:]
+                return r
+
+        buf = ReadBuffer(resp, self.chunk_size)
 
         while self.running:
+            length = 0
+            while True:
+                line = buf.read_line().strip()
+                if not line:
+                    pass  # keep-alive new lines are expected
+                elif line.isdigit():
+                    length = int(line)
+                    break
+                else:
+                    raise TweepError('Expecting length, unexpected value found')
 
-            # Note: keep-alive newlines might be inserted
-            # before each length value.
-            # read until we get a digit...
-            c = '\n'
-            for c in resp.iter_content():
-                if c == '\n':
-                    continue
-                break
-
-            delimited_string = c
-
-            # read rest of delimiter length..
-            d = ''
-            for d in resp.iter_content():
-                if d != '\n':
-                    delimited_string += d
-                    continue
-                break
-
-            # read the next twitter status object
-            if delimited_string.strip().isdigit():
-                next_status_obj = resp.raw.read(int(delimited_string))
-                if self.running:
-                    self._data(next_status_obj)
+            next_status_obj = buf.read_len(length)
+            if self.running:
+                self._data(next_status_obj)
 
         if resp.raw._fp.isclosed():
             self.on_closed(resp)