# The mocked function not have been called at all since the stream looks closed
self.assertEqual(mock_read.call_count, 0)
+ def test_read_unicode_tweet(self):
+ stream = '11\n{id:12345}\n\n23\n{id:23456, test:"\xe3\x81\x93"}\n\n'
+ for length in [1, 2, 5, 10, 20, 50]:
+ buf = ReadBuffer(six.StringIO(stream), length)
+ self.assertEqual('11\n', buf.read_line())
+ self.assertEqual('{id:12345}\n', buf.read_len(11))
+ self.assertEqual('\n', buf.read_line())
+ self.assertEqual('23\n', buf.read_line())
+ self.assertEqual('{id:23456, test:"\xe3\x81\x93"}\n', buf.read_len(23))
+
class TweepyStreamBackoffTests(unittest.TestCase):
def setUp(self):
def __init__(self, stream, chunk_size):
self._stream = stream
- self._buffer = u""
+ self._buffer = ''
self._chunk_size = chunk_size
def read_len(self, length):
if len(self._buffer) >= length:
return self._pop(length)
read_len = max(self._chunk_size, length - len(self._buffer))
- self._buffer += self._stream.read(read_len).decode("ascii")
+ self._buffer += self._stream.read(read_len)
def read_line(self, sep='\n'):
start = 0
return self._pop(loc + len(sep))
else:
start = len(self._buffer)
- self._buffer += self._stream.read(self._chunk_size).decode("ascii")
+ self._buffer += self._stream.read(self._chunk_size)
def _pop(self, length):
r = self._buffer[:length]