"""Generate pseudorandom number."""
return ''.join([str(random.randint(0, 9)) for i in range(length)])
+def generate_verifier(length=8):
+ """Generate pseudorandom number."""
+ return ''.join([str(random.randint(0, 9)) for i in range(length)])
+
class OAuthConsumer(object):
"""Consumer of OAuth authentication.
self.key = key
self.secret = secret
-
+
class OAuthToken(object):
"""OAuthToken is a data type that represents an End User via either an access
or request token.
"""
key = None
secret = None
+ callback = None
+ callback_confirmed = None
+ verifier = None
def __init__(self, key, secret):
self.key = key
self.secret = secret
+ def set_callback(self, callback):
+ self.callback = callback
+ self.callback_confirmed = 'true'
+
+ def set_verifier(self, verifier=None):
+ if verifier is not None:
+ self.verifier = verifier
+ else:
+ self.verifier = generate_verifier()
+
+ def get_callback_url(self):
+ if self.callback and self.verifier:
+ # Append the oauth_verifier.
+ parts = urlparse.urlparse(self.callback)
+ scheme, netloc, path, params, query, fragment = parts[:6]
+ if query:
+ query = '%s&oauth_verifier=%s' % (query, self.verifier)
+ else:
+ query = 'oauth_verifier=%s' % self.verifier
+ return urlparse.urlunparse((scheme, netloc, path, params,
+ query, fragment))
+ return self.callback
+
def to_string(self):
- return urllib.urlencode({'oauth_token': self.key,
- 'oauth_token_secret': self.secret})
+ data = {
+ 'oauth_token': self.key,
+ 'oauth_token_secret': self.secret,
+ }
+ if self.callback_confirmed is not None:
+ data['oauth_callback_confirmed'] = self.callback_confirmed
+ return urllib.urlencode(data)
def from_string(s):
""" Returns a token from something like:
params = cgi.parse_qs(s, keep_blank_values=False)
key = params['oauth_token'][0]
secret = params['oauth_token_secret'][0]
- return OAuthToken(key, secret)
+ token = OAuthToken(key, secret)
+ try:
+ token.callback_confirmed = params['oauth_callback_confirmed'][0]
+ except KeyError:
+ pass # 1.0, no callback confirmed.
+ return token
from_string = staticmethod(from_string)
def __str__(self):
- oauth_timestamp
- oauth_nonce
- oauth_version
+ - oauth_verifier
... any additional parameters, as defined by the Service Provider.
"""
parameters = None # OAuth parameters.
from_request = staticmethod(from_request)
def from_consumer_and_token(oauth_consumer, token=None,
- http_method=HTTP_METHOD, http_url=None, parameters=None):
+ callback=None, verifier=None, http_method=HTTP_METHOD,
+ http_url=None, parameters=None):
if not parameters:
parameters = {}
if token:
parameters['oauth_token'] = token.key
+ parameters['oauth_callback'] = token.callback
+ # 1.0a support for verifier.
+ parameters['oauth_verifier'] = verifier
+ elif callback:
+ # 1.0a support for callback in the request token request.
+ parameters['oauth_callback'] = callback
return OAuthRequest(http_method, http_url, parameters)
from_consumer_and_token = staticmethod(from_consumer_and_token)
# No token required for the initial token request.
version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
+ try:
+ callback = self.get_callback(oauth_request)
+ except OAuthError:
+ callback = None # 1.0, no callback specified.
self._check_signature(oauth_request, consumer, None)
# Fetch a new token.
- token = self.data_store.fetch_request_token(consumer)
+ token = self.data_store.fetch_request_token(consumer, callback)
return token
def fetch_access_token(self, oauth_request):
"""
version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
+ verifier = self._get_verifier(oauth_request)
# Get the request token.
token = self._get_token(oauth_request, 'request')
self._check_signature(oauth_request, consumer, token)
- new_token = self.data_store.fetch_access_token(consumer, token)
+ new_token = self.data_store.fetch_access_token(consumer, token, verifier)
return new_token
def verify_request(self, oauth_request):
if not token:
raise OAuthError('Invalid %s token: %s' % (token_type, token_field))
return token
+
+ def _get_verifier(self, oauth_request):
+ return oauth_request.get_parameter('oauth_verifier')
def _check_signature(self, oauth_request, consumer, token):
timestamp, nonce = oauth_request._get_timestamp_nonce()
"""-> OAuthToken."""
raise NotImplementedError
- def fetch_request_token(self, oauth_consumer):
+ def fetch_request_token(self, oauth_consumer, oauth_callback):
"""-> OAuthToken."""
raise NotImplementedError
- def fetch_access_token(self, oauth_consumer, oauth_token):
+ def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier):
"""-> OAuthToken."""
raise NotImplementedError