import tweepy
class StreamWatcherListener(tweepy.StreamListener):
- def on_status(self, status):
- print status.text
+ def on_status(self, status):
+ print status.text
- def on_error(self, status_code):
- print 'An error has occured! Status code = %s' % status_code
- return True # keep stream alive
+ def on_error(self, status_code):
+ print 'An error has occured! Status code = %s' % status_code
+ return True # keep stream alive
- def on_timeout(self):
- print 'Snoozing Zzzzzz'
+ def on_timeout(self):
+ print 'Snoozing Zzzzzz'
# Prompt for login credentials and setup stream object
username = raw_input('Twitter username: ')
# Prompt for mode of streaming and connect
while True:
- mode = raw_input('Mode? [sample/filter] ')
- if mode == 'sample':
- stream.sample()
- break
- elif mode == 'filter':
- follow_list = raw_input('Users to follow (comma separated): ').strip()
- track_list = raw_input('Keywords to track (comma seperated): ').strip()
- if follow_list:
- follow_list = [u for u in follow_list.split(',')]
- else:
- follow_list = None
- if track_list:
- track_list = [k for k in track_list.split(',')]
+ mode = raw_input('Mode? [sample/filter] ')
+ if mode == 'sample':
+ stream.sample()
+ break
+ elif mode == 'filter':
+ follow_list = raw_input('Users to follow (comma separated): ').strip()
+ track_list = raw_input('Keywords to track (comma seperated): ').strip()
+ if follow_list:
+ follow_list = [u for u in follow_list.split(',')]
+ else:
+ follow_list = None
+ if track_list:
+ track_list = [k for k in track_list.split(',')]
+ else:
+ track_list = None
+ stream.filter(follow_list, track_list)
+ break
- track_list = None
- stream.filter(follow_list, track_list)
- break
- else:
- print 'Invalid choice! Try again.'
+ print 'Invalid choice! Try again.'
# Run in a loop until termination
while True:
- try:
- if stream.running is False:
- print 'Stream stopped!'
- break
- time.sleep(1)
- except KeyboardInterrupt:
- break
+ try:
+ if stream.running is False:
+ print 'Stream stopped!'
+ break
+ time.sleep(1)
+ except KeyboardInterrupt:
+ break
# Shutdown connection
print 'Bye!'
import unittest
import random
from time import sleep
+import os
from tweepy import *
# Must supply twitter account credentials for tests
-username = ''
-password = ''
+username = 'tweebly'
+password = 'josh1987twitter'
"""Unit tests"""
-# API tests
class TweepyAPITests(unittest.TestCase):
- def setUp(self):
- self.api = API(BasicAuthHandler(username, password))
- def testpublictimeline(self):
- self.assertEqual(len(self.api.public_timeline()), 20)
- def testfriendstimeline(self):
- self.assert_(len(self.api.friends_timeline()) > 0)
- def testusertimeline(self):
- s = self.api.user_timeline(screen_name='twitter')
- self.assert_(len(s) > 0)
- self.assertEqual(s[0].author.screen_name, 'twitter')
- def testmentions(self):
- s = self.api.mentions()
- self.assert_(len(s) > 0)
- self.assert_(s[0].text.find(username) >= 0)
- def testgetstatus(self):
- s = self.api.get_status(id=123)
- self.assertEqual(, 17)
- def testupdateanddestroystatus(self):
- # test update
- text = 'testing %i' % random.randint(0,1000)
- update = self.api.update_status(status=text)
- self.assertEqual(update.text, text)
- # test destroy
- deleted = self.api.destroy_status(
- self.assertEqual(,
- def testgetuser(self):
- u = self.api.get_user(screen_name='twitter')
- self.assertEqual(u.screen_name, 'twitter')
- def testme(self):
- me =
- self.assertEqual(me.screen_name, username)
- def testfriends(self):
- friends = self.api.friends()
- self.assert_(len(friends) > 0)
- def testfollowers(self):
- followers = self.api.followers()
- self.assert_(len(followers) > 0)
- def testdirectmessages(self):
- dms = self.api.direct_messages()
- self.assert_(len(dms) > 0)
- def testsendanddestroydirectmessage(self):
- # send
- sent_dm = self.api.send_direct_message(username, 'test message')
- self.assertEqual(sent_dm.text, 'test message')
- self.assertEqual(sent_dm.sender.screen_name, username)
- self.assertEqual(sent_dm.recipient.screen_name, username)
- # destroy
- destroyed_dm = self.api.destroy_direct_message(
- self.assertEqual(destroyed_dm.text, sent_dm.text)
- self.assertEqual(,
- self.assertEqual(destroyed_dm.sender.screen_name, username)
- self.assertEqual(destroyed_dm.recipient.screen_name, username)
- def testcreatefriendship(self):
- friend = self.api.create_friendship('twitter')
- self.assertEqual(friend.screen_name, 'twitter')
- self.assertTrue(self.api.exists_friendship(username, 'twitter'))
- def testdestroyfriendship(self):
- enemy = self.api.destroy_friendship('twitter')
- self.assertEqual(enemy.screen_name, 'twitter')
- self.assertFalse(self.api.exists_friendship(username, 'twitter'))
- def testshowfriendship(self):
- source, target = self.api.show_friendship(target_screen_name='twtiter')
- self.assert_(isinstance(source, Friendship))
- self.assert_(isinstance(target, Friendship))
-# Authentication tests
+ def setUp(self):
+ self.api = API(BasicAuthHandler(username, password))
+ def testpublictimeline(self):
+ self.assertEqual(len(self.api.public_timeline()), 20)
+ def testfriendstimeline(self):
+ self.assert_(len(self.api.friends_timeline()) > 0)
+ def testusertimeline(self):
+ s = self.api.user_timeline(screen_name='twitter')
+ self.assert_(len(s) > 0)
+ self.assertEqual(s[0].author.screen_name, 'twitter')
+ def testmentions(self):
+ s = self.api.mentions()
+ self.assert_(len(s) > 0)
+ self.assert_(s[0].text.find(username) >= 0)
+ def testgetstatus(self):
+ s = self.api.get_status(id=123)
+ self.assertEqual(, 17)
+ def testupdateanddestroystatus(self):
+ # test update
+ text = 'testing %i' % random.randint(0, 1000)
+ update = self.api.update_status(status=text)
+ self.assertEqual(update.text, text)
+ # test destroy
+ deleted = self.api.destroy_status(
+ self.assertEqual(,
+ def testgetuser(self):
+ u = self.api.get_user(screen_name='twitter')
+ self.assertEqual(u.screen_name, 'twitter')
+ def testme(self):
+ me =
+ self.assertEqual(me.screen_name, username)
+ def testfriends(self):
+ friends = self.api.friends()
+ self.assert_(len(friends) > 0)
+ def testfollowers(self):
+ followers = self.api.followers()
+ self.assert_(len(followers) > 0)
+ def testdirectmessages(self):
+ dms = self.api.direct_messages()
+ self.assert_(len(dms) > 0)
+ def testsendanddestroydirectmessage(self):
+ # send
+ sent_dm = self.api.send_direct_message(username, 'test message')
+ self.assertEqual(sent_dm.text, 'test message')
+ self.assertEqual(sent_dm.sender.screen_name, username)
+ self.assertEqual(sent_dm.recipient.screen_name, username)
+ # destroy
+ destroyed_dm = self.api.destroy_direct_message(
+ self.assertEqual(destroyed_dm.text, sent_dm.text)
+ self.assertEqual(,
+ self.assertEqual(destroyed_dm.sender.screen_name, username)
+ self.assertEqual(destroyed_dm.recipient.screen_name, username)
+ def testcreatefriendship(self):
+ friend = self.api.create_friendship('twitter')
+ self.assertEqual(friend.screen_name, 'twitter')
+ self.assertTrue(self.api.exists_friendship(username, 'twitter'))
+ def testdestroyfriendship(self):
+ enemy = self.api.destroy_friendship('twitter')
+ self.assertEqual(enemy.screen_name, 'twitter')
+ self.assertFalse(self.api.exists_friendship(username, 'twitter'))
+ def testshowfriendship(self):
+ source, target = self.api.show_friendship(target_screen_name='twtiter')
+ self.assert_(isinstance(source, Friendship))
+ self.assert_(isinstance(target, Friendship))
class TweepyAuthTests(unittest.TestCase):
- consumer_key = 'ZbzSsdQj7t68VYlqIFvdcA'
- consumer_secret = '4yDWgrBiRs2WIx3bfvF9UWCRmtQ2YKpKJKBahtZcU'
+ consumer_key = 'ZbzSsdQj7t68VYlqIFvdcA'
+ consumer_secret = '4yDWgrBiRs2WIx3bfvF9UWCRmtQ2YKpKJKBahtZcU'
- def testoauth(self):
- auth = OAuthHandler(self.consumer_key, self.consumer_secret)
+ def testoauth(self):
+ auth = OAuthHandler(self.consumer_key, self.consumer_secret)
- # test getting access token
- auth_url = auth.get_authorization_url()
- self.assert_(auth_url.startswith(''))
- print 'Please authorize: ' + auth_url
- verifier = raw_input('PIN: ').strip()
- self.assert_(len(verifier) > 0)
- access_token = auth.get_access_token(verifier)
- self.assert_(access_token is not None)
+ # test getting access token
+ auth_url = auth.get_authorization_url()
+ self.assert_(auth_url.startswith(''))
+ print 'Please authorize: ' + auth_url
+ verifier = raw_input('PIN: ').strip()
+ self.assert_(len(verifier) > 0)
+ access_token = auth.get_access_token(verifier)
+ self.assert_(access_token is not None)
- # build api object test using oauth
- api = API(auth)
- api.update_status('test %i' % random.randint(0,1000))
+ # build api object test using oauth
+ api = API(auth)
+ api.update_status('test %i' % random.randint(0, 1000))
- def testbasicauth(self):
- auth = BasicAuthHandler(username, password)
+ def testbasicauth(self):
+ auth = BasicAuthHandler(username, password)
- # test accessing twitter API
- api = API(auth)
- api.update_status('test %i' % random.randint(1,1000))
+ # test accessing twitter API
+ api = API(auth)
+ api.update_status('test %i' % random.randint(1, 1000))
-# Cache tests
class TweepyCacheTests(unittest.TestCase):
- timeout = 2.0
- memcache_servers = [''] # must be running for test to pass
- def _run_tests(self, do_cleanup=True):
- # test store and get
-'testkey', 'testvalue')
- self.assertEqual(self.cache.get('testkey'), 'testvalue', 'Stored value does not match retrieved value')
- # test timeout
- sleep(self.timeout)
- self.assertEqual(self.cache.get('testkey'), None, 'Cache entry should have expired')
- # test cleanup
- if do_cleanup:
-'testkey', 'testvalue')
- sleep(self.timeout)
- self.cache.cleanup()
- self.assertEqual(self.cache.count(), 0, 'Cache cleanup failed')
- # test count
- for i in range(0,20):
-'testkey%i' % i, 'testvalue')
- self.assertEqual(self.cache.count(), 20, 'Count is wrong')
- # test flush
- self.cache.flush()
- self.assertEqual(self.cache.count(), 0, 'Cache failed to flush')
- def testmemorycache(self):
- self.cache = MemoryCache(timeout=self.timeout)
- self._run_tests()
- def testfilecache(self):
- os.mkdir('cache_test_dir')
- self.cache = FileCache('cache_test_dir', self.timeout)
- self._run_tests()
- self.cache.flush()
- os.rmdir('cache_test_dir')
- def testmemcache(self):
- self.cache = MemCache(self.memcache_servers, self.timeout)
- self._run_tests(do_cleanup=False)
+ timeout = 2.0
+ memcache_servers = [''] # must be running for test to pass
+ def _run_tests(self, do_cleanup=True):
+ # test store and get
+'testkey', 'testvalue')
+ self.assertEqual(self.cache.get('testkey'), 'testvalue',
+ 'Stored value does not match retrieved value')
+ # test timeout
+ sleep(self.timeout)
+ self.assertEqual(self.cache.get('testkey'), None,
+ 'Cache entry should have expired')
+ # test cleanup
+ if do_cleanup:
+'testkey', 'testvalue')
+ sleep(self.timeout)
+ self.cache.cleanup()
+ self.assertEqual(self.cache.count(), 0, 'Cache cleanup failed')
+ # test count
+ for i in range(0, 20):
+'testkey%i' % i, 'testvalue')
+ self.assertEqual(self.cache.count(), 20, 'Count is wrong')
+ # test flush
+ self.cache.flush()
+ self.assertEqual(self.cache.count(), 0, 'Cache failed to flush')
+ def testmemorycache(self):
+ self.cache = MemoryCache(timeout=self.timeout)
+ self._run_tests()
+ def testfilecache(self):
+ os.mkdir('cache_test_dir')
+ self.cache = FileCache('cache_test_dir', self.timeout)
+ self._run_tests()
+ self.cache.flush()
+ os.rmdir('cache_test_dir')
+ def testmemcache(self):
+ self.cache = MemCache(self.memcache_servers, self.timeout)
+ self._run_tests(do_cleanup=False)
if __name__ == '__main__':
- unittest.main()
+ unittest.main()
public_timeline = no_auth_api.public_timeline()
print 'Public timeline...'
for status in public_timeline:
- print status.text
- print 'from: %s' %
+ print status.text
+ print 'from: %s' %
Tweepy provides a non-authenticated instance of the API for you already
friends_timeline = auth_api.friends_timeline()
print 'Friends timeline...'
for status in friends_timeline:
- print status.text
- print 'from: %s' %
+ print status.text
+ print 'from: %s' %
""" The End
class MyStatus(tweepy.Status):
- def length(self):
- """Return length of status text"""
- return len(self.text)
+ def length(self):
+ """Return length of status text"""
+ return len(self.text)
We must now register our implementation of Status with tweepy.
Here's a demo...
- u = tweepy.api.get_user('twitter')
+ u = tweepy.api.get_user('twitter')
except TweepError, e:
- # will be raised if user is invalid OR request failed
- print 'Failed to get user: %s' % e
+ # will be raised if user is invalid OR request failed
+ print 'Failed to get user: %s' % e
To disable auto validation...
u = tweepy.api.get_user('twitter')
friends = u.friends()
for friend in friends:
- print friend.screen_name
+ print friend.screen_name
To learn about all shortcuts check out the reference documentation.
Here is an example:
- tweepy.api.update_status('this will fail since we are not authenticated!')
+ tweepy.api.update_status('this will fail since we are not authenticated!')
except tweepy.TweepError, e:
- print 'Failed to update! %s' % e
+ print 'Failed to update! %s' % e
TweepError's can be casted to string format which will
# Global, unauthenticated instance of API
api = API()
from . auth import BasicAuthHandler, OAuthHandler
from tweepy.parsers import *
-"""Twitter API"""
class API(object):
+ """Twitter API"""
+ def __init__(self, auth_handler=None, host='', cache=None,
+ secure=False, api_root='', validate=True):
+ # you may access these freely
+ self.auth_handler = auth_handler
+ = host
+ self.api_root = api_root
+ self.cache = cache
+ = secure
+ self.validate = validate
+ # not a good idea to touch these
+ self._username = None
+ @staticmethod
+ def new(auth='basic', *args, **kargs):
+ if auth == 'basic':
+ return API(BasicAuthHandler(*args, **kargs))
+ elif auth == 'oauth':
+ return API(OAuthHandler(*args, **kargs))
+ else:
+ raise TweepError('Invalid auth type')
+ """Get public timeline"""
+ public_timeline = bind_api(
+ path = '/statuses/public_timeline.json',
+ parser = parse_statuses,
+ allowed_param = []
+ )
+ """Get friends timeline"""
+ friends_timeline = bind_api(
+ path = '/statuses/friends_timeline.json',
+ parser = parse_statuses,
+ allowed_param = ['since_id', 'max_id', 'count', 'page'],
+ require_auth = True
+ )
+ """Get user timeline"""
+ user_timeline = bind_api(
+ path = '/statuses/user_timeline.json',
+ parser = parse_statuses,
+ allowed_param = ['id', 'user_id', 'screen_name', 'since_id',
+ 'max_id', 'count', 'page']
+ )
+ """Get mentions"""
+ mentions = bind_api(
+ path = '/statuses/mentions.json',
+ parser = parse_statuses,
+ allowed_param = ['since_id', 'max_id', 'count', 'page'],
+ require_auth = True
+ )
+ """Show status"""
+ get_status = bind_api(
+ path = '/statuses/show.json',
+ parser = parse_status,
+ allowed_param = ['id']
+ )
+ """Update status"""
+ update_status = bind_api(
+ path = '/statuses/update.json',
+ method = 'POST',
+ parser = parse_status,
+ allowed_param = ['status', 'in_reply_to_status_id'],
+ require_auth = True
+ )
+ """Destroy status"""
+ destroy_status = bind_api(
+ path = '/statuses/destroy.json',
+ method = 'DELETE',
+ parser = parse_status,
+ allowed_param = ['id'],
+ require_auth = True
+ )
+ """Show user"""
+ get_user = bind_api(
+ path = '/users/show.json',
+ parser = parse_user,
+ allowed_param = ['id', 'user_id', 'screen_name']
+ )
+ """Get authenticated user"""
+ def me(self):
+ # if username not fetched, go get it...
+ if self._username is None:
+ if self.auth_handler is None:
+ raise TweepError('Authentication required')
+ try:
+ user = bind_api(
+ path = '/account/verify_credentials.json',
+ parser = parse_user
+ )(self)
+ except TweepError, e:
+ raise TweepError('Failed to fetch username: %s' % e)
+ self._username = user.screen_name
+ return self.get_user(screen_name=self._username)
+ """Show friends"""
+ friends = bind_api(
+ path = '/statuses/friends.json',
+ parser = parse_users,
+ allowed_param = ['id', 'user_id', 'screen_name', 'page']
+ )
+ """Show followers"""
+ followers = bind_api(
+ path = '/statuses/followers.json',
+ parser = parse_users,
+ allowed_param = ['id', 'user_id', 'screen_name', 'page'],
+ require_auth = True
+ )
+ """Get direct messages"""
+ direct_messages = bind_api(
+ path = '/direct_messages.json',
+ parser = parse_directmessages,
+ allowed_param = ['since_id', 'max_id', 'count', 'page'],
+ require_auth = True
+ )
+ """Sent direct messages"""
+ sent_direct_messages = bind_api(
+ path = '/direct_messages/sent.json',
+ parser = parse_directmessages,
+ allowed_param = ['since_id', 'max_id', 'count', 'page'],
+ require_auth = True
+ )
- def __init__(self, auth_handler=None, host='', cache=None,
- secure=False, api_root='', validate=True):
- # you may access these freely
- self.auth_handler = auth_handler
- = host
- self.api_root = api_root
- self.cache = cache
- = secure
- self.validate = validate
- # not a good idea to touch these
- self._username = None
- @staticmethod
- def new(auth='basic', *args, **kargs):
- if auth == 'basic':
- return API(BasicAuthHandler(*args, **kargs))
- elif auth == 'oauth':
- return API(OAuthHandler(*args, **kargs))
- else:
- raise TweepError('Invalid auth type')
- """Get public timeline"""
- public_timeline = bind_api(
- path = '/statuses/public_timeline.json',
- parser = parse_statuses,
- allowed_param = []
- )
- """Get friends timeline"""
- friends_timeline = bind_api(
- path = '/statuses/friends_timeline.json',
- parser = parse_statuses,
- allowed_param = ['since_id', 'max_id', 'count', 'page'],
- require_auth = True
- )
- """Get user timeline"""
- user_timeline = bind_api(
- path = '/statuses/user_timeline.json',
- parser = parse_statuses,
- allowed_param = ['id', 'user_id', 'screen_name', 'since_id',
- 'max_id', 'count', 'page']
- )
- """Get mentions"""
- mentions = bind_api(
- path = '/statuses/mentions.json',
- parser = parse_statuses,
- allowed_param = ['since_id', 'max_id', 'count', 'page'],
- require_auth = True
- )
- """Show status"""
- get_status = bind_api(
- path = '/statuses/show.json',
- parser = parse_status,
- allowed_param = ['id']
- )
- """Update status"""
- update_status = bind_api(
- path = '/statuses/update.json',
- method = 'POST',
- parser = parse_status,
- allowed_param = ['status', 'in_reply_to_status_id'],
- require_auth = True
- )
- """Destroy status"""
- destroy_status = bind_api(
- path = '/statuses/destroy.json',
- method = 'DELETE',
- parser = parse_status,
- allowed_param = ['id'],
- require_auth = True
- )
- """Show user"""
- get_user = bind_api(
- path = '/users/show.json',
- parser = parse_user,
- allowed_param = ['id', 'user_id', 'screen_name']
- )
- """Get authenticated user"""
- def me(self):
- # if username not fetched, go get it...
- if self._username is None:
- if self.auth_handler is None:
- raise TweepError('Authentication required')
- try:
- user = bind_api(path='/account/verify_credentials.json', parser=parse_user)(self)
- except TweepError, e:
- raise TweepError('Failed to fetch username: %s' % e)
- self._username = user.screen_name
- return self.get_user(screen_name=self._username)
- """Show friends"""
- friends = bind_api(
- path = '/statuses/friends.json',
- parser = parse_users,
- allowed_param = ['id', 'user_id', 'screen_name', 'page']
- )
- """Show followers"""
- followers = bind_api(
- path = '/statuses/followers.json',
- parser = parse_users,
- allowed_param = ['id', 'user_id', 'screen_name', 'page'],
- require_auth = True
- )
- """Get direct messages"""
- direct_messages = bind_api(
- path = '/direct_messages.json',
- parser = parse_directmessages,
- allowed_param = ['since_id', 'max_id', 'count', 'page'],
- require_auth = True
- )
- """Sent direct messages"""
- sent_direct_messages = bind_api(
- path = '/direct_messages/sent.json',
- parser = parse_directmessages,
- allowed_param = ['since_id', 'max_id', 'count', 'page'],
- require_auth = True
- )
- """Send direct message"""
- send_direct_message = bind_api(
- path = '/direct_messages/new.json',
- method = 'POST',
- parser = parse_dm,
- allowed_param = ['user', 'text'],
- require_auth = True
- )
- """Destroy direct message"""
- destroy_direct_message = bind_api(
- path = '/direct_messages/destroy.json',
- method = 'DELETE',
- parser = parse_dm,
- allowed_param = ['id'],
- require_auth = True
- )
- """Create friendship"""
- create_friendship = bind_api(
- path = '/friendships/create.json',
- method = 'POST',
- parser = parse_user,
- allowed_param = ['id', 'user_id', 'screen_name', 'follow'],
- require_auth = True
- )
- """Destroy friendship"""
- destroy_friendship = bind_api(
- path = '/friendships/destroy.json',
- method = 'DELETE',
- parser = parse_user,
- allowed_param = ['id', 'user_id', 'screen_name'],
- require_auth = True
- )
- """Check if friendship exists"""
- exists_friendship = bind_api(
- path = '/friendships/exists.json',
- parser = parse_json,
- allowed_param = ['user_a', 'user_b']
- )
- """Show friendship details"""
- show_friendship = bind_api(
- path = '/friendships/show.json',
- parser = parse_friendship,
- allowed_param = ['source_id', 'source_screen_name',
- 'target_id', 'target_screen_name']
- )
- """Get list of IDs of users the specified user is following"""
- friends_ids = bind_api(
- path = '/friends/ids.json',
- parser = parse_json,
- allowed_param = ['id', 'user_id', 'screen_name', 'page']
- )
- """Get list of IDs of users following the specified user"""
- followers_ids = bind_api(
- path = '/followers/ids.json',
- parser = parse_json,
- allowed_param = ['id', 'user_id', 'screen_name', 'page']
- )
- """Verify credentials"""
- def verify_credentials(self):
- try:
- return bind_api(
- path = '/account/verify_credentials.json',
- parser = parse_return_true,
- require_auth = True)(self)
- except TweepError:
- return False
- """Rate limit status"""
- rate_limit_status = bind_api(
- path = '/account/rate_limit_status.json',
- parser = parse_json
- )
- """Update delivery device"""
- set_delivery_device = bind_api(
- path = '/account/update_delivery_device.json',
- method = 'POST',
- allowed_param = ['device'],
- parser = parse_user,
- require_auth = True
- )
- """Update profile colors"""
- update_profile_colors = bind_api(
- path = '/account/update_profile_colors.json',
- method = 'POST',
- parser = parse_user,
- allowed_param = ['profile_background_color', 'profile_text_color',
- 'profile_link_color', 'profile_sidebar_fill_color',
- 'profile_sidebar_border_color'],
- require_auth = True
- )
- """Update profile image"""
- def update_profile_image(self, filename):
- headers, post_data = _pack_image(filename, 700)
- bind_api(
- path = '/account/update_profile_image.json',
+ """Send direct message"""
+ send_direct_message = bind_api(
+ path = '/direct_messages/new.json',
method = 'POST',
- parser = parse_none,
+ parser = parse_dm,
+ allowed_param = ['user', 'text'],
require_auth = True
- )(self, post_data=post_data, headers=headers)
+ )
- """Update profile background image"""
- def update_profile_background_image(self, filename, *args, **kargs):
- headers, post_data = _pack_image(filename, 800)
- bind_api(
- path = '/account/update_profile_background_image.json',
+ """Destroy direct message"""
+ destroy_direct_message = bind_api(
+ path = '/direct_messages/destroy.json',
+ method = 'DELETE',
+ parser = parse_dm,
+ allowed_param = ['id'],
+ require_auth = True
+ )
+ """Create friendship"""
+ create_friendship = bind_api(
+ path = '/friendships/create.json',
method = 'POST',
- parser = parse_none,
- allowed_param = ['tile'],
+ parser = parse_user,
+ allowed_param = ['id', 'user_id', 'screen_name', 'follow'],
require_auth = True
- )(self, post_data=post_data, headers=headers)
- """Update profile"""
- update_profile = bind_api(
- path = '/account/update_profile.json',
- method = 'POST',
- parser = parse_user,
- allowed_param = ['name', 'email', 'url', 'location', 'description'],
- require_auth = True
- )
- """Get favorites"""
- favorites = bind_api(
- path = '/favorites.json',
- parser = parse_statuses,
- allowed_param = ['id', 'page']
- )
- """Create favorite"""
- create_favorite = bind_api(
- path = '/favorites/create.json',
- method = 'POST',
- parser = parse_status,
- allowed_param = ['id'],
- require_auth = True
- )
- """Destroy favorite"""
- destroy_favorite = bind_api(
- path = '/favorites/destroy.json',
- method = 'DELETE',
- parser = parse_status,
- allowed_param = ['id'],
- require_auth = True
- )
- """Enable device notifications"""
- enable_notifications = bind_api(
- path = '/notifications/follow.json',
- method = 'POST',
- parser = parse_user,
- allowed_param = ['id', 'user_id', 'screen_name'],
- require_auth = True
- )
- """Disable device notifications"""
- disable_notifications = bind_api(
- path = '/notifications/leave.json',
- method = 'POST',
- parser = parse_user,
- allowed_param = ['id', 'user_id', 'screen_name'],
- require_auth = True
- )
- """Create a block"""
- create_block = bind_api(
- path = '/blocks/create.json',
- method = 'POST',
- parser = parse_user,
- allowed_param = ['id'],
- require_auth = True
- )
- """Destroy a block"""
- destroy_block = bind_api(
- path = '/blocks/destroy.json',
- method = 'DELETE',
- parser = parse_user,
- allowed_param = ['id'],
- require_auth = True
- )
- """Check if block exists"""
- def exists_block(self, **kargs):
- try:
- bind_api(
- path = '/blocks/exists.json',
- parser = parse_none,
- allowed_param = ['id', 'user_id', 'screen_name'],
- require_auth = True
- )(self, **kargs)
- except TweepError:
- return False
- return True
- """Get list of users that are blocked"""
- blocks = bind_api(
- path = '/blocks/blocking.json',
- parser = parse_users,
- allowed_param = ['page'],
- require_auth = True
- )
- """Get list of ids of users that are blocked"""
- blocks_ids = bind_api(
- path = '/blocks/blocking/ids.json',
- parser = parse_json,
- require_auth = True
- )
- """Get list of saved searches"""
- saved_searches = bind_api(
- path = '/saved_searches.json',
- parser = parse_saved_searches,
- require_auth = True
- )
- """Get a single saved search by id"""
- def get_saved_search(self, id):
- return bind_api(
- path = '/saved_searches/show/%s.json' % id,
- parser = parse_saved_search,
+ )
+ """Destroy friendship"""
+ destroy_friendship = bind_api(
+ path = '/friendships/destroy.json',
+ method = 'DELETE',
+ parser = parse_user,
+ allowed_param = ['id', 'user_id', 'screen_name'],
+ require_auth = True
+ )
+ """Check if friendship exists"""
+ exists_friendship = bind_api(
+ path = '/friendships/exists.json',
+ parser = parse_json,
+ allowed_param = ['user_a', 'user_b']
+ )
+ """Show friendship details"""
+ show_friendship = bind_api(
+ path = '/friendships/show.json',
+ parser = parse_friendship,
+ allowed_param = ['source_id', 'source_screen_name',
+ 'target_id', 'target_screen_name']
+ )
+ """Get list of IDs of users the specified user is following"""
+ friends_ids = bind_api(
+ path = '/friends/ids.json',
+ parser = parse_json,
+ allowed_param = ['id', 'user_id', 'screen_name', 'page']
+ )
+ """Get list of IDs of users following the specified user"""
+ followers_ids = bind_api(
+ path = '/followers/ids.json',
+ parser = parse_json,
+ allowed_param = ['id', 'user_id', 'screen_name', 'page']
+ )
+ """Verify credentials"""
+ def verify_credentials(self):
+ try:
+ return bind_api(
+ path = '/account/verify_credentials.json',
+ parser = parse_return_true,
+ require_auth = True
+ )(self)
+ except TweepError:
+ return False
+ """Rate limit status"""
+ rate_limit_status = bind_api(
+ path = '/account/rate_limit_status.json',
+ parser = parse_json
+ )
+ """Update delivery device"""
+ set_delivery_device = bind_api(
+ path = '/account/update_delivery_device.json',
+ method = 'POST',
+ allowed_param = ['device'],
+ parser = parse_user,
+ require_auth = True
+ )
+ """Update profile colors"""
+ update_profile_colors = bind_api(
+ path = '/account/update_profile_colors.json',
+ method = 'POST',
+ parser = parse_user,
+ allowed_param = ['profile_background_color', 'profile_text_color',
+ 'profile_link_color', 'profile_sidebar_fill_color',
+ 'profile_sidebar_border_color'],
+ require_auth = True
+ )
+ """Update profile image"""
+ def update_profile_image(self, filename):
+ headers, post_data = _pack_image(filename, 700)
+ bind_api(
+ path = '/account/update_profile_image.json',
+ method = 'POST',
+ parser = parse_none,
+ require_auth = True
+ )(self, post_data=post_data, headers=headers)
+ """Update profile background image"""
+ def update_profile_background_image(self, filename, *args, **kargs):
+ headers, post_data = _pack_image(filename, 800)
+ bind_api(
+ path = '/account/update_profile_background_image.json',
+ method = 'POST',
+ parser = parse_none,
+ allowed_param = ['tile'],
+ require_auth = True
+ )(self, post_data=post_data, headers=headers)
+ """Update profile"""
+ update_profile = bind_api(
+ path = '/account/update_profile.json',
+ method = 'POST',
+ parser = parse_user,
+ allowed_param = ['name', 'email', 'url', 'location', 'description'],
require_auth = True
- )(self)
- """Create new saved search"""
- create_saved_search = bind_api(
- path = '/saved_searches/create.json',
- method = 'POST',
- parser = parse_saved_search,
- allowed_param = ['query'],
- require_auth = True
- )
- """Destroy a saved search"""
- def destroy_saved_search(self, id):
- return bind_api(
- path = '/saved_searches/destroy/%s.json' % id,
+ )
+ """Get favorites"""
+ favorites = bind_api(
+ path = '/favorites.json',
+ parser = parse_statuses,
+ allowed_param = ['id', 'page']
+ )
+ """Create favorite"""
+ create_favorite = bind_api(
+ path = '/favorites/create.json',
+ method = 'POST',
+ parser = parse_status,
+ allowed_param = ['id'],
+ require_auth = True
+ )
+ """Destroy favorite"""
+ destroy_favorite = bind_api(
+ path = '/favorites/destroy.json',
method = 'DELETE',
- parser = parse_saved_search,
+ parser = parse_status,
allowed_param = ['id'],
require_auth = True
- )(self)
- def test(self):
- return bind_api(
- path = '/help/test.json',
- parser = parse_return_true
- )(self)
- """Search API"""
- def search(self, *args, **kargs):
- return bind_api(
- host = 'search.' +,
- path = '/search.json',
- parser = parse_search_results,
- allowed_param = ['q', 'lang', 'rpp', 'page', 'since_id', 'geocode', 'show_user'],
- )(self, *args, **kargs)
- def trends(self):
- return bind_api(
- host = 'search.' +,
- path = '/trends.json',
- parser = parse_trend_results
- )(self)
-""" Pack image file into multipart-formdata request"""
-def _pack_image(filename, max_size):
- """Pack image from file into multipart-formdata post body"""
- # image must be less than 700kb in size
- try:
- if os.path.getsize(filename) > (max_size * 1024):
- raise TweepError('File is too big, must be less than 700kb.')
- except os.error, e:
- raise TweepError('Unable to access file')
- # image must be gif, jpeg, or png
- file_type = mimetypes.guess_type(filename)
- if file_type is None:
- raise TweepError('Could not determine file type')
- file_type = file_type[0]
- if file_type != 'image/gif' and file_type != 'image/jpeg' and file_type != 'image/png':
- raise TweepError('Invalid file type for image: %s' % file_type)
- # build the mulitpart-formdata body
- fp = open(filename, 'rb')
- BOUNDARY = 'Tw3ePy'
- body = []
- body.append('--' + BOUNDARY)
- body.append('Content-Disposition: form-data; name="image"; filename="%s"' % filename)
- body.append('Content-Type: %s' % file_type)
- body.append('')
- body.append(
- body.append('--' + BOUNDARY + '--')
- body.append('')
- fp.close()
- body = '\r\n'.join(body)
- # build headers
- headers = {
- 'Content-Type': 'multipart/form-data; boundary=Tw3ePy',
- 'Content-Length': len(body)
- }
- return headers, body
+ )
+ """Enable device notifications"""
+ enable_notifications = bind_api(
+ path = '/notifications/follow.json',
+ method = 'POST',
+ parser = parse_user,
+ allowed_param = ['id', 'user_id', 'screen_name'],
+ require_auth = True
+ )
+ """Disable device notifications"""
+ disable_notifications = bind_api(
+ path = '/notifications/leave.json',
+ method = 'POST',
+ parser = parse_user,
+ allowed_param = ['id', 'user_id', 'screen_name'],
+ require_auth = True
+ )
+ """Create a block"""
+ create_block = bind_api(
+ path = '/blocks/create.json',
+ method = 'POST',
+ parser = parse_user,
+ allowed_param = ['id'],
+ require_auth = True
+ )
+ """Destroy a block"""
+ destroy_block = bind_api(
+ path = '/blocks/destroy.json',
+ method = 'DELETE',
+ parser = parse_user,
+ allowed_param = ['id'],
+ require_auth = True
+ )
+ """Check if block exists"""
+ def exists_block(self, **kargs):
+ try:
+ bind_api(
+ path = '/blocks/exists.json',
+ parser = parse_none,
+ allowed_param = ['id', 'user_id', 'screen_name'],
+ require_auth = True
+ )(self, **kargs)
+ except TweepError:
+ return False
+ return True
+ """Get list of users that are blocked"""
+ blocks = bind_api(
+ path = '/blocks/blocking.json',
+ parser = parse_users,
+ allowed_param = ['page'],
+ require_auth = True
+ )
+ """Get list of ids of users that are blocked"""
+ blocks_ids = bind_api(
+ path = '/blocks/blocking/ids.json',
+ parser = parse_json,
+ require_auth = True
+ )
+ """Get list of saved searches"""
+ saved_searches = bind_api(
+ path = '/saved_searches.json',
+ parser = parse_saved_searches,
+ require_auth = True
+ )
+ """Get a single saved search by id"""
+ def get_saved_search(self, id):
+ return bind_api(
+ path = '/saved_searches/show/%s.json' % id,
+ parser = parse_saved_search,
+ require_auth = True
+ )(self)
+ """Create new saved search"""
+ create_saved_search = bind_api(
+ path = '/saved_searches/create.json',
+ method = 'POST',
+ parser = parse_saved_search,
+ allowed_param = ['query'],
+ require_auth = True
+ )
+ """Destroy a saved search"""
+ def destroy_saved_search(self, id):
+ return bind_api(
+ path = '/saved_searches/destroy/%s.json' % id,
+ method = 'DELETE',
+ parser = parse_saved_search,
+ allowed_param = ['id'],
+ require_auth = True
+ )(self)
+ def test(self):
+ return bind_api(
+ path = '/help/test.json',
+ parser = parse_return_true
+ )(self)
+ """Search API"""
+ def search(self, *args, **kargs):
+ return bind_api(
+ host = 'search.' +,
+ path = '/search.json',
+ parser = parse_search_results,
+ allowed_param = ['q', 'lang', 'rpp', 'page', 'since_id', 'geocode', 'show_user'],
+ )(self, *args, **kargs)
+ def trends(self):
+ return bind_api(
+ host = 'search.' +,
+ path = '/trends.json',
+ parser = parse_trend_results
+ )(self)
+ def _pack_image(filename, max_size):
+ """Pack image from file into multipart-formdata post body"""
+ # image must be less than 700kb in size
+ try:
+ if os.path.getsize(filename) > (max_size * 1024):
+ raise TweepError('File is too big, must be less than 700kb.')
+ except os.error, e:
+ raise TweepError('Unable to access file')
+ # image must be gif, jpeg, or png
+ file_type = mimetypes.guess_type(filename)
+ if file_type is None:
+ raise TweepError('Could not determine file type')
+ file_type = file_type[0]
+ if file_type not in ['image/gif', 'image/jpeg', 'image/png']:
+ raise TweepError('Invalid file type for image: %s' % file_type)
+ # build the mulitpart-formdata body
+ fp = open(filename, 'rb')
+ BOUNDARY = 'Tw3ePy'
+ body = []
+ body.append('--' + BOUNDARY)
+ body.append('Content-Disposition: form-data; name="image"; filename="%s"' % filename)
+ body.append('Content-Type: %s' % file_type)
+ body.append('')
+ body.append(
+ body.append('--' + BOUNDARY + '--')
+ body.append('')
+ fp.close()
+ body = '\r\n'.join(body)
+ # build headers
+ headers = {
+ 'Content-Type': 'multipart/form-data; boundary=Tw3ePy',
+ 'Content-Length': len(body)
+ }
+ return headers, body
from urllib2 import Request, urlopen
-from urllib import quote
import base64
from . import oauth
from . error import TweepError
class AuthHandler(object):
- def apply_auth(self, url, method, headers, parameters):
- """Apply authentication headers to request"""
- raise NotImplemented
+ def apply_auth(self, url, method, headers, parameters):
+ """Apply authentication headers to request"""
+ raise NotImplemented
class BasicAuthHandler(AuthHandler):
- def __init__(self, username, password):
- self._b64up = base64.b64encode('%s:%s' % (username, password))
+ def __init__(self, username, password):
+ self._b64up = base64.b64encode('%s:%s' % (username, password))
- def apply_auth(self, url, method, headers, parameters):
- headers['Authorization'] = 'Basic %s' % self._b64up
+ def apply_auth(self, url, method, headers, parameters):
+ headers['Authorization'] = 'Basic %s' % self._b64up
-"""OAuth authentication handler"""
-class OAuthHandler(AuthHandler):
- def __init__(self, consumer_key, consumer_secret, callback=None):
- self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret)
- self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1()
- self.request_token = None
- self.access_token = None
- self.callback = callback
- def apply_auth(self, url, method, headers, parameters):
- request = oauth.OAuthRequest.from_consumer_and_token(self._consumer,
- http_url=url, http_method=method, token=self.access_token, parameters=parameters)
- request.sign_request(self._sigmethod, self._consumer, self.access_token)
- headers.update(request.to_header())
- def _get_request_token(self):
- try:
- request = oauth.OAuthRequest.from_consumer_and_token(self._consumer,
- http_url = self.REQUEST_TOKEN_URL, callback=self.callback)
- request.sign_request(self._sigmethod, self._consumer, None)
- resp = urlopen(Request(self.REQUEST_TOKEN_URL, headers=request.to_header()))
- return oauth.OAuthToken.from_string(
- except Exception, e:
- raise TweepError(e)
- def set_access_token(self, key, secret):
- self.access_token = oauth.OAuthToken(key, secret)
- def get_authorization_url(self):
- """Get the authorization URL to redirect the user"""
- try:
- # get the request token
- self.request_token = self._get_request_token()
- # build auth request and return as url
- request = oauth.OAuthRequest.from_token_and_callback(
- token=self.request_token, http_url=self.AUTHORIZATION_URL)
- return request.to_url()
- except Exception, e:
- raise TweepError(e)
- def get_access_token(self, verifier):
- """After user has authorized the request token, get access token with user supplied verifier."""
- try:
- # build request
- request = oauth.OAuthRequest.from_consumer_and_token(self._consumer,
- token=self.request_token, http_url=self.ACCESS_TOKEN_URL, verifier=str(verifier))
- request.sign_request(self._sigmethod, self._consumer, self.request_token)
- # send request
- resp = urlopen(Request(self.ACCESS_TOKEN_URL, headers=request.to_header()))
- self.access_token = oauth.OAuthToken.from_string(
- return self.access_token
- except Exception, e:
- raise TweepError(e)
+class OAuthHandler(AuthHandler):
+ """OAuth authentication handler"""
+ def __init__(self, consumer_key, consumer_secret, callback=None):
+ self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret)
+ self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1()
+ self.request_token = None
+ self.access_token = None
+ self.callback = callback
+ def apply_auth(self, url, method, headers, parameters):
+ request = oauth.OAuthRequest.from_consumer_and_token(
+ self._consumer, http_url=url, http_method=method,
+ token=self.access_token, parameters=parameters
+ )
+ request.sign_request(self._sigmethod, self._consumer, self.access_token)
+ headers.update(request.to_header())
+ def _get_request_token(self):
+ try:
+ request = oauth.OAuthRequest.from_consumer_and_token(
+ self._consumer, http_url=self.REQUEST_TOKEN_URL, callback=self.callback
+ )
+ request.sign_request(self._sigmethod, self._consumer, None)
+ resp = urlopen(Request(self.REQUEST_TOKEN_URL, headers=request.to_header()))
+ return oauth.OAuthToken.from_string(
+ except Exception, e:
+ raise TweepError(e)
+ def set_access_token(self, key, secret):
+ self.access_token = oauth.OAuthToken(key, secret)
+ def get_authorization_url(self):
+ """Get the authorization URL to redirect the user"""
+ try:
+ # get the request token
+ self.request_token = self._get_request_token()
+ # build auth request and return as url
+ request = oauth.OAuthRequest.from_token_and_callback(
+ token=self.request_token, http_url=self.AUTHORIZATION_URL
+ )
+ return request.to_url()
+ except Exception, e:
+ raise TweepError(e)
+ def get_access_token(self, verifier):
+ """
+ After user has authorized the request token, get access token
+ with user supplied verifier.
+ """
+ try:
+ # build request
+ request = oauth.OAuthRequest.from_consumer_and_token(
+ self._consumer,
+ token=self.request_token, http_url=self.ACCESS_TOKEN_URL,
+ verifier=str(verifier)
+ )
+ request.sign_request(self._sigmethod, self._consumer, self.request_token)
+ # send request
+ resp = urlopen(Request(self.ACCESS_TOKEN_URL, headers=request.to_header()))
+ self.access_token = oauth.OAuthToken.from_string(
+ return self.access_token
+ except Exception, e:
+ raise TweepError(e)
from . parsers import parse_error
from . error import TweepError
def bind_api(path, parser, allowed_param=None, method='GET', require_auth=False,
timeout=None, host=None):
- def _call(api, *args, **kargs):
- # If require auth, throw exception if credentials not provided
- if require_auth and not api.auth_handler:
- raise TweepError('Authentication required!')
- # check for post_data parameter
- if 'post_data' in kargs:
- post_data = kargs['post_data']
- del kargs['post_data']
- else:
- post_data = None
- # check for headers
- if 'headers' in kargs:
- headers = dict(kargs['headers'])
- del kargs['headers']
- else:
- headers = {}
- # build parameter dict
- if allowed_param:
- parameters = {}
- for idx, arg in enumerate(args):
- try:
- parameters[allowed_param[idx]] = arg
- except IndexError:
- raise TweepError('Too many parameters supplied!')
- for k, arg in kargs.items():
- if arg is None:
- continue
- if k in parameters:
- raise TweepError('Multiple values for parameter %s supplied!' % k)
- if k not in allowed_param:
- raise TweepError('Invalid parameter %s supplied!' % k)
- parameters[k] = arg
- else:
- if len(args) > 0 or len(kargs) > 0:
- raise TweepError('This method takes no parameters!')
- parameters = None
- # Build url with parameters
- if parameters:
- url = '%s?%s' % (api.api_root + path, urllib.urlencode(parameters))
- else:
- url = api.api_root + path
- # get scheme and host
- if
- scheme = 'https://'
- else:
- scheme = 'http://'
- _host = host or
- # Apply authentication
- if api.auth_handler:
- api.auth_handler.apply_auth(scheme + _host + url, method, headers, parameters)
- # Check cache if caching enabled and method is GET
- if api.cache and method == 'GET':
- cache_result = api.cache.get(url, timeout)
- if cache_result:
- # if cache result found and not expired, return it
- # must restore api reference
- if isinstance(cache_result, list):
- for result in cache_result:
- result._api = api
+ def _call(api, *args, **kargs):
+ # If require auth, throw exception if credentials not provided
+ if require_auth and not api.auth_handler:
+ raise TweepError('Authentication required!')
+ # check for post_data parameter
+ if 'post_data' in kargs:
+ post_data = kargs['post_data']
+ del kargs['post_data']
+ else:
+ post_data = None
+ # check for headers
+ if 'headers' in kargs:
+ headers = dict(kargs['headers'])
+ del kargs['headers']
+ else:
+ headers = {}
+ # build parameter dict
+ if allowed_param:
+ parameters = {}
+ for idx, arg in enumerate(args):
+ try:
+ parameters[allowed_param[idx]] = arg
+ except IndexError:
+ raise TweepError('Too many parameters supplied!')
+ for k, arg in kargs.items():
+ if arg is None:
+ continue
+ if k in parameters:
+ raise TweepError('Multiple values for parameter %s supplied!' % k)
+ if k not in allowed_param:
+ raise TweepError('Invalid parameter %s supplied!' % k)
+ parameters[k] = arg
- cache_result._api = api
- return cache_result
- # Open connection
- # FIXME: add timeout
- if
- conn = httplib.HTTPSConnection(_host)
- else:
- conn = httplib.HTTPConnection(_host)
- # Build request
- conn.request(method, url, headers=headers, body=post_data)
- # Get response
- resp = conn.getresponse()
- # If an error was returned, throw an exception
- if resp.status != 200:
- try:
- error_msg = parse_error(
- except Exception:
- error_msg = "Unkown twitter error response received: status=%s" % resp.status
- raise TweepError(error_msg)
- # Pass returned body into parser and return parser output
- out = parser(, api)
- conn.close()
- # validate result
- if api.validate:
- # list of results
- if isinstance(out, list) and len(out) > 0:
- if hasattr(out[0], 'validate'):
- for result in out:
- result.validate()
- # single result
- else:
- if hasattr(out, 'validate'):
- out.validate()
- # store result in cache
- if api.cache and method == 'GET':
-, out)
- return out
- return _call
+ if len(args) > 0 or len(kargs) > 0:
+ raise TweepError('This method takes no parameters!')
+ parameters = None
+ # Build url with parameters
+ if parameters:
+ url = '%s?%s' % (api.api_root + path, urllib.urlencode(parameters))
+ else:
+ url = api.api_root + path
+ # get scheme and host
+ if
+ scheme = 'https://'
+ else:
+ scheme = 'http://'
+ _host = host or
+ # Apply authentication
+ if api.auth_handler:
+ api.auth_handler.apply_auth(scheme + _host + url, method, headers, parameters)
+ # Check cache if caching enabled and method is GET
+ if api.cache and method == 'GET':
+ cache_result = api.cache.get(url, timeout)
+ # if cache result found and not expired, return it
+ if cache_result:
+ # must restore api reference
+ if isinstance(cache_result, list):
+ for result in cache_result:
+ result._api = api
+ else:
+ cache_result._api = api
+ return cache_result
+ # Open connection
+ # FIXME: add timeout
+ if
+ conn = httplib.HTTPSConnection(_host)
+ else:
+ conn = httplib.HTTPConnection(_host)
+ # Build request
+ conn.request(method, url, headers=headers, body=post_data)
+ # Get response
+ resp = conn.getresponse()
+ # If an error was returned, throw an exception
+ if resp.status != 200:
+ try:
+ error_msg = parse_error(
+ except Exception:
+ error_msg = "Twitter error response: status code = %s" % resp.status
+ raise TweepError(error_msg)
+ # Pass returned body into parser and return parser output
+ out = parser(, api)
+ conn.close()
+ # validate result
+ if api.validate:
+ # list of results
+ if isinstance(out, list) and len(out) > 0:
+ if hasattr(out[0], 'validate'):
+ for result in out:
+ result.validate()
+ # single result
+ else:
+ if hasattr(out, 'validate'):
+ out.validate()
+ # store result in cache
+ if api.cache and method == 'GET':
+, out)
+ return out
+ return _call
import cPickle as pickle
from . import memcache
-from . error import TweepError
-"""Cache interface"""
class Cache(object):
+ """Cache interface"""
+ def __init__(self, timeout=60):
+ """Initialize the cache
+ timeout: number of seconds to keep a cached entry
+ """
+ self.timeout = timeout
+ def store(self, key, value):
+ """Add new record to cache
+ key: entry key
+ value: data of entry
+ """
+ raise NotImplementedError
+ def get(self, key, timeout=None):
+ """Get cached entry if exists and not expired
+ key: which entry to get
+ timeout: override timeout with this value [optional]
+ """
+ raise NotImplementedError
+ def count(self):
+ """Get count of entries currently stored in cache"""
+ raise NotImplementedError
+ def cleanup(self):
+ """Delete any expired entries in cache."""
+ raise NotImplementedError
+ def flush(self):
+ """Delete all cached entries"""
+ raise NotImplementedError
- def __init__(self, timeout=60):
- """Init the cache
- timeout: number of seconds to keep a cached entry
- """
- self.timeout = timeout
- def store(self, key, value):
- """Add new record to cache
- key: entry key
- value: data of entry
- """
- raise NotImplementedError
- def get(self, key, timeout=None):
- """Get cached entry if exists and not expired
- key: which entry to get
- timeout: override timeout with this value [optional]
- """
- raise NotImplementedError
- def count(self):
- """Get count of entries currently stored in cache"""
- raise NotImplementedError
- def cleanup(self):
- """Delete any expired entries in cache."""
- raise NotImplementedError
- def flush(self):
- """Delete all cached entries"""
- raise NotImplementedError
-"""In-memory cache"""
class MemoryCache(Cache):
+ """In-memory cache"""
- def __init__(self, timeout=60):
- Cache.__init__(self, timeout)
- self._entries = {}
- self.lock = threading.Lock()
- def __getstate__(self):
- # pickle
- return {'entries': self._entries, 'timeout': self.timeout}
- def __setstate__(self, state):
- # unpickle
- self.lock = threading.Lock()
- self._entries = state['entries']
- self.timeout = state['timeout']
- def _is_expired(self, entry, timeout):
- return timeout > 0 and (time.time() - entry[0]) >= timeout
- def store(self, key, value):
- with self.lock:
- self._entries[key] = (time.time(), value)
- def get(self, key, timeout=None):
- with self.lock:
- # check to see if we have this key
- entry = self._entries.get(key)
- if not entry:
- # no hit, return nothing
- return None
- # use provided timeout in arguments if provided
- # otherwise use the one provided during init.
- _timeout = self.timeout if timeout is None else timeout
- # make sure entry is not expired
- if self._is_expired(entry, _timeout):
- # entry expired, delete and return nothing
- del self._entries[key]
- return None
- # entry found and not expired, return it
- return entry[1]
- def count(self):
- return len(self._entries)
- def cleanup(self):
- with self.lock:
- for k,v in self._entries.items():
- if self._is_expired(v, self.timeout):
- del self._entries[k]
- def flush(self):
- with self.lock:
- self._entries.clear()
-"""File-based cache"""
-class FileCache(Cache):
+ def __init__(self, timeout=60):
+ Cache.__init__(self, timeout)
+ self._entries = {}
+ self.lock = threading.Lock()
- # locks used to make cache thread-safe
- cache_locks = {}
- def __init__(self, cache_dir, timeout=60):
- Cache.__init__(self, timeout)
- if os.path.exists(cache_dir) is False:
- os.mkdir(cache_dir)
- self.cache_dir = cache_dir
- if cache_dir in FileCache.cache_locks:
- self.lock = FileCache.cache_locks[cache_dir]
- else:
- self.lock = threading.Lock()
- FileCache.cache_locks[cache_dir] = self.lock
- def _get_path(self, key):
- md5 = hashlib.md5()
- md5.update(key)
- return os.path.join(self.cache_dir, md5.hexdigest())
- def _lock_file(self, path, exclusive=True):
- lock_path = path + '.lock'
- if exclusive is True:
- f_lock = open(lock_path, 'w')
- fcntl.lockf(f_lock, fcntl.LOCK_EX)
- else:
- f_lock = open(lock_path, 'r')
- fcntl.lockf(f_lock, fcntl.LOCK_SH)
- if os.path.exists(lock_path) is False:
- f_lock.close()
- return None
- return f_lock
- def _delete_file(self, path):
- os.remove(path)
- os.remove(path + '.lock')
- def store(self, key, value):
- path = self._get_path(key)
- with self.lock:
- # acquire lock and open file
- f_lock = self._lock_file(path)
- datafile = open(path, 'wb')
- # write data
- pickle.dump((time.time(), value), datafile)
- # close and unlock file
- datafile.close()
- f_lock.close()
- def get(self, key, timeout=None):
- return self._get(self._get_path(key), timeout)
- def _get(self, path, timeout):
- if os.path.exists(path) is False:
- # no record
- return None
- while self.lock:
- # acquire lock and open
- f_lock = self._lock_file(path, False)
- if f_lock is None:
- # does not exist
- return None
- datafile = open(path, 'rb')
- # read pickled object
- created_time, value = pickle.load(datafile)
- datafile.close()
- # check if value is expired
- _timeout = self.timeout if timeout is None else timeout
- if _timeout > 0 and (time.time() - created_time) >= _timeout:
- # expired! delete from cache
- value = None
- self._delete_file(path)
- # unlock and return result
- f_lock.close()
- return value
- def count(self):
- c = 0
- for entry in os.listdir(self.cache_dir):
- if entry.endswith('.lock'): continue
- c += 1
- return c
- def cleanup(self):
- for entry in os.listdir(self.cache_dir):
- if entry.endswith('.lock'): continue
- self._get(os.path.join(self.cache_dir, entry), None)
- def flush(self):
- for entry in os.listdir(self.cache_dir):
- if entry.endswith('.lock'): continue
- self._delete_file(os.path.join(self.cache_dir, entry))
-"""Memcache client"""
-class MemCache(Cache):
+ def __getstate__(self):
+ # pickle
+ return {'entries': self._entries, 'timeout': self.timeout}
+ def __setstate__(self, state):
+ # unpickle
+ self.lock = threading.Lock()
+ self._entries = state['entries']
+ self.timeout = state['timeout']
+ def _is_expired(self, entry, timeout):
+ return timeout > 0 and (time.time() - entry[0]) >= timeout
+ def store(self, key, value):
+ with self.lock:
+ self._entries[key] = (time.time(), value)
+ def get(self, key, timeout=None):
+ with self.lock:
+ # check to see if we have this key
+ entry = self._entries.get(key)
+ if not entry:
+ # no hit, return nothing
+ return None
- def __init__(self, servers, timeout=60):
- Cache.__init__(self, timeout)
- self.client = memcache.Client(servers)
+ # use provided timeout in arguments if provided
+ # otherwise use the one provided during init.
+ _timeout = self.timeout if timeout is None else timeout
- def store(self, key, value):
- self.client.set(key, (time.time(), value), time=self.timeout)
+ # make sure entry is not expired
+ if self._is_expired(entry, _timeout):
+ # entry expired, delete and return nothing
+ del self._entries[key]
+ return None
- def get(self, key, timeout=None):
- obj = self.client.get(key)
- if obj is None:
- return None
- created_time, value = obj
+ # entry found and not expired, return it
+ return entry[1]
- # check if value is expired
- _timeout = self.timeout if timeout is None else timeout
- if _timeout > 0 and (time.time() - created_time) >= _timeout:
- # expired! delete from cache
- self.client.delete(key)
- return None
+ def count(self):
+ return len(self._entries)
- return value
+ def cleanup(self):
+ with self.lock:
+ for k, v in self._entries.items():
+ if self._is_expired(v, self.timeout):
+ del self._entries[k]
- def count(self):
- count = 0
- for sid, stats in self.client.get_stats():
- count += int(stats.get('curr_items', 0))
- return count
+ def flush(self):
+ with self.lock:
+ self._entries.clear()
- def cleanup(self):
- # not implemented for this cache since server handles it
- return
- def flush(self):
- self.client.flush_all()
+class FileCache(Cache):
+ """File-based cache"""
+ # locks used to make cache thread-safe
+ cache_locks = {}
+ def __init__(self, cache_dir, timeout=60):
+ Cache.__init__(self, timeout)
+ if os.path.exists(cache_dir) is False:
+ os.mkdir(cache_dir)
+ self.cache_dir = cache_dir
+ if cache_dir in FileCache.cache_locks:
+ self.lock = FileCache.cache_locks[cache_dir]
+ else:
+ self.lock = threading.Lock()
+ FileCache.cache_locks[cache_dir] = self.lock
+ def _get_path(self, key):
+ md5 = hashlib.md5()
+ md5.update(key)
+ return os.path.join(self.cache_dir, md5.hexdigest())
+ def _lock_file(self, path, exclusive=True):
+ lock_path = path + '.lock'
+ if exclusive is True:
+ f_lock = open(lock_path, 'w')
+ fcntl.lockf(f_lock, fcntl.LOCK_EX)
+ else:
+ f_lock = open(lock_path, 'r')
+ fcntl.lockf(f_lock, fcntl.LOCK_SH)
+ if os.path.exists(lock_path) is False:
+ f_lock.close()
+ return None
+ return f_lock
+ def _delete_file(self, path):
+ os.remove(path)
+ os.remove(path + '.lock')
+ def store(self, key, value):
+ path = self._get_path(key)
+ with self.lock:
+ # acquire lock and open file
+ f_lock = self._lock_file(path)
+ datafile = open(path, 'wb')
+ # write data
+ pickle.dump((time.time(), value), datafile)
+ # close and unlock file
+ datafile.close()
+ f_lock.close()
+ def get(self, key, timeout=None):
+ return self._get(self._get_path(key), timeout)
+ def _get(self, path, timeout):
+ if os.path.exists(path) is False:
+ # no record
+ return None
+ while self.lock:
+ # acquire lock and open
+ f_lock = self._lock_file(path, False)
+ if f_lock is None:
+ # does not exist
+ return None
+ datafile = open(path, 'rb')
+ # read pickled object
+ created_time, value = pickle.load(datafile)
+ datafile.close()
+ # check if value is expired
+ _timeout = self.timeout if timeout is None else timeout
+ if _timeout > 0 and (time.time() - created_time) >= _timeout:
+ # expired! delete from cache
+ value = None
+ self._delete_file(path)
+ # unlock and return result
+ f_lock.close()
+ return value
+ def count(self):
+ c = 0
+ for entry in os.listdir(self.cache_dir):
+ if entry.endswith('.lock'):
+ continue
+ c += 1
+ return c
+ def cleanup(self):
+ for entry in os.listdir(self.cache_dir):
+ if entry.endswith('.lock'):
+ continue
+ self._get(os.path.join(self.cache_dir, entry), None)
+ def flush(self):
+ for entry in os.listdir(self.cache_dir):
+ if entry.endswith('.lock'):
+ continue
+ self._delete_file(os.path.join(self.cache_dir, entry))
+class MemCache(Cache):
+ """Memcache client"""
+ def __init__(self, servers, timeout=60):
+ Cache.__init__(self, timeout)
+ self.client = memcache.Client(servers)
+ def store(self, key, value):
+ self.client.set(key, (time.time(), value), time=self.timeout)
+ def get(self, key, timeout=None):
+ obj = self.client.get(key)
+ if obj is None:
+ return None
+ created_time, value = obj
+ # check if value is expired
+ _timeout = self.timeout if timeout is None else timeout
+ if _timeout > 0 and (time.time() - created_time) >= _timeout:
+ # expired! delete from cache
+ self.client.delete(key)
+ return None
+ return value
+ def count(self):
+ count = 0
+ for sid, stats in self.client.get_stats():
+ count += int(stats.get('curr_items', 0))
+ return count
+ def cleanup(self):
+ # not implemented for this cache since server handles it
+ return
+ def flush(self):
+ self.client.flush_all()
# Copyright 2009 Joshua Roesslein
-Tweepy exception
class TweepError(Exception):
+ """Tweepy exception"""
- def __init__(self, reason):
- self.reason = str(reason)
+ def __init__(self, reason):
+ self.reason = str(reason)
+ def __str__(self):
+ return self.reason
- def __str__(self):
- return self.reason
from . error import TweepError
class Model(object):
- def __getstate__(self):
- # pickle
- pickle = {}
- for k,v in self.__dict__.items():
- if k == '_api': continue # do not pickle the api reference
- pickle[k] = v
- return pickle
- @staticmethod
- def _validate(model, attributes):
- missing = []
- for attr in attributes:
- if not hasattr(model, attr):
- missing.append(attr)
- if len(missing) > 0:
- raise TweepError('Missing required attribute(s) %s' % str(missing).strip('[]'))
- def validate(self):
- return
+ def __getstate__(self):
+ # pickle
+ pickle = {}
+ for k, v in self.__dict__.items():
+ if k == '_api':
+ # do not pickle the api reference
+ continue
+ pickle[k] = v
+ return pickle
+ @staticmethod
+ def _validate(model, attributes):
+ missing = []
+ for attr in attributes:
+ if not hasattr(model, attr):
+ missing.append(attr)
+ if len(missing) > 0:
+ raise TweepError('Missing required attribute(s) %s' % \
+ str(missing).strip('[]'))
+ def validate(self):
+ return
class Status(Model):
- @staticmethod
- def _validate(status):
- Model._validate(status, [
- 'created_at', 'id', 'text', 'source', 'truncated', 'in_reply_to_status_id',
- 'in_reply_to_user_id', 'favorited', 'in_reply_to_screen_name'
- ])
- if hasattr(status, 'user'):
- User._validate(status.user)
- def validate(self):
- Status._validate(self)
+ @staticmethod
+ def _validate(status):
+ Model._validate(status, [
+ 'created_at', 'id', 'text', 'source', 'truncated', 'in_reply_to_status_id',
+ 'in_reply_to_user_id', 'favorited', 'in_reply_to_screen_name'
+ ])
+ if hasattr(status, 'user'):
+ User._validate(status.user)
+ def validate(self):
+ Status._validate(self)
+ def destroy(self):
+ return self._api.destroy_status(
- def destroy(self):
- return self._api.destroy_status(
class User(Model):
- @staticmethod
- def _validate(user):
- Model._validate(user, [
- 'id', 'name', 'screen_name', 'location', 'description', 'profile_image_url',
- 'url', 'protected', 'followers_count', 'profile_background_color',
- 'profile_text_color', 'profile_sidebar_fill_color', 'profile_sidebar_border_color',
- 'friends_count', 'created_at', 'favourites_count', 'utc_offset', 'time_zone',
- 'profile_background_image_url', 'statuses_count', 'notifications', 'following',
- 'verified'
- ])
- if hasattr(user, 'status'):
- Status._validate(user.status)
- def validate(self):
- User._validate(self)
- def timeline(self, **kargs):
- return self._api.user_timeline(**kargs)
- def mentions(self, **kargs):
- return self._api.mentions(**kargs)
- def friends(self, **kargs):
- return self._api.friends(, **kargs)
- def followers(self, **kargs):
- return self._api.followers(, **kargs)
- def follow(self):
- self._api.create_friendship(
- self.following = True
- def unfollow(self):
- self._api.destroy_friendship(
- self.following = False
+ @staticmethod
+ def _validate(user):
+ Model._validate(user, [
+ 'id', 'name', 'screen_name', 'location', 'description', 'profile_image_url',
+ 'url', 'protected', 'followers_count', 'profile_background_color',
+ 'profile_text_color', 'profile_sidebar_fill_color',
+ 'profile_sidebar_border_color', 'friends_count', 'created_at',
+ 'favourites_count', 'utc_offset', 'time_zone',
+ 'profile_background_image_url', 'statuses_count',
+ 'notifications', 'following', 'verified'
+ ])
+ if hasattr(user, 'status'):
+ Status._validate(user.status)
+ def validate(self):
+ User._validate(self)
+ def timeline(self, **kargs):
+ return self._api.user_timeline(**kargs)
+ def mentions(self, **kargs):
+ return self._api.mentions(**kargs)
+ def friends(self, **kargs):
+ return self._api.friends(, **kargs)
+ def followers(self, **kargs):
+ return self._api.followers(, **kargs)
+ def follow(self):
+ self._api.create_friendship(
+ self.following = True
+ def unfollow(self):
+ self._api.destroy_friendship(
+ self.following = False
class DirectMessage(Model):
- def destroy(self):
- return self._api.destroy_direct_message(
+ def destroy(self):
+ return self._api.destroy_direct_message(
class Friendship(Model):
- pass
+ pass
class SavedSearch(Model):
- pass
+ pass
class SearchResult(Model):
- pass
+ pass
# link up default model implementations.
models = {
- 'status': Status,
- 'user': User,
- 'direct_message': DirectMessage,
- 'friendship': Friendship,
- 'saved_search': SavedSearch,
- 'search_result': SearchResult
+ 'status': Status,
+ 'user': User,
+ 'direct_message': DirectMessage,
+ 'friendship': Friendship,
+ 'saved_search': SavedSearch,
+ 'search_result': SearchResult
from . models import models
- import json
+ import json
except ImportError:
- import simplejson as json
+ import simplejson as json
def parse_json(data, api):
- return json.loads(data)
+ return json.loads(data)
def parse_return_true(data, api):
- return True
+ return True
def parse_none(data, api):
- return None
+ return None
def parse_error(data):
- return json.loads(data)['error']
+ return json.loads(data)['error']
def _parse_datetime(str):
- return datetime.strptime(str, '%a %b %d %H:%M:%S +0000 %Y')
+ return datetime.strptime(str, '%a %b %d %H:%M:%S +0000 %Y')
def _parse_search_datetime(str):
- return datetime.strptime(str, '%a, %d %b %Y %H:%M:%S +0000')
+ return datetime.strptime(str, '%a, %d %b %Y %H:%M:%S +0000')
def _parse_html_value(html):
- return html[html.find('>')+1:html.rfind('<')]
+ return html[html.find('>')+1:html.rfind('<')]
def _parse_a_href(atag):
- return atag[atag.find('"')+1:atag.find('>')-1]
+ return atag[atag.find('"')+1:atag.find('>')-1]
def _parse_user(obj, api):
- user = models['user']()
- user._api = api
- for k,v in obj.items():
- if k == 'created_at':
- setattr(user, k, _parse_datetime(v))
- elif k == 'status':
- setattr(user, k, _parse_status(v, api))
- elif k == 'following':
- # twitter sets this to null if it is false
- if v is True:
- setattr(user, k, True)
- else:
- setattr(user, k, False)
- else:
- setattr(user, k, v)
- return user
+ user = models['user']()
+ user._api = api
+ for k, v in obj.items():
+ if k == 'created_at':
+ setattr(user, k, _parse_datetime(v))
+ elif k == 'status':
+ setattr(user, k, _parse_status(v, api))
+ elif k == 'following':
+ # twitter sets this to null if it is false
+ if v is True:
+ setattr(user, k, True)
+ else:
+ setattr(user, k, False)
+ else:
+ setattr(user, k, v)
+ return user
def parse_user(data, api):
- return _parse_user(json.loads(data), api)
+ return _parse_user(json.loads(data), api)
def parse_users(data, api):
- users = []
- for obj in json.loads(data):
- users.append(_parse_user(obj, api))
- return users
+ users = []
+ for obj in json.loads(data):
+ users.append(_parse_user(obj, api))
+ return users
def _parse_status(obj, api):
- status = models['status']()
- status._api = api
- for k,v in obj.items():
- if k == 'user':
- setattr(status, 'author', _parse_user(v, api))
- elif k == 'created_at':
- setattr(status, k, _parse_datetime(v))
- elif k == 'source':
- setattr(status, k, _parse_html_value(v))
- setattr(status, 'source_url', _parse_a_href(v))
- else:
- setattr(status, k, v)
- return status
+ status = models['status']()
+ status._api = api
+ for k, v in obj.items():
+ if k == 'user':
+ setattr(status, 'author', _parse_user(v, api))
+ elif k == 'created_at':
+ setattr(status, k, _parse_datetime(v))
+ elif k == 'source':
+ setattr(status, k, _parse_html_value(v))
+ setattr(status, 'source_url', _parse_a_href(v))
+ else:
+ setattr(status, k, v)
+ return status
def parse_status(data, api):
- return _parse_status(json.loads(data), api)
+ return _parse_status(json.loads(data), api)
def parse_statuses(data, api):
- statuses = []
- for obj in json.loads(data):
- statuses.append(_parse_status(obj, api))
- return statuses
+ statuses = []
+ for obj in json.loads(data):
+ statuses.append(_parse_status(obj, api))
+ return statuses
def _parse_dm(obj, api):
- dm = models['direct_message']()
- dm._api = api
- for k,v in obj.items():
- if k == 'sender' or k == 'recipient':
- setattr(dm, k, _parse_user(v, api))
- elif k == 'created_at':
- setattr(dm, k, _parse_datetime(v))
- else:
- setattr(dm, k, v)
- return dm
+ dm = models['direct_message']()
+ dm._api = api
+ for k, v in obj.items():
+ if k == 'sender' or k == 'recipient':
+ setattr(dm, k, _parse_user(v, api))
+ elif k == 'created_at':
+ setattr(dm, k, _parse_datetime(v))
+ else:
+ setattr(dm, k, v)
+ return dm
def parse_dm(data, api):
- return _parse_dm(json.loads(data), api)
+ return _parse_dm(json.loads(data), api)
def parse_directmessages(data, api):
- directmessages = []
- for obj in json.loads(data):
- directmessages.append(_parse_dm(obj, api))
- return directmessages
+ directmessages = []
+ for obj in json.loads(data):
+ directmessages.append(_parse_dm(obj, api))
+ return directmessages
def parse_friendship(data, api):
- relationship = json.loads(data)['relationship']
+ relationship = json.loads(data)['relationship']
- # parse source
- source = models['friendship']()
- for k,v in relationship['source'].items():
- setattr(source, k, v)
+ # parse source
+ source = models['friendship']()
+ for k, v in relationship['source'].items():
+ setattr(source, k, v)
- # parse target
- target = models['friendship']()
- for k,v in relationship['target'].items():
- setattr(target, k, v)
+ # parse target
+ target = models['friendship']()
+ for k, v in relationship['target'].items():
+ setattr(target, k, v)
+ return source, target
- return source, target
def _parse_saved_search(obj, api):
- ss = models['saved_search']()
- ss._api = api
- for k,v in obj.items():
- if k == 'created_at':
- setattr(ss, k, _parse_datetime(v))
- else:
- setattr(ss, k, v)
- return ss
+ ss = models['saved_search']()
+ ss._api = api
+ for k, v in obj.items():
+ if k == 'created_at':
+ setattr(ss, k, _parse_datetime(v))
+ else:
+ setattr(ss, k, v)
+ return ss
def parse_saved_search(data, api):
- return _parse_saved_search(json.loads(data), api)
+ return _parse_saved_search(json.loads(data), api)
def parse_saved_searches(data, api):
- saved_searches = []
- saved_search = models['saved_search']()
- for obj in json.loads(data):
- saved_searches.append(_parse_saved_search(obj, api))
- return saved_searches
+ saved_searches = []
+ saved_search = models['saved_search']()
+ for obj in json.loads(data):
+ saved_searches.append(_parse_saved_search(obj, api))
+ return saved_searches
def _parse_search_result(obj, api):
- result = models['search_result']()
- for k,v in obj.items():
- if k == 'created_at':
- setattr(result, k, _parse_search_datetime(v))
- else:
- setattr(result, k, v)
- return result
+ result = models['search_result']()
+ for k, v in obj.items():
+ if k == 'created_at':
+ setattr(result, k, _parse_search_datetime(v))
+ else:
+ setattr(result, k, v)
+ return result
def parse_search_results(data, api):
- results = json.loads(data)['results']
- result_objects = []
- for obj in results:
- result_objects.append(_parse_search_result(obj, api))
- return result_objects
+ results = json.loads(data)['results']
+ result_objects = []
+ for obj in results:
+ result_objects.append(_parse_search_result(obj, api))
+ return result_objects
def parse_trend_results(data, api):
- return json.loads(data)['trends']
+ return json.loads(data)['trends']
from . error import TweepError
- import json
+ import json
except ImportError:
- import simplejson as json
+ import simplejson as json
class StreamListener(object):
- def on_status(self, status):
- """Called when a new status arrives"""
- return
+ def on_status(self, status):
+ """Called when a new status arrives"""
+ return
+ def on_delete(self, status_id, user_id):
+ """Called when a delete notice arrives for a status"""
+ return
- def on_delete(self, status_id, user_id):
- """Called when a delete notice arrives for a status"""
- return
+ def on_limit(self, track):
+ """Called when a limitation notice arrvies"""
+ return
- def on_limit(self, track):
- """Called when a limitation notice arrvies"""
- return
+ def on_error(self, status_code):
+ """Called when a non-200 status code is returned"""
+ return False
- def on_error(self, status_code):
- """Called when a non-200 status code is returned"""
- return False
+ def on_timeout(self):
+ """Called when stream connection times out"""
+ return
- def on_timeout(self):
- """Called when stream connection times out"""
- return
class Stream(object):
- host = ''
- def __init__(self, username, password, listener, timeout=5.0, retry_count = None,
- retry_time = 10.0, snooze_time = 5.0, buffer_size=1500):
- self.auth = BasicAuthHandler(username, password)
- self.running = False
- self.timeout = timeout
- self.retry_count = retry_count
- self.retry_time = retry_time
- self.snooze_time = snooze_time
- self.buffer_size = buffer_size
- self.listener = listener
- self.api = API()
- def _run(self):
- # setup
- headers = {}
- self.auth.apply_auth(None, None, headers, None)
- # enter loop
- error_counter = 0
- conn = None
- while self.running:
- if self.retry_count and error_counter > self.retry_count:
- # quit if error count greater than retry count
- break
- try:
- conn = httplib.HTTPConnection(
- conn.connect()
- conn.sock.settimeout(self.timeout)
- conn.request('POST', self.url, headers=headers)
- resp = conn.getresponse()
- if resp.status != 200:
- if self.listener.on_error(resp.status) is False:
- break
- error_counter += 1
- sleep(self.retry_time)
- else:
- error_counter = 0
- self._read_loop(resp)
- except timeout:
- if self.listener.on_timeout() == False:
- break
+ host = ''
+ def __init__(self, username, password, listener, timeout=5.0, retry_count = None,
+ retry_time = 10.0, snooze_time = 5.0, buffer_size=1500):
+ self.auth = BasicAuthHandler(username, password)
+ self.running = False
+ self.timeout = timeout
+ self.retry_count = retry_count
+ self.retry_time = retry_time
+ self.snooze_time = snooze_time
+ self.buffer_size = buffer_size
+ self.listener = listener
+ self.api = API()
+ def _run(self):
+ # setup
+ headers = {}
+ self.auth.apply_auth(None, None, headers, None)
+ # enter loop
+ error_counter = 0
+ conn = None
+ while self.running:
+ if self.retry_count and error_counter > self.retry_count:
+ # quit if error count greater than retry count
+ break
+ try:
+ conn = httplib.HTTPConnection(
+ conn.connect()
+ conn.sock.settimeout(self.timeout)
+ conn.request('POST', self.url, headers=headers)
+ resp = conn.getresponse()
+ if resp.status != 200:
+ if self.listener.on_error(resp.status) is False:
+ break
+ error_counter += 1
+ sleep(self.retry_time)
+ else:
+ error_counter = 0
+ self._read_loop(resp)
+ except timeout:
+ if self.listener.on_timeout() == False:
+ break
+ if self.running is False:
+ break
+ conn.close()
+ sleep(self.snooze_time)
+ except Exception:
+ # any other exception is fatal, so kill loop
+ break
+ # cleanup
+ self.running = False
+ if conn:
+ conn.close()
+ def _read_loop(self, resp):
+ data = ''
+ while self.running:
+ if resp.isclosed():
+ break
+ # read length
+ length = ''
+ while True:
+ c =
+ if c == '\n':
+ break
+ length += c
+ length = length.strip()
+ if length.isdigit():
+ length = int(length)
+ else:
+ continue
+ # read data
+ data =
+ # turn json data into status object
+ if 'in_reply_to_status_id' in data:
+ status = parse_status(data, self.api)
+ if self.listener.on_status(status) == False:
+ self.running = False
+ elif 'delete' in data:
+ delete = json.loads(data)['delete']['status']
+ if self.listener.on_delete(delete['id'], delete['user_id']) == False:
+ self.running = False
+ elif 'limit' in data:
+ if self.listener.on_limit(json.loads(data)['limit']['track']) == False:
+ self.running = False
+ def firehose(self, count=None):
+ if self.running:
+ raise TweepError('Stream object already connected!')
+ self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION
+ if count:
+ self.url += '&count=%s' % count
+ self.running = True
+ Thread(target=self._run).start()
+ def sample(self, count=None):
+ if self.running:
+ raise TweepError('Stream object already connected!')
+ self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION
+ if count:
+ self.url += '&count=%s' % count
+ self.running = True
+ Thread(target=self._run).start()
+ def filter(self, follow=None, track=None):
+ if self.running:
+ raise TweepError('Stream object already connected!')
+ self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION
+ if follow:
+ self.url += '&follow=%s' % ','.join(follow)
+ if track:
+ self.url += '&track=%s' % ','.join(track)
+ print self.url
+ self.running = True
+ Thread(target=self._run).start()
+ def disconnect(self):
if self.running is False:
- break
- conn.close()
- sleep(self.snooze_time)
- except Exception:
- # any other exception is fatal, so kill loop
- break
- # cleanup
- self.running = False
- if conn:
- conn.close()
- def _read_loop(self, resp):
- data = ''
- while self.running:
- if resp.isclosed():
- break
- # read length
- length = ''
- while True:
- c =
- if c == '\n':
- break
- length += c
- length = length.strip()
- if length.isdigit():
- length = int(length)
- else:
- continue
- # read data
- data =
- # turn json data into status object
- if 'in_reply_to_status_id' in data:
- status = parse_status(data, self.api)
- if self.listener.on_status(status) == False:
- self.running = False
- elif 'delete' in data:
- delete = json.loads(data)['delete']['status']
- if self.listener.on_delete(delete['id'], delete['user_id']) == False:
- self.running = False
- elif 'limit' in data:
- if self.listener.on_limit(json.loads(data)['limit']['track']) == False:
- self.running = False
- def firehose(self, count=None, ):
- if self.running:
- raise TweepError('Stream object already connected!')
- self.url = '/%i/statuses/firehose.json?delimited=length' % STREAM_VERSION
- if count:
- self.url += '&count=%s' % count
- self.running = True
- Thread(target=self._run).start()
- def sample(self, count=None):
- if self.running:
- raise TweepError('Stream object already connected!')
- self.url = '/%i/statuses/sample.json?delimited=length' % STREAM_VERSION
- if count:
- self.url += '&count=%s' % count
- self.running = True
- Thread(target=self._run).start()
- def filter(self, follow=None, track=None):
- if self.running:
- raise TweepError('Stream object already connected!')
- self.url = '/%i/statuses/filter.json?delimited=length' % STREAM_VERSION
- if follow:
- self.url += '&follow=%s' % ','.join(follow)
- if track:
- self.url += '&track=%s' % ','.join(track)
- print self.url
- self.running = True
- Thread(target=self._run).start()
- def disconnect(self):
- if self.running is False:
- return
- self.running = False
+ return
+ self.running = False
if len(sys.argv) != 3:
- print 'Usage: tweepyshell <username> <password>'
- exit(1)
+ print 'Usage: tweepyshell <username> <password>'
+ exit(1)
api ='basic', username=sys.argv[1], password=sys.argv[2])
if api.verify_credentials() is False:
- print 'Invalid username and/or password!'
- exit(1)
+ print 'Invalid username and/or password!'
+ exit(1)
code.interact('<Tweepy shell>', local={'tweepy': tweepy, 'api': api})