From: Harmon Date: Fri, 22 Jan 2021 16:52:57 +0000 (-0600) Subject: Use requests Response.iter_lines in Stream X-Git-Url: https://vcs.fsf.org/?a=commitdiff_plain;h=37e701e2193dcf617e7235d3b004f790c5ff784e;p=tweepy.git Use requests Response.iter_lines in Stream Remove and replace ReadBuffer The on_data method of the stream listener is now passed the raw data from Stream in bytes form. The previous regex check for the charset directive/parameter in the Content-Type HTTP header was unnecessary anyway, as it's not included in the response for streams (anymore?). This reduces the overhead for each line of data if the raw data doesn't need to be decoded to be processed, but passes on the burden of decoding the raw data if the default StreamListener.on_data is not being used. The default StreamListener.on_data implementation should still be fine, as json.loads can process bytes since Python 3.6 and accepts UTF-8 encoding, which it should still be safe to assume is used as the encoding for the raw data, as before, when the unnecessary regex check would fail. This Stream._read_loop implementation also allows the stream to disconnect when any line of data is received, including keep-alive signals, which resolves #773 and resolves #897. --- diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 2571157..8142450 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -1,4 +1,3 @@ -import io import unittest from unittest.case import skip @@ -9,7 +8,7 @@ from .test_utils import mock_tweet from tweepy.api import API from tweepy.auth import OAuthHandler from tweepy.models import Status -from tweepy.streaming import ReadBuffer, Stream, StreamListener +from tweepy.streaming import Stream, StreamListener class MockStreamListener(StreamListener): @@ -81,72 +80,6 @@ class TweepyStreamTests(unittest.TestCase): self.assertEqual('Caf\xe9'.encode('utf8'), s.body['follow']) -class TweepyStreamReadBufferTests(unittest.TestCase): - - stream = b"""11\n{id:12345}\n\n24\n{id:23456, test:"blah"}\n""" - - def test_read_tweet(self): - for length in [1, 2, 5, 10, 20, 50]: - buf = ReadBuffer(io.BytesIO(self.stream), length) - self.assertEqual('11\n', buf.read_line()) - self.assertEqual('{id:12345}\n', buf.read_len(11)) - self.assertEqual('\n', buf.read_line()) - self.assertEqual('24\n', buf.read_line()) - self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24)) - - def test_read_empty_buffer(self): - """ - Requests can be closed by twitter. - The ReadBuffer should not loop infinitely when this happens. - Instead it should return and let the outer _read_loop handle it. - """ - - # If the test fails, we are in danger of an infinite loop - # so we need to do some work to block that from happening - class InfiniteLoopException(Exception): - pass - - self.called_count = 0 - call_limit = 5 - def on_read(chunk_size): - self.called_count += 1 - - if self.called_count > call_limit: - # we have failed - raise InfiniteLoopException("Oops, read() was called a bunch of times") - - return "" - - # Create a fake stream - stream = io.BytesIO(b'') - - # Mock it's read function so it can't be called too many times - mock_read = MagicMock(side_effect=on_read) - - try: - stream.close() - with patch.multiple(stream, create=True, read=mock_read): - # Now the stream can't call 'read' more than call_limit times - # and it looks like a requests stream that is closed - buf = ReadBuffer(stream, 50) - buf.read_line("\n") - except InfiniteLoopException: - self.fail("ReadBuffer.read_line tried to loop infinitely.") - - # The mocked function not have been called at all since the stream looks closed - self.assertEqual(mock_read.call_count, 0) - - def test_read_unicode_tweet(self): - stream = b'11\n{id:12345}\n\n23\n{id:23456, test:"\xe3\x81\x93"}\n\n' - for length in [1, 2, 5, 10, 20, 50]: - buf = ReadBuffer(io.BytesIO(stream), length) - self.assertEqual('11\n', buf.read_line()) - self.assertEqual('{id:12345}\n', buf.read_len(11)) - self.assertEqual('\n', buf.read_line()) - self.assertEqual('23\n', buf.read_line()) - self.assertEqual('{id:23456, test:"\u3053"}\n', buf.read_len(23)) - - class TweepyStreamBackoffTests(unittest.TestCase): def setUp(self): #bad auth causes twitter to return 401 errors diff --git a/tweepy/streaming.py b/tweepy/streaming.py index 23b8715..b8e9e75 100644 --- a/tweepy/streaming.py +++ b/tweepy/streaming.py @@ -6,7 +6,6 @@ import json import logging -import re import ssl from threading import Thread from time import sleep @@ -119,55 +118,6 @@ class StreamListener: return -class ReadBuffer: - """Buffer data from the response in a smarter way than httplib/requests can. - - Tweets are roughly in the 2-12kb range, averaging around 3kb. - Requests/urllib3/httplib/socket all use socket.read, which blocks - until enough data is returned. On some systems (eg google appengine), socket - reads are quite slow. To combat this latency we can read big chunks, - but the blocking part means we won't get results until enough tweets - have arrived. That may not be a big deal for high throughput systems. - For low throughput systems we don't want to sacrifice latency, so we - use small chunks so it can read the length and the tweet in 2 read calls. - """ - - def __init__(self, stream, chunk_size, encoding='utf-8'): - self._stream = stream - self._buffer = b'' - self._chunk_size = chunk_size - self._encoding = encoding - - def read_len(self, length): - while not self._stream.closed: - if len(self._buffer) >= length: - return self._pop(length) - read_len = max(self._chunk_size, length - len(self._buffer)) - self._buffer += self._stream.read(read_len) - return b'' - - def read_line(self, sep=b'\n'): - """Read the data stream until a given separator is found (default \n) - - :param sep: Separator to read until. Must by of the bytes type - :return: The str of the data read until sep - """ - start = 0 - while not self._stream.closed: - loc = self._buffer.find(sep, start) - if loc >= 0: - return self._pop(loc + len(sep)) - else: - start = len(self._buffer) - self._buffer += self._stream.read(self._chunk_size) - return b'' - - def _pop(self, length): - r = self._buffer[:length] - self._buffer = self._buffer[length:] - return r.decode(self._encoding) - - class Stream: def __init__(self, auth, listener, **options): @@ -276,49 +226,14 @@ class Stream: self.new_session() def _read_loop(self, resp): - charset = resp.headers.get('content-type', default='') - enc_search = re.search(r'charset=(?P\S*)', charset) - if enc_search is not None: - encoding = enc_search.group('enc') - else: - encoding = 'utf-8' - - buf = ReadBuffer(resp.raw, self.chunk_size, encoding=encoding) - - while self.running and not resp.raw.closed: - line = buf.read_line() - stripped_line = line.strip() if line else line # line is sometimes None so we need to check here - if not stripped_line: - self.listener.on_keep_alive() # keep-alive new lines are expected - elif self.running: - if self.listener.on_data(stripped_line) is False: - self.running = False - - # # Note: keep-alive newlines might be inserted before each length value. - # # read until we get a digit... - # c = b'\n' - # for c in resp.iter_content(decode_unicode=True): - # if c == b'\n': - # continue - # break - # - # delimited_string = c - # - # # read rest of delimiter length.. - # d = b'' - # for d in resp.iter_content(decode_unicode=True): - # if d != b'\n': - # delimited_string += d - # continue - # break - # - # # read the next twitter status object - # if delimited_string.decode('utf-8').strip().isdigit(): - # status_id = int(delimited_string) - # data = resp.raw.read(status_id) - # if self.running: - # if self.listener.on_data(data.decode('utf-8')) is False: - # self.running = False + for line in resp.iter_lines(chunk_size=self.chunk_size): + if not self.running: + break + if not line: + self.listener.on_keep_alive() + elif self.listener.on_data(line) is False: + self.running = False + break if resp.raw.closed: self.on_closed(resp)