From: Josh Roesslein Date: Sun, 13 Sep 2009 19:14:23 +0000 (-0500) Subject: 2 space -> 4 space indents X-Git-Url: https://vcs.fsf.org/?a=commitdiff_plain;h=a00f378d846d547f22d5cc8d781d6feeb10a70ae;p=tweepy.git 2 space -> 4 space indents --- diff --git a/streamwatcher.py b/streamwatcher.py index 3012088..4d84f0a 100755 --- a/streamwatcher.py +++ b/streamwatcher.py @@ -5,17 +5,18 @@ from getpass import getpass import tweepy + class StreamWatcherListener(tweepy.StreamListener): - def on_status(self, status): - print status.text + def on_status(self, status): + print status.text - def on_error(self, status_code): - print 'An error has occured! Status code = %s' % status_code - return True # keep stream alive + def on_error(self, status_code): + print 'An error has occured! Status code = %s' % status_code + return True # keep stream alive - def on_timeout(self): - print 'Snoozing Zzzzzz' + def on_timeout(self): + print 'Snoozing Zzzzzz' # Prompt for login credentials and setup stream object username = raw_input('Twitter username: ') @@ -24,38 +25,37 @@ stream = tweepy.Stream(username, password, StreamWatcherListener()) # Prompt for mode of streaming and connect while True: - mode = raw_input('Mode? [sample/filter] ') - if mode == 'sample': - stream.sample() - break - elif mode == 'filter': - follow_list = raw_input('Users to follow (comma separated): ').strip() - track_list = raw_input('Keywords to track (comma seperated): ').strip() - if follow_list: - follow_list = [u for u in follow_list.split(',')] - else: - follow_list = None - if track_list: - track_list = [k for k in track_list.split(',')] + mode = raw_input('Mode? [sample/filter] ') + if mode == 'sample': + stream.sample() + break + elif mode == 'filter': + follow_list = raw_input('Users to follow (comma separated): ').strip() + track_list = raw_input('Keywords to track (comma seperated): ').strip() + if follow_list: + follow_list = [u for u in follow_list.split(',')] + else: + follow_list = None + if track_list: + track_list = [k for k in track_list.split(',')] + else: + track_list = None + stream.filter(follow_list, track_list) + break else: - track_list = None - stream.filter(follow_list, track_list) - break - else: - print 'Invalid choice! Try again.' + print 'Invalid choice! Try again.' # Run in a loop until termination while True: - try: - if stream.running is False: - print 'Stream stopped!' - break - time.sleep(1) - except KeyboardInterrupt: - break + try: + if stream.running is False: + print 'Stream stopped!' + break + time.sleep(1) + except KeyboardInterrupt: + break # Shutdown connection stream.disconnect() - print 'Bye!' diff --git a/tests.py b/tests.py index 6fe28c0..118944c 100644 --- a/tests.py +++ b/tests.py @@ -5,177 +5,180 @@ import unittest import random from time import sleep +import os from tweepy import * """Configurations""" # Must supply twitter account credentials for tests -username = '' -password = '' +username = 'tweebly' +password = 'josh1987twitter' """Unit tests""" -# API tests + class TweepyAPITests(unittest.TestCase): - def setUp(self): - self.api = API(BasicAuthHandler(username, password)) - - def testpublictimeline(self): - self.assertEqual(len(self.api.public_timeline()), 20) - - def testfriendstimeline(self): - self.assert_(len(self.api.friends_timeline()) > 0) - - def testusertimeline(self): - s = self.api.user_timeline(screen_name='twitter') - self.assert_(len(s) > 0) - self.assertEqual(s[0].author.screen_name, 'twitter') - - def testmentions(self): - s = self.api.mentions() - self.assert_(len(s) > 0) - self.assert_(s[0].text.find(username) >= 0) - - def testgetstatus(self): - s = self.api.get_status(id=123) - self.assertEqual(s.author.id, 17) - - def testupdateanddestroystatus(self): - # test update - text = 'testing %i' % random.randint(0,1000) - update = self.api.update_status(status=text) - self.assertEqual(update.text, text) - - # test destroy - deleted = self.api.destroy_status(id=update.id) - self.assertEqual(deleted.id, update.id) - - def testgetuser(self): - u = self.api.get_user(screen_name='twitter') - self.assertEqual(u.screen_name, 'twitter') - - def testme(self): - me = self.api.me() - self.assertEqual(me.screen_name, username) - - def testfriends(self): - friends = self.api.friends() - self.assert_(len(friends) > 0) - - def testfollowers(self): - followers = self.api.followers() - self.assert_(len(followers) > 0) - - def testdirectmessages(self): - dms = self.api.direct_messages() - self.assert_(len(dms) > 0) - - def testsendanddestroydirectmessage(self): - # send - sent_dm = self.api.send_direct_message(username, 'test message') - self.assertEqual(sent_dm.text, 'test message') - self.assertEqual(sent_dm.sender.screen_name, username) - self.assertEqual(sent_dm.recipient.screen_name, username) - - # destroy - destroyed_dm = self.api.destroy_direct_message(sent_dm.id) - self.assertEqual(destroyed_dm.text, sent_dm.text) - self.assertEqual(destroyed_dm.id, sent_dm.id) - self.assertEqual(destroyed_dm.sender.screen_name, username) - self.assertEqual(destroyed_dm.recipient.screen_name, username) - - def testcreatefriendship(self): - friend = self.api.create_friendship('twitter') - self.assertEqual(friend.screen_name, 'twitter') - self.assertTrue(self.api.exists_friendship(username, 'twitter')) - - def testdestroyfriendship(self): - enemy = self.api.destroy_friendship('twitter') - self.assertEqual(enemy.screen_name, 'twitter') - self.assertFalse(self.api.exists_friendship(username, 'twitter')) - - def testshowfriendship(self): - source, target = self.api.show_friendship(target_screen_name='twtiter') - self.assert_(isinstance(source, Friendship)) - self.assert_(isinstance(target, Friendship)) - -# Authentication tests + def setUp(self): + self.api = API(BasicAuthHandler(username, password)) + + def testpublictimeline(self): + self.assertEqual(len(self.api.public_timeline()), 20) + + def testfriendstimeline(self): + self.assert_(len(self.api.friends_timeline()) > 0) + + def testusertimeline(self): + s = self.api.user_timeline(screen_name='twitter') + self.assert_(len(s) > 0) + self.assertEqual(s[0].author.screen_name, 'twitter') + + def testmentions(self): + s = self.api.mentions() + self.assert_(len(s) > 0) + self.assert_(s[0].text.find(username) >= 0) + + def testgetstatus(self): + s = self.api.get_status(id=123) + self.assertEqual(s.author.id, 17) + + def testupdateanddestroystatus(self): + # test update + text = 'testing %i' % random.randint(0, 1000) + update = self.api.update_status(status=text) + self.assertEqual(update.text, text) + + # test destroy + deleted = self.api.destroy_status(id=update.id) + self.assertEqual(deleted.id, update.id) + + def testgetuser(self): + u = self.api.get_user(screen_name='twitter') + self.assertEqual(u.screen_name, 'twitter') + + def testme(self): + me = self.api.me() + self.assertEqual(me.screen_name, username) + + def testfriends(self): + friends = self.api.friends() + self.assert_(len(friends) > 0) + + def testfollowers(self): + followers = self.api.followers() + self.assert_(len(followers) > 0) + + def testdirectmessages(self): + dms = self.api.direct_messages() + self.assert_(len(dms) > 0) + + def testsendanddestroydirectmessage(self): + # send + sent_dm = self.api.send_direct_message(username, 'test message') + self.assertEqual(sent_dm.text, 'test message') + self.assertEqual(sent_dm.sender.screen_name, username) + self.assertEqual(sent_dm.recipient.screen_name, username) + + # destroy + destroyed_dm = self.api.destroy_direct_message(sent_dm.id) + self.assertEqual(destroyed_dm.text, sent_dm.text) + self.assertEqual(destroyed_dm.id, sent_dm.id) + self.assertEqual(destroyed_dm.sender.screen_name, username) + self.assertEqual(destroyed_dm.recipient.screen_name, username) + + def testcreatefriendship(self): + friend = self.api.create_friendship('twitter') + self.assertEqual(friend.screen_name, 'twitter') + self.assertTrue(self.api.exists_friendship(username, 'twitter')) + + def testdestroyfriendship(self): + enemy = self.api.destroy_friendship('twitter') + self.assertEqual(enemy.screen_name, 'twitter') + self.assertFalse(self.api.exists_friendship(username, 'twitter')) + + def testshowfriendship(self): + source, target = self.api.show_friendship(target_screen_name='twtiter') + self.assert_(isinstance(source, Friendship)) + self.assert_(isinstance(target, Friendship)) + + class TweepyAuthTests(unittest.TestCase): - consumer_key = 'ZbzSsdQj7t68VYlqIFvdcA' - consumer_secret = '4yDWgrBiRs2WIx3bfvF9UWCRmtQ2YKpKJKBahtZcU' + consumer_key = 'ZbzSsdQj7t68VYlqIFvdcA' + consumer_secret = '4yDWgrBiRs2WIx3bfvF9UWCRmtQ2YKpKJKBahtZcU' - def testoauth(self): - auth = OAuthHandler(self.consumer_key, self.consumer_secret) + def testoauth(self): + auth = OAuthHandler(self.consumer_key, self.consumer_secret) - # test getting access token - auth_url = auth.get_authorization_url() - self.assert_(auth_url.startswith('http://twitter.com/oauth/authorize?')) - print 'Please authorize: ' + auth_url - verifier = raw_input('PIN: ').strip() - self.assert_(len(verifier) > 0) - access_token = auth.get_access_token(verifier) - self.assert_(access_token is not None) + # test getting access token + auth_url = auth.get_authorization_url() + self.assert_(auth_url.startswith('http://twitter.com/oauth/authorize?')) + print 'Please authorize: ' + auth_url + verifier = raw_input('PIN: ').strip() + self.assert_(len(verifier) > 0) + access_token = auth.get_access_token(verifier) + self.assert_(access_token is not None) - # build api object test using oauth - api = API(auth) - api.update_status('test %i' % random.randint(0,1000)) + # build api object test using oauth + api = API(auth) + api.update_status('test %i' % random.randint(0, 1000)) - def testbasicauth(self): - auth = BasicAuthHandler(username, password) + def testbasicauth(self): + auth = BasicAuthHandler(username, password) - # test accessing twitter API - api = API(auth) - api.update_status('test %i' % random.randint(1,1000)) + # test accessing twitter API + api = API(auth) + api.update_status('test %i' % random.randint(1, 1000)) -# Cache tests class TweepyCacheTests(unittest.TestCase): - timeout = 2.0 - memcache_servers = ['127.0.0.1:11211'] # must be running for test to pass - - def _run_tests(self, do_cleanup=True): - # test store and get - self.cache.store('testkey', 'testvalue') - self.assertEqual(self.cache.get('testkey'), 'testvalue', 'Stored value does not match retrieved value') - - # test timeout - sleep(self.timeout) - self.assertEqual(self.cache.get('testkey'), None, 'Cache entry should have expired') - - # test cleanup - if do_cleanup: - self.cache.store('testkey', 'testvalue') - sleep(self.timeout) - self.cache.cleanup() - self.assertEqual(self.cache.count(), 0, 'Cache cleanup failed') - - # test count - for i in range(0,20): - self.cache.store('testkey%i' % i, 'testvalue') - self.assertEqual(self.cache.count(), 20, 'Count is wrong') - - # test flush - self.cache.flush() - self.assertEqual(self.cache.count(), 0, 'Cache failed to flush') - - def testmemorycache(self): - self.cache = MemoryCache(timeout=self.timeout) - self._run_tests() - - def testfilecache(self): - os.mkdir('cache_test_dir') - self.cache = FileCache('cache_test_dir', self.timeout) - self._run_tests() - self.cache.flush() - os.rmdir('cache_test_dir') - - def testmemcache(self): - self.cache = MemCache(self.memcache_servers, self.timeout) - self._run_tests(do_cleanup=False) + timeout = 2.0 + memcache_servers = ['127.0.0.1:11211'] # must be running for test to pass + + def _run_tests(self, do_cleanup=True): + # test store and get + self.cache.store('testkey', 'testvalue') + self.assertEqual(self.cache.get('testkey'), 'testvalue', + 'Stored value does not match retrieved value') + + # test timeout + sleep(self.timeout) + self.assertEqual(self.cache.get('testkey'), None, + 'Cache entry should have expired') + + # test cleanup + if do_cleanup: + self.cache.store('testkey', 'testvalue') + sleep(self.timeout) + self.cache.cleanup() + self.assertEqual(self.cache.count(), 0, 'Cache cleanup failed') + + # test count + for i in range(0, 20): + self.cache.store('testkey%i' % i, 'testvalue') + self.assertEqual(self.cache.count(), 20, 'Count is wrong') + + # test flush + self.cache.flush() + self.assertEqual(self.cache.count(), 0, 'Cache failed to flush') + + def testmemorycache(self): + self.cache = MemoryCache(timeout=self.timeout) + self._run_tests() + + def testfilecache(self): + os.mkdir('cache_test_dir') + self.cache = FileCache('cache_test_dir', self.timeout) + self._run_tests() + self.cache.flush() + os.rmdir('cache_test_dir') + + def testmemcache(self): + self.cache = MemCache(self.memcache_servers, self.timeout) + self._run_tests(do_cleanup=False) if __name__ == '__main__': - unittest.main() - + + unittest.main() + diff --git a/tutorial/t2.py b/tutorial/t2.py index 403e72b..7beacb3 100644 --- a/tutorial/t2.py +++ b/tutorial/t2.py @@ -21,8 +21,8 @@ Let's query the public timeline and print it to the console... public_timeline = no_auth_api.public_timeline() print 'Public timeline...' for status in public_timeline: - print status.text - print 'from: %s' % status.author.screen_name + print status.text + print 'from: %s' % status.author.screen_name """ Tweepy provides a non-authenticated instance of the API for you already @@ -50,8 +50,8 @@ and print it to the console... friends_timeline = auth_api.friends_timeline() print 'Friends timeline...' for status in friends_timeline: - print status.text - print 'from: %s' % status.author.screen_name + print status.text + print 'from: %s' % status.author.screen_name """ The End diff --git a/tutorial/t3.py b/tutorial/t3.py index 954e745..044bbaa 100644 --- a/tutorial/t3.py +++ b/tutorial/t3.py @@ -22,9 +22,9 @@ First let's create our own implementation of Status. """ class MyStatus(tweepy.Status): - def length(self): - """Return length of status text""" - return len(self.text) + def length(self): + """Return length of status text""" + return len(self.text) """ We must now register our implementation of Status with tweepy. @@ -54,10 +54,10 @@ to make sure data is present which your application depends on. Here's a demo... """ try: - u = tweepy.api.get_user('twitter') + u = tweepy.api.get_user('twitter') except TweepError, e: - # will be raised if user is invalid OR request failed - print 'Failed to get user: %s' % e + # will be raised if user is invalid OR request failed + print 'Failed to get user: %s' % e """ To disable auto validation... @@ -74,7 +74,7 @@ friends by using the User model friends() shortcut... u = tweepy.api.get_user('twitter') friends = u.friends() for friend in friends: - print friend.screen_name + print friend.screen_name """ To learn about all shortcuts check out the reference documentation. diff --git a/tutorial/t4.py b/tutorial/t4.py index a59e2ec..dd4feb1 100644 --- a/tutorial/t4.py +++ b/tutorial/t4.py @@ -14,9 +14,9 @@ When ever something goes wrong this exception will be raised. Here is an example: """ try: - tweepy.api.update_status('this will fail since we are not authenticated!') + tweepy.api.update_status('this will fail since we are not authenticated!') except tweepy.TweepError, e: - print 'Failed to update! %s' % e + print 'Failed to update! %s' % e """ TweepError's can be casted to string format which will diff --git a/tweepy/__init__.py b/tweepy/__init__.py index 8b5eedb..380c2db 100644 --- a/tweepy/__init__.py +++ b/tweepy/__init__.py @@ -16,3 +16,4 @@ from . streaming import Stream, StreamListener # Global, unauthenticated instance of API api = API() + diff --git a/tweepy/api.py b/tweepy/api.py index 115b65d..cf22fb7 100644 --- a/tweepy/api.py +++ b/tweepy/api.py @@ -10,455 +10,460 @@ from . error import TweepError from . auth import BasicAuthHandler, OAuthHandler from tweepy.parsers import * -"""Twitter API""" + class API(object): + """Twitter API""" + + def __init__(self, auth_handler=None, host='twitter.com', cache=None, + secure=False, api_root='', validate=True): + # you may access these freely + self.auth_handler = auth_handler + self.host = host + self.api_root = api_root + self.cache = cache + self.secure = secure + self.validate = validate + + # not a good idea to touch these + self._username = None + + @staticmethod + def new(auth='basic', *args, **kargs): + if auth == 'basic': + return API(BasicAuthHandler(*args, **kargs)) + elif auth == 'oauth': + return API(OAuthHandler(*args, **kargs)) + else: + raise TweepError('Invalid auth type') + + """Get public timeline""" + public_timeline = bind_api( + path = '/statuses/public_timeline.json', + parser = parse_statuses, + allowed_param = [] + ) + + """Get friends timeline""" + friends_timeline = bind_api( + path = '/statuses/friends_timeline.json', + parser = parse_statuses, + allowed_param = ['since_id', 'max_id', 'count', 'page'], + require_auth = True + ) + + """Get user timeline""" + user_timeline = bind_api( + path = '/statuses/user_timeline.json', + parser = parse_statuses, + allowed_param = ['id', 'user_id', 'screen_name', 'since_id', + 'max_id', 'count', 'page'] + ) + + """Get mentions""" + mentions = bind_api( + path = '/statuses/mentions.json', + parser = parse_statuses, + allowed_param = ['since_id', 'max_id', 'count', 'page'], + require_auth = True + ) + + """Show status""" + get_status = bind_api( + path = '/statuses/show.json', + parser = parse_status, + allowed_param = ['id'] + ) + + """Update status""" + update_status = bind_api( + path = '/statuses/update.json', + method = 'POST', + parser = parse_status, + allowed_param = ['status', 'in_reply_to_status_id'], + require_auth = True + ) + + """Destroy status""" + destroy_status = bind_api( + path = '/statuses/destroy.json', + method = 'DELETE', + parser = parse_status, + allowed_param = ['id'], + require_auth = True + ) + + """Show user""" + get_user = bind_api( + path = '/users/show.json', + parser = parse_user, + allowed_param = ['id', 'user_id', 'screen_name'] + ) + + """Get authenticated user""" + def me(self): + # if username not fetched, go get it... + if self._username is None: + if self.auth_handler is None: + raise TweepError('Authentication required') + + try: + user = bind_api( + path = '/account/verify_credentials.json', + parser = parse_user + )(self) + except TweepError, e: + raise TweepError('Failed to fetch username: %s' % e) + + self._username = user.screen_name + + return self.get_user(screen_name=self._username) + + """Show friends""" + friends = bind_api( + path = '/statuses/friends.json', + parser = parse_users, + allowed_param = ['id', 'user_id', 'screen_name', 'page'] + ) + + """Show followers""" + followers = bind_api( + path = '/statuses/followers.json', + parser = parse_users, + allowed_param = ['id', 'user_id', 'screen_name', 'page'], + require_auth = True + ) + + """Get direct messages""" + direct_messages = bind_api( + path = '/direct_messages.json', + parser = parse_directmessages, + allowed_param = ['since_id', 'max_id', 'count', 'page'], + require_auth = True + ) + + """Sent direct messages""" + sent_direct_messages = bind_api( + path = '/direct_messages/sent.json', + parser = parse_directmessages, + allowed_param = ['since_id', 'max_id', 'count', 'page'], + require_auth = True + ) - def __init__(self, auth_handler=None, host='twitter.com', cache=None, - secure=False, api_root='', validate=True): - # you may access these freely - self.auth_handler = auth_handler - self.host = host - self.api_root = api_root - self.cache = cache - self.secure = secure - self.validate = validate - - # not a good idea to touch these - self._username = None - - @staticmethod - def new(auth='basic', *args, **kargs): - if auth == 'basic': - return API(BasicAuthHandler(*args, **kargs)) - elif auth == 'oauth': - return API(OAuthHandler(*args, **kargs)) - else: - raise TweepError('Invalid auth type') - - """Get public timeline""" - public_timeline = bind_api( - path = '/statuses/public_timeline.json', - parser = parse_statuses, - allowed_param = [] - ) - - """Get friends timeline""" - friends_timeline = bind_api( - path = '/statuses/friends_timeline.json', - parser = parse_statuses, - allowed_param = ['since_id', 'max_id', 'count', 'page'], - require_auth = True - ) - - """Get user timeline""" - user_timeline = bind_api( - path = '/statuses/user_timeline.json', - parser = parse_statuses, - allowed_param = ['id', 'user_id', 'screen_name', 'since_id', - 'max_id', 'count', 'page'] - ) - - """Get mentions""" - mentions = bind_api( - path = '/statuses/mentions.json', - parser = parse_statuses, - allowed_param = ['since_id', 'max_id', 'count', 'page'], - require_auth = True - ) - - """Show status""" - get_status = bind_api( - path = '/statuses/show.json', - parser = parse_status, - allowed_param = ['id'] - ) - - """Update status""" - update_status = bind_api( - path = '/statuses/update.json', - method = 'POST', - parser = parse_status, - allowed_param = ['status', 'in_reply_to_status_id'], - require_auth = True - ) - - """Destroy status""" - destroy_status = bind_api( - path = '/statuses/destroy.json', - method = 'DELETE', - parser = parse_status, - allowed_param = ['id'], - require_auth = True - ) - - """Show user""" - get_user = bind_api( - path = '/users/show.json', - parser = parse_user, - allowed_param = ['id', 'user_id', 'screen_name'] - ) - - """Get authenticated user""" - def me(self): - # if username not fetched, go get it... - if self._username is None: - if self.auth_handler is None: - raise TweepError('Authentication required') - - try: - user = bind_api(path='/account/verify_credentials.json', parser=parse_user)(self) - except TweepError, e: - raise TweepError('Failed to fetch username: %s' % e) - self._username = user.screen_name - - return self.get_user(screen_name=self._username) - - """Show friends""" - friends = bind_api( - path = '/statuses/friends.json', - parser = parse_users, - allowed_param = ['id', 'user_id', 'screen_name', 'page'] - ) - - """Show followers""" - followers = bind_api( - path = '/statuses/followers.json', - parser = parse_users, - allowed_param = ['id', 'user_id', 'screen_name', 'page'], - require_auth = True - ) - - """Get direct messages""" - direct_messages = bind_api( - path = '/direct_messages.json', - parser = parse_directmessages, - allowed_param = ['since_id', 'max_id', 'count', 'page'], - require_auth = True - ) - - """Sent direct messages""" - sent_direct_messages = bind_api( - path = '/direct_messages/sent.json', - parser = parse_directmessages, - allowed_param = ['since_id', 'max_id', 'count', 'page'], - require_auth = True - ) - - """Send direct message""" - send_direct_message = bind_api( - path = '/direct_messages/new.json', - method = 'POST', - parser = parse_dm, - allowed_param = ['user', 'text'], - require_auth = True - ) - - """Destroy direct message""" - destroy_direct_message = bind_api( - path = '/direct_messages/destroy.json', - method = 'DELETE', - parser = parse_dm, - allowed_param = ['id'], - require_auth = True - ) - - """Create friendship""" - create_friendship = bind_api( - path = '/friendships/create.json', - method = 'POST', - parser = parse_user, - allowed_param = ['id', 'user_id', 'screen_name', 'follow'], - require_auth = True - ) - - """Destroy friendship""" - destroy_friendship = bind_api( - path = '/friendships/destroy.json', - method = 'DELETE', - parser = parse_user, - allowed_param = ['id', 'user_id', 'screen_name'], - require_auth = True - ) - - """Check if friendship exists""" - exists_friendship = bind_api( - path = '/friendships/exists.json', - parser = parse_json, - allowed_param = ['user_a', 'user_b'] - ) - - """Show friendship details""" - show_friendship = bind_api( - path = '/friendships/show.json', - parser = parse_friendship, - allowed_param = ['source_id', 'source_screen_name', - 'target_id', 'target_screen_name'] - ) - - """Get list of IDs of users the specified user is following""" - friends_ids = bind_api( - path = '/friends/ids.json', - parser = parse_json, - allowed_param = ['id', 'user_id', 'screen_name', 'page'] - ) - - """Get list of IDs of users following the specified user""" - followers_ids = bind_api( - path = '/followers/ids.json', - parser = parse_json, - allowed_param = ['id', 'user_id', 'screen_name', 'page'] - ) - - """Verify credentials""" - def verify_credentials(self): - try: - return bind_api( - path = '/account/verify_credentials.json', - parser = parse_return_true, - require_auth = True)(self) - except TweepError: - return False - - """Rate limit status""" - rate_limit_status = bind_api( - path = '/account/rate_limit_status.json', - parser = parse_json - ) - - """Update delivery device""" - set_delivery_device = bind_api( - path = '/account/update_delivery_device.json', - method = 'POST', - allowed_param = ['device'], - parser = parse_user, - require_auth = True - ) - - """Update profile colors""" - update_profile_colors = bind_api( - path = '/account/update_profile_colors.json', - method = 'POST', - parser = parse_user, - allowed_param = ['profile_background_color', 'profile_text_color', - 'profile_link_color', 'profile_sidebar_fill_color', - 'profile_sidebar_border_color'], - require_auth = True - ) - - """Update profile image""" - def update_profile_image(self, filename): - headers, post_data = _pack_image(filename, 700) - bind_api( - path = '/account/update_profile_image.json', + """Send direct message""" + send_direct_message = bind_api( + path = '/direct_messages/new.json', method = 'POST', - parser = parse_none, + parser = parse_dm, + allowed_param = ['user', 'text'], require_auth = True - )(self, post_data=post_data, headers=headers) + ) - """Update profile background image""" - def update_profile_background_image(self, filename, *args, **kargs): - headers, post_data = _pack_image(filename, 800) - bind_api( - path = '/account/update_profile_background_image.json', + """Destroy direct message""" + destroy_direct_message = bind_api( + path = '/direct_messages/destroy.json', + method = 'DELETE', + parser = parse_dm, + allowed_param = ['id'], + require_auth = True + ) + + """Create friendship""" + create_friendship = bind_api( + path = '/friendships/create.json', method = 'POST', - parser = parse_none, - allowed_param = ['tile'], + parser = parse_user, + allowed_param = ['id', 'user_id', 'screen_name', 'follow'], require_auth = True - )(self, post_data=post_data, headers=headers) - - """Update profile""" - update_profile = bind_api( - path = '/account/update_profile.json', - method = 'POST', - parser = parse_user, - allowed_param = ['name', 'email', 'url', 'location', 'description'], - require_auth = True - ) - - """Get favorites""" - favorites = bind_api( - path = '/favorites.json', - parser = parse_statuses, - allowed_param = ['id', 'page'] - ) - - """Create favorite""" - create_favorite = bind_api( - path = '/favorites/create.json', - method = 'POST', - parser = parse_status, - allowed_param = ['id'], - require_auth = True - ) - - """Destroy favorite""" - destroy_favorite = bind_api( - path = '/favorites/destroy.json', - method = 'DELETE', - parser = parse_status, - allowed_param = ['id'], - require_auth = True - ) - - """Enable device notifications""" - enable_notifications = bind_api( - path = '/notifications/follow.json', - method = 'POST', - parser = parse_user, - allowed_param = ['id', 'user_id', 'screen_name'], - require_auth = True - ) - - """Disable device notifications""" - disable_notifications = bind_api( - path = '/notifications/leave.json', - method = 'POST', - parser = parse_user, - allowed_param = ['id', 'user_id', 'screen_name'], - require_auth = True - ) - - """Create a block""" - create_block = bind_api( - path = '/blocks/create.json', - method = 'POST', - parser = parse_user, - allowed_param = ['id'], - require_auth = True - ) - - """Destroy a block""" - destroy_block = bind_api( - path = '/blocks/destroy.json', - method = 'DELETE', - parser = parse_user, - allowed_param = ['id'], - require_auth = True - ) - - """Check if block exists""" - def exists_block(self, **kargs): - try: - bind_api( - path = '/blocks/exists.json', - parser = parse_none, - allowed_param = ['id', 'user_id', 'screen_name'], - require_auth = True - )(self, **kargs) - except TweepError: - return False - return True - - """Get list of users that are blocked""" - blocks = bind_api( - path = '/blocks/blocking.json', - parser = parse_users, - allowed_param = ['page'], - require_auth = True - ) - - """Get list of ids of users that are blocked""" - blocks_ids = bind_api( - path = '/blocks/blocking/ids.json', - parser = parse_json, - require_auth = True - ) - - """Get list of saved searches""" - saved_searches = bind_api( - path = '/saved_searches.json', - parser = parse_saved_searches, - require_auth = True - ) - - """Get a single saved search by id""" - def get_saved_search(self, id): - return bind_api( - path = '/saved_searches/show/%s.json' % id, - parser = parse_saved_search, + ) + + """Destroy friendship""" + destroy_friendship = bind_api( + path = '/friendships/destroy.json', + method = 'DELETE', + parser = parse_user, + allowed_param = ['id', 'user_id', 'screen_name'], + require_auth = True + ) + + """Check if friendship exists""" + exists_friendship = bind_api( + path = '/friendships/exists.json', + parser = parse_json, + allowed_param = ['user_a', 'user_b'] + ) + + """Show friendship details""" + show_friendship = bind_api( + path = '/friendships/show.json', + parser = parse_friendship, + allowed_param = ['source_id', 'source_screen_name', + 'target_id', 'target_screen_name'] + ) + + """Get list of IDs of users the specified user is following""" + friends_ids = bind_api( + path = '/friends/ids.json', + parser = parse_json, + allowed_param = ['id', 'user_id', 'screen_name', 'page'] + ) + + """Get list of IDs of users following the specified user""" + followers_ids = bind_api( + path = '/followers/ids.json', + parser = parse_json, + allowed_param = ['id', 'user_id', 'screen_name', 'page'] + ) + + """Verify credentials""" + def verify_credentials(self): + try: + return bind_api( + path = '/account/verify_credentials.json', + parser = parse_return_true, + require_auth = True + )(self) + except TweepError: + return False + + """Rate limit status""" + rate_limit_status = bind_api( + path = '/account/rate_limit_status.json', + parser = parse_json + ) + + """Update delivery device""" + set_delivery_device = bind_api( + path = '/account/update_delivery_device.json', + method = 'POST', + allowed_param = ['device'], + parser = parse_user, + require_auth = True + ) + + """Update profile colors""" + update_profile_colors = bind_api( + path = '/account/update_profile_colors.json', + method = 'POST', + parser = parse_user, + allowed_param = ['profile_background_color', 'profile_text_color', + 'profile_link_color', 'profile_sidebar_fill_color', + 'profile_sidebar_border_color'], + require_auth = True + ) + + """Update profile image""" + def update_profile_image(self, filename): + headers, post_data = _pack_image(filename, 700) + bind_api( + path = '/account/update_profile_image.json', + method = 'POST', + parser = parse_none, + require_auth = True + )(self, post_data=post_data, headers=headers) + + """Update profile background image""" + def update_profile_background_image(self, filename, *args, **kargs): + headers, post_data = _pack_image(filename, 800) + bind_api( + path = '/account/update_profile_background_image.json', + method = 'POST', + parser = parse_none, + allowed_param = ['tile'], + require_auth = True + )(self, post_data=post_data, headers=headers) + + """Update profile""" + update_profile = bind_api( + path = '/account/update_profile.json', + method = 'POST', + parser = parse_user, + allowed_param = ['name', 'email', 'url', 'location', 'description'], require_auth = True - )(self) - - """Create new saved search""" - create_saved_search = bind_api( - path = '/saved_searches/create.json', - method = 'POST', - parser = parse_saved_search, - allowed_param = ['query'], - require_auth = True - ) - - """Destroy a saved search""" - def destroy_saved_search(self, id): - return bind_api( - path = '/saved_searches/destroy/%s.json' % id, + ) + + """Get favorites""" + favorites = bind_api( + path = '/favorites.json', + parser = parse_statuses, + allowed_param = ['id', 'page'] + ) + + """Create favorite""" + create_favorite = bind_api( + path = '/favorites/create.json', + method = 'POST', + parser = parse_status, + allowed_param = ['id'], + require_auth = True + ) + + """Destroy favorite""" + destroy_favorite = bind_api( + path = '/favorites/destroy.json', method = 'DELETE', - parser = parse_saved_search, + parser = parse_status, allowed_param = ['id'], require_auth = True - )(self) - - def test(self): - return bind_api( - path = '/help/test.json', - parser = parse_return_true - )(self) - - """Search API""" - - def search(self, *args, **kargs): - return bind_api( - host = 'search.' + self.host, - path = '/search.json', - parser = parse_search_results, - allowed_param = ['q', 'lang', 'rpp', 'page', 'since_id', 'geocode', 'show_user'], - )(self, *args, **kargs) - - def trends(self): - return bind_api( - host = 'search.' + self.host, - path = '/trends.json', - parser = parse_trend_results - )(self) - - -""" Pack image file into multipart-formdata request""" -def _pack_image(filename, max_size): - """Pack image from file into multipart-formdata post body""" - # image must be less than 700kb in size - try: - if os.path.getsize(filename) > (max_size * 1024): - raise TweepError('File is too big, must be less than 700kb.') - except os.error, e: - raise TweepError('Unable to access file') - - # image must be gif, jpeg, or png - file_type = mimetypes.guess_type(filename) - if file_type is None: - raise TweepError('Could not determine file type') - file_type = file_type[0] - if file_type != 'image/gif' and file_type != 'image/jpeg' and file_type != 'image/png': - raise TweepError('Invalid file type for image: %s' % file_type) - - # build the mulitpart-formdata body - fp = open(filename, 'rb') - BOUNDARY = 'Tw3ePy' - body = [] - body.append('--' + BOUNDARY) - body.append('Content-Disposition: form-data; name="image"; filename="%s"' % filename) - body.append('Content-Type: %s' % file_type) - body.append('') - body.append(fp.read()) - body.append('--' + BOUNDARY + '--') - body.append('') - fp.close() - body = '\r\n'.join(body) - - # build headers - headers = { - 'Content-Type': 'multipart/form-data; boundary=Tw3ePy', - 'Content-Length': len(body) - } - - return headers, body + ) + + """Enable device notifications""" + enable_notifications = bind_api( + path = '/notifications/follow.json', + method = 'POST', + parser = parse_user, + allowed_param = ['id', 'user_id', 'screen_name'], + require_auth = True + ) + + """Disable device notifications""" + disable_notifications = bind_api( + path = '/notifications/leave.json', + method = 'POST', + parser = parse_user, + allowed_param = ['id', 'user_id', 'screen_name'], + require_auth = True + ) + + """Create a block""" + create_block = bind_api( + path = '/blocks/create.json', + method = 'POST', + parser = parse_user, + allowed_param = ['id'], + require_auth = True + ) + + """Destroy a block""" + destroy_block = bind_api( + path = '/blocks/destroy.json', + method = 'DELETE', + parser = parse_user, + allowed_param = ['id'], + require_auth = True + ) + + """Check if block exists""" + def exists_block(self, **kargs): + try: + bind_api( + path = '/blocks/exists.json', + parser = parse_none, + allowed_param = ['id', 'user_id', 'screen_name'], + require_auth = True + )(self, **kargs) + except TweepError: + return False + + return True + + """Get list of users that are blocked""" + blocks = bind_api( + path = '/blocks/blocking.json', + parser = parse_users, + allowed_param = ['page'], + require_auth = True + ) + + """Get list of ids of users that are blocked""" + blocks_ids = bind_api( + path = '/blocks/blocking/ids.json', + parser = parse_json, + require_auth = True + ) + + """Get list of saved searches""" + saved_searches = bind_api( + path = '/saved_searches.json', + parser = parse_saved_searches, + require_auth = True + ) + + """Get a single saved search by id""" + def get_saved_search(self, id): + return bind_api( + path = '/saved_searches/show/%s.json' % id, + parser = parse_saved_search, + require_auth = True + )(self) + + """Create new saved search""" + create_saved_search = bind_api( + path = '/saved_searches/create.json', + method = 'POST', + parser = parse_saved_search, + allowed_param = ['query'], + require_auth = True + ) + + """Destroy a saved search""" + def destroy_saved_search(self, id): + return bind_api( + path = '/saved_searches/destroy/%s.json' % id, + method = 'DELETE', + parser = parse_saved_search, + allowed_param = ['id'], + require_auth = True + )(self) + + def test(self): + return bind_api( + path = '/help/test.json', + parser = parse_return_true + )(self) + + """Search API""" + + def search(self, *args, **kargs): + return bind_api( + host = 'search.' + self.host, + path = '/search.json', + parser = parse_search_results, + allowed_param = ['q', 'lang', 'rpp', 'page', 'since_id', 'geocode', 'show_user'], + )(self, *args, **kargs) + + def trends(self): + return bind_api( + host = 'search.' + self.host, + path = '/trends.json', + parser = parse_trend_results + )(self) + + def _pack_image(filename, max_size): + """Pack image from file into multipart-formdata post body""" + # image must be less than 700kb in size + try: + if os.path.getsize(filename) > (max_size * 1024): + raise TweepError('File is too big, must be less than 700kb.') + except os.error, e: + raise TweepError('Unable to access file') + + # image must be gif, jpeg, or png + file_type = mimetypes.guess_type(filename) + if file_type is None: + raise TweepError('Could not determine file type') + file_type = file_type[0] + if file_type not in ['image/gif', 'image/jpeg', 'image/png']: + raise TweepError('Invalid file type for image: %s' % file_type) + + # build the mulitpart-formdata body + fp = open(filename, 'rb') + BOUNDARY = 'Tw3ePy' + body = [] + body.append('--' + BOUNDARY) + body.append('Content-Disposition: form-data; name="image"; filename="%s"' % filename) + body.append('Content-Type: %s' % file_type) + body.append('') + body.append(fp.read()) + body.append('--' + BOUNDARY + '--') + body.append('') + fp.close() + body = '\r\n'.join(body) + + # build headers + headers = { + 'Content-Type': 'multipart/form-data; boundary=Tw3ePy', + 'Content-Length': len(body) + } + + return headers, body diff --git a/tweepy/auth.py b/tweepy/auth.py index 655c493..d94644d 100644 --- a/tweepy/auth.py +++ b/tweepy/auth.py @@ -3,86 +3,97 @@ # See LICENSE from urllib2 import Request, urlopen -from urllib import quote import base64 from . import oauth from . error import TweepError + class AuthHandler(object): - def apply_auth(self, url, method, headers, parameters): - """Apply authentication headers to request""" - raise NotImplemented + def apply_auth(self, url, method, headers, parameters): + """Apply authentication headers to request""" + raise NotImplemented + class BasicAuthHandler(AuthHandler): - def __init__(self, username, password): - self._b64up = base64.b64encode('%s:%s' % (username, password)) + def __init__(self, username, password): + self._b64up = base64.b64encode('%s:%s' % (username, password)) - def apply_auth(self, url, method, headers, parameters): - headers['Authorization'] = 'Basic %s' % self._b64up + def apply_auth(self, url, method, headers, parameters): + headers['Authorization'] = 'Basic %s' % self._b64up -"""OAuth authentication handler""" -class OAuthHandler(AuthHandler): - REQUEST_TOKEN_URL = 'http://twitter.com/oauth/request_token' - AUTHORIZATION_URL = 'http://twitter.com/oauth/authorize' - ACCESS_TOKEN_URL = 'http://twitter.com/oauth/access_token' - - def __init__(self, consumer_key, consumer_secret, callback=None): - self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret) - self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1() - self.request_token = None - self.access_token = None - self.callback = callback - - def apply_auth(self, url, method, headers, parameters): - request = oauth.OAuthRequest.from_consumer_and_token(self._consumer, - http_url=url, http_method=method, token=self.access_token, parameters=parameters) - request.sign_request(self._sigmethod, self._consumer, self.access_token) - headers.update(request.to_header()) - - def _get_request_token(self): - try: - request = oauth.OAuthRequest.from_consumer_and_token(self._consumer, - http_url = self.REQUEST_TOKEN_URL, callback=self.callback) - request.sign_request(self._sigmethod, self._consumer, None) - resp = urlopen(Request(self.REQUEST_TOKEN_URL, headers=request.to_header())) - return oauth.OAuthToken.from_string(resp.read()) - - except Exception, e: - raise TweepError(e) - - def set_access_token(self, key, secret): - self.access_token = oauth.OAuthToken(key, secret) - - def get_authorization_url(self): - """Get the authorization URL to redirect the user""" - try: - # get the request token - self.request_token = self._get_request_token() - - # build auth request and return as url - request = oauth.OAuthRequest.from_token_and_callback( - token=self.request_token, http_url=self.AUTHORIZATION_URL) - return request.to_url() - - except Exception, e: - raise TweepError(e) - - def get_access_token(self, verifier): - """After user has authorized the request token, get access token with user supplied verifier.""" - try: - # build request - request = oauth.OAuthRequest.from_consumer_and_token(self._consumer, - token=self.request_token, http_url=self.ACCESS_TOKEN_URL, verifier=str(verifier)) - request.sign_request(self._sigmethod, self._consumer, self.request_token) - - # send request - resp = urlopen(Request(self.ACCESS_TOKEN_URL, headers=request.to_header())) - self.access_token = oauth.OAuthToken.from_string(resp.read()) - return self.access_token - except Exception, e: - raise TweepError(e) +class OAuthHandler(AuthHandler): + """OAuth authentication handler""" + + REQUEST_TOKEN_URL = 'http://twitter.com/oauth/request_token' + AUTHORIZATION_URL = 'http://twitter.com/oauth/authorize' + ACCESS_TOKEN_URL = 'http://twitter.com/oauth/access_token' + + def __init__(self, consumer_key, consumer_secret, callback=None): + self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret) + self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1() + self.request_token = None + self.access_token = None + self.callback = callback + + def apply_auth(self, url, method, headers, parameters): + request = oauth.OAuthRequest.from_consumer_and_token( + self._consumer, http_url=url, http_method=method, + token=self.access_token, parameters=parameters + ) + request.sign_request(self._sigmethod, self._consumer, self.access_token) + headers.update(request.to_header()) + + def _get_request_token(self): + try: + request = oauth.OAuthRequest.from_consumer_and_token( + self._consumer, http_url=self.REQUEST_TOKEN_URL, callback=self.callback + ) + request.sign_request(self._sigmethod, self._consumer, None) + resp = urlopen(Request(self.REQUEST_TOKEN_URL, headers=request.to_header())) + return oauth.OAuthToken.from_string(resp.read()) + except Exception, e: + raise TweepError(e) + + def set_access_token(self, key, secret): + self.access_token = oauth.OAuthToken(key, secret) + + def get_authorization_url(self): + """Get the authorization URL to redirect the user""" + try: + # get the request token + self.request_token = self._get_request_token() + + # build auth request and return as url + request = oauth.OAuthRequest.from_token_and_callback( + token=self.request_token, http_url=self.AUTHORIZATION_URL + ) + + return request.to_url() + except Exception, e: + raise TweepError(e) + + def get_access_token(self, verifier): + """ + After user has authorized the request token, get access token + with user supplied verifier. + """ + try: + # build request + request = oauth.OAuthRequest.from_consumer_and_token( + self._consumer, + token=self.request_token, http_url=self.ACCESS_TOKEN_URL, + verifier=str(verifier) + ) + request.sign_request(self._sigmethod, self._consumer, self.request_token) + + # send request + resp = urlopen(Request(self.ACCESS_TOKEN_URL, headers=request.to_header())) + self.access_token = oauth.OAuthToken.from_string(resp.read()) + return self.access_token + except Exception, e: + raise TweepError(e) diff --git a/tweepy/binder.py b/tweepy/binder.py index 6a55eff..3aa5e25 100644 --- a/tweepy/binder.py +++ b/tweepy/binder.py @@ -8,120 +8,122 @@ import urllib from . parsers import parse_error from . error import TweepError + def bind_api(path, parser, allowed_param=None, method='GET', require_auth=False, timeout=None, host=None): - def _call(api, *args, **kargs): - # If require auth, throw exception if credentials not provided - if require_auth and not api.auth_handler: - raise TweepError('Authentication required!') - - # check for post_data parameter - if 'post_data' in kargs: - post_data = kargs['post_data'] - del kargs['post_data'] - else: - post_data = None - - # check for headers - if 'headers' in kargs: - headers = dict(kargs['headers']) - del kargs['headers'] - else: - headers = {} - - # build parameter dict - if allowed_param: - parameters = {} - for idx, arg in enumerate(args): - try: - parameters[allowed_param[idx]] = arg - except IndexError: - raise TweepError('Too many parameters supplied!') - for k, arg in kargs.items(): - if arg is None: - continue - if k in parameters: - raise TweepError('Multiple values for parameter %s supplied!' % k) - if k not in allowed_param: - raise TweepError('Invalid parameter %s supplied!' % k) - parameters[k] = arg - else: - if len(args) > 0 or len(kargs) > 0: - raise TweepError('This method takes no parameters!') - parameters = None - - # Build url with parameters - if parameters: - url = '%s?%s' % (api.api_root + path, urllib.urlencode(parameters)) - else: - url = api.api_root + path - - # get scheme and host - if api.secure: - scheme = 'https://' - else: - scheme = 'http://' - _host = host or api.host - - # Apply authentication - if api.auth_handler: - api.auth_handler.apply_auth(scheme + _host + url, method, headers, parameters) - - # Check cache if caching enabled and method is GET - if api.cache and method == 'GET': - cache_result = api.cache.get(url, timeout) - if cache_result: - # if cache result found and not expired, return it - # must restore api reference - if isinstance(cache_result, list): - for result in cache_result: - result._api = api + def _call(api, *args, **kargs): + # If require auth, throw exception if credentials not provided + if require_auth and not api.auth_handler: + raise TweepError('Authentication required!') + + # check for post_data parameter + if 'post_data' in kargs: + post_data = kargs['post_data'] + del kargs['post_data'] + else: + post_data = None + + # check for headers + if 'headers' in kargs: + headers = dict(kargs['headers']) + del kargs['headers'] + else: + headers = {} + + # build parameter dict + if allowed_param: + parameters = {} + for idx, arg in enumerate(args): + try: + parameters[allowed_param[idx]] = arg + except IndexError: + raise TweepError('Too many parameters supplied!') + for k, arg in kargs.items(): + if arg is None: + continue + if k in parameters: + raise TweepError('Multiple values for parameter %s supplied!' % k) + if k not in allowed_param: + raise TweepError('Invalid parameter %s supplied!' % k) + parameters[k] = arg else: - cache_result._api = api - return cache_result - - # Open connection - # FIXME: add timeout - if api.secure: - conn = httplib.HTTPSConnection(_host) - else: - conn = httplib.HTTPConnection(_host) - - # Build request - conn.request(method, url, headers=headers, body=post_data) - - # Get response - resp = conn.getresponse() - - # If an error was returned, throw an exception - if resp.status != 200: - try: - error_msg = parse_error(resp.read()) - except Exception: - error_msg = "Unkown twitter error response received: status=%s" % resp.status - raise TweepError(error_msg) - - # Pass returned body into parser and return parser output - out = parser(resp.read(), api) - conn.close() - - # validate result - if api.validate: - # list of results - if isinstance(out, list) and len(out) > 0: - if hasattr(out[0], 'validate'): - for result in out: - result.validate() - # single result - else: - if hasattr(out, 'validate'): - out.validate() - - # store result in cache - if api.cache and method == 'GET': - api.cache.store(url, out) - - return out - - return _call + if len(args) > 0 or len(kargs) > 0: + raise TweepError('This method takes no parameters!') + parameters = None + + # Build url with parameters + if parameters: + url = '%s?%s' % (api.api_root + path, urllib.urlencode(parameters)) + else: + url = api.api_root + path + + # get scheme and host + if api.secure: + scheme = 'https://' + else: + scheme = 'http://' + _host = host or api.host + + # Apply authentication + if api.auth_handler: + api.auth_handler.apply_auth(scheme + _host + url, method, headers, parameters) + + # Check cache if caching enabled and method is GET + if api.cache and method == 'GET': + cache_result = api.cache.get(url, timeout) + # if cache result found and not expired, return it + if cache_result: + # must restore api reference + if isinstance(cache_result, list): + for result in cache_result: + result._api = api + else: + cache_result._api = api + return cache_result + + # Open connection + # FIXME: add timeout + if api.secure: + conn = httplib.HTTPSConnection(_host) + else: + conn = httplib.HTTPConnection(_host) + + # Build request + conn.request(method, url, headers=headers, body=post_data) + + # Get response + resp = conn.getresponse() + + # If an error was returned, throw an exception + if resp.status != 200: + try: + error_msg = parse_error(resp.read()) + except Exception: + error_msg = "Twitter error response: status code = %s" % resp.status + raise TweepError(error_msg) + + # Pass returned body into parser and return parser output + out = parser(resp.read(), api) + conn.close() + + # validate result + if api.validate: + # list of results + if isinstance(out, list) and len(out) > 0: + if hasattr(out[0], 'validate'): + for result in out: + result.validate() + # single result + else: + if hasattr(out, 'validate'): + out.validate() + + # store result in cache + if api.cache and method == 'GET': + api.cache.store(url, out) + + return out + + return _call + diff --git a/tweepy/cache.py b/tweepy/cache.py index e847ebc..e1e9dce 100644 --- a/tweepy/cache.py +++ b/tweepy/cache.py @@ -12,237 +12,243 @@ import fcntl import cPickle as pickle from . import memcache -from . error import TweepError -"""Cache interface""" + class Cache(object): + """Cache interface""" + + def __init__(self, timeout=60): + """Initialize the cache + timeout: number of seconds to keep a cached entry + """ + self.timeout = timeout + + def store(self, key, value): + """Add new record to cache + key: entry key + value: data of entry + """ + raise NotImplementedError + + def get(self, key, timeout=None): + """Get cached entry if exists and not expired + key: which entry to get + timeout: override timeout with this value [optional] + """ + raise NotImplementedError + + def count(self): + """Get count of entries currently stored in cache""" + raise NotImplementedError + + def cleanup(self): + """Delete any expired entries in cache.""" + raise NotImplementedError + + def flush(self): + """Delete all cached entries""" + raise NotImplementedError + - def __init__(self, timeout=60): - """Init the cache - timeout: number of seconds to keep a cached entry - """ - self.timeout = timeout - - def store(self, key, value): - """Add new record to cache - key: entry key - value: data of entry - """ - raise NotImplementedError - - def get(self, key, timeout=None): - """Get cached entry if exists and not expired - key: which entry to get - timeout: override timeout with this value [optional] - """ - raise NotImplementedError - - def count(self): - """Get count of entries currently stored in cache""" - raise NotImplementedError - - def cleanup(self): - """Delete any expired entries in cache.""" - raise NotImplementedError - - def flush(self): - """Delete all cached entries""" - raise NotImplementedError - -"""In-memory cache""" class MemoryCache(Cache): + """In-memory cache""" - def __init__(self, timeout=60): - Cache.__init__(self, timeout) - self._entries = {} - self.lock = threading.Lock() - - def __getstate__(self): - # pickle - return {'entries': self._entries, 'timeout': self.timeout} - - def __setstate__(self, state): - # unpickle - self.lock = threading.Lock() - self._entries = state['entries'] - self.timeout = state['timeout'] - - def _is_expired(self, entry, timeout): - return timeout > 0 and (time.time() - entry[0]) >= timeout - - def store(self, key, value): - with self.lock: - self._entries[key] = (time.time(), value) - - def get(self, key, timeout=None): - with self.lock: - # check to see if we have this key - entry = self._entries.get(key) - if not entry: - # no hit, return nothing - return None - - # use provided timeout in arguments if provided - # otherwise use the one provided during init. - _timeout = self.timeout if timeout is None else timeout - - # make sure entry is not expired - if self._is_expired(entry, _timeout): - # entry expired, delete and return nothing - del self._entries[key] - return None - - # entry found and not expired, return it - return entry[1] - - def count(self): - return len(self._entries) - - def cleanup(self): - with self.lock: - for k,v in self._entries.items(): - if self._is_expired(v, self.timeout): - del self._entries[k] - - def flush(self): - with self.lock: - self._entries.clear() - -"""File-based cache""" -class FileCache(Cache): + def __init__(self, timeout=60): + Cache.__init__(self, timeout) + self._entries = {} + self.lock = threading.Lock() - # locks used to make cache thread-safe - cache_locks = {} - - def __init__(self, cache_dir, timeout=60): - Cache.__init__(self, timeout) - if os.path.exists(cache_dir) is False: - os.mkdir(cache_dir) - self.cache_dir = cache_dir - if cache_dir in FileCache.cache_locks: - self.lock = FileCache.cache_locks[cache_dir] - else: - self.lock = threading.Lock() - FileCache.cache_locks[cache_dir] = self.lock - - def _get_path(self, key): - md5 = hashlib.md5() - md5.update(key) - return os.path.join(self.cache_dir, md5.hexdigest()) - - def _lock_file(self, path, exclusive=True): - lock_path = path + '.lock' - if exclusive is True: - f_lock = open(lock_path, 'w') - fcntl.lockf(f_lock, fcntl.LOCK_EX) - else: - f_lock = open(lock_path, 'r') - fcntl.lockf(f_lock, fcntl.LOCK_SH) - if os.path.exists(lock_path) is False: - f_lock.close() - return None - return f_lock - - def _delete_file(self, path): - os.remove(path) - os.remove(path + '.lock') - - def store(self, key, value): - path = self._get_path(key) - with self.lock: - # acquire lock and open file - f_lock = self._lock_file(path) - datafile = open(path, 'wb') - - # write data - pickle.dump((time.time(), value), datafile) - - # close and unlock file - datafile.close() - f_lock.close() - - def get(self, key, timeout=None): - return self._get(self._get_path(key), timeout) - - def _get(self, path, timeout): - if os.path.exists(path) is False: - # no record - return None - while self.lock: - # acquire lock and open - f_lock = self._lock_file(path, False) - if f_lock is None: - # does not exist - return None - datafile = open(path, 'rb') - - # read pickled object - created_time, value = pickle.load(datafile) - datafile.close() - - # check if value is expired - _timeout = self.timeout if timeout is None else timeout - if _timeout > 0 and (time.time() - created_time) >= _timeout: - # expired! delete from cache - value = None - self._delete_file(path) - - # unlock and return result - f_lock.close() - return value - - def count(self): - c = 0 - for entry in os.listdir(self.cache_dir): - if entry.endswith('.lock'): continue - c += 1 - return c - - def cleanup(self): - for entry in os.listdir(self.cache_dir): - if entry.endswith('.lock'): continue - self._get(os.path.join(self.cache_dir, entry), None) - - def flush(self): - for entry in os.listdir(self.cache_dir): - if entry.endswith('.lock'): continue - self._delete_file(os.path.join(self.cache_dir, entry)) - -"""Memcache client""" -class MemCache(Cache): + def __getstate__(self): + # pickle + return {'entries': self._entries, 'timeout': self.timeout} + + def __setstate__(self, state): + # unpickle + self.lock = threading.Lock() + self._entries = state['entries'] + self.timeout = state['timeout'] + + def _is_expired(self, entry, timeout): + return timeout > 0 and (time.time() - entry[0]) >= timeout + + def store(self, key, value): + with self.lock: + self._entries[key] = (time.time(), value) + + def get(self, key, timeout=None): + with self.lock: + # check to see if we have this key + entry = self._entries.get(key) + if not entry: + # no hit, return nothing + return None - def __init__(self, servers, timeout=60): - Cache.__init__(self, timeout) - self.client = memcache.Client(servers) + # use provided timeout in arguments if provided + # otherwise use the one provided during init. + _timeout = self.timeout if timeout is None else timeout - def store(self, key, value): - self.client.set(key, (time.time(), value), time=self.timeout) + # make sure entry is not expired + if self._is_expired(entry, _timeout): + # entry expired, delete and return nothing + del self._entries[key] + return None - def get(self, key, timeout=None): - obj = self.client.get(key) - if obj is None: - return None - created_time, value = obj + # entry found and not expired, return it + return entry[1] - # check if value is expired - _timeout = self.timeout if timeout is None else timeout - if _timeout > 0 and (time.time() - created_time) >= _timeout: - # expired! delete from cache - self.client.delete(key) - return None + def count(self): + return len(self._entries) - return value + def cleanup(self): + with self.lock: + for k, v in self._entries.items(): + if self._is_expired(v, self.timeout): + del self._entries[k] - def count(self): - count = 0 - for sid, stats in self.client.get_stats(): - count += int(stats.get('curr_items', 0)) - return count + def flush(self): + with self.lock: + self._entries.clear() - def cleanup(self): - # not implemented for this cache since server handles it - return - def flush(self): - self.client.flush_all() +class FileCache(Cache): + """File-based cache""" + + # locks used to make cache thread-safe + cache_locks = {} + + def __init__(self, cache_dir, timeout=60): + Cache.__init__(self, timeout) + if os.path.exists(cache_dir) is False: + os.mkdir(cache_dir) + self.cache_dir = cache_dir + if cache_dir in FileCache.cache_locks: + self.lock = FileCache.cache_locks[cache_dir] + else: + self.lock = threading.Lock() + FileCache.cache_locks[cache_dir] = self.lock + + def _get_path(self, key): + md5 = hashlib.md5() + md5.update(key) + return os.path.join(self.cache_dir, md5.hexdigest()) + + def _lock_file(self, path, exclusive=True): + lock_path = path + '.lock' + if exclusive is True: + f_lock = open(lock_path, 'w') + fcntl.lockf(f_lock, fcntl.LOCK_EX) + else: + f_lock = open(lock_path, 'r') + fcntl.lockf(f_lock, fcntl.LOCK_SH) + if os.path.exists(lock_path) is False: + f_lock.close() + return None + return f_lock + + def _delete_file(self, path): + os.remove(path) + os.remove(path + '.lock') + + def store(self, key, value): + path = self._get_path(key) + with self.lock: + # acquire lock and open file + f_lock = self._lock_file(path) + datafile = open(path, 'wb') + + # write data + pickle.dump((time.time(), value), datafile) + + # close and unlock file + datafile.close() + f_lock.close() + + def get(self, key, timeout=None): + return self._get(self._get_path(key), timeout) + + def _get(self, path, timeout): + if os.path.exists(path) is False: + # no record + return None + while self.lock: + # acquire lock and open + f_lock = self._lock_file(path, False) + if f_lock is None: + # does not exist + return None + datafile = open(path, 'rb') + + # read pickled object + created_time, value = pickle.load(datafile) + datafile.close() + + # check if value is expired + _timeout = self.timeout if timeout is None else timeout + if _timeout > 0 and (time.time() - created_time) >= _timeout: + # expired! delete from cache + value = None + self._delete_file(path) + + # unlock and return result + f_lock.close() + return value + + def count(self): + c = 0 + for entry in os.listdir(self.cache_dir): + if entry.endswith('.lock'): + continue + c += 1 + return c + + def cleanup(self): + for entry in os.listdir(self.cache_dir): + if entry.endswith('.lock'): + continue + self._get(os.path.join(self.cache_dir, entry), None) + + def flush(self): + for entry in os.listdir(self.cache_dir): + if entry.endswith('.lock'): + continue + self._delete_file(os.path.join(self.cache_dir, entry)) + + +class MemCache(Cache): + """Memcache client""" + + def __init__(self, servers, timeout=60): + Cache.__init__(self, timeout) + self.client = memcache.Client(servers) + + def store(self, key, value): + self.client.set(key, (time.time(), value), time=self.timeout) + + def get(self, key, timeout=None): + obj = self.client.get(key) + if obj is None: + return None + created_time, value = obj + + # check if value is expired + _timeout = self.timeout if timeout is None else timeout + if _timeout > 0 and (time.time() - created_time) >= _timeout: + # expired! delete from cache + self.client.delete(key) + return None + + return value + + def count(self): + count = 0 + for sid, stats in self.client.get_stats(): + count += int(stats.get('curr_items', 0)) + return count + + def cleanup(self): + # not implemented for this cache since server handles it + return + + def flush(self): + self.client.flush_all() diff --git a/tweepy/error.py b/tweepy/error.py index 2905bee..297a48f 100644 --- a/tweepy/error.py +++ b/tweepy/error.py @@ -2,13 +2,12 @@ # Copyright 2009 Joshua Roesslein # See LICENSE -""" -Tweepy exception -""" class TweepError(Exception): + """Tweepy exception""" - def __init__(self, reason): - self.reason = str(reason) + def __init__(self, reason): + self.reason = str(reason) + + def __str__(self): + return self.reason - def __str__(self): - return self.reason diff --git a/tweepy/models.py b/tweepy/models.py index 329a236..6e75a42 100644 --- a/tweepy/models.py +++ b/tweepy/models.py @@ -4,101 +4,118 @@ from . error import TweepError + class Model(object): - def __getstate__(self): - # pickle - pickle = {} - for k,v in self.__dict__.items(): - if k == '_api': continue # do not pickle the api reference - pickle[k] = v - return pickle - - @staticmethod - def _validate(model, attributes): - missing = [] - for attr in attributes: - if not hasattr(model, attr): - missing.append(attr) - if len(missing) > 0: - raise TweepError('Missing required attribute(s) %s' % str(missing).strip('[]')) - - def validate(self): - return + def __getstate__(self): + # pickle + pickle = {} + for k, v in self.__dict__.items(): + if k == '_api': + # do not pickle the api reference + continue + pickle[k] = v + return pickle + + @staticmethod + def _validate(model, attributes): + missing = [] + for attr in attributes: + if not hasattr(model, attr): + missing.append(attr) + if len(missing) > 0: + raise TweepError('Missing required attribute(s) %s' % \ + str(missing).strip('[]')) + + def validate(self): + return + class Status(Model): - @staticmethod - def _validate(status): - Model._validate(status, [ - 'created_at', 'id', 'text', 'source', 'truncated', 'in_reply_to_status_id', - 'in_reply_to_user_id', 'favorited', 'in_reply_to_screen_name' - ]) - if hasattr(status, 'user'): - User._validate(status.user) - def validate(self): - Status._validate(self) + @staticmethod + def _validate(status): + Model._validate(status, [ + 'created_at', 'id', 'text', 'source', 'truncated', 'in_reply_to_status_id', + 'in_reply_to_user_id', 'favorited', 'in_reply_to_screen_name' + ]) + if hasattr(status, 'user'): + User._validate(status.user) + + def validate(self): + Status._validate(self) + + def destroy(self): + return self._api.destroy_status(id=self.id) - def destroy(self): - return self._api.destroy_status(id=self.id) class User(Model): - @staticmethod - def _validate(user): - Model._validate(user, [ - 'id', 'name', 'screen_name', 'location', 'description', 'profile_image_url', - 'url', 'protected', 'followers_count', 'profile_background_color', - 'profile_text_color', 'profile_sidebar_fill_color', 'profile_sidebar_border_color', - 'friends_count', 'created_at', 'favourites_count', 'utc_offset', 'time_zone', - 'profile_background_image_url', 'statuses_count', 'notifications', 'following', - 'verified' - ]) - if hasattr(user, 'status'): - Status._validate(user.status) - def validate(self): - User._validate(self) - - def timeline(self, **kargs): - return self._api.user_timeline(**kargs) - def mentions(self, **kargs): - return self._api.mentions(**kargs) - def friends(self, **kargs): - return self._api.friends(id=self.id, **kargs) - def followers(self, **kargs): - return self._api.followers(id=self.id, **kargs) - - def follow(self): - self._api.create_friendship(user_id=self.id) - self.following = True - def unfollow(self): - self._api.destroy_friendship(user_id=self.id) - self.following = False + @staticmethod + def _validate(user): + Model._validate(user, [ + 'id', 'name', 'screen_name', 'location', 'description', 'profile_image_url', + 'url', 'protected', 'followers_count', 'profile_background_color', + 'profile_text_color', 'profile_sidebar_fill_color', + 'profile_sidebar_border_color', 'friends_count', 'created_at', + 'favourites_count', 'utc_offset', 'time_zone', + 'profile_background_image_url', 'statuses_count', + 'notifications', 'following', 'verified' + ]) + if hasattr(user, 'status'): + Status._validate(user.status) + + def validate(self): + User._validate(self) + + def timeline(self, **kargs): + return self._api.user_timeline(**kargs) + + def mentions(self, **kargs): + return self._api.mentions(**kargs) + + def friends(self, **kargs): + return self._api.friends(id=self.id, **kargs) + + def followers(self, **kargs): + return self._api.followers(id=self.id, **kargs) + + def follow(self): + self._api.create_friendship(user_id=self.id) + self.following = True + + def unfollow(self): + self._api.destroy_friendship(user_id=self.id) + self.following = False + class DirectMessage(Model): - def destroy(self): - return self._api.destroy_direct_message(id=self.id) + def destroy(self): + return self._api.destroy_direct_message(id=self.id) + class Friendship(Model): - pass + pass + class SavedSearch(Model): - pass + pass + class SearchResult(Model): - pass + pass # link up default model implementations. models = { - 'status': Status, - 'user': User, - 'direct_message': DirectMessage, - 'friendship': Friendship, - 'saved_search': SavedSearch, - 'search_result': SearchResult + 'status': Status, + 'user': User, + 'direct_message': DirectMessage, + 'friendship': Friendship, + 'saved_search': SavedSearch, + 'search_result': SearchResult } diff --git a/tweepy/parsers.py b/tweepy/parsers.py index bbde005..e1dce1e 100644 --- a/tweepy/parsers.py +++ b/tweepy/parsers.py @@ -7,182 +7,205 @@ from datetime import datetime from . models import models try: - import json + import json except ImportError: - import simplejson as json + import simplejson as json + def parse_json(data, api): - return json.loads(data) + return json.loads(data) + def parse_return_true(data, api): - return True + return True + def parse_none(data, api): - return None + return None + def parse_error(data): - return json.loads(data)['error'] + return json.loads(data)['error'] + def _parse_datetime(str): - return datetime.strptime(str, '%a %b %d %H:%M:%S +0000 %Y') + return datetime.strptime(str, '%a %b %d %H:%M:%S +0000 %Y') + def _parse_search_datetime(str): - return datetime.strptime(str, '%a, %d %b %Y %H:%M:%S +0000') + return datetime.strptime(str, '%a, %d %b %Y %H:%M:%S +0000') + def _parse_html_value(html): - return html[html.find('>')+1:html.rfind('<')] + return html[html.find('>')+1:html.rfind('<')] + def _parse_a_href(atag): - return atag[atag.find('"')+1:atag.find('>')-1] + return atag[atag.find('"')+1:atag.find('>')-1] + def _parse_user(obj, api): - user = models['user']() - user._api = api - for k,v in obj.items(): - if k == 'created_at': - setattr(user, k, _parse_datetime(v)) - elif k == 'status': - setattr(user, k, _parse_status(v, api)) - elif k == 'following': - # twitter sets this to null if it is false - if v is True: - setattr(user, k, True) - else: - setattr(user, k, False) - else: - setattr(user, k, v) - return user + user = models['user']() + user._api = api + for k, v in obj.items(): + if k == 'created_at': + setattr(user, k, _parse_datetime(v)) + elif k == 'status': + setattr(user, k, _parse_status(v, api)) + elif k == 'following': + # twitter sets this to null if it is false + if v is True: + setattr(user, k, True) + else: + setattr(user, k, False) + else: + setattr(user, k, v) + return user + def parse_user(data, api): - return _parse_user(json.loads(data), api) + return _parse_user(json.loads(data), api) + def parse_users(data, api): - users = [] - for obj in json.loads(data): - users.append(_parse_user(obj, api)) - return users + users = [] + for obj in json.loads(data): + users.append(_parse_user(obj, api)) + return users + def _parse_status(obj, api): - status = models['status']() - status._api = api - for k,v in obj.items(): - if k == 'user': - setattr(status, 'author', _parse_user(v, api)) - elif k == 'created_at': - setattr(status, k, _parse_datetime(v)) - elif k == 'source': - setattr(status, k, _parse_html_value(v)) - setattr(status, 'source_url', _parse_a_href(v)) - else: - setattr(status, k, v) - return status + status = models['status']() + status._api = api + for k, v in obj.items(): + if k == 'user': + setattr(status, 'author', _parse_user(v, api)) + elif k == 'created_at': + setattr(status, k, _parse_datetime(v)) + elif k == 'source': + setattr(status, k, _parse_html_value(v)) + setattr(status, 'source_url', _parse_a_href(v)) + else: + setattr(status, k, v) + return status + def parse_status(data, api): - return _parse_status(json.loads(data), api) + return _parse_status(json.loads(data), api) + def parse_statuses(data, api): - statuses = [] - for obj in json.loads(data): - statuses.append(_parse_status(obj, api)) - return statuses + statuses = [] + for obj in json.loads(data): + statuses.append(_parse_status(obj, api)) + return statuses + def _parse_dm(obj, api): - dm = models['direct_message']() - dm._api = api - for k,v in obj.items(): - if k == 'sender' or k == 'recipient': - setattr(dm, k, _parse_user(v, api)) - elif k == 'created_at': - setattr(dm, k, _parse_datetime(v)) - else: - setattr(dm, k, v) - return dm + dm = models['direct_message']() + dm._api = api + for k, v in obj.items(): + if k == 'sender' or k == 'recipient': + setattr(dm, k, _parse_user(v, api)) + elif k == 'created_at': + setattr(dm, k, _parse_datetime(v)) + else: + setattr(dm, k, v) + return dm + def parse_dm(data, api): - return _parse_dm(json.loads(data), api) + return _parse_dm(json.loads(data), api) + def parse_directmessages(data, api): - directmessages = [] - for obj in json.loads(data): - directmessages.append(_parse_dm(obj, api)) - return directmessages + directmessages = [] + for obj in json.loads(data): + directmessages.append(_parse_dm(obj, api)) + return directmessages + def parse_friendship(data, api): - relationship = json.loads(data)['relationship'] + relationship = json.loads(data)['relationship'] - # parse source - source = models['friendship']() - for k,v in relationship['source'].items(): - setattr(source, k, v) + # parse source + source = models['friendship']() + for k, v in relationship['source'].items(): + setattr(source, k, v) - # parse target - target = models['friendship']() - for k,v in relationship['target'].items(): - setattr(target, k, v) + # parse target + target = models['friendship']() + for k, v in relationship['target'].items(): + setattr(target, k, v) + + return source, target - return source, target def _parse_saved_search(obj, api): - ss = models['saved_search']() - ss._api = api - for k,v in obj.items(): - if k == 'created_at': - setattr(ss, k, _parse_datetime(v)) - else: - setattr(ss, k, v) - return ss + ss = models['saved_search']() + ss._api = api + for k, v in obj.items(): + if k == 'created_at': + setattr(ss, k, _parse_datetime(v)) + else: + setattr(ss, k, v) + return ss + def parse_saved_search(data, api): - return _parse_saved_search(json.loads(data), api) + return _parse_saved_search(json.loads(data), api) + def parse_saved_searches(data, api): - saved_searches = [] - saved_search = models['saved_search']() - for obj in json.loads(data): - saved_searches.append(_parse_saved_search(obj, api)) - return saved_searches + saved_searches = [] + saved_search = models['saved_search']() + for obj in json.loads(data): + saved_searches.append(_parse_saved_search(obj, api)) + return saved_searches + def _parse_search_result(obj, api): - result = models['search_result']() - for k,v in obj.items(): - if k == 'created_at': - setattr(result, k, _parse_search_datetime(v)) - else: - setattr(result, k, v) - return result + result = models['search_result']() + for k, v in obj.items(): + if k == 'created_at': + setattr(result, k, _parse_search_datetime(v)) + else: + setattr(result, k, v) + return result + def parse_search_results(data, api): - results = json.loads(data)['results'] - result_objects = [] - for obj in results: - result_objects.append(_parse_search_result(obj, api)) - return result_objects + results = json.loads(data)['results'] + result_objects = [] + for obj in results: + result_objects.append(_parse_search_result(obj, api)) + return result_objects + def parse_trend_results(data, api): - return json.loads(data)['trends'] - + return json.loads(data)['trends'] diff --git a/tweepy/streaming.py b/tweepy/streaming.py index 9a97850..fcfc7fc 100644 --- a/tweepy/streaming.py +++ b/tweepy/streaming.py @@ -13,159 +13,161 @@ from . api import API from . error import TweepError try: - import json + import json except ImportError: - import simplejson as json + import simplejson as json STREAM_VERSION = 1 + class StreamListener(object): - def on_status(self, status): - """Called when a new status arrives""" - return + def on_status(self, status): + """Called when a new status arrives""" + return + + def on_delete(self, status_id, user_id): + """Called when a delete notice arrives for a status""" + return - def on_delete(self, status_id, user_id): - """Called when a delete notice arrives for a status""" - return + def on_limit(self, track): + """Called when a limitation notice arrvies""" + return - def on_limit(self, track): - """Called when a limitation notice arrvies""" - return + def on_error(self, status_code): + """Called when a non-200 status code is returned""" + return False - def on_error(self, status_code): - """Called when a non-200 status code is returned""" - return False + def on_timeout(self): + """Called when stream connection times out""" + return - def on_timeout(self): - """Called when stream connection times out""" - return class Stream(object): - host = 'stream.twitter.com' - - def __init__(self, username, password, listener, timeout=5.0, retry_count = None, - retry_time = 10.0, snooze_time = 5.0, buffer_size=1500): - self.auth = BasicAuthHandler(username, password) - self.running = False - self.timeout = timeout - self.retry_count = retry_count - self.retry_time = retry_time - self.snooze_time = snooze_time - self.buffer_size = buffer_size - self.listener = listener - self.api = API() - - def _run(self): - # setup - headers = {} - self.auth.apply_auth(None, None, headers, None) - - # enter loop - error_counter = 0 - conn = None - while self.running: - if self.retry_count and error_counter > self.retry_count: - # quit if error count greater than retry count - break - try: - conn = httplib.HTTPConnection(self.host) - conn.connect() - conn.sock.settimeout(self.timeout) - conn.request('POST', self.url, headers=headers) - resp = conn.getresponse() - if resp.status != 200: - if self.listener.on_error(resp.status) is False: - break - error_counter += 1 - sleep(self.retry_time) - else: - error_counter = 0 - self._read_loop(resp) - except timeout: - if self.listener.on_timeout() == False: - break + host = 'stream.twitter.com' + + def __init__(self, username, password, listener, timeout=5.0, retry_count = None, + retry_time = 10.0, snooze_time = 5.0, buffer_size=1500): + self.auth = BasicAuthHandler(username, password) + self.running = False + self.timeout = timeout + self.retry_count = retry_count + self.retry_time = retry_time + self.snooze_time = snooze_time + self.buffer_size = buffer_size + self.listener = listener + self.api = API() + + def _run(self): + # setup + headers = {} + self.auth.apply_auth(None, None, headers, None) + + # enter loop + error_counter = 0 + conn = None + while self.running: + if self.retry_count and error_counter > self.retry_count: + # quit if error count greater than retry count + break + try: + conn = httplib.HTTPConnection(self.host) + conn.connect() + conn.sock.settimeout(self.timeout) + conn.request('POST', self.url, headers=headers) + resp = conn.getresponse() + if resp.status != 200: + if self.listener.on_error(resp.status) is False: + break + error_counter += 1 + sleep(self.retry_time) + else: + error_counter = 0 + self._read_loop(resp) + except timeout: + if self.listener.on_timeout() == False: + break + if self.running is False: + break + conn.close() + sleep(self.snooze_time) + except Exception: + # any other exception is fatal, so kill loop + break + + # cleanup + self.running = False + if conn: + conn.close() + + def _read_loop(self, resp): + data = '' + while self.running: + if resp.isclosed(): + break + + # read length + length = '' + while True: + c = resp.read(1) + if c == '\n': + break + length += c + length = length.strip() + if length.isdigit(): + length = int(length) + else: + continue + + # read data + data = resp.read(length) + + # turn json data into status object + if 'in_reply_to_status_id' in data: + status = parse_status(data, self.api) + if self.listener.on_status(status) == False: + self.running = False + elif 'delete' in data: + delete = json.loads(data)['delete']['status'] + if self.listener.on_delete(delete['id'], delete['user_id']) == False: + self.running = False + elif 'limit' in data: + if self.listener.on_limit(json.loads(data)['limit']['track']) == False: + self.running = False + + def firehose(self, count=None): + if self.running: + raise TweepError('Stream object already connected!') + self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION + if count: + self.url += '&count=%s' % count + self.running = True + Thread(target=self._run).start() + + def sample(self, count=None): + if self.running: + raise TweepError('Stream object already connected!') + self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION + if count: + self.url += '&count=%s' % count + self.running = True + Thread(target=self._run).start() + + def filter(self, follow=None, track=None): + if self.running: + raise TweepError('Stream object already connected!') + self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION + if follow: + self.url += '&follow=%s' % ','.join(follow) + if track: + self.url += '&track=%s' % ','.join(track) + print self.url + self.running = True + Thread(target=self._run).start() + + def disconnect(self): if self.running is False: - break - conn.close() - sleep(self.snooze_time) - except Exception: - # any other exception is fatal, so kill loop - break - - # cleanup - self.running = False - if conn: - conn.close() - - def _read_loop(self, resp): - data = '' - while self.running: - if resp.isclosed(): - break - - # read length - length = '' - while True: - c = resp.read(1) - if c == '\n': - break - length += c - length = length.strip() - if length.isdigit(): - length = int(length) - else: - continue - - # read data - data = resp.read(length) - - # turn json data into status object - if 'in_reply_to_status_id' in data: - status = parse_status(data, self.api) - if self.listener.on_status(status) == False: - self.running = False - elif 'delete' in data: - delete = json.loads(data)['delete']['status'] - if self.listener.on_delete(delete['id'], delete['user_id']) == False: - self.running = False - elif 'limit' in data: - if self.listener.on_limit(json.loads(data)['limit']['track']) == False: - self.running = False - - def firehose(self, count=None, ): - if self.running: - raise TweepError('Stream object already connected!') - self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION - if count: - self.url += '&count=%s' % count - self.running = True - Thread(target=self._run).start() - - def sample(self, count=None): - if self.running: - raise TweepError('Stream object already connected!') - self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION - if count: - self.url += '&count=%s' % count - self.running = True - Thread(target=self._run).start() - - def filter(self, follow=None, track=None): - if self.running: - raise TweepError('Stream object already connected!') - self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION - if follow: - self.url += '&follow=%s' % ','.join(follow) - if track: - self.url += '&track=%s' % ','.join(track) - print self.url - self.running = True - Thread(target=self._run).start() - - def disconnect(self): - if self.running is False: - return - self.running = False - + return + self.running = False + diff --git a/tweepyshell.py b/tweepyshell.py index df4b479..1d2eff3 100755 --- a/tweepyshell.py +++ b/tweepyshell.py @@ -14,13 +14,13 @@ using the credentials provided. """ if len(sys.argv) != 3: - print 'Usage: tweepyshell ' - exit(1) + print 'Usage: tweepyshell ' + exit(1) api = tweepy.API.new(auth='basic', username=sys.argv[1], password=sys.argv[2]) if api.verify_credentials() is False: - print 'Invalid username and/or password!' - exit(1) + print 'Invalid username and/or password!' + exit(1) code.interact('', local={'tweepy': tweepy, 'api': api})