Patch by Strum. Ticket #451 - Convert all mongokit style .find, .find_one, .one calls...
[mediagoblin.git] / mediagoblin / tests / test_oauth.py
index 94ba5dab7d42156dacf269d9ea65244536355607..ea3bd798d265d0990140e1b5d5edfe6ec8ecaf0e 100644 (file)
 import json
 import logging
 
+import pytest
 from urlparse import parse_qs, urlparse
 
 from mediagoblin import mg_globals
 from mediagoblin.tools import template, pluginapi
-from mediagoblin.tests.tools import get_app, fixture_add_user
+from mediagoblin.tests.tools import fixture_add_user
 
 
 _log = logging.getLogger(__name__)
 
 
 class TestOAuth(object):
-    def setUp(self):
-        self.app = get_app()
+    @pytest.fixture(autouse=True)
+    def setup(self, test_app):
+        self.test_app = test_app
+
         self.db = mg_globals.database
 
         self.pman = pluginapi.PluginManager()
@@ -40,14 +43,14 @@ class TestOAuth(object):
         self.login()
 
     def login(self):
-        self.app.post(
-                '/auth/login/', {
-                    'username': self.user.username,
-                    'password': self.user_password})
+        self.test_app.post(
+            '/auth/login/', {
+                'username': self.user.username,
+                'password': self.user_password})
 
     def register_client(self, name, client_type, description=None,
-            redirect_uri=''):
-        return self.app.post(
+                        redirect_uri=''):
+        return self.test_app.post(
                 '/oauth/client/register', {
                     'name': name,
                     'description': description,
@@ -59,8 +62,8 @@ class TestOAuth(object):
 
     def test_1_public_client_registration_without_redirect_uri(self):
         ''' Test 'public' OAuth client registration without any redirect uri '''
-        response = self.register_client(u'OMGOMGOMG', 'public',
-                'OMGOMG Apache License v2')
+        response = self.register_client(
+            u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2')
 
         ctx = self.get_context('oauth/client/register.html')
 
@@ -70,26 +73,30 @@ class TestOAuth(object):
         assert response.status_int == 200
 
         # Should display an error
-        assert ctx['form'].redirect_uri.errors
+        assert len(ctx['form'].redirect_uri.errors)
 
         # Should not pass through
         assert not client
 
     def test_2_successful_public_client_registration(self):
         ''' Successfully register a public client '''
-        self.login()
-        self.register_client(u'OMGOMG', 'public', 'OMG!',
-                'http://foo.example')
+        uri = 'http://foo.example'
+        self.register_client(
+            u'OMGOMG', 'public', 'OMG!', uri)
 
         client = self.db.OAuthClient.query.filter(
                 self.db.OAuthClient.name == u'OMGOMG').first()
 
+        # redirect_uri should be set
+        assert client.redirect_uri == uri
+
         # Client should have been registered
         assert client
 
     def test_3_successful_confidential_client_reg(self):
         ''' Register a confidential OAuth client '''
-        response = self.register_client(u'GMOGMO', 'confidential', 'NO GMO!')
+        response = self.register_client(
+            u'GMOGMO', 'confidential', 'NO GMO!')
 
         assert response.status_int == 302
 
@@ -103,15 +110,14 @@ class TestOAuth(object):
 
     def test_4_authorize_confidential_client(self):
         ''' Authorize a confidential client as a logged in user '''
-
         client = self.test_3_successful_confidential_client_reg()
 
         client_identifier = client.identifier
 
         redirect_uri = 'https://foo.example'
-        response = self.app.get('/oauth/authorize', {
+        response = self.test_app.get('/oauth/authorize', {
                 'client_id': client.identifier,
-                'scope': 'admin',
+                'scope': 'all',
                 'redirect_uri': redirect_uri})
 
         # User-agent should NOT be redirected
@@ -122,7 +128,7 @@ class TestOAuth(object):
         form = ctx['form']
 
         # Short for client authorization post reponse
-        capr = self.app.post(
+        capr = self.test_app.post(
                 '/oauth/client/authorize', {
                     'client_id': form.client_id.data,
                     'allow': 'Allow',
@@ -137,6 +143,7 @@ class TestOAuth(object):
         return authorization_response, client_identifier
 
     def get_code_from_redirect_uri(self, uri):
+        ''' Get the value of ?code= from an URI '''
         return parse_qs(urlparse(uri).query)['code'][0]
 
     def test_token_endpoint_successful_confidential_request(self):
@@ -148,7 +155,7 @@ class TestOAuth(object):
         client = self.db.OAuthClient.query.filter(
                 self.db.OAuthClient.identifier == unicode(client_id)).first()
 
-        token_res = self.app.get('/oauth/access_token?client_id={0}&\
+        token_res = self.test_app.get('/oauth/access_token?client_id={0}&\
 code={1}&client_secret={2}'.format(client_id, code, client.secret))
 
         assert token_res.status_int == 200
@@ -162,6 +169,11 @@ code={1}&client_secret={2}'.format(client_id, code, client.secret))
         assert type(token_data['expires_in']) == int
         assert token_data['expires_in'] > 0
 
+        # There should be a refresh token provided in the token data
+        assert len(token_data['refresh_token'])
+
+        return client_id, token_data
+
     def test_token_endpont_missing_id_confidential_request(self):
         ''' Unsuccessful request against token endpoint, missing client_id '''
         code_redirect, client_id = self.test_4_authorize_confidential_client()
@@ -171,7 +183,7 @@ code={1}&client_secret={2}'.format(client_id, code, client.secret))
         client = self.db.OAuthClient.query.filter(
                 self.db.OAuthClient.identifier == unicode(client_id)).first()
 
-        token_res = self.app.get('/oauth/access_token?\
+        token_res = self.test_app.get('/oauth/access_token?\
 code={0}&client_secret={1}'.format(code, client.secret))
 
         assert token_res.status_int == 200
@@ -181,4 +193,30 @@ code={0}&client_secret={1}'.format(code, client.secret))
         assert 'error' in token_data
         assert not 'access_token' in token_data
         assert token_data['error'] == 'invalid_request'
-        assert token_data['error_description'] == 'Missing client_id in request'
+        assert len(token_data['error_description'])
+
+    def test_refresh_token(self):
+        ''' Try to get a new access token using the refresh token '''
+        # Get an access token and a refresh token
+        client_id, token_data =\
+            self.test_token_endpoint_successful_confidential_request()
+
+        client = self.db.OAuthClient.query.filter(
+            self.db.OAuthClient.identifier == client_id).first()
+
+        token_res = self.test_app.get('/oauth/access_token',
+                     {'refresh_token': token_data['refresh_token'],
+                      'client_id': client_id,
+                      'client_secret': client.secret
+                      })
+
+        assert token_res.status_int == 200
+
+        new_token_data = json.loads(token_res.body)
+
+        assert not 'error' in new_token_data
+        assert 'access_token' in new_token_data
+        assert 'token_type' in new_token_data
+        assert 'expires_in' in new_token_data
+        assert type(new_token_data['expires_in']) == int
+        assert new_token_data['expires_in'] > 0