check for closed fp in stream reading
authorMichael Brooks <mjbrooks@uw.edu>
Wed, 18 Feb 2015 21:49:23 +0000 (13:49 -0800)
committerMichael Brooks <mjbrooks@uw.edu>
Wed, 18 Feb 2015 21:49:23 +0000 (13:49 -0800)
tests/test_streaming.py
tweepy/streaming.py

index a87c2fc0bb2e229899ef03ec589205641b551526..0d54b97b85a60b55ad34372cc1fddbaf212e4e10 100644 (file)
@@ -133,6 +133,52 @@ class TweepyStreamReadBuffer(unittest.TestCase):
             self.assertEqual('24\n', buf.read_line())
             self.assertEqual('{id:23456, test:"blah"}\n', buf.read_len(24))
 
+    def test_read_empty_buffer(self):
+        """
+        Requests can be closed by twitter.
+        The ReadBuffer should not loop infinitely when this happens.
+        Instead it should return and let the outer _read_loop handle it.
+        """
+
+        # If the test fails, we are in danger of an infinite loop
+        # so we need to do some work to block that from happening
+        class InfiniteLoopException(Exception):
+            pass
+
+        self.called_count = 0
+        call_limit = 5
+        def on_read(chunk_size):
+            self.called_count += 1
+
+            if self.called_count > call_limit:
+                # we have failed
+                raise InfiniteLoopException("Oops, read() was called a bunch of times")
+
+            return ""
+
+        # Create a fake stream
+        stream = six.StringIO('')
+
+        # Mock it's read function so it can't be called too many times
+        mock_read = MagicMock(side_effect=on_read)
+
+        # Add an _fp member to it, like a real requests.raw stream
+        mock_fp = MagicMock()
+        mock_fp.isclosed.return_value = True
+
+        try:
+            with patch.multiple(stream, read=mock_read, _fp=mock_fp, create=True):
+                # Now the stream can't call 'read' more than call_limit times
+                # and it looks like a requests stream that is closed
+                buf = ReadBuffer(stream, 50)
+                buf.read_line("\n")
+        except InfiniteLoopException:
+            self.fail("ReadBuffer.read_line tried to loop infinitely.")
+
+        self.assertEqual(mock_fp.isclosed.call_count, 1)
+        # The mocked function not have been called at all since the stream looks closed
+        self.assertEqual(mock_read.call_count, 0)
+
 
 class TweepyStreamBackoffTests(unittest.TestCase):
     def setUp(self):
index 23a02cd6af0377ffd91ae28305d8e924e53611a2..a0f73949cca42a4f60c2449c467659e6271ff658 100644 (file)
@@ -154,7 +154,7 @@ class ReadBuffer(object):
         self._chunk_size = chunk_size
 
     def read_len(self, length):
-        while True:
+        while not self._stream._fp.isclosed():
             if len(self._buffer) >= length:
                 return self._pop(length)
             read_len = max(self._chunk_size, length - len(self._buffer))
@@ -162,7 +162,7 @@ class ReadBuffer(object):
 
     def read_line(self, sep='\n'):
         start = 0
-        while True:
+        while not self._stream._fp.isclosed():
             loc = self._buffer.find(sep, start)
             if loc >= 0:
                 return self._pop(loc + len(sep))
@@ -292,9 +292,9 @@ class Stream(object):
     def _read_loop(self, resp):
         buf = ReadBuffer(resp.raw, self.chunk_size)
 
-        while self.running:
+        while self.running and not resp.raw._fp.isclosed():
             length = 0
-            while True:
+            while not resp.raw._fp.isclosed():
                 line = buf.read_line().strip()
                 if not line:
                     self.listener.keep_alive()  # keep-alive new lines are expected