-import io
import unittest
from unittest.case import skip
from tweepy.api import API
from tweepy.auth import OAuthHandler
from tweepy.models import Status
-from tweepy.streaming import ReadBuffer, Stream, StreamListener
+from tweepy.streaming import Stream, StreamListener
class MockStreamListener(StreamListener):
self.assertEqual('Caf\xe9'.encode('utf8'), s.body['follow'])
-class TweepyStreamReadBufferTests(unittest.TestCase):
-
- stream = b"""11\n{id:12345}\n\n24\n{id:23456, test:"blah"}\n"""
-
- def test_read_tweet(self):
- for length in [1, 2, 5, 10, 20, 50]:
- buf = ReadBuffer(io.BytesIO(self.stream), length)
- self.assertEqual('11\n', buf.read_line())
- self.assertEqual('{id:12345}\n', buf.read_len(11))
- self.assertEqual('\n', buf.read_line())
- self.assertEqual('24\n', buf.read_line())
- self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24))
-
- def test_read_empty_buffer(self):
- """
- Requests can be closed by twitter.
- The ReadBuffer should not loop infinitely when this happens.
- Instead it should return and let the outer _read_loop handle it.
- """
-
- # If the test fails, we are in danger of an infinite loop
- # so we need to do some work to block that from happening
- class InfiniteLoopException(Exception):
- pass
-
- self.called_count = 0
- call_limit = 5
- def on_read(chunk_size):
- self.called_count += 1
-
- if self.called_count > call_limit:
- # we have failed
- raise InfiniteLoopException("Oops, read() was called a bunch of times")
-
- return ""
-
- # Create a fake stream
- stream = io.BytesIO(b'')
-
- # Mock it's read function so it can't be called too many times
- mock_read = MagicMock(side_effect=on_read)
-
- try:
- stream.close()
- with patch.multiple(stream, create=True, read=mock_read):
- # Now the stream can't call 'read' more than call_limit times
- # and it looks like a requests stream that is closed
- buf = ReadBuffer(stream, 50)
- buf.read_line("\n")
- except InfiniteLoopException:
- self.fail("ReadBuffer.read_line tried to loop infinitely.")
-
- # The mocked function not have been called at all since the stream looks closed
- self.assertEqual(mock_read.call_count, 0)
-
- def test_read_unicode_tweet(self):
- stream = b'11\n{id:12345}\n\n23\n{id:23456, test:"\xe3\x81\x93"}\n\n'
- for length in [1, 2, 5, 10, 20, 50]:
- buf = ReadBuffer(io.BytesIO(stream), length)
- self.assertEqual('11\n', buf.read_line())
- self.assertEqual('{id:12345}\n', buf.read_len(11))
- self.assertEqual('\n', buf.read_line())
- self.assertEqual('23\n', buf.read_line())
- self.assertEqual('{id:23456, test:"\u3053"}\n', buf.read_len(23))
-
-
class TweepyStreamBackoffTests(unittest.TestCase):
def setUp(self):
#bad auth causes twitter to return 401 errors
import json
import logging
-import re
import ssl
from threading import Thread
from time import sleep
return
-class ReadBuffer:
- """Buffer data from the response in a smarter way than httplib/requests can.
-
- Tweets are roughly in the 2-12kb range, averaging around 3kb.
- Requests/urllib3/httplib/socket all use socket.read, which blocks
- until enough data is returned. On some systems (eg google appengine), socket
- reads are quite slow. To combat this latency we can read big chunks,
- but the blocking part means we won't get results until enough tweets
- have arrived. That may not be a big deal for high throughput systems.
- For low throughput systems we don't want to sacrifice latency, so we
- use small chunks so it can read the length and the tweet in 2 read calls.
- """
-
- def __init__(self, stream, chunk_size, encoding='utf-8'):
- self._stream = stream
- self._buffer = b''
- self._chunk_size = chunk_size
- self._encoding = encoding
-
- def read_len(self, length):
- while not self._stream.closed:
- if len(self._buffer) >= length:
- return self._pop(length)
- read_len = max(self._chunk_size, length - len(self._buffer))
- self._buffer += self._stream.read(read_len)
- return b''
-
- def read_line(self, sep=b'\n'):
- """Read the data stream until a given separator is found (default \n)
-
- :param sep: Separator to read until. Must by of the bytes type
- :return: The str of the data read until sep
- """
- start = 0
- while not self._stream.closed:
- loc = self._buffer.find(sep, start)
- if loc >= 0:
- return self._pop(loc + len(sep))
- else:
- start = len(self._buffer)
- self._buffer += self._stream.read(self._chunk_size)
- return b''
-
- def _pop(self, length):
- r = self._buffer[:length]
- self._buffer = self._buffer[length:]
- return r.decode(self._encoding)
-
-
class Stream:
def __init__(self, auth, listener, **options):
self.new_session()
def _read_loop(self, resp):
- charset = resp.headers.get('content-type', default='')
- enc_search = re.search(r'charset=(?P<enc>\S*)', charset)
- if enc_search is not None:
- encoding = enc_search.group('enc')
- else:
- encoding = 'utf-8'
-
- buf = ReadBuffer(resp.raw, self.chunk_size, encoding=encoding)
-
- while self.running and not resp.raw.closed:
- line = buf.read_line()
- stripped_line = line.strip() if line else line # line is sometimes None so we need to check here
- if not stripped_line:
- self.listener.on_keep_alive() # keep-alive new lines are expected
- elif self.running:
- if self.listener.on_data(stripped_line) is False:
- self.running = False
-
- # # Note: keep-alive newlines might be inserted before each length value.
- # # read until we get a digit...
- # c = b'\n'
- # for c in resp.iter_content(decode_unicode=True):
- # if c == b'\n':
- # continue
- # break
- #
- # delimited_string = c
- #
- # # read rest of delimiter length..
- # d = b''
- # for d in resp.iter_content(decode_unicode=True):
- # if d != b'\n':
- # delimited_string += d
- # continue
- # break
- #
- # # read the next twitter status object
- # if delimited_string.decode('utf-8').strip().isdigit():
- # status_id = int(delimited_string)
- # data = resp.raw.read(status_id)
- # if self.running:
- # if self.listener.on_data(data.decode('utf-8')) is False:
- # self.running = False
+ for line in resp.iter_lines(chunk_size=self.chunk_size):
+ if not self.running:
+ break
+ if not line:
+ self.listener.on_keep_alive()
+ elif self.listener.on_data(line) is False:
+ self.running = False
+ break
if resp.raw.closed:
self.on_closed(resp)