From dcbd075ec8db919bf648b88ec76f08966c9b4006 Mon Sep 17 00:00:00 2001 From: Josh Roesslein Date: Thu, 3 Sep 2009 12:13:26 -0500 Subject: [PATCH] Update stream API to use new API method URLs. Added StreamListener to replace callback function. --- CHANGES | 4 +++ streamwatcher.py | 42 +++++++++++++---------- tweepy/__init__.py | 2 +- tweepy/streaming.py | 83 +++++++++++++++++++++------------------------ 4 files changed, 68 insertions(+), 63 deletions(-) diff --git a/CHANGES b/CHANGES index 9acb582..af32436 100644 --- a/CHANGES +++ b/CHANGES @@ -13,7 +13,11 @@ during upgrade will be listed here. + added new() method. shortcut for setting up new API instances example: API.new(auth='basic', username='testuser', password='testpass') + update_profile_image() and update_profile_background_image() method added. ++ Streaming: + + Update to new streaming API methods + + New StreamListener class replacing callback function + Fixes + User.following is now set to False instead of None when user is not followed. + python 2.5 import syntax error fixed + + python 2.5 timeout support for streaming API diff --git a/streamwatcher.py b/streamwatcher.py index 91826e8..8c8df3d 100755 --- a/streamwatcher.py +++ b/streamwatcher.py @@ -5,32 +5,38 @@ from getpass import getpass import tweepy -def callback(t, stream_object): - if t == 'status': - print stream_object.text - elif t == 'delete': - print 'delete!!! id = %s' % stream_object['id'] - elif t == 'limit': - print 'limit!!! track=%s' % stream_object['track'] +class StreamWatcherListener(tweepy.StreamListener): + + def on_status(self, status): + print status.text + + def on_error(self, status_code): + print 'An error has occured! Status code = %s' % status_code + return True # keep stream alive # Prompt for login credentials and setup stream object username = raw_input('Twitter username: ') password = getpass('Twitter password: ') -stream = tweepy.Stream(username, password, callback) +stream = tweepy.Stream(username, password, StreamWatcherListener()) # Prompt for mode of streaming and connect while True: - mode = raw_input('Mode? [spritzer/follow/track] ') - if mode == 'spritzer': - stream.spritzer() - break - elif mode == 'follow': - follow_list = raw_input('Users to follow (comma separated): ') - stream.follow(follow_list) + mode = raw_input('Mode? [sample/filter] ') + if mode == 'sample': + stream.sample() break - elif mode == 'track': - track_list = raw_input('Keywords to track (comma separated): ') - stream.track(track_list) + elif mode == 'filter': + follow_list = raw_input('Users to follow (comma separated): ').strip() + track_list = raw_input('Keywords to track (comma seperated): ').strip() + if follow_list: + follow_list = [u for u in follow_list.split(',')] + else: + follow_list = None + if track_list: + track_list = [k for k in track_list.split(',')] + else: + track_list = None + stream.filter(follow_list, track_list) break else: print 'Invalid choice! Try again.' diff --git a/tweepy/__init__.py b/tweepy/__init__.py index 3d70bfa..8b5eedb 100644 --- a/tweepy/__init__.py +++ b/tweepy/__init__.py @@ -12,7 +12,7 @@ from . error import TweepError from . api import API from . cache import Cache, MemoryCache, FileCache, MemCache from . auth import BasicAuthHandler, OAuthHandler -from . streaming import Stream +from . streaming import Stream, StreamListener # Global, unauthenticated instance of API api = API() diff --git a/tweepy/streaming.py b/tweepy/streaming.py index d7faf12..f955118 100644 --- a/tweepy/streaming.py +++ b/tweepy/streaming.py @@ -17,12 +17,32 @@ try: except ImportError: import simplejson as json +STREAM_VERSION = 1 + +class StreamListener(object): + + def on_status(self, status): + """Called when a new status arrives""" + return + + def on_delete(self, status_id, user_id): + """Called when a delete notice arrives for a status""" + return + + def on_limit(self, track): + """Called when a limitation notice arrvies""" + return + + def on_error(self, status_code): + """Called when a non-200 status code is returned""" + return False + class Stream(object): host = 'stream.twitter.com' - def __init__(self, username, password, callback, timeout=2.0, retry_count = 3, - retry_time = 3.0, snooze_time = 10.0, buffer_size=1500): + def __init__(self, username, password, listener, timeout=5.0, retry_count = None, + retry_time = 10.0, snooze_time = 5.0, buffer_size=1500): self.auth = BasicAuthHandler(username, password) self.running = False self.timeout = timeout @@ -30,7 +50,7 @@ class Stream(object): self.retry_time = retry_time self.snooze_time = snooze_time self.buffer_size = buffer_size - self.callback = callback + self.listener = listener self.api = API() def _run(self): @@ -42,7 +62,7 @@ class Stream(object): error_counter = 0 conn = None while self.running: - if error_counter > self.retry_count: + if self.retry_count and error_counter > self.retry_count: # quit if error count greater than retry count break try: @@ -52,12 +72,15 @@ class Stream(object): conn.request('POST', self.url, headers=headers) resp = conn.getresponse() if resp.status != 200: + if self.listener.on_error(resp.status) is False: + break error_counter += 1 sleep(self.retry_time) else: error_counter = 0 self._read_loop(resp) except timeout: + print 'timeout!' if self.running is False: break conn.close() @@ -96,68 +119,40 @@ class Stream(object): # turn json data into status object if 'in_reply_to_status_id' in data: status = parse_status(data, self.api) - self.callback('status', status) + self.listener.on_status(status) elif 'delete' in data: - self.callback('delete', json.loads(data)['delete']['status']) + delete = json.loads(data)['delete']['status'] + self.listener.on_delete(delete['id'], delete['user_id']) elif 'limit' in data: - self.callback('limit', json.loads(data)['limit']) + self.listener.on_limit(json.loads(data)['limit']['track']) def firehose(self, count=None, ): if self.running: raise TweepError('Stream object already connected!') - self.url = '/firehose.json?delimited=length' - if count: - self.url += '&count=%s' % count - self.running = True - Thread(target=self._run).start() - - def gardenhose(self): - if self.running: - raise TweepError('Stream object already connected!') - self.url = '/gardenhose.json?delimited=length' - self.running = True - Thread(target=self._run).start() - - def birddog(self, follow, count=None): - if self.running: - raise TweepError('Stream object already connected!') - self.url = '/birddog.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '') + self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION if count: self.url += '&count=%s' % count self.running = True Thread(target=self._run).start() - def shadow(self, follow, count=None): + def sample(self, count=None): if self.running: raise TweepError('Stream object already connected!') - self.url = '/shadow.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '') + self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION if count: self.url += '&count=%s' % count self.running = True Thread(target=self._run).start() - def spritzer(self): - if self.running: - raise TweepError('Stream object already connected!') - self.url = '/spritzer.json?delimited=length' - self.running = True - Thread(target=self._run).start() - - def follow(self, follow=None): + def filter(self, follow=None, track=None): if self.running: raise TweepError('Stream object already connected!') - self.url = '/follow.json?delimited=length' + self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION if follow: - self.url += '&follow=%s' % str(follow).strip('[]').replace(' ', '') - self.running = True - Thread(target=self._run).start() - - def track(self, track=None): - if self.running: - raise TweepError('Stream object already connected!') - self.url = '/track.json?delimited=length' + self.url += '&follow=%s' % ','.join(follow) if track: - self.url += '&track=%s' % str(track).strip('[]').replace(' ', '') + self.url += '&track=%s' % ','.join(track) + print self.url self.running = True Thread(target=self._run).start() -- 2.25.1