Use requests Response.iter_lines in Stream
authorHarmon <Harmon758@gmail.com>
Fri, 22 Jan 2021 16:52:57 +0000 (10:52 -0600)
committerHarmon <Harmon758@gmail.com>
Fri, 22 Jan 2021 16:52:57 +0000 (10:52 -0600)
Remove and replace ReadBuffer

The on_data method of the stream listener is now passed the raw data from Stream in bytes form.

The previous regex check for the charset directive/parameter in the Content-Type HTTP header was unnecessary anyway, as it's not included in the response for streams (anymore?).

This reduces the overhead for each line of data if the raw data doesn't need to be decoded to be processed, but passes on the burden of decoding the raw data if the default StreamListener.on_data is not being used.

The default StreamListener.on_data implementation should still be fine, as json.loads can process bytes since Python 3.6 and accepts UTF-8 encoding, which it should still be safe to assume is used as the encoding for the raw data, as before, when the unnecessary regex check would fail.

This Stream._read_loop implementation also allows the stream to disconnect when any line of data is received, including keep-alive signals, which resolves #773 and resolves #897.

tests/test_streaming.py
tweepy/streaming.py

index 257115732ff87462256a5a724839c242c8e941a5..81424507b8f68d463f4265feabb36eae3372af4a 100644 (file)
@@ -1,4 +1,3 @@
-import io
 import unittest
 from unittest.case import skip
 
@@ -9,7 +8,7 @@ from .test_utils import mock_tweet
 from tweepy.api import API
 from tweepy.auth import OAuthHandler
 from tweepy.models import Status
-from tweepy.streaming import ReadBuffer, Stream, StreamListener
+from tweepy.streaming import Stream, StreamListener
 
 
 class MockStreamListener(StreamListener):
@@ -81,72 +80,6 @@ class TweepyStreamTests(unittest.TestCase):
         self.assertEqual('Caf\xe9'.encode('utf8'), s.body['follow'])
 
 
-class TweepyStreamReadBufferTests(unittest.TestCase):
-
-    stream = b"""11\n{id:12345}\n\n24\n{id:23456, test:"blah"}\n"""
-
-    def test_read_tweet(self):
-        for length in [1, 2, 5, 10, 20, 50]:
-            buf = ReadBuffer(io.BytesIO(self.stream), length)
-            self.assertEqual('11\n', buf.read_line())
-            self.assertEqual('{id:12345}\n', buf.read_len(11))
-            self.assertEqual('\n', buf.read_line())
-            self.assertEqual('24\n', buf.read_line())
-            self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24))
-
-    def test_read_empty_buffer(self):
-        """
-        Requests can be closed by twitter.
-        The ReadBuffer should not loop infinitely when this happens.
-        Instead it should return and let the outer _read_loop handle it.
-        """
-
-        # If the test fails, we are in danger of an infinite loop
-        # so we need to do some work to block that from happening
-        class InfiniteLoopException(Exception):
-            pass
-
-        self.called_count = 0
-        call_limit = 5
-        def on_read(chunk_size):
-            self.called_count += 1
-
-            if self.called_count > call_limit:
-                # we have failed
-                raise InfiniteLoopException("Oops, read() was called a bunch of times")
-
-            return ""
-
-        # Create a fake stream
-        stream = io.BytesIO(b'')
-
-        # Mock it's read function so it can't be called too many times
-        mock_read = MagicMock(side_effect=on_read)
-
-        try:
-            stream.close()
-            with patch.multiple(stream, create=True, read=mock_read):
-                # Now the stream can't call 'read' more than call_limit times
-                # and it looks like a requests stream that is closed
-                buf = ReadBuffer(stream, 50)
-                buf.read_line("\n")
-        except InfiniteLoopException:
-            self.fail("ReadBuffer.read_line tried to loop infinitely.")
-
-        # The mocked function not have been called at all since the stream looks closed
-        self.assertEqual(mock_read.call_count, 0)
-
-    def test_read_unicode_tweet(self):
-        stream = b'11\n{id:12345}\n\n23\n{id:23456, test:"\xe3\x81\x93"}\n\n'
-        for length in [1, 2, 5, 10, 20, 50]:
-            buf = ReadBuffer(io.BytesIO(stream), length)
-            self.assertEqual('11\n', buf.read_line())
-            self.assertEqual('{id:12345}\n', buf.read_len(11))
-            self.assertEqual('\n', buf.read_line())
-            self.assertEqual('23\n', buf.read_line())
-            self.assertEqual('{id:23456, test:"\u3053"}\n', buf.read_len(23))
-
-
 class TweepyStreamBackoffTests(unittest.TestCase):
     def setUp(self):
         #bad auth causes twitter to return 401 errors
index 23b8715cf4288060a40f30cc9084cedb18f8dcf8..b8e9e757976e37baba70647f7d6ac75ee0266e2f 100644 (file)
@@ -6,7 +6,6 @@
 
 import json
 import logging
-import re
 import ssl
 from threading import Thread
 from time import sleep
@@ -119,55 +118,6 @@ class StreamListener:
         return
 
 
-class ReadBuffer:
-    """Buffer data from the response in a smarter way than httplib/requests can.
-
-    Tweets are roughly in the 2-12kb range, averaging around 3kb.
-    Requests/urllib3/httplib/socket all use socket.read, which blocks
-    until enough data is returned. On some systems (eg google appengine), socket
-    reads are quite slow. To combat this latency we can read big chunks,
-    but the blocking part means we won't get results until enough tweets
-    have arrived. That may not be a big deal for high throughput systems.
-    For low throughput systems we don't want to sacrifice latency, so we
-    use small chunks so it can read the length and the tweet in 2 read calls.
-    """
-
-    def __init__(self, stream, chunk_size, encoding='utf-8'):
-        self._stream = stream
-        self._buffer = b''
-        self._chunk_size = chunk_size
-        self._encoding = encoding
-
-    def read_len(self, length):
-        while not self._stream.closed:
-            if len(self._buffer) >= length:
-                return self._pop(length)
-            read_len = max(self._chunk_size, length - len(self._buffer))
-            self._buffer += self._stream.read(read_len)
-        return b''
-
-    def read_line(self, sep=b'\n'):
-        """Read the data stream until a given separator is found (default \n)
-
-        :param sep: Separator to read until. Must by of the bytes type
-        :return: The str of the data read until sep
-        """
-        start = 0
-        while not self._stream.closed:
-            loc = self._buffer.find(sep, start)
-            if loc >= 0:
-                return self._pop(loc + len(sep))
-            else:
-                start = len(self._buffer)
-            self._buffer += self._stream.read(self._chunk_size)
-        return b''
-
-    def _pop(self, length):
-        r = self._buffer[:length]
-        self._buffer = self._buffer[length:]
-        return r.decode(self._encoding)
-
-
 class Stream:
 
     def __init__(self, auth, listener, **options):
@@ -276,49 +226,14 @@ class Stream:
             self.new_session()
 
     def _read_loop(self, resp):
-        charset = resp.headers.get('content-type', default='')
-        enc_search = re.search(r'charset=(?P<enc>\S*)', charset)
-        if enc_search is not None:
-            encoding = enc_search.group('enc')
-        else:
-            encoding = 'utf-8'
-
-        buf = ReadBuffer(resp.raw, self.chunk_size, encoding=encoding)
-
-        while self.running and not resp.raw.closed:
-            line = buf.read_line()
-            stripped_line = line.strip() if line else line  # line is sometimes None so we need to check here
-            if not stripped_line:
-                self.listener.on_keep_alive()  # keep-alive new lines are expected
-            elif self.running:
-                if self.listener.on_data(stripped_line) is False:
-                    self.running = False
-
-            # # Note: keep-alive newlines might be inserted before each length value.
-            # # read until we get a digit...
-            # c = b'\n'
-            # for c in resp.iter_content(decode_unicode=True):
-            #     if c == b'\n':
-            #         continue
-            #     break
-            #
-            # delimited_string = c
-            #
-            # # read rest of delimiter length..
-            # d = b''
-            # for d in resp.iter_content(decode_unicode=True):
-            #     if d != b'\n':
-            #         delimited_string += d
-            #         continue
-            #     break
-            #
-            # # read the next twitter status object
-            # if delimited_string.decode('utf-8').strip().isdigit():
-            #     status_id = int(delimited_string)
-            #     data = resp.raw.read(status_id)
-            #     if self.running:
-            #         if self.listener.on_data(data.decode('utf-8')) is False:
-            #             self.running = False
+        for line in resp.iter_lines(chunk_size=self.chunk_size):
+            if not self.running:
+                break
+            if not line:
+                self.listener.on_keep_alive()
+            elif self.listener.on_data(line) is False:
+                self.running = False
+                break
 
         if resp.raw.closed:
             self.on_closed(resp)