Commit | Line | Data |
---|---|---|
a11aa2d1 JW |
1 | # GNU MediaGoblin -- federated, autonomous media hosting |
2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. | |
3 | # | |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
a6ec38c3 | 17 | import json |
a11aa2d1 JW |
18 | import logging |
19 | ||
2455a54f | 20 | import pytest |
a6ec38c3 JW |
21 | from urlparse import parse_qs, urlparse |
22 | ||
a11aa2d1 | 23 | from mediagoblin import mg_globals |
a11aa2d1 | 24 | from mediagoblin.tools import template, pluginapi |
5c2ece74 | 25 | from mediagoblin.tests.tools import fixture_add_user |
a11aa2d1 JW |
26 | |
27 | ||
28 | _log = logging.getLogger(__name__) | |
29 | ||
30 | ||
31 | class TestOAuth(object): | |
2455a54f CAW |
32 | @pytest.fixture(autouse=True) |
33 | def setup(self, test_app): | |
34 | self.test_app = test_app | |
35 | ||
a11aa2d1 JW |
36 | self.db = mg_globals.database |
37 | ||
38 | self.pman = pluginapi.PluginManager() | |
39 | ||
fc7b1b17 SS |
40 | self.user_password = u'4cc355_70k3N' |
41 | self.user = fixture_add_user(u'joauth', self.user_password) | |
a11aa2d1 | 42 | |
2455a54f | 43 | self.login() |
a11aa2d1 | 44 | |
2455a54f CAW |
45 | def login(self): |
46 | self.test_app.post( | |
47 | '/auth/login/', { | |
48 | 'username': self.user.username, | |
49 | 'password': self.user_password}) | |
a11aa2d1 | 50 | |
2455a54f CAW |
51 | def register_client(self, name, client_type, description=None, |
52 | redirect_uri=''): | |
53 | return self.test_app.post( | |
1c694fbe | 54 | '/oauth-2/client/register', { |
a11aa2d1 JW |
55 | 'name': name, |
56 | 'description': description, | |
57 | 'type': client_type, | |
58 | 'redirect_uri': redirect_uri}) | |
59 | ||
60 | def get_context(self, template_name): | |
61 | return template.TEMPLATE_TEST_CONTEXT[template_name] | |
62 | ||
2455a54f | 63 | def test_1_public_client_registration_without_redirect_uri(self): |
a11aa2d1 | 64 | ''' Test 'public' OAuth client registration without any redirect uri ''' |
2455a54f CAW |
65 | response = self.register_client( |
66 | u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2') | |
a11aa2d1 JW |
67 | |
68 | ctx = self.get_context('oauth/client/register.html') | |
69 | ||
70 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 71 | self.db.OAuthClient.name == u'OMGOMGOMG').first() |
a11aa2d1 JW |
72 | |
73 | assert response.status_int == 200 | |
74 | ||
75 | # Should display an error | |
c121a7d3 | 76 | assert len(ctx['form'].redirect_uri.errors) |
a11aa2d1 JW |
77 | |
78 | # Should not pass through | |
79 | assert not client | |
80 | ||
2455a54f | 81 | def test_2_successful_public_client_registration(self): |
a11aa2d1 | 82 | ''' Successfully register a public client ''' |
c121a7d3 | 83 | uri = 'http://foo.example' |
2455a54f CAW |
84 | self.register_client( |
85 | u'OMGOMG', 'public', 'OMG!', uri) | |
a11aa2d1 JW |
86 | |
87 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 88 | self.db.OAuthClient.name == u'OMGOMG').first() |
a11aa2d1 | 89 | |
c121a7d3 JW |
90 | # redirect_uri should be set |
91 | assert client.redirect_uri == uri | |
92 | ||
a11aa2d1 JW |
93 | # Client should have been registered |
94 | assert client | |
95 | ||
2455a54f | 96 | def test_3_successful_confidential_client_reg(self): |
a11aa2d1 | 97 | ''' Register a confidential OAuth client ''' |
5c2ece74 | 98 | response = self.register_client( |
2455a54f | 99 | u'GMOGMO', 'confidential', 'NO GMO!') |
a11aa2d1 JW |
100 | |
101 | assert response.status_int == 302 | |
102 | ||
103 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 104 | self.db.OAuthClient.name == u'GMOGMO').first() |
a11aa2d1 JW |
105 | |
106 | # Client should have been registered | |
107 | assert client | |
108 | ||
109 | return client | |
110 | ||
2455a54f | 111 | def test_4_authorize_confidential_client(self): |
a11aa2d1 | 112 | ''' Authorize a confidential client as a logged in user ''' |
2455a54f | 113 | client = self.test_3_successful_confidential_client_reg() |
a11aa2d1 | 114 | |
0df00eb6 JW |
115 | client_identifier = client.identifier |
116 | ||
a11aa2d1 | 117 | redirect_uri = 'https://foo.example' |
1c694fbe | 118 | response = self.test_app.get('/oauth-2/authorize', { |
a11aa2d1 | 119 | 'client_id': client.identifier, |
c121a7d3 | 120 | 'scope': 'all', |
a11aa2d1 JW |
121 | 'redirect_uri': redirect_uri}) |
122 | ||
123 | # User-agent should NOT be redirected | |
124 | assert response.status_int == 200 | |
125 | ||
126 | ctx = self.get_context('oauth/authorize.html') | |
127 | ||
128 | form = ctx['form'] | |
129 | ||
130 | # Short for client authorization post reponse | |
2455a54f | 131 | capr = self.test_app.post( |
1c694fbe | 132 | '/oauth-2/client/authorize', { |
a11aa2d1 JW |
133 | 'client_id': form.client_id.data, |
134 | 'allow': 'Allow', | |
135 | 'next': form.next.data}) | |
136 | ||
137 | assert capr.status_int == 302 | |
138 | ||
139 | authorization_response = capr.follow() | |
140 | ||
141 | assert authorization_response.location.startswith(redirect_uri) | |
142 | ||
0df00eb6 | 143 | return authorization_response, client_identifier |
a6ec38c3 JW |
144 | |
145 | def get_code_from_redirect_uri(self, uri): | |
c121a7d3 | 146 | ''' Get the value of ?code= from an URI ''' |
a6ec38c3 JW |
147 | return parse_qs(urlparse(uri).query)['code'][0] |
148 | ||
2455a54f | 149 | def test_token_endpoint_successful_confidential_request(self): |
a6ec38c3 | 150 | ''' Successful request against token endpoint ''' |
2455a54f | 151 | code_redirect, client_id = self.test_4_authorize_confidential_client() |
a6ec38c3 JW |
152 | |
153 | code = self.get_code_from_redirect_uri(code_redirect.location) | |
154 | ||
155 | client = self.db.OAuthClient.query.filter( | |
156 | self.db.OAuthClient.identifier == unicode(client_id)).first() | |
157 | ||
1c694fbe | 158 | token_res = self.test_app.get('/oauth-2/access_token?client_id={0}&\ |
a6ec38c3 JW |
159 | code={1}&client_secret={2}'.format(client_id, code, client.secret)) |
160 | ||
161 | assert token_res.status_int == 200 | |
162 | ||
163 | token_data = json.loads(token_res.body) | |
164 | ||
165 | assert not 'error' in token_data | |
166 | assert 'access_token' in token_data | |
167 | assert 'token_type' in token_data | |
168 | assert 'expires_in' in token_data | |
169 | assert type(token_data['expires_in']) == int | |
170 | assert token_data['expires_in'] > 0 | |
171 | ||
c121a7d3 JW |
172 | # There should be a refresh token provided in the token data |
173 | assert len(token_data['refresh_token']) | |
174 | ||
175 | return client_id, token_data | |
176 | ||
2455a54f | 177 | def test_token_endpont_missing_id_confidential_request(self): |
a6ec38c3 | 178 | ''' Unsuccessful request against token endpoint, missing client_id ''' |
2455a54f | 179 | code_redirect, client_id = self.test_4_authorize_confidential_client() |
a6ec38c3 JW |
180 | |
181 | code = self.get_code_from_redirect_uri(code_redirect.location) | |
182 | ||
183 | client = self.db.OAuthClient.query.filter( | |
184 | self.db.OAuthClient.identifier == unicode(client_id)).first() | |
185 | ||
1c694fbe | 186 | token_res = self.test_app.get('/oauth-2/access_token?\ |
a6ec38c3 JW |
187 | code={0}&client_secret={1}'.format(code, client.secret)) |
188 | ||
189 | assert token_res.status_int == 200 | |
190 | ||
191 | token_data = json.loads(token_res.body) | |
192 | ||
193 | assert 'error' in token_data | |
194 | assert not 'access_token' in token_data | |
195 | assert token_data['error'] == 'invalid_request' | |
c121a7d3 JW |
196 | assert len(token_data['error_description']) |
197 | ||
2455a54f | 198 | def test_refresh_token(self): |
c121a7d3 JW |
199 | ''' Try to get a new access token using the refresh token ''' |
200 | # Get an access token and a refresh token | |
201 | client_id, token_data =\ | |
2455a54f | 202 | self.test_token_endpoint_successful_confidential_request() |
c121a7d3 JW |
203 | |
204 | client = self.db.OAuthClient.query.filter( | |
205 | self.db.OAuthClient.identifier == client_id).first() | |
206 | ||
1c694fbe | 207 | token_res = self.test_app.get('/oauth-2/access_token', |
c121a7d3 JW |
208 | {'refresh_token': token_data['refresh_token'], |
209 | 'client_id': client_id, | |
210 | 'client_secret': client.secret | |
211 | }) | |
212 | ||
213 | assert token_res.status_int == 200 | |
214 | ||
215 | new_token_data = json.loads(token_res.body) | |
216 | ||
217 | assert not 'error' in new_token_data | |
218 | assert 'access_token' in new_token_data | |
219 | assert 'token_type' in new_token_data | |
220 | assert 'expires_in' in new_token_data | |
221 | assert type(new_token_data['expires_in']) == int | |
222 | assert new_token_data['expires_in'] > 0 |