Convenience functions for callable hooks
[mediagoblin.git] / mediagoblin / tests / test_oauth.py
CommitLineData
a11aa2d1
JW
1# GNU MediaGoblin -- federated, autonomous media hosting
2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
a6ec38c3 17import json
a11aa2d1
JW
18import logging
19
a6ec38c3
JW
20from urlparse import parse_qs, urlparse
21
a11aa2d1 22from mediagoblin import mg_globals
a11aa2d1 23from mediagoblin.tools import template, pluginapi
5c2ece74 24from mediagoblin.tests.tools import fixture_add_user
a11aa2d1
JW
25
26
27_log = logging.getLogger(__name__)
28
29
30class TestOAuth(object):
5c2ece74 31 def _setup(self, test_app):
a11aa2d1
JW
32 self.db = mg_globals.database
33
34 self.pman = pluginapi.PluginManager()
35
fc7b1b17
SS
36 self.user_password = u'4cc355_70k3N'
37 self.user = fixture_add_user(u'joauth', self.user_password)
a11aa2d1 38
5c2ece74 39 self.login(test_app)
a11aa2d1 40
5c2ece74
CAW
41 def login(self, test_app):
42 test_app.post(
a11aa2d1
JW
43 '/auth/login/', {
44 'username': self.user.username,
45 'password': self.user_password})
46
5c2ece74 47 def register_client(self, test_app, name, client_type, description=None,
a11aa2d1 48 redirect_uri=''):
5c2ece74 49 return test_app.post(
a11aa2d1
JW
50 '/oauth/client/register', {
51 'name': name,
52 'description': description,
53 'type': client_type,
54 'redirect_uri': redirect_uri})
55
56 def get_context(self, template_name):
57 return template.TEMPLATE_TEST_CONTEXT[template_name]
58
5c2ece74 59 def test_1_public_client_registration_without_redirect_uri(self, test_app):
a11aa2d1 60 ''' Test 'public' OAuth client registration without any redirect uri '''
5c2ece74
CAW
61 self._setup(test_app)
62
63 response = self.register_client(test_app, u'OMGOMGOMG', 'public',
a11aa2d1
JW
64 'OMGOMG Apache License v2')
65
66 ctx = self.get_context('oauth/client/register.html')
67
68 client = self.db.OAuthClient.query.filter(
af6a43d1 69 self.db.OAuthClient.name == u'OMGOMGOMG').first()
a11aa2d1
JW
70
71 assert response.status_int == 200
72
73 # Should display an error
74 assert ctx['form'].redirect_uri.errors
75
76 # Should not pass through
77 assert not client
78
5c2ece74 79 def test_2_successful_public_client_registration(self, test_app):
a11aa2d1 80 ''' Successfully register a public client '''
5c2ece74
CAW
81 self._setup(test_app)
82 self.register_client(test_app, u'OMGOMG', 'public', 'OMG!',
a11aa2d1
JW
83 'http://foo.example')
84
85 client = self.db.OAuthClient.query.filter(
af6a43d1 86 self.db.OAuthClient.name == u'OMGOMG').first()
a11aa2d1
JW
87
88 # Client should have been registered
89 assert client
90
5c2ece74 91 def test_3_successful_confidential_client_reg(self, test_app):
a11aa2d1 92 ''' Register a confidential OAuth client '''
5c2ece74
CAW
93 self._setup(test_app)
94
95 response = self.register_client(
96 test_app, u'GMOGMO', 'confidential', 'NO GMO!')
a11aa2d1
JW
97
98 assert response.status_int == 302
99
100 client = self.db.OAuthClient.query.filter(
af6a43d1 101 self.db.OAuthClient.name == u'GMOGMO').first()
a11aa2d1
JW
102
103 # Client should have been registered
104 assert client
105
106 return client
107
5c2ece74 108 def test_4_authorize_confidential_client(self, test_app):
a11aa2d1 109 ''' Authorize a confidential client as a logged in user '''
5c2ece74 110 self._setup(test_app)
af6a43d1 111
5c2ece74 112 client = self.test_3_successful_confidential_client_reg(test_app)
a11aa2d1 113
0df00eb6
JW
114 client_identifier = client.identifier
115
a11aa2d1 116 redirect_uri = 'https://foo.example'
5c2ece74 117 response = test_app.get('/oauth/authorize', {
a11aa2d1
JW
118 'client_id': client.identifier,
119 'scope': 'admin',
120 'redirect_uri': redirect_uri})
121
122 # User-agent should NOT be redirected
123 assert response.status_int == 200
124
125 ctx = self.get_context('oauth/authorize.html')
126
127 form = ctx['form']
128
129 # Short for client authorization post reponse
5c2ece74 130 capr = test_app.post(
a11aa2d1
JW
131 '/oauth/client/authorize', {
132 'client_id': form.client_id.data,
133 'allow': 'Allow',
134 'next': form.next.data})
135
136 assert capr.status_int == 302
137
138 authorization_response = capr.follow()
139
140 assert authorization_response.location.startswith(redirect_uri)
141
0df00eb6 142 return authorization_response, client_identifier
a6ec38c3
JW
143
144 def get_code_from_redirect_uri(self, uri):
145 return parse_qs(urlparse(uri).query)['code'][0]
146
5c2ece74 147 def test_token_endpoint_successful_confidential_request(self, test_app):
a6ec38c3 148 ''' Successful request against token endpoint '''
5c2ece74
CAW
149 self._setup(test_app)
150
151 code_redirect, client_id = self.test_4_authorize_confidential_client(
152 test_app)
a6ec38c3
JW
153
154 code = self.get_code_from_redirect_uri(code_redirect.location)
155
156 client = self.db.OAuthClient.query.filter(
157 self.db.OAuthClient.identifier == unicode(client_id)).first()
158
5c2ece74 159 token_res = test_app.get('/oauth/access_token?client_id={0}&\
a6ec38c3
JW
160code={1}&client_secret={2}'.format(client_id, code, client.secret))
161
162 assert token_res.status_int == 200
163
164 token_data = json.loads(token_res.body)
165
166 assert not 'error' in token_data
167 assert 'access_token' in token_data
168 assert 'token_type' in token_data
169 assert 'expires_in' in token_data
170 assert type(token_data['expires_in']) == int
171 assert token_data['expires_in'] > 0
172
5c2ece74 173 def test_token_endpont_missing_id_confidential_request(self, test_app):
a6ec38c3 174 ''' Unsuccessful request against token endpoint, missing client_id '''
5c2ece74
CAW
175 self._setup(test_app)
176
177 code_redirect, client_id = self.test_4_authorize_confidential_client(
178 test_app)
a6ec38c3
JW
179
180 code = self.get_code_from_redirect_uri(code_redirect.location)
181
182 client = self.db.OAuthClient.query.filter(
183 self.db.OAuthClient.identifier == unicode(client_id)).first()
184
5c2ece74 185 token_res = test_app.get('/oauth/access_token?\
a6ec38c3
JW
186code={0}&client_secret={1}'.format(code, client.secret))
187
188 assert token_res.status_int == 200
189
190 token_data = json.loads(token_res.body)
191
192 assert 'error' in token_data
193 assert not 'access_token' in token_data
194 assert token_data['error'] == 'invalid_request'
195 assert token_data['error_description'] == 'Missing client_id in request'