self.assertEqual('24\n', buf.read_line())
self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24))
+ def test_read_empty_buffer(self):
+ """
+ Requests can be closed by twitter.
+ The ReadBuffer should not loop infinitely when this happens.
+ Instead it should return and let the outer _read_loop handle it.
+ """
+
+ # If the test fails, we are in danger of an infinite loop
+ # so we need to do some work to block that from happening
+ class InfiniteLoopException(Exception):
+ pass
+
+ self.called_count = 0
+ call_limit = 5
+ def on_read(chunk_size):
+ self.called_count += 1
+
+ if self.called_count > call_limit:
+ # we have failed
+ raise InfiniteLoopException("Oops, read() was called a bunch of times")
+
+ return ""
+
+ # Create a fake stream
+ stream = six.StringIO('')
+
+ # Mock it's read function so it can't be called too many times
+ mock_read = MagicMock(side_effect=on_read)
+
+ # Add an _fp member to it, like a real requests.raw stream
+ mock_fp = MagicMock()
+ mock_fp.isclosed.return_value = True
+
+ try:
+ with patch.multiple(stream, read=mock_read, _fp=mock_fp, create=True):
+ # Now the stream can't call 'read' more than call_limit times
+ # and it looks like a requests stream that is closed
+ buf = ReadBuffer(stream, 50)
+ buf.read_line("\n")
+ except InfiniteLoopException:
+ self.fail("ReadBuffer.read_line tried to loop infinitely.")
+
+ self.assertEqual(mock_fp.isclosed.call_count, 1)
+ # The mocked function not have been called at all since the stream looks closed
+ self.assertEqual(mock_read.call_count, 0)
+
class TweepyStreamBackoffTests(unittest.TestCase):
def setUp(self):
self._chunk_size = chunk_size
def read_len(self, length):
- while True:
+ while not self._stream._fp.isclosed():
if len(self._buffer) >= length:
return self._pop(length)
read_len = max(self._chunk_size, length - len(self._buffer))
def read_line(self, sep='\n'):
start = 0
- while True:
+ while not self._stream._fp.isclosed():
loc = self._buffer.find(sep, start)
if loc >= 0:
return self._pop(loc + len(sep))
def _read_loop(self, resp):
buf = ReadBuffer(resp.raw, self.chunk_size)
- while self.running:
+ while self.running and not resp.raw._fp.isclosed():
length = 0
- while True:
+ while not resp.raw._fp.isclosed():
line = buf.read_line().strip()
if not line:
self.listener.keep_alive() # keep-alive new lines are expected