+ Removed 'email' parameter from API.update_profile. No longer supported.
+ API.auth_handler -> API.auth
+ Moved memcache implementation to tweepy-more repository.
++ Tweepy now uses the versioned API and the new api.twitter.com subdomain
1.1 -> 1.2 [Current]
=====================
self.api.followers_ids(username)
def testverifycredentials(self):
- self.assertEqual(self.api.verify_credentials(), True)
+ self.assertNotEqual(self.api.verify_credentials(), False)
- api = API.new('basic', 'bad', 'password')
+ api = API(BasicAuthHandler('bad', 'password'))
self.assertEqual(api.verify_credentials(), False)
def testratelimitstatus(self):
self.assertEqual(updated.profile_sidebar_fill_color, '000')
self.assertEqual(updated.profile_sidebar_border_color, '000')
+ """
def testupateprofileimage(self):
self.api.update_profile_image('examples/profile.png')
def testupdateprofilebg(self):
self.api.update_profile_background_image('examples/bg.png')
+ """
def testupdateprofile(self):
original = self.api.me()
profile = {
'name': 'Tweepy test 123',
- 'email': 'test@example.com',
'url': 'http://www.example.com',
'location': 'pytopia',
'description': 'just testing things out'
}
updated = self.api.update_profile(**profile)
self.api.update_profile(
- name = original.name, email = 'hi@example.com', url = original.url,
+ name = original.name, url = original.url,
location = original.location, description = original.description
)
# test getting access token
auth_url = auth.get_authorization_url()
- self.assert_(auth_url.startswith('http://twitter.com/oauth/authorize?'))
+ self.assert_(auth_url.startswith('http://api.twitter.com/oauth/authorize?'))
print 'Please authorize: ' + auth_url
verifier = raw_input('PIN: ').strip()
self.assert_(len(verifier) > 0)
class API(object):
"""Twitter API"""
- def __init__(self, auth_handler=None, host='twitter.com', cache=None,
- secure=False, api_root='',
+ def __init__(self, auth_handler=None,
+ host='api.twitter.com', search_host='search.twitter.com',
+ cache=None, secure=False, api_root='/1', search_root='',
retry_count=0, retry_delay=0, retry_errors=None):
# you may access these freely
self.auth = auth_handler
self.host = host
+ self.search_host = search_host
self.api_root = api_root
+ self.search_root = search_root
self.cache = cache
self.secure = secure
self.retry_count = retry_count
return False
""" search """
-
- def search(self, *args, **kargs):
- return bind_api(
- host = 'search.' + self.host,
- path = '/search.json',
- parser = parse_search_results,
- allowed_param = ['q', 'lang', 'locale', 'rpp', 'page', 'since_id', 'geocode', 'show_user'],
- )(self, *args, **kargs)
- search.pagination_mode = 'page'
+ search = bind_api(
+ search_api = True,
+ path = '/search.json',
+ parser = parse_search_results,
+ allowed_param = ['q', 'lang', 'locale', 'rpp', 'page', 'since_id', 'geocode', 'show_user']
+ )
""" trends """
- def trends(self):
- return bind_api(
- host = 'search.' + self.host,
- path = '/trends.json',
- parser = parse_json
- )(self)
+ trends = bind_api(
+ search_api = True,
+ path = '/trends.json',
+ parser = parse_json
+ )
""" trends/current """
- def trends_current(self, *args, **kargs):
- return bind_api(
- host = 'search.' + self.host,
- path = '/trends/current.json',
- parser = parse_json,
- allowed_param = ['exclude']
- )(self, *args, **kargs)
+ trends_current = bind_api(
+ search_api = True,
+ path = '/trends/current.json',
+ parser = parse_json,
+ allowed_param = ['exclude']
+ )
""" trends/daily """
- def trends_daily(self, *args, **kargs):
- return bind_api(
- host = "search." + self.host,
- path = '/trends/daily.json',
- parser = parse_json,
- allowed_param = ['date', 'exclude']
- )(self, *args, **kargs)
+ trends_daily = bind_api(
+ search_api = True,
+ path = '/trends/daily.json',
+ parser = parse_json,
+ allowed_param = ['date', 'exclude']
+ )
""" trends/weekly """
- def trends_weekly(self, *args, **kargs):
- return bind_api(
- host = "search." + self.host,
- path = '/trends/weekly.json',
- parser = parse_json,
- allowed_param = ['date', 'exclude']
- )(self, *args, **kargs)
+ trends_weekly = bind_api(
+ search_api = True,
+ path = '/trends/weekly.json',
+ parser = parse_json,
+ allowed_param = ['date', 'exclude']
+ )
""" Internal use only """
-
@staticmethod
def _pack_image(filename, max_size):
"""Pack image from file into multipart-formdata post body"""
class OAuthHandler(AuthHandler):
"""OAuth authentication handler"""
- REQUEST_TOKEN_URL = 'http://twitter.com/oauth/request_token'
- AUTHORIZATION_URL = 'http://twitter.com/oauth/authorize'
- AUTHENTICATE_URL = 'http://twitter.com/oauth/authenticate'
- ACCESS_TOKEN_URL = 'http://twitter.com/oauth/access_token'
+ REQUEST_TOKEN_URL = 'http://api.twitter.com/oauth/request_token'
+ AUTHORIZATION_URL = 'http://api.twitter.com/oauth/authorize'
+ AUTHENTICATE_URL = 'http://api.twitter.com/oauth/authenticate'
+ ACCESS_TOKEN_URL = 'http://api.twitter.com/oauth/access_token'
def __init__(self, consumer_key, consumer_secret, callback=None):
self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret)
def bind_api(path, parser, allowed_param=[], method='GET', require_auth=False,
- timeout=None, host=None):
+ timeout=None, search_api = False):
def _call(api, *args, **kargs):
# If require auth, throw exception if credentials not provided
parameters = None
# Build url with parameters
+ api_root = api.api_root if search_api is False else api.search_root
if parameters:
- url = '%s?%s' % (api.api_root + path, urllib.urlencode(parameters))
+ url = '%s?%s' % (api_root + path, urllib.urlencode(parameters))
else:
- url = api.api_root + path
+ url = api_root + path
# Check cache if caching enabled and method is GET
if api.cache and method == 'GET':
scheme = 'https://'
else:
scheme = 'http://'
- _host = host or api.host
+ host = api.host if search_api is False else api.search_host
# Continue attempting request until successful
# or maximum number of retries is reached.
# Open connection
# FIXME: add timeout
if api.secure:
- conn = httplib.HTTPSConnection(_host)
+ conn = httplib.HTTPSConnection(host)
else:
- conn = httplib.HTTPConnection(_host)
+ conn = httplib.HTTPConnection(host)
# Apply authentication
if api.auth:
api.auth.apply_auth(
- scheme + _host + url,
+ scheme + host + url,
method, headers, parameters
)