From c2a04373a4152e3f2eb93101c21a704ed44c509f Mon Sep 17 00:00:00 2001 From: Timo Ewalds Date: Mon, 20 Oct 2014 18:33:43 -0400 Subject: [PATCH] Add a read buffer so that tweepy does fewer socket.read calls, which are expensive on GAE. --- tweepy/streaming.py | 88 ++++++++++++++++++++++++++++++++------------- 1 file changed, 63 insertions(+), 25 deletions(-) diff --git a/tweepy/streaming.py b/tweepy/streaming.py index c292220..2dd3770 100644 --- a/tweepy/streaming.py +++ b/tweepy/streaming.py @@ -2,6 +2,8 @@ # Copyright 2009-2010 Joshua Roesslein # See LICENSE for details. +# Appengine users: https://developers.google.com/appengine/docs/python/sockets/#making_httplib_use_sockets + import logging import requests from requests.exceptions import Timeout @@ -135,7 +137,13 @@ class Stream(object): self.retry_time_cap = options.get("retry_time_cap", 320.0) self.snooze_time_step = options.get("snooze_time", 0.25) self.snooze_time_cap = options.get("snooze_time_cap", 16) - self.buffer_size = options.get("buffer_size", 1500) + + # The default socket.read size. Default to less than half the size of + # a tweet so that it reads tweets with the minimal latency of 2 reads + # per tweet. Values higher than ~1kb will increase latency by waiting + # for more data to arrive but may also increase throughput by doing + # fewer socket read calls. + self.chunk_size = options.get("chunk_size", 512) self.api = API() self.session = requests.Session() @@ -218,33 +226,63 @@ class Stream(object): self.running = False def _read_loop(self, resp): + class ReadBuffer(object): + """Buffer data from the response in a smarter way than httplib/requests can. + + Tweets are roughly in the 2-12kb range, averaging around 3kb. + Requests/urllib3/httplib/socket all use socket.read, which blocks + until enough data is returned. On some systems (eg google appengine), socket + reads are quite slow. To combat this latency we can read big chunks, + but the blocking part means we won't get results until enough tweets + have arrived. That may not be a big deal for high throughput systems. + For low throughput systems we don't want to sacrafice latency, so we + use small chunks so it can read the length and the tweet in 2 read calls. + """ + + def __init__(self, resp, chunk_size): + self._resp = resp + self._buffer = "" + self._chunk_size = chunk_size + + def read_len(self, length): + while True: + if len(self._buffer) >= length: + return self._pop(length) + read_len = max(self._chunk_size, length - len(self._buffer)) + self._buffer += self._resp.raw.read(read_len) + + def read_line(self, sep='\n'): + start = 0 + while True: + loc = self._buffer.find(sep, start) + if loc >= 0: + return self._pop(loc + len(sep)) + else: + start = len(self._buffer) + self._buffer += self._resp.raw.read(self._chunk_size) + + def _pop(self, length): + r = self._buffer[:length] + self._buffer = self._buffer[length:] + return r + + buf = ReadBuffer(resp, self.chunk_size) while self.running: + length = 0 + while True: + line = buf.read_line().strip() + if not line: + pass # keep-alive new lines are expected + elif line.isdigit(): + length = int(line) + break + else: + raise TweepError('Expecting length, unexpected value found') - # Note: keep-alive newlines might be inserted - # before each length value. - # read until we get a digit... - c = '\n' - for c in resp.iter_content(): - if c == '\n': - continue - break - - delimited_string = c - - # read rest of delimiter length.. - d = '' - for d in resp.iter_content(): - if d != '\n': - delimited_string += d - continue - break - - # read the next twitter status object - if delimited_string.strip().isdigit(): - next_status_obj = resp.raw.read(int(delimited_string)) - if self.running: - self._data(next_status_obj) + next_status_obj = buf.read_len(length) + if self.running: + self._data(next_status_obj) if resp.raw._fp.isclosed(): self.on_closed(resp) -- 2.25.1