Adds test for request_tokens
authorxray7224 <jessica@megworld.co.uk>
Thu, 18 Jul 2013 18:15:05 +0000 (19:15 +0100)
committerxray7224 <jessica@megworld.co.uk>
Thu, 18 Jul 2013 18:15:05 +0000 (19:15 +0100)
mediagoblin/federation/oauth.py
mediagoblin/federation/tools/request.py
mediagoblin/federation/views.py
mediagoblin/tests/test_oauth1.py

index ea0fea2cb84695ed7725aa5b1752a06702bce86d..764b8535ced5dabb7cfb410d8b007d587fdec1c8 100644 (file)
@@ -26,8 +26,9 @@ class GMGRequestValidator(RequestValidator):
 
     enforce_ssl = False
 
-    def __init__(self, data=None):
+    def __init__(self, data=None, *args, **kwargs):
         self.POST = data
+        super(GMGRequestValidator, self).__init__(*args, **kwargs)
 
     def save_request_token(self, token, request):
         """ Saves request token in db """
@@ -38,7 +39,8 @@ class GMGRequestValidator(RequestValidator):
                 secret=token["oauth_token_secret"],
                 )
         request_token.client = client_id
-        request_token.callback = token.get("oauth_callback", None)
+        if u"oauth_callback" in self.POST:
+            request_token.callback = self.POST[u"oauth_callback"]
         request_token.save()
 
     def save_verifier(self, token, verifier, request):
index 4f5be2770cdea0a8a410ad6915f364be77e0856f..6e484bb674d199038d4cf7e3edc8383dbffc5cca 100644 (file)
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-import re
-
-# Regex for parsing Authorization string
-auth_header_re = re.compile('(\w+)[:=] ?"?(\w+)"?')
-
 def decode_authorization_header(header):
     """ Decodes a HTTP Authorization Header to python dictionary """
-    authorization = header.get("Authorization", "")
-    tokens = dict(auth_header_re.findall(authorization))
+    authorization = header.get("Authorization", "").lstrip(" ").lstrip("OAuth")
+    tokens = {}
+
+    for param in authorization.split(","):
+        key, value = param.split("=")
+        
+        key = key.lstrip(" ")
+        value = value.lstrip(" ").lstrip('"')
+        value = value.rstrip(" ").rstrip('"')
+
+        tokens[key] = value
+
     return tokens
 
index 7eb9f1487a61d123532b5c3db643dadd920ebf44..633a19d49ca10e12c2433711a40421a5c1d7223a 100644 (file)
@@ -198,7 +198,6 @@ def request_token(request):
 
     authorization = decode_authorization_header(data)
 
-
     if authorization == dict() or u"oauth_consumer_key" not in authorization:
         error = "Missing required parameter."
         return json_response({"error": error}, status=400)
@@ -206,12 +205,13 @@ def request_token(request):
     # check the client_id
     client_id = authorization[u"oauth_consumer_key"]
     client = Client.query.filter_by(id=client_id).first()
-    if client is None:
+
+    if client == None:
         # client_id is invalid
         error = "Invalid client_id"
         return json_response({"error": error}, status=400)
 
-    # make request token and return to client
+   # make request token and return to client
     request_validator = GMGRequestValidator(authorization)
     rv = RequestTokenEndpoint(request_validator)
     tokens = rv.create_request_token(request, authorization)
@@ -219,7 +219,7 @@ def request_token(request):
     # store the nonce & timestamp before we return back
     nonce = authorization[u"oauth_nonce"]
     timestamp = authorization[u"oauth_timestamp"]
-    timestamp = datetime.datetime.fromtimestamp(int(timestamp))
+    timestamp = datetime.datetime.fromtimestamp(float(timestamp))
 
     nc = NonceTimestamp(nonce=nonce, timestamp=timestamp)
     nc.save()
index f3b44850efa02723ee456fe298943e382fa877e8..073c288458e59d701d6c77a15eb98f5a58e8d78b 100644 (file)
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-import json
+import cgi
 
 import pytest
 from urlparse import parse_qs, urlparse
 
+from oauthlib.oauth1 import Client
+
 from mediagoblin import mg_globals
 from mediagoblin.tools import template, pluginapi
 from mediagoblin.tests.tools import fixture_add_user
 
 
 class TestOAuth(object):
+
+    MIME_FORM = "application/x-www-form-urlencoded"
+    MIME_JSON = "application/json"
+
     @pytest.fixture(autouse=True)
     def setup(self, test_app):
         self.test_app = test_app
@@ -54,7 +60,7 @@ class TestOAuth(object):
     def test_client_client_register_limited_info(self):
         """ Tests that a client can be registered with limited information """
         response = self.register_client()
-        client_info = json.loads(response.body)
+        client_info = response.json
 
         client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
         
@@ -72,7 +78,7 @@ class TestOAuth(object):
                 }
 
         response = self.register_client(**query)
-        client_info = json.loads(response.body)
+        client_info = response.json
 
         client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
         
@@ -89,7 +95,7 @@ class TestOAuth(object):
         # first we need to register a client
         response = self.register_client()
 
-        client_info = json.loads(response.body)
+        client_info = response.json
         client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
 
         # Now update
@@ -105,8 +111,8 @@ class TestOAuth(object):
         update_response = self.register_client(**update_query)
 
         assert update_response.status_int == 200
-        client_info = json.loads(update_response.body)
-        client = self.Client.query.filter_by(id=client_info["client_id"]).first()
+        client_info = update_response.json
+        client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
 
         assert client.secret == client_info["client_secret"]
         assert client.application_type == update_query["application_type"]
@@ -115,8 +121,46 @@ class TestOAuth(object):
         assert client.logo_url == update_query["logo_url"]
         assert client.redirect_uri == update_query["redirect_uris"].split()
 
-    def request_token(self):
+    def to_authorize_headers(self, data):
+        headers = ""
+        for key, value in data.items():
+            headers += '{0}="{1}",'.format(key, value)
+        return {"Authorization": "OAuth " + headers[:-1]}
+
+    def test_request_token(self):
         """ Test a request for a request token """
         response = self.register_client()
 
+        client_id = response.json["client_id"]
+
+        endpoint = "/oauth/request_token"
+        request_query = {
+                "oauth_consumer_key": client_id,
+                "oauth_nonce": "abcdefghij",
+                "oauth_timestamp": 123456789.0,
+                "oauth_callback": "https://some.url/callback",
+                }
+
+        headers = self.to_authorize_headers(request_query)
+
+        headers["Content-Type"] = self.MIME_FORM
+
+        response = self.test_app.post(endpoint, headers=headers)
+        response = cgi.parse_qs(response.body)
+
+        # each element is a list, reduce it to a string
+        for key, value in response.items():
+            response[key] = value[0]
+
+        request_token = self.db.RequestToken.query.filter_by(
+                token=response["oauth_token"]
+                ).first()
+
+        client = self.db.Client.query.filter_by(id=client_id).first()
+
+        assert request_token is not None
+        assert request_token.secret == response["oauth_token_secret"]
+        assert request_token.client == client.id
+        assert request_token.used == False
+        assert request_token.callback == request_query["oauth_callback"]