+from StringIO import StringIO
from time import sleep
import unittest2 as unittest
from tweepy.api import API
from tweepy.auth import OAuthHandler
from tweepy.models import Status
-from tweepy.streaming import Stream, StreamListener
+from tweepy.streaming import Stream, StreamListener, ReadBuffer
from config import create_auth
from test_utils import mock_tweet
# Should be UTF-8 encoded
self.assertEqual(u'Caf\xe9'.encode('utf8'), s.session.params['follow'])
+
+class TweepyStreamReadBuffer(unittest.TestCase):
+
+ stream = """11\n{id:12345}\n\n24\n{id:23456, test:"blah"}\n"""
+
+ def test_read_tweet(self):
+ for length in [1, 2, 5, 10, 20, 50]:
+ buf = ReadBuffer(StringIO(self.stream), length)
+ self.assertEqual('11\n', buf.read_line())
+ self.assertEqual('{id:12345}\n', buf.read_len(11))
+ self.assertEqual('\n', buf.read_line())
+ self.assertEqual('24\n', buf.read_line())
+ self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24))
+
+
class TweepyStreamBackoffTests(unittest.TestCase):
def setUp(self):
#bad auth causes twitter to return 401 errors
return
+class ReadBuffer(object):
+ """Buffer data from the response in a smarter way than httplib/requests can.
+
+ Tweets are roughly in the 2-12kb range, averaging around 3kb.
+ Requests/urllib3/httplib/socket all use socket.read, which blocks
+ until enough data is returned. On some systems (eg google appengine), socket
+ reads are quite slow. To combat this latency we can read big chunks,
+ but the blocking part means we won't get results until enough tweets
+ have arrived. That may not be a big deal for high throughput systems.
+ For low throughput systems we don't want to sacrafice latency, so we
+ use small chunks so it can read the length and the tweet in 2 read calls.
+ """
+
+ def __init__(self, stream, chunk_size):
+ self._stream = stream
+ self._buffer = ""
+ self._chunk_size = chunk_size
+
+ def read_len(self, length):
+ while True:
+ if len(self._buffer) >= length:
+ return self._pop(length)
+ read_len = max(self._chunk_size, length - len(self._buffer))
+ self._buffer += self._stream.read(read_len)
+
+ def read_line(self, sep='\n'):
+ start = 0
+ while True:
+ loc = self._buffer.find(sep, start)
+ if loc >= 0:
+ return self._pop(loc + len(sep))
+ else:
+ start = len(self._buffer)
+ self._buffer += self._stream.read(self._chunk_size)
+
+ def _pop(self, length):
+ r = self._buffer[:length]
+ self._buffer = self._buffer[length:]
+ return r
+
+
class Stream(object):
host = 'stream.twitter.com'
self.running = False
def _read_loop(self, resp):
- class ReadBuffer(object):
- """Buffer data from the response in a smarter way than httplib/requests can.
-
- Tweets are roughly in the 2-12kb range, averaging around 3kb.
- Requests/urllib3/httplib/socket all use socket.read, which blocks
- until enough data is returned. On some systems (eg google appengine), socket
- reads are quite slow. To combat this latency we can read big chunks,
- but the blocking part means we won't get results until enough tweets
- have arrived. That may not be a big deal for high throughput systems.
- For low throughput systems we don't want to sacrafice latency, so we
- use small chunks so it can read the length and the tweet in 2 read calls.
- """
-
- def __init__(self, resp, chunk_size):
- self._resp = resp
- self._buffer = ""
- self._chunk_size = chunk_size
-
- def read_len(self, length):
- while True:
- if len(self._buffer) >= length:
- return self._pop(length)
- read_len = max(self._chunk_size, length - len(self._buffer))
- self._buffer += self._resp.raw.read(read_len)
-
- def read_line(self, sep='\n'):
- start = 0
- while True:
- loc = self._buffer.find(sep, start)
- if loc >= 0:
- return self._pop(loc + len(sep))
- else:
- start = len(self._buffer)
- self._buffer += self._resp.raw.read(self._chunk_size)
-
- def _pop(self, length):
- r = self._buffer[:length]
- self._buffer = self._buffer[length:]
- return r
-
- buf = ReadBuffer(resp, self.chunk_size)
+ buf = ReadBuffer(resp.raw, self.chunk_size)
while self.running:
length = 0