Update stream API to use new API method URLs. Added StreamListener to replace callbac...
authorJosh Roesslein <jroesslein@gmail.com>
Thu, 3 Sep 2009 17:13:26 +0000 (12:13 -0500)
committerJosh Roesslein <jroesslein@gmail.com>
Thu, 3 Sep 2009 17:13:26 +0000 (12:13 -0500)
CHANGES
streamwatcher.py
tweepy/__init__.py
tweepy/streaming.py

diff --git a/CHANGES b/CHANGES
index 9acb582056634a3490e11ad5dc3a242ba13a76c0..af32436a5fb2e64286c413e8295f513264848e6e 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -13,7 +13,11 @@ during upgrade will be listed here.
     + added new() method. shortcut for setting up new API instances
       example: API.new(auth='basic', username='testuser', password='testpass')
     + update_profile_image() and update_profile_background_image() method added.
++ Streaming:
+    + Update to new streaming API methods
+    + New StreamListener class replacing callback function
 + Fixes
     + User.following is now set to False instead of None
       when user is not followed.
     + python 2.5 import syntax error fixed
+    + python 2.5 timeout support for streaming API
index 91826e8689395e73dc78eccbb6c943b019d46011..8c8df3d5afb4841ea7e2b1e655e67fd34ec0a7eb 100755 (executable)
@@ -5,32 +5,38 @@ from getpass import getpass
 
 import tweepy
 
-def callback(t, stream_object):
-  if t == 'status':
-    print stream_object.text   
-  elif t == 'delete':
-    print 'delete!!!  id = %s' % stream_object['id']
-  elif t == 'limit':
-    print 'limit!!! track=%s' % stream_object['track']
+class StreamWatcherListener(tweepy.StreamListener):
+
+  def on_status(self, status):
+    print status.text
+
+  def on_error(self, status_code):
+    print 'An error has occured! Status code = %s' % status_code
+    return True  # keep stream alive
 
 # Prompt for login credentials and setup stream object
 username = raw_input('Twitter username: ')
 password = getpass('Twitter password: ')
-stream = tweepy.Stream(username, password, callback)
+stream = tweepy.Stream(username, password, StreamWatcherListener())
 
 # Prompt for mode of streaming and connect
 while True:
-  mode = raw_input('Mode? [spritzer/follow/track] ')
-  if mode == 'spritzer':
-    stream.spritzer()
-    break
-  elif mode == 'follow':
-    follow_list = raw_input('Users to follow (comma separated): ')
-    stream.follow(follow_list)
+  mode = raw_input('Mode? [sample/filter] ')
+  if mode == 'sample':
+    stream.sample()
     break
-  elif mode == 'track':
-    track_list = raw_input('Keywords to track (comma separated): ')
-    stream.track(track_list)
+  elif mode == 'filter':
+    follow_list = raw_input('Users to follow (comma separated): ').strip()
+    track_list = raw_input('Keywords to track (comma seperated): ').strip()
+    if follow_list:
+      follow_list = [u for u in follow_list.split(',')]
+    else:
+      follow_list = None
+    if track_list:
+      track_list = [k for k in track_list.split(',')]
+    else:
+      track_list = None
+    stream.filter(follow_list, track_list)
     break
   else:
     print 'Invalid choice! Try again.'
index 3d70bfaa631ccc3fe979914df0a221197d911662..8b5eedb12efee68acd2da6248c60a2dcda730d91 100644 (file)
@@ -12,7 +12,7 @@ from . error import TweepError
 from . api import API
 from . cache import Cache, MemoryCache, FileCache, MemCache
 from . auth import BasicAuthHandler, OAuthHandler
-from . streaming import Stream
+from . streaming import Stream, StreamListener
 
 # Global, unauthenticated instance of API
 api = API()
index d7faf12c6f2bee15112d3f92a1a3542e1ff4cee0..f955118aa43863a810d8498f3c1fae5752871346 100644 (file)
@@ -17,12 +17,32 @@ try:
 except ImportError:
   import simplejson as json
 
+STREAM_VERSION = 1
+
+class StreamListener(object):
+
+  def on_status(self, status):
+    """Called when a new status arrives"""
+    return
+
+  def on_delete(self, status_id, user_id):
+    """Called when a delete notice arrives for a status"""
+    return
+
+  def on_limit(self, track):
+    """Called when a limitation notice arrvies"""
+    return
+
+  def on_error(self, status_code):
+    """Called when a non-200 status code is returned"""
+    return False
+
 class Stream(object):
 
   host = 'stream.twitter.com'
 
-  def __init__(self, username, password, callback, timeout=2.0, retry_count = 3
-                retry_time = 3.0, snooze_time = 10.0, buffer_size=1500):
+  def __init__(self, username, password, listener, timeout=5.0, retry_count = None
+                retry_time = 10.0, snooze_time = 5.0, buffer_size=1500):
     self.auth = BasicAuthHandler(username, password)
     self.running = False
     self.timeout = timeout
@@ -30,7 +50,7 @@ class Stream(object):
     self.retry_time = retry_time
     self.snooze_time = snooze_time
     self.buffer_size = buffer_size
-    self.callback = callback
+    self.listener = listener
     self.api = API()
 
   def _run(self):
@@ -42,7 +62,7 @@ class Stream(object):
     error_counter = 0
     conn = None
     while self.running:
-      if error_counter > self.retry_count:
+      if self.retry_count and error_counter > self.retry_count:
         # quit if error count greater than retry count
         break
       try:
@@ -52,12 +72,15 @@ class Stream(object):
         conn.request('POST', self.url, headers=headers)
         resp = conn.getresponse()
         if resp.status != 200:
+          if self.listener.on_error(resp.status) is False:
+            break
           error_counter += 1
           sleep(self.retry_time)
         else:
           error_counter = 0
           self._read_loop(resp)
       except timeout:
+        print 'timeout!'
         if self.running is False:
           break
         conn.close()
@@ -96,68 +119,40 @@ class Stream(object):
       # turn json data into status object
       if 'in_reply_to_status_id' in data:
         status = parse_status(data, self.api)
-        self.callback('status', status)
+        self.listener.on_status(status)
       elif 'delete' in data:
-        self.callback('delete', json.loads(data)['delete']['status'])
+        delete = json.loads(data)['delete']['status']
+        self.listener.on_delete(delete['id'], delete['user_id'])
       elif 'limit' in data:
-        self.callback('limit', json.loads(data)['limit'])
+        self.listener.on_limit(json.loads(data)['limit']['track'])
 
   def firehose(self, count=None, ):
     if self.running:
       raise TweepError('Stream object already connected!')
-    self.url = '/firehose.json?delimited=length'
-    if count:
-      self.url += '&count=%s' % count
-    self.running = True
-    Thread(target=self._run).start()
-
-  def gardenhose(self):
-    if self.running:
-      raise TweepError('Stream object already connected!')
-    self.url = '/gardenhose.json?delimited=length'
-    self.running = True
-    Thread(target=self._run).start()
-
-  def birddog(self, follow, count=None):
-    if self.running:
-      raise TweepError('Stream object already connected!')
-    self.url = '/birddog.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '')
+    self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION
     if count:
       self.url += '&count=%s' % count
     self.running = True
     Thread(target=self._run).start()
 
-  def shadow(self, follow, count=None):
+  def sample(self, count=None):
     if self.running:
       raise TweepError('Stream object already connected!')
-    self.url = '/shadow.json?delimited=length&follow=%s' % str(follow).strip('[]').replace(' ', '')
+    self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION
     if count:
       self.url += '&count=%s' % count
     self.running = True
     Thread(target=self._run).start()
 
-  def spritzer(self):
-    if self.running:
-      raise TweepError('Stream object already connected!')
-    self.url = '/spritzer.json?delimited=length'
-    self.running = True
-    Thread(target=self._run).start()
-
-  def follow(self, follow=None):
+  def filter(self, follow=None, track=None):
     if self.running:
       raise TweepError('Stream object already connected!')
-    self.url = '/follow.json?delimited=length'
+    self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION
     if follow:
-      self.url += '&follow=%s' % str(follow).strip('[]').replace(' ', '')
-    self.running = True
-    Thread(target=self._run).start()
-
-  def track(self, track=None):
-    if self.running:
-      raise TweepError('Stream object already connected!')
-    self.url = '/track.json?delimited=length'
+      self.url += '&follow=%s' % ','.join(follow)
     if track:
-      self.url += '&track=%s' % str(track).strip('[]').replace(' ', '')
+      self.url += '&track=%s' % ','.join(track)
+    print self.url
     self.running = True
     Thread(target=self._run).start()