Add streaming test_sample test.
authorJoshua Roesslein <jroesslein@gmail.com>
Sun, 19 May 2013 06:59:12 +0000 (23:59 -0700)
committerJoshua Roesslein <jroesslein@gmail.com>
Sun, 19 May 2013 06:59:12 +0000 (23:59 -0700)
tests/test_streaming.py

index cac0b077979425eb0edb1f00ce67be203743e23b..0f32e9598533b4171f9f9a143544ee44a43aa7b7 100644 (file)
@@ -2,36 +2,52 @@ from time import sleep
 import unittest
 
 from tweepy.api import API
+from tweepy.models import Status
 from tweepy.streaming import Stream, StreamListener
 
 from config import create_auth
 from mock import mock_tweet
 
 class MockStreamListener(StreamListener):
-    def __init__(self):
+    def __init__(self, test_case):
         super(MockStreamListener, self).__init__()
+        self.test_case = test_case
         self.status_count = 0
+        self.status_stop_count = 0
+        self.connect_cb = None
+
+    def on_connect(self):
+        if self.connect_cb:
+            self.connect_cb()
 
     def on_status(self, status):
         self.status_count += 1
-        return False
+        self.test_case.assertIsInstance(status, Status)
+        if self.status_stop_count == self.status_count:
+            return False
 
 class TweepyStreamTests(unittest.TestCase):
     def setUp(self):
         self.auth = create_auth()
-        self.listener = MockStreamListener()
+        self.listener = MockStreamListener(self)
         self.stream = Stream(self.auth, self.listener)
 
     def tearDown(self):
         self.stream.disconnect()
 
     def test_userstream(self):
-        self.stream.userstream(async=True)
-
         # Generate random tweet which should show up in the stream.
-        # Wait a bit of time for it to arrive before asserting.
-        API(self.auth).update_status(mock_tweet())
-        sleep(1)
+        def on_connect():
+            API(self.auth).update_status(mock_tweet())
 
+        self.listener.connect_cb = on_connect
+        self.listener.status_stop_count = 1
+        self.stream.userstream()
         self.assertEqual(self.listener.status_count, 1)
 
+    def test_sample(self):
+        self.listener.status_stop_count = 10
+        self.stream.sample()
+        self.assertEquals(self.listener.status_count,
+                          self.listener.status_stop_count)
+