# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
+# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets
+
import logging
import requests
from requests.exceptions import Timeout
self.retry_time_cap = options.get("retry_time_cap", 320.0)
self.snooze_time_step = options.get("snooze_time", 0.25)
self.snooze_time_cap = options.get("snooze_time_cap", 16)
- self.buffer_size = options.get("buffer_size", 1500)
+
+ # The default socket.read size. Default to less than half the size of
+ # a tweet so that it reads tweets with the minimal latency of 2 reads
+ # per tweet. Values higher than ~1kb will increase latency by waiting
+ # for more data to arrive but may also increase throughput by doing
+ # fewer socket read calls.
+ self.chunk_size = options.get("chunk_size", 512)
self.api = API()
self.session = requests.Session()
self.running = False
def _read_loop(self, resp):
+ class ReadBuffer(object):
+ """Buffer data from the response in a smarter way than httplib/requests can.
+
+ Tweets are roughly in the 2-12kb range, averaging around 3kb.
+ Requests/urllib3/httplib/socket all use socket.read, which blocks
+ until enough data is returned. On some systems (eg google appengine), socket
+ reads are quite slow. To combat this latency we can read big chunks,
+ but the blocking part means we won't get results until enough tweets
+ have arrived. That may not be a big deal for high throughput systems.
+ For low throughput systems we don't want to sacrafice latency, so we
+ use small chunks so it can read the length and the tweet in 2 read calls.
+ """
+
+ def __init__(self, resp, chunk_size):
+ self._resp = resp
+ self._buffer = ""
+ self._chunk_size = chunk_size
+
+ def read_len(self, length):
+ while True:
+ if len(self._buffer) >= length:
+ return self._pop(length)
+ read_len = max(self._chunk_size, length - len(self._buffer))
+ self._buffer += self._resp.raw.read(read_len)
+
+ def read_line(self, sep='\n'):
+ start = 0
+ while True:
+ loc = self._buffer.find(sep, start)
+ if loc >= 0:
+ return self._pop(loc + len(sep))
+ else:
+ start = len(self._buffer)
+ self._buffer += self._resp.raw.read(self._chunk_size)
+
+ def _pop(self, length):
+ r = self._buffer[:length]
+ self._buffer = self._buffer[length:]
+ return r
+
+ buf = ReadBuffer(resp, self.chunk_size)
while self.running:
+ length = 0
+ while True:
+ line = buf.read_line().strip()
+ if not line:
+ pass # keep-alive new lines are expected
+ elif line.isdigit():
+ length = int(line)
+ break
+ else:
+ raise TweepError('Expecting length, unexpected value found')
- # Note: keep-alive newlines might be inserted
- # before each length value.
- # read until we get a digit...
- c = '\n'
- for c in resp.iter_content():
- if c == '\n':
- continue
- break
-
- delimited_string = c
-
- # read rest of delimiter length..
- d = ''
- for d in resp.iter_content():
- if d != '\n':
- delimited_string += d
- continue
- break
-
- # read the next twitter status object
- if delimited_string.strip().isdigit():
- next_status_obj = resp.raw.read(int(delimited_string))
- if self.running:
- self._data(next_status_obj)
+ next_status_obj = buf.read_len(length)
+ if self.running:
+ self._data(next_status_obj)
if resp.raw._fp.isclosed():
self.on_closed(resp)