From 89d5b44e0aee5845f816a89a9f8b3364940daea3 Mon Sep 17 00:00:00 2001 From: xray7224 Date: Thu, 18 Jul 2013 19:15:05 +0100 Subject: [PATCH] Adds test for request_tokens --- mediagoblin/federation/oauth.py | 6 ++- mediagoblin/federation/tools/request.py | 19 +++++--- mediagoblin/federation/views.py | 8 ++-- mediagoblin/tests/test_oauth1.py | 58 ++++++++++++++++++++++--- 4 files changed, 71 insertions(+), 20 deletions(-) diff --git a/mediagoblin/federation/oauth.py b/mediagoblin/federation/oauth.py index ea0fea2c..764b8535 100644 --- a/mediagoblin/federation/oauth.py +++ b/mediagoblin/federation/oauth.py @@ -26,8 +26,9 @@ class GMGRequestValidator(RequestValidator): enforce_ssl = False - def __init__(self, data=None): + def __init__(self, data=None, *args, **kwargs): self.POST = data + super(GMGRequestValidator, self).__init__(*args, **kwargs) def save_request_token(self, token, request): """ Saves request token in db """ @@ -38,7 +39,8 @@ class GMGRequestValidator(RequestValidator): secret=token["oauth_token_secret"], ) request_token.client = client_id - request_token.callback = token.get("oauth_callback", None) + if u"oauth_callback" in self.POST: + request_token.callback = self.POST[u"oauth_callback"] request_token.save() def save_verifier(self, token, verifier, request): diff --git a/mediagoblin/federation/tools/request.py b/mediagoblin/federation/tools/request.py index 4f5be277..6e484bb6 100644 --- a/mediagoblin/federation/tools/request.py +++ b/mediagoblin/federation/tools/request.py @@ -14,14 +14,19 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import re - -# Regex for parsing Authorization string -auth_header_re = re.compile('(\w+)[:=] ?"?(\w+)"?') - def decode_authorization_header(header): """ Decodes a HTTP Authorization Header to python dictionary """ - authorization = header.get("Authorization", "") - tokens = dict(auth_header_re.findall(authorization)) + authorization = header.get("Authorization", "").lstrip(" ").lstrip("OAuth") + tokens = {} + + for param in authorization.split(","): + key, value = param.split("=") + + key = key.lstrip(" ") + value = value.lstrip(" ").lstrip('"') + value = value.rstrip(" ").rstrip('"') + + tokens[key] = value + return tokens diff --git a/mediagoblin/federation/views.py b/mediagoblin/federation/views.py index 7eb9f148..633a19d4 100644 --- a/mediagoblin/federation/views.py +++ b/mediagoblin/federation/views.py @@ -198,7 +198,6 @@ def request_token(request): authorization = decode_authorization_header(data) - if authorization == dict() or u"oauth_consumer_key" not in authorization: error = "Missing required parameter." return json_response({"error": error}, status=400) @@ -206,12 +205,13 @@ def request_token(request): # check the client_id client_id = authorization[u"oauth_consumer_key"] client = Client.query.filter_by(id=client_id).first() - if client is None: + + if client == None: # client_id is invalid error = "Invalid client_id" return json_response({"error": error}, status=400) - # make request token and return to client + # make request token and return to client request_validator = GMGRequestValidator(authorization) rv = RequestTokenEndpoint(request_validator) tokens = rv.create_request_token(request, authorization) @@ -219,7 +219,7 @@ def request_token(request): # store the nonce & timestamp before we return back nonce = authorization[u"oauth_nonce"] timestamp = authorization[u"oauth_timestamp"] - timestamp = datetime.datetime.fromtimestamp(int(timestamp)) + timestamp = datetime.datetime.fromtimestamp(float(timestamp)) nc = NonceTimestamp(nonce=nonce, timestamp=timestamp) nc.save() diff --git a/mediagoblin/tests/test_oauth1.py b/mediagoblin/tests/test_oauth1.py index f3b44850..073c2884 100644 --- a/mediagoblin/tests/test_oauth1.py +++ b/mediagoblin/tests/test_oauth1.py @@ -14,17 +14,23 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import json +import cgi import pytest from urlparse import parse_qs, urlparse +from oauthlib.oauth1 import Client + from mediagoblin import mg_globals from mediagoblin.tools import template, pluginapi from mediagoblin.tests.tools import fixture_add_user class TestOAuth(object): + + MIME_FORM = "application/x-www-form-urlencoded" + MIME_JSON = "application/json" + @pytest.fixture(autouse=True) def setup(self, test_app): self.test_app = test_app @@ -54,7 +60,7 @@ class TestOAuth(object): def test_client_client_register_limited_info(self): """ Tests that a client can be registered with limited information """ response = self.register_client() - client_info = json.loads(response.body) + client_info = response.json client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() @@ -72,7 +78,7 @@ class TestOAuth(object): } response = self.register_client(**query) - client_info = json.loads(response.body) + client_info = response.json client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() @@ -89,7 +95,7 @@ class TestOAuth(object): # first we need to register a client response = self.register_client() - client_info = json.loads(response.body) + client_info = response.json client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() # Now update @@ -105,8 +111,8 @@ class TestOAuth(object): update_response = self.register_client(**update_query) assert update_response.status_int == 200 - client_info = json.loads(update_response.body) - client = self.Client.query.filter_by(id=client_info["client_id"]).first() + client_info = update_response.json + client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() assert client.secret == client_info["client_secret"] assert client.application_type == update_query["application_type"] @@ -115,8 +121,46 @@ class TestOAuth(object): assert client.logo_url == update_query["logo_url"] assert client.redirect_uri == update_query["redirect_uris"].split() - def request_token(self): + def to_authorize_headers(self, data): + headers = "" + for key, value in data.items(): + headers += '{0}="{1}",'.format(key, value) + return {"Authorization": "OAuth " + headers[:-1]} + + def test_request_token(self): """ Test a request for a request token """ response = self.register_client() + client_id = response.json["client_id"] + + endpoint = "/oauth/request_token" + request_query = { + "oauth_consumer_key": client_id, + "oauth_nonce": "abcdefghij", + "oauth_timestamp": 123456789.0, + "oauth_callback": "https://some.url/callback", + } + + headers = self.to_authorize_headers(request_query) + + headers["Content-Type"] = self.MIME_FORM + + response = self.test_app.post(endpoint, headers=headers) + response = cgi.parse_qs(response.body) + + # each element is a list, reduce it to a string + for key, value in response.items(): + response[key] = value[0] + + request_token = self.db.RequestToken.query.filter_by( + token=response["oauth_token"] + ).first() + + client = self.db.Client.query.filter_by(id=client_id).first() + + assert request_token is not None + assert request_token.secret == response["oauth_token_secret"] + assert request_token.client == client.id + assert request_token.used == False + assert request_token.callback == request_query["oauth_callback"] -- 2.25.1