-from urllib2 import Request, urlopen
-import urllib
-import base64
-import json
import logging
from tweepy.error import TweepError
from requests.auth import AuthBase
from urlparse import parse_qs
+WARNING_MESSAGE = """Warning! Due to a Twitter API bug, signin_with_twitter
+and access_type don't always play nice together. Details
+"https://dev.twitter.com/discussions/21281"""
+
+
class AuthHandler(object):
def apply_auth(self, url, method, headers, parameters):
self.access_token_secret = None
self.callback = callback
self.username = None
- self.oauth = OAuth1Session(consumer_key, client_secret=consumer_secret, callback_uri=self.callback)
+ self.oauth = OAuth1Session(consumer_key,
+ client_secret=consumer_secret,
+ callback_uri=self.callback)
def _get_oauth_url(self, endpoint):
return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
def apply_auth(self):
- return OAuth1(self.consumer_key, client_secret=self.consumer_secret, resource_owner_key=self.access_token, resource_owner_secret=self.access_token_secret)
+ return OAuth1(self.consumer_key,
+ client_secret=self.consumer_secret,
+ resource_owner_key=self.access_token,
+ resource_owner_secret=self.access_token_secret)
- def _get_request_token(self, access_type = None):
+ def _get_request_token(self, access_type=None):
try:
url = self._get_oauth_url('request_token')
if access_type:
self.access_token = key
self.access_token_secret = secret
- def get_authorization_url(self, signin_with_twitter = False, access_type = None):
+ def get_authorization_url(self,
+ signin_with_twitter=False,
+ access_type=None):
"""Get the authorization URL to redirect the user"""
try:
if signin_with_twitter:
url = self._get_oauth_url('authenticate')
if access_type:
- logging.warning(
- "Warning! Due to a Twitter API bug, signin_with_twitter "
- "and access_type don't always play nice together. Details: "
- "https://dev.twitter.com/discussions/21281")
+ logging.warning(WARNING_MESSAGE)
else:
url = self._get_oauth_url('authorize')
self.request_token = self._get_request_token(access_type=access_type)
except Exception as e:
raise TweepError(e)
- def get_access_token(self, verifier = None):
+ def get_access_token(self, verifier=None):
"""
After user has authorized the request token, get access token
with user supplied verifier.
"""
try:
url = self._get_oauth_url('access_token')
- self.oauth = OAuth1Session(self.consumer_key, client_secret=self.consumer_secret, resource_owner_key=self.request_token['oauth_token'], resource_owner_secret=self.request_token['oauth_token_secret'], verifier=verifier, callback_uri=self.callback)
+ self.oauth = OAuth1Session(self.consumer_key,
+ client_secret=self.consumer_secret,
+ resource_owner_key=self.request_token['oauth_token'],
+ resource_owner_secret=self.request_token['oauth_token_secret'],
+ verifier=verifier, callback_uri=self.callback)
resp = self.oauth.fetch_access_token(url)
self.access_token = resp['oauth_token']
self.access_token_secret = resp['oauth_token_secret']
- return (self.access_token, self.access_token_secret)
+ return self.access_token, self.access_token_secret
except Exception as e:
raise TweepError(e)
"""
try:
url = self._get_oauth_url('access_token')
- oauth = OAuth1(self.consumer_key, client_secret=self.consumer_secret)
- r = requests.post(url=url, auth=oauth, headers={'x_auth_mode':
- 'client_auth', 'x_auth_username': username, 'x_auth_password':
- password})
+ oauth = OAuth1(self.consumer_key,
+ client_secret=self.consumer_secret)
+ r = requests.post(url=url,
+ auth=oauth,
+ headers={'x_auth_mode': 'client_auth',
+ 'x_auth_username': username,
+ 'x_auth_password': password})
print r.content
credentials = parse_qs(r.content)
- return (credentials.get('oauth_token')[0], credentials.get('oauth_token_secret')[0])
+ return credentials.get('oauth_token')[0], credentials.get('oauth_token_secret')[0]
except Exception as e:
raise TweepError(e)
if user:
self.username = user.screen_name
else:
- raise TweepError('Unable to get username, invalid oauth token!')
+ raise TweepError('Unable to get username,'
+ ' invalid oauth token!')
return self.username
class OAuth2Bearer(AuthBase):
def __init__(self, bearer_token):
self.bearer_token = bearer_token
+
def __call__(self, request):
request.headers['Authorization'] = 'Bearer ' + self.bearer_token
return request
self.consumer_secret = consumer_secret
self._bearer_token = ''
- resp = requests.post(self._get_oauth_url('token'), auth=(self.consumer_key, self.consumer_secret),
- data={'grant_type': 'client_credentials'})
+ resp = requests.post(self._get_oauth_url('token'),
+ auth=(self.consumer_key,
+ self.consumer_secret),
+ data={'grant_type': 'client_credentials'})
data = resp.json()
if data.get('token_type') != 'bearer':
- raise TweepError('Expected token_type to equal "bearer", but got %s \
- instead' % data.get('token_type'))
-
+ raise TweepError('Expected token_type to equal "bearer", '
+ 'but got %s instead' % data.get('token_type'))
self._bearer_token = data['access_token']
-
def _get_oauth_url(self, endpoint):
return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
-
def apply_auth(self):
return OAuth2Bearer(self._bearer_token)
raise TweepError('Authentication required!')
self.post_data = kwargs.pop('post_data', None)
- self.retry_count = kwargs.pop('retry_count', api.retry_count)
- self.retry_delay = kwargs.pop('retry_delay', api.retry_delay)
- self.retry_errors = kwargs.pop('retry_errors', api.retry_errors)
- self.wait_on_rate_limit = kwargs.pop('wait_on_rate_limit', api.wait_on_rate_limit)
- self.wait_on_rate_limit_notify = kwargs.pop('wait_on_rate_limit_notify', api.wait_on_rate_limit_notify)
+ self.retry_count = kwargs.pop('retry_count',
+ api.retry_count)
+ self.retry_delay = kwargs.pop('retry_delay',
+ api.retry_delay)
+ self.retry_errors = kwargs.pop('retry_errors',
+ api.retry_errors)
+ self.wait_on_rate_limit = kwargs.pop('wait_on_rate_limit',
+ api.wait_on_rate_limit)
+ self.wait_on_rate_limit_notify = kwargs.pop('wait_on_rate_limit_notify',
+ api.wait_on_rate_limit_notify)
self.parser = kwargs.pop('parser', api.parser)
self.session.headers = kwargs.pop('headers', {})
self.build_parameters(args, kwargs)
retries_performed = 0
while retries_performed < self.retry_count + 1:
# handle running out of api calls
- if self.wait_on_rate_limit and self._reset_time is not None and \
- self._remaining_calls is not None and self._remaining_calls < 1:
- sleep_time = self._reset_time - int(time.time())
- if sleep_time > 0:
- if self.wait_on_rate_limit_notify:
- print "Rate limit reached. Sleeping for: " + str(sleep_time)
- time.sleep(sleep_time + 5) # sleep for few extra sec
+ if self.wait_on_rate_limit:
+ if self._reset_time is not None:
+ if self._remaining_calls is not None:
+ if self._remaining_calls < 1:
+ sleep_time = self._reset_time - int(time.time())
+ if sleep_time > 0:
+ if self.wait_on_rate_limit_notify:
+ print "Rate limit reached. Sleeping for: " + str(sleep_time)
+ time.sleep(sleep_time + 5) # sleep for few extra sec
# Apply authentication
if self.api.auth:
# Execute request
try:
- resp = self.session.request(self.method, full_url,
- data=self.post_data, timeout=self.api.timeout,
- auth=auth, proxies=self.api.proxy)
+ resp = self.session.request(self.method,
+ full_url,
+ data=self.post_data,
+ timeout=self.api.timeout,
+ auth=auth,
+ proxies=self.api.proxy)
except Exception, e:
raise TweepError('Failed to send request: %s' % e)
rem_calls = resp.headers.get('x-rate-limit-remaining')
if reset_time is not None:
self._reset_time = int(reset_time)
if self.wait_on_rate_limit and self._remaining_calls == 0 and (
- resp.status == 429 or resp.status == 420): # if ran out of calls before waiting switching retry last call
+ # if ran out of calls before waiting switching retry last call
+ resp.status == 429 or resp.status == 420):
continue
retry_delay = self.retry_delay
# Exit request loop if non-retry error code
# Set pagination mode
if 'cursor' in APIMethod.allowed_param:
_call.pagination_mode = 'cursor'
- elif 'max_id' in APIMethod.allowed_param and \
- 'since_id' in APIMethod.allowed_param:
- _call.pagination_mode = 'id'
+ elif 'max_id' in APIMethod.allowed_param:
+ if 'since_id' in APIMethod.allowed_param:
+ _call.pagination_mode = 'id'
elif 'page' in APIMethod.allowed_param:
_call.pagination_mode = 'page'
# check if value is expired
if timeout is None:
timeout = self.timeout
- if timeout > 0 and (time.time() - created_time) >= timeout:
- # expired! delete from cache
- value = None
- self._delete_file(path)
+ if timeout > 0:
+ if (time.time() - created_time) >= timeout:
+ # expired! delete from cache
+ value = None
+ self._delete_file(path)
# unlock and return result
self._unlock_file(f_lock)
continue
self._delete_file(os.path.join(self.cache_dir, entry))
+
class MemCacheCache(Cache):
"""Cache interface"""
def get(self, key, timeout=None):
"""Get cached entry if exists and not expired
key: which entry to get
- timeout: override timeout with this value [optional]. DOES NOT WORK HERE
+ timeout: override timeout with this value [optional].
+ DOES NOT WORK HERE
"""
return self.client.get(key)
"""Delete all cached entries. NO-OP"""
raise NotImplementedError
+
class RedisCache(Cache):
- '''Cache running in a redis server'''
+ """Cache running in a redis server"""
- def __init__(self, client, timeout=60, keys_container = 'tweepy:keys', pre_identifier = 'tweepy:'):
+ def __init__(self, client,
+ timeout=60,
+ keys_container='tweepy:keys',
+ pre_identifier='tweepy:'):
Cache.__init__(self, timeout)
self.client = client
self.keys_container = keys_container
return timeout > 0 and (time.time() - entry[0]) >= timeout
def store(self, key, value):
- '''Store the key, value pair in our redis server'''
- # Prepend tweepy to our key, this makes it easier to identify tweepy keys in our redis server
+ """Store the key, value pair in our redis server"""
+ # Prepend tweepy to our key,
+ # this makes it easier to identify tweepy keys in our redis server
key = self.pre_identifier + key
# Get a pipe (to execute several redis commands in one step)
pipe = self.client.pipeline()
pipe.execute()
def get(self, key, timeout=None):
- '''Given a key, returns an element from the redis table'''
+ """Given a key, returns an element from the redis table"""
key = self.pre_identifier + key
# Check to see if we have this key
unpickled_entry = self.client.get(key)
return entry[1]
def count(self):
- '''Note: This is not very efficient, since it retreives all the keys from the redis
- server to know how many keys we have'''
+ """Note: This is not very efficient,
+ since it retreives all the keys from the redis
+ server to know how many keys we have"""
return len(self.client.smembers(self.keys_container))
def delete_entry(self, key):
- '''Delete an object from the redis table'''
+ """Delete an object from the redis table"""
pipe = self.client.pipeline()
pipe.srem(self.keys_container, key)
pipe.delete(key)
pipe.execute()
def cleanup(self):
- '''Cleanup all the expired keys'''
+ """Cleanup all the expired keys"""
keys = self.client.smembers(self.keys_container)
for key in keys:
entry = self.client.get(key)
self.delete_entry(key)
def flush(self):
- '''Delete all entries from the cache'''
+ """Delete all entries from the cache"""
keys = self.client.smembers(self.keys_container)
for key in keys:
self.delete_entry(key)
from tweepy.error import TweepError
from tweepy.parsers import ModelParser, RawParser
+
class Cursor(object):
"""Pagination helper class"""
i.limit = limit
return i
+
class BaseIterator(object):
def __init__(self, method, args, kargs):
def __iter__(self):
return self
+
class CursorIterator(BaseIterator):
def __init__(self, method, args, kargs):
def next(self):
if self.next_cursor == 0 or (self.limit and self.num_tweets == self.limit):
raise StopIteration
- data, cursors = self.method(
- cursor=self.next_cursor, *self.args, **self.kargs
- )
+ data, cursors = self.method(cursor=self.next_cursor,
+ *self.args,
+ **self.kargs)
self.prev_cursor, self.next_cursor = cursors
if len(data) == 0:
raise StopIteration
def prev(self):
if self.prev_cursor == 0:
raise TweepError('Can not page back more, at first page')
- data, self.next_cursor, self.prev_cursor = self.method(
- cursor=self.prev_cursor, *self.args, **self.kargs
- )
+ data, self.next_cursor, self.prev_cursor = self.method(cursor=self.prev_cursor,
+ *self.args,
+ **self.kargs)
self.num_tweets -= 1
return data
+
class IdIterator(BaseIterator):
def __init__(self, method, args, kargs):
model = ModelParser().parse(self.method(create=True), data)
self.method.__self__.parser = old_parser
- result = self.method.__self__.parser.parse(self.method(create=True), data)
-
+ result = self.method.__self__.parser.parse(self.method(create=True),
+ data)
+
if len(self.results) != 0:
self.index += 1
self.results.append(result)
self.index += 1
result = self.results[self.index]
model = self.model_results[self.index]
-
+
if len(result) == 0:
raise StopIteration
# TODO: Make this not dependant on the parser making max_id and
# since_id available
- self.max_id = model.max_id
+ self.max_id = model.max_id
self.num_tweets += 1
return result
self.num_tweets += 1
return data
+
class PageIterator(BaseIterator):
def __init__(self, method, args, kargs):
self.current_page = 0
def next(self):
- if self.limit > 0 and self.current_page > self.limit:
- raise StopIteration
+ if self.limit > 0:
+ if self.current_page > self.limit:
+ raise StopIteration
items = self.method(page=self.current_page, *self.args, **self.kargs)
if len(items) == 0:
return items
def prev(self):
- if (self.current_page == 1):
+ if self.current_page == 1:
raise TweepError('Can not page back more, at first page')
self.current_page -= 1
return self.method(page=self.current_page, *self.args, **self.kargs)
+
class ItemIterator(BaseIterator):
def __init__(self, page_iterator):
self.num_tweets = 0
def next(self):
- if self.limit > 0 and self.num_tweets == self.limit:
- raise StopIteration
+ if self.limit > 0:
+ if self.num_tweets == self.limit:
+ raise StopIteration
if self.current_page is None or self.page_index == len(self.current_page) - 1:
# Reached end of current page, get the next page...
self.current_page = self.page_iterator.next()
self.page_index -= 1
self.num_tweets -= 1
return self.current_page[self.page_index]
-
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
+
class TweepError(Exception):
"""Tweepy exception"""
def __str__(self):
return self.reason
-