enforce_ssl = False
- def __init__(self, data=None):
+ def __init__(self, data=None, *args, **kwargs):
self.POST = data
+ super(GMGRequestValidator, self).__init__(*args, **kwargs)
def save_request_token(self, token, request):
""" Saves request token in db """
secret=token["oauth_token_secret"],
)
request_token.client = client_id
- request_token.callback = token.get("oauth_callback", None)
+ if u"oauth_callback" in self.POST:
+ request_token.callback = self.POST[u"oauth_callback"]
request_token.save()
def save_verifier(self, token, verifier, request):
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
-
-# Regex for parsing Authorization string
-auth_header_re = re.compile('(\w+)[:=] ?"?(\w+)"?')
-
def decode_authorization_header(header):
""" Decodes a HTTP Authorization Header to python dictionary """
- authorization = header.get("Authorization", "")
- tokens = dict(auth_header_re.findall(authorization))
+ authorization = header.get("Authorization", "").lstrip(" ").lstrip("OAuth")
+ tokens = {}
+
+ for param in authorization.split(","):
+ key, value = param.split("=")
+
+ key = key.lstrip(" ")
+ value = value.lstrip(" ").lstrip('"')
+ value = value.rstrip(" ").rstrip('"')
+
+ tokens[key] = value
+
return tokens
authorization = decode_authorization_header(data)
-
if authorization == dict() or u"oauth_consumer_key" not in authorization:
error = "Missing required parameter."
return json_response({"error": error}, status=400)
# check the client_id
client_id = authorization[u"oauth_consumer_key"]
client = Client.query.filter_by(id=client_id).first()
- if client is None:
+
+ if client == None:
# client_id is invalid
error = "Invalid client_id"
return json_response({"error": error}, status=400)
- # make request token and return to client
+ # make request token and return to client
request_validator = GMGRequestValidator(authorization)
rv = RequestTokenEndpoint(request_validator)
tokens = rv.create_request_token(request, authorization)
# store the nonce & timestamp before we return back
nonce = authorization[u"oauth_nonce"]
timestamp = authorization[u"oauth_timestamp"]
- timestamp = datetime.datetime.fromtimestamp(int(timestamp))
+ timestamp = datetime.datetime.fromtimestamp(float(timestamp))
nc = NonceTimestamp(nonce=nonce, timestamp=timestamp)
nc.save()
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import json
+import cgi
import pytest
from urlparse import parse_qs, urlparse
+from oauthlib.oauth1 import Client
+
from mediagoblin import mg_globals
from mediagoblin.tools import template, pluginapi
from mediagoblin.tests.tools import fixture_add_user
class TestOAuth(object):
+
+ MIME_FORM = "application/x-www-form-urlencoded"
+ MIME_JSON = "application/json"
+
@pytest.fixture(autouse=True)
def setup(self, test_app):
self.test_app = test_app
def test_client_client_register_limited_info(self):
""" Tests that a client can be registered with limited information """
response = self.register_client()
- client_info = json.loads(response.body)
+ client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
}
response = self.register_client(**query)
- client_info = json.loads(response.body)
+ client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
# first we need to register a client
response = self.register_client()
- client_info = json.loads(response.body)
+ client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
# Now update
update_response = self.register_client(**update_query)
assert update_response.status_int == 200
- client_info = json.loads(update_response.body)
- client = self.Client.query.filter_by(id=client_info["client_id"]).first()
+ client_info = update_response.json
+ client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
assert client.secret == client_info["client_secret"]
assert client.application_type == update_query["application_type"]
assert client.logo_url == update_query["logo_url"]
assert client.redirect_uri == update_query["redirect_uris"].split()
- def request_token(self):
+ def to_authorize_headers(self, data):
+ headers = ""
+ for key, value in data.items():
+ headers += '{0}="{1}",'.format(key, value)
+ return {"Authorization": "OAuth " + headers[:-1]}
+
+ def test_request_token(self):
""" Test a request for a request token """
response = self.register_client()
+ client_id = response.json["client_id"]
+
+ endpoint = "/oauth/request_token"
+ request_query = {
+ "oauth_consumer_key": client_id,
+ "oauth_nonce": "abcdefghij",
+ "oauth_timestamp": 123456789.0,
+ "oauth_callback": "https://some.url/callback",
+ }
+
+ headers = self.to_authorize_headers(request_query)
+
+ headers["Content-Type"] = self.MIME_FORM
+
+ response = self.test_app.post(endpoint, headers=headers)
+ response = cgi.parse_qs(response.body)
+
+ # each element is a list, reduce it to a string
+ for key, value in response.items():
+ response[key] = value[0]
+
+ request_token = self.db.RequestToken.query.filter_by(
+ token=response["oauth_token"]
+ ).first()
+
+ client = self.db.Client.query.filter_by(id=client_id).first()
+
+ assert request_token is not None
+ assert request_token.secret == response["oauth_token_secret"]
+ assert request_token.client == client.id
+ assert request_token.used == False
+ assert request_token.callback == request_query["oauth_callback"]