Commit | Line | Data |
---|---|---|
a11aa2d1 JW |
1 | # GNU MediaGoblin -- federated, autonomous media hosting |
2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. | |
3 | # | |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
a6ec38c3 | 17 | import json |
a11aa2d1 JW |
18 | import logging |
19 | ||
2455a54f | 20 | import pytest |
a6ec38c3 JW |
21 | from urlparse import parse_qs, urlparse |
22 | ||
a11aa2d1 | 23 | from mediagoblin import mg_globals |
a11aa2d1 | 24 | from mediagoblin.tools import template, pluginapi |
5c2ece74 | 25 | from mediagoblin.tests.tools import fixture_add_user |
a11aa2d1 JW |
26 | |
27 | ||
28 | _log = logging.getLogger(__name__) | |
29 | ||
30 | ||
31 | class TestOAuth(object): | |
2455a54f CAW |
32 | @pytest.fixture(autouse=True) |
33 | def setup(self, test_app): | |
34 | self.test_app = test_app | |
35 | ||
a11aa2d1 JW |
36 | self.db = mg_globals.database |
37 | ||
38 | self.pman = pluginapi.PluginManager() | |
39 | ||
fc7b1b17 | 40 | self.user_password = u'4cc355_70k3N' |
e1561d04 | 41 | self.user = fixture_add_user(u'joauth', self.user_password, |
42 | privileges=[u'active']) | |
a11aa2d1 | 43 | |
2455a54f | 44 | self.login() |
a11aa2d1 | 45 | |
2455a54f CAW |
46 | def login(self): |
47 | self.test_app.post( | |
48 | '/auth/login/', { | |
49 | 'username': self.user.username, | |
50 | 'password': self.user_password}) | |
a11aa2d1 | 51 | |
2455a54f CAW |
52 | def register_client(self, name, client_type, description=None, |
53 | redirect_uri=''): | |
54 | return self.test_app.post( | |
1c694fbe | 55 | '/oauth-2/client/register', { |
a11aa2d1 JW |
56 | 'name': name, |
57 | 'description': description, | |
58 | 'type': client_type, | |
59 | 'redirect_uri': redirect_uri}) | |
60 | ||
61 | def get_context(self, template_name): | |
62 | return template.TEMPLATE_TEST_CONTEXT[template_name] | |
63 | ||
2455a54f | 64 | def test_1_public_client_registration_without_redirect_uri(self): |
a11aa2d1 | 65 | ''' Test 'public' OAuth client registration without any redirect uri ''' |
2455a54f CAW |
66 | response = self.register_client( |
67 | u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2') | |
a11aa2d1 JW |
68 | |
69 | ctx = self.get_context('oauth/client/register.html') | |
70 | ||
71 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 72 | self.db.OAuthClient.name == u'OMGOMGOMG').first() |
a11aa2d1 JW |
73 | |
74 | assert response.status_int == 200 | |
75 | ||
76 | # Should display an error | |
c121a7d3 | 77 | assert len(ctx['form'].redirect_uri.errors) |
a11aa2d1 JW |
78 | |
79 | # Should not pass through | |
80 | assert not client | |
81 | ||
2455a54f | 82 | def test_2_successful_public_client_registration(self): |
a11aa2d1 | 83 | ''' Successfully register a public client ''' |
c121a7d3 | 84 | uri = 'http://foo.example' |
2455a54f CAW |
85 | self.register_client( |
86 | u'OMGOMG', 'public', 'OMG!', uri) | |
a11aa2d1 JW |
87 | |
88 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 89 | self.db.OAuthClient.name == u'OMGOMG').first() |
a11aa2d1 | 90 | |
c121a7d3 JW |
91 | # redirect_uri should be set |
92 | assert client.redirect_uri == uri | |
93 | ||
a11aa2d1 JW |
94 | # Client should have been registered |
95 | assert client | |
96 | ||
2455a54f | 97 | def test_3_successful_confidential_client_reg(self): |
a11aa2d1 | 98 | ''' Register a confidential OAuth client ''' |
5c2ece74 | 99 | response = self.register_client( |
2455a54f | 100 | u'GMOGMO', 'confidential', 'NO GMO!') |
a11aa2d1 JW |
101 | |
102 | assert response.status_int == 302 | |
103 | ||
104 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 105 | self.db.OAuthClient.name == u'GMOGMO').first() |
a11aa2d1 JW |
106 | |
107 | # Client should have been registered | |
108 | assert client | |
109 | ||
110 | return client | |
111 | ||
2455a54f | 112 | def test_4_authorize_confidential_client(self): |
a11aa2d1 | 113 | ''' Authorize a confidential client as a logged in user ''' |
2455a54f | 114 | client = self.test_3_successful_confidential_client_reg() |
a11aa2d1 | 115 | |
0df00eb6 JW |
116 | client_identifier = client.identifier |
117 | ||
a11aa2d1 | 118 | redirect_uri = 'https://foo.example' |
1c694fbe | 119 | response = self.test_app.get('/oauth-2/authorize', { |
a11aa2d1 | 120 | 'client_id': client.identifier, |
c121a7d3 | 121 | 'scope': 'all', |
a11aa2d1 JW |
122 | 'redirect_uri': redirect_uri}) |
123 | ||
124 | # User-agent should NOT be redirected | |
125 | assert response.status_int == 200 | |
126 | ||
127 | ctx = self.get_context('oauth/authorize.html') | |
128 | ||
129 | form = ctx['form'] | |
130 | ||
131 | # Short for client authorization post reponse | |
2455a54f | 132 | capr = self.test_app.post( |
1c694fbe | 133 | '/oauth-2/client/authorize', { |
a11aa2d1 JW |
134 | 'client_id': form.client_id.data, |
135 | 'allow': 'Allow', | |
136 | 'next': form.next.data}) | |
137 | ||
138 | assert capr.status_int == 302 | |
139 | ||
140 | authorization_response = capr.follow() | |
141 | ||
142 | assert authorization_response.location.startswith(redirect_uri) | |
143 | ||
0df00eb6 | 144 | return authorization_response, client_identifier |
a6ec38c3 JW |
145 | |
146 | def get_code_from_redirect_uri(self, uri): | |
c121a7d3 | 147 | ''' Get the value of ?code= from an URI ''' |
a6ec38c3 JW |
148 | return parse_qs(urlparse(uri).query)['code'][0] |
149 | ||
2455a54f | 150 | def test_token_endpoint_successful_confidential_request(self): |
a6ec38c3 | 151 | ''' Successful request against token endpoint ''' |
2455a54f | 152 | code_redirect, client_id = self.test_4_authorize_confidential_client() |
a6ec38c3 JW |
153 | |
154 | code = self.get_code_from_redirect_uri(code_redirect.location) | |
155 | ||
156 | client = self.db.OAuthClient.query.filter( | |
157 | self.db.OAuthClient.identifier == unicode(client_id)).first() | |
158 | ||
1c694fbe | 159 | token_res = self.test_app.get('/oauth-2/access_token?client_id={0}&\ |
a6ec38c3 JW |
160 | code={1}&client_secret={2}'.format(client_id, code, client.secret)) |
161 | ||
162 | assert token_res.status_int == 200 | |
163 | ||
164 | token_data = json.loads(token_res.body) | |
165 | ||
166 | assert not 'error' in token_data | |
167 | assert 'access_token' in token_data | |
168 | assert 'token_type' in token_data | |
169 | assert 'expires_in' in token_data | |
170 | assert type(token_data['expires_in']) == int | |
171 | assert token_data['expires_in'] > 0 | |
172 | ||
c121a7d3 JW |
173 | # There should be a refresh token provided in the token data |
174 | assert len(token_data['refresh_token']) | |
175 | ||
176 | return client_id, token_data | |
177 | ||
2455a54f | 178 | def test_token_endpont_missing_id_confidential_request(self): |
a6ec38c3 | 179 | ''' Unsuccessful request against token endpoint, missing client_id ''' |
2455a54f | 180 | code_redirect, client_id = self.test_4_authorize_confidential_client() |
a6ec38c3 JW |
181 | |
182 | code = self.get_code_from_redirect_uri(code_redirect.location) | |
183 | ||
184 | client = self.db.OAuthClient.query.filter( | |
185 | self.db.OAuthClient.identifier == unicode(client_id)).first() | |
186 | ||
1c694fbe | 187 | token_res = self.test_app.get('/oauth-2/access_token?\ |
a6ec38c3 JW |
188 | code={0}&client_secret={1}'.format(code, client.secret)) |
189 | ||
190 | assert token_res.status_int == 200 | |
191 | ||
192 | token_data = json.loads(token_res.body) | |
193 | ||
194 | assert 'error' in token_data | |
195 | assert not 'access_token' in token_data | |
196 | assert token_data['error'] == 'invalid_request' | |
c121a7d3 JW |
197 | assert len(token_data['error_description']) |
198 | ||
2455a54f | 199 | def test_refresh_token(self): |
c121a7d3 JW |
200 | ''' Try to get a new access token using the refresh token ''' |
201 | # Get an access token and a refresh token | |
202 | client_id, token_data =\ | |
2455a54f | 203 | self.test_token_endpoint_successful_confidential_request() |
c121a7d3 JW |
204 | |
205 | client = self.db.OAuthClient.query.filter( | |
206 | self.db.OAuthClient.identifier == client_id).first() | |
207 | ||
1c694fbe | 208 | token_res = self.test_app.get('/oauth-2/access_token', |
c121a7d3 JW |
209 | {'refresh_token': token_data['refresh_token'], |
210 | 'client_id': client_id, | |
211 | 'client_secret': client.secret | |
212 | }) | |
213 | ||
214 | assert token_res.status_int == 200 | |
215 | ||
216 | new_token_data = json.loads(token_res.body) | |
217 | ||
218 | assert not 'error' in new_token_data | |
219 | assert 'access_token' in new_token_data | |
220 | assert 'token_type' in new_token_data | |
221 | assert 'expires_in' in new_token_data | |
222 | assert type(new_token_data['expires_in']) == int | |
223 | assert new_token_data['expires_in'] > 0 |