Step 3: Starting a Stream
=========================
A number of twitter streams are available through Tweepy. Most cases
-will use filter, the user_stream, or the sitestream.
+will use filter.
For more information on the capabilities and limitations of the different
streams see `Twitter Streaming API Documentation`_.
def on_connect(self):
API(self.auth).update_status(mock_tweet())
- def test_userstream(self):
- # Generate random tweet which should show up in the stream.
-
- self.listener.connect_cb = self.on_connect
- self.listener.status_stop_count = 1
- self.stream.userstream()
- self.assertEqual(self.listener.status_count, 1)
-
- @skip("Sitestream only available to whitelisted accounts.")
- def test_sitestream(self):
- self.listener.connect_cb = self.on_connect
- self.listener.status_stop_count = 1
- self.stream.sitestream(follow=[self.auth.get_username()])
- self.assertEqual(self.listener.status_count, 1)
-
- def test_userstream_with_params(self):
- # Generate random tweet which should show up in the stream.
- def on_connect():
- API(self.auth).update_status(mock_tweet())
-
- self.listener.connect_cb = on_connect
- self.listener.status_stop_count = 1
- self.stream.userstream(_with='user', replies='all', stall_warnings=True)
- self.assertEqual(self.listener.status_count, 1)
-
def test_sample(self):
self.listener.status_stop_count = 10
self.stream.sample()
""" Called when the response has been closed by Twitter """
pass
- def userstream(self,
- stall_warnings=False,
- _with=None,
- replies=None,
- track=None,
- locations=None,
- is_async=False,
- encoding='utf8'):
- self.session.params = {'delimited': 'length'}
- if self.running:
- raise TweepError('Stream object already connected!')
- self.url = '/%s/user.json' % STREAM_VERSION
- self.host = 'userstream.twitter.com'
- if stall_warnings:
- self.session.params['stall_warnings'] = stall_warnings
- if _with:
- self.session.params['with'] = _with
- if replies:
- self.session.params['replies'] = replies
- if locations and len(locations) > 0:
- if len(locations) % 4 != 0:
- raise TweepError("Wrong number of locations points, "
- "it has to be a multiple of 4")
- self.session.params['locations'] = ','.join(['%.2f' % l for l in locations])
- if track:
- self.session.params['track'] = ','.join(track).encode(encoding)
-
- self._start(is_async)
-
def sample(self, is_async=False, languages=None, stall_warnings=False):
self.session.params = {'delimited': 'length'}
if self.running:
self.session.params = {'delimited': 'length'}
self._start(is_async)
- def sitestream(self, follow, stall_warnings=False,
- with_='user', replies=False, is_async=False):
- self.body = {}
- if self.running:
- raise TweepError('Stream object already connected!')
- self.url = '/%s/site.json' % STREAM_VERSION
- self.body['follow'] = ','.join(map(str, follow))
- self.body['delimited'] = 'length'
- if stall_warnings:
- self.body['stall_warnings'] = stall_warnings
- if with_:
- self.body['with'] = with_
- if replies:
- self.body['replies'] = replies
- self._start(is_async)
-
def disconnect(self):
if self.running is False:
return