From: Michael Brooks Date: Wed, 18 Feb 2015 21:49:23 +0000 (-0800) Subject: check for closed fp in stream reading X-Git-Url: https://vcs.fsf.org/?a=commitdiff_plain;h=875bcdd3b230dd990a3b4bdcb51abe92f3a3f6b9;p=tweepy.git check for closed fp in stream reading --- diff --git a/tests/test_streaming.py b/tests/test_streaming.py index a87c2fc..0d54b97 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -133,6 +133,52 @@ class TweepyStreamReadBuffer(unittest.TestCase): self.assertEqual('24\n', buf.read_line()) self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24)) + def test_read_empty_buffer(self): + """ + Requests can be closed by twitter. + The ReadBuffer should not loop infinitely when this happens. + Instead it should return and let the outer _read_loop handle it. + """ + + # If the test fails, we are in danger of an infinite loop + # so we need to do some work to block that from happening + class InfiniteLoopException(Exception): + pass + + self.called_count = 0 + call_limit = 5 + def on_read(chunk_size): + self.called_count += 1 + + if self.called_count > call_limit: + # we have failed + raise InfiniteLoopException("Oops, read() was called a bunch of times") + + return "" + + # Create a fake stream + stream = six.StringIO('') + + # Mock it's read function so it can't be called too many times + mock_read = MagicMock(side_effect=on_read) + + # Add an _fp member to it, like a real requests.raw stream + mock_fp = MagicMock() + mock_fp.isclosed.return_value = True + + try: + with patch.multiple(stream, read=mock_read, _fp=mock_fp, create=True): + # Now the stream can't call 'read' more than call_limit times + # and it looks like a requests stream that is closed + buf = ReadBuffer(stream, 50) + buf.read_line("\n") + except InfiniteLoopException: + self.fail("ReadBuffer.read_line tried to loop infinitely.") + + self.assertEqual(mock_fp.isclosed.call_count, 1) + # The mocked function not have been called at all since the stream looks closed + self.assertEqual(mock_read.call_count, 0) + class TweepyStreamBackoffTests(unittest.TestCase): def setUp(self): diff --git a/tweepy/streaming.py b/tweepy/streaming.py index 23a02cd..a0f7394 100644 --- a/tweepy/streaming.py +++ b/tweepy/streaming.py @@ -154,7 +154,7 @@ class ReadBuffer(object): self._chunk_size = chunk_size def read_len(self, length): - while True: + while not self._stream._fp.isclosed(): if len(self._buffer) >= length: return self._pop(length) read_len = max(self._chunk_size, length - len(self._buffer)) @@ -162,7 +162,7 @@ class ReadBuffer(object): def read_line(self, sep='\n'): start = 0 - while True: + while not self._stream._fp.isclosed(): loc = self._buffer.find(sep, start) if loc >= 0: return self._pop(loc + len(sep)) @@ -292,9 +292,9 @@ class Stream(object): def _read_loop(self, resp): buf = ReadBuffer(resp.raw, self.chunk_size) - while self.running: + while self.running and not resp.raw._fp.isclosed(): length = 0 - while True: + while not resp.raw._fp.isclosed(): line = buf.read_line().strip() if not line: self.listener.keep_alive() # keep-alive new lines are expected