self.listener.status_stop_count = 1
self.stream.userstream()
self.assertEqual(self.listener.status_count, 1)
+
+ def test_sitestream(self):
+ self.listener.connect_cb = self.on_connect
+ self.listener.status_stop_count = 1
+ self.sitestream(follow=[self.auth.get_username()])
+ self.assertEqual(self.listener.status_count, 1)
+ def test_userstream_with_params(self):
+ # Generate random tweet which should show up in the stream.
+ def on_connect():
+ API(self.auth).update_status(mock_tweet())
+
+ self.listener.connect_cb = on_connect
+ self.listener.status_stop_count = 1
+ self.stream.userstream(_with='user', replies='all', stall_warnings=True)
+ self.assertEqual(self.listener.status_count, 1)
+
def test_sample(self):
self.listener.status_stop_count = 10
self.stream.sample()
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
- if count:
- self.url += '&count=%s' % count
self._start(async)
- def filter(self, follow=None, track=None, async=False, locations=None,
- count = None, stall_warnings=False, languages=None):
- self.parameters = {}
- self.headers['Content-type'] = "application/x-www-form-urlencoded"
+ def filter(self, follow=None, track=None, async=False, locations=None,
+ stall_warnings=False, languages=None, encoding='utf8'):
+ self.session.params = {}
+ self.session.headers['Content-type'] = "application/x-www-form-urlencoded"
if self.running:
raise TweepError('Stream object already connected!')
- self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
+ self.url = '/%s/statuses/filter.json' % STREAM_VERSION
if follow:
- self.parameters['follow'] = ','.join(map(str, follow))
+ encoded_follow = [s.encode(encoding) for s in follow]
+ self.session.params['follow'] = ','.join(encoded_follow)
if track:
- self.parameters['track'] = ','.join(map(str, track))
+ encoded_track = [s.encode(encoding) for s in track]
+ self.session.params['track'] = ','.join(encoded_track)
if locations and len(locations) > 0:
- assert len(locations) % 4 == 0
- self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
- if count:
- self.parameters['count'] = count
+ if len(locations) % 4 != 0:
+ raise TweepError("Wrong number of locations points, "
+ "it has to be a multiple of 4")
+ self.session.params['locations'] = ','.join(['%.4f' % l for l in locations])
if stall_warnings:
- self.parameters['stall_warnings'] = stall_warnings
+ self.session.params['stall_warnings'] = stall_warnings
if languages:
- self.parameters['language'] = ','.join(map(str, languages))
- self.body = urlencode_noplus(self.parameters)
- self.parameters['delimited'] = 'length'
+ self.session.params['language'] = ','.join(map(str, languages))
+ self.body = urlencode_noplus(self.session.params)
+ self.session.params['delimited'] = 'length'
+ self.host = 'stream.twitter.com'
self._start(async)
+ def sitestream(self, follow, stall_warnings=False, with_='user', replies=False, async=False):
+ self.parameters = {}
+ if self.running:
+ raise TweepError('Stream object already connected!')
+ self.url = '/%s/site.json' % STREAM_VERSION
+ self.parameters['follow'] = ','.join(map(str, follow))
+ self.parameters['delimited'] = 'length'
+ if stall_warnings:
+ self.parameters['stall_warnings'] = stall_warnings
+ if with_:
+ self.parameters['with'] = with_
+ if replies:
+ self.parameters['replies'] = replies
+ self.body = urlencode_noplus(self.parameters)
+ self._start(async)
+
def disconnect(self):
if self.running is False:
return