import tweepy
-def callback(t, stream_object):
- if t == 'status':
- print stream_object.text
- elif t == 'delete':
- print 'delete!!! id = %s' % stream_object['id']
- elif t == 'limit':
- print 'limit!!! track=%s' % stream_object['track']
+class StreamWatcherListener(tweepy.StreamListener):
+
+ def on_status(self, status):
+ print status.text
+
+ def on_error(self, status_code):
+ print 'An error has occured! Status code = %s' % status_code
+ return True # keep stream alive
# Prompt for login credentials and setup stream object
username = raw_input('Twitter username: ')
password = getpass('Twitter password: ')
-stream = tweepy.Stream(username, password, callback)
+stream = tweepy.Stream(username, password, StreamWatcherListener())
# Prompt for mode of streaming and connect
while True:
- mode = raw_input('Mode? [spritzer/follow/track] ')
- if mode == 'spritzer':
- stream.spritzer()
- break
- elif mode == 'follow':
- follow_list = raw_input('Users to follow (comma separated): ')
- stream.follow(follow_list)
+ mode = raw_input('Mode? [sample/filter] ')
+ if mode == 'sample':
+ stream.sample()
break
- elif mode == 'track':
- track_list = raw_input('Keywords to track (comma separated): ')
- stream.track(track_list)
+ elif mode == 'filter':
+ follow_list = raw_input('Users to follow (comma separated): ').strip()
+ track_list = raw_input('Keywords to track (comma seperated): ').strip()
+ if follow_list:
+ follow_list = [u for u in follow_list.split(',')]
+ else:
+ follow_list = None
+ if track_list:
+ track_list = [k for k in track_list.split(',')]
+ else:
+ track_list = None
+ stream.filter(follow_list, track_list)
break
else:
print 'Invalid choice! Try again.'
except ImportError:
import simplejson as json
+STREAM_VERSION = 1
+
+class StreamListener(object):
+
+ def on_status(self, status):
+ """Called when a new status arrives"""
+ return
+
+ def on_delete(self, status_id, user_id):
+ """Called when a delete notice arrives for a status"""
+ return
+
+ def on_limit(self, track):
+ """Called when a limitation notice arrvies"""
+ return
+
+ def on_error(self, status_code):
+ """Called when a non-200 status code is returned"""
+ return False
+
class Stream(object):
host = 'stream.twitter.com'
- def __init__(self, username, password, callback, timeout=2.0, retry_count = 3,
- retry_time = 3.0, snooze_time = 10.0, buffer_size=1500):
+ def __init__(self, username, password, listener, timeout=5.0, retry_count = None,
+ retry_time = 10.0, snooze_time = 5.0, buffer_size=1500):
self.auth = BasicAuthHandler(username, password)
self.running = False
self.timeout = timeout
self.retry_time = retry_time
self.snooze_time = snooze_time
self.buffer_size = buffer_size
- self.callback = callback
+ self.listener = listener
self.api = API()
def _run(self):
error_counter = 0
conn = None
while self.running:
- if error_counter > self.retry_count:
+ if self.retry_count and error_counter > self.retry_count:
# quit if error count greater than retry count
break
try:
conn.request('POST', self.url, headers=headers)
resp = conn.getresponse()
if resp.status != 200:
+ if self.listener.on_error(resp.status) is False:
+ break
error_counter += 1
sleep(self.retry_time)
else:
error_counter = 0
self._read_loop(resp)
except timeout:
+ print 'timeout!'
if self.running is False:
break
conn.close()
# turn json data into status object
if 'in_reply_to_status_id' in data:
status = parse_status(data, self.api)
- self.callback('status', status)
+ self.listener.on_status(status)
elif 'delete' in data:
- self.callback('delete', json.loads(data)['delete']['status'])
+ delete = json.loads(data)['delete']['status']
+ self.listener.on_delete(delete['id'], delete['user_id'])
elif 'limit' in data:
- self.callback('limit', json.loads(data)['limit'])
+ self.listener.on_limit(json.loads(data)['limit']['track'])
def firehose(self, count=None, ):
if self.running:
raise TweepError('Stream object already connected!')
- self.url = '/firehose.json?delimited=length'
- if count:
- self.url += '&count=%s' % count
- self.running = True
- Thread(target=self._run).start()
-
- def gardenhose(self):
- if self.running:
- raise TweepError('Stream object already connected!')
- self.url = '/gardenhose.json?delimited=length'
- self.running = True
- Thread(target=self._run).start()
-
- def birddog(self, follow, count=None):
- if self.running:
- raise TweepError('Stream object already connected!')
- self.url = '/birddog.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '')
+ self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION
if count:
self.url += '&count=%s' % count
self.running = True
Thread(target=self._run).start()
- def shadow(self, follow, count=None):
+ def sample(self, count=None):
if self.running:
raise TweepError('Stream object already connected!')
- self.url = '/shadow.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '')
+ self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION
if count:
self.url += '&count=%s' % count
self.running = True
Thread(target=self._run).start()
- def spritzer(self):
- if self.running:
- raise TweepError('Stream object already connected!')
- self.url = '/spritzer.json?delimited=length'
- self.running = True
- Thread(target=self._run).start()
-
- def follow(self, follow=None):
+ def filter(self, follow=None, track=None):
if self.running:
raise TweepError('Stream object already connected!')
- self.url = '/follow.json?delimited=length'
+ self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION
if follow:
- self.url += '&follow=%s' % str(follow).strip('[]').replace(' ', '')
- self.running = True
- Thread(target=self._run).start()
-
- def track(self, track=None):
- if self.running:
- raise TweepError('Stream object already connected!')
- self.url = '/track.json?delimited=length'
+ self.url += '&follow=%s' % ','.join(follow)
if track:
- self.url += '&track=%s' % str(track).strip('[]').replace(' ', '')
+ self.url += '&track=%s' % ','.join(track)
+ print self.url
self.running = True
Thread(target=self._run).start()