Commit | Line | Data |
---|---|---|
a11aa2d1 JW |
1 | # GNU MediaGoblin -- federated, autonomous media hosting |
2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. | |
3 | # | |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
a6ec38c3 | 17 | import json |
a11aa2d1 JW |
18 | import logging |
19 | ||
a6ec38c3 JW |
20 | from urlparse import parse_qs, urlparse |
21 | ||
a11aa2d1 | 22 | from mediagoblin import mg_globals |
a11aa2d1 | 23 | from mediagoblin.tools import template, pluginapi |
5c2ece74 | 24 | from mediagoblin.tests.tools import fixture_add_user |
a11aa2d1 JW |
25 | |
26 | ||
27 | _log = logging.getLogger(__name__) | |
28 | ||
29 | ||
30 | class TestOAuth(object): | |
5c2ece74 | 31 | def _setup(self, test_app): |
a11aa2d1 JW |
32 | self.db = mg_globals.database |
33 | ||
34 | self.pman = pluginapi.PluginManager() | |
35 | ||
fc7b1b17 SS |
36 | self.user_password = u'4cc355_70k3N' |
37 | self.user = fixture_add_user(u'joauth', self.user_password) | |
a11aa2d1 | 38 | |
5c2ece74 | 39 | self.login(test_app) |
a11aa2d1 | 40 | |
5c2ece74 CAW |
41 | def login(self, test_app): |
42 | test_app.post( | |
a11aa2d1 JW |
43 | '/auth/login/', { |
44 | 'username': self.user.username, | |
45 | 'password': self.user_password}) | |
46 | ||
5c2ece74 | 47 | def register_client(self, test_app, name, client_type, description=None, |
a11aa2d1 | 48 | redirect_uri=''): |
5c2ece74 | 49 | return test_app.post( |
a11aa2d1 JW |
50 | '/oauth/client/register', { |
51 | 'name': name, | |
52 | 'description': description, | |
53 | 'type': client_type, | |
54 | 'redirect_uri': redirect_uri}) | |
55 | ||
56 | def get_context(self, template_name): | |
57 | return template.TEMPLATE_TEST_CONTEXT[template_name] | |
58 | ||
5c2ece74 | 59 | def test_1_public_client_registration_without_redirect_uri(self, test_app): |
a11aa2d1 | 60 | ''' Test 'public' OAuth client registration without any redirect uri ''' |
5c2ece74 CAW |
61 | self._setup(test_app) |
62 | ||
63 | response = self.register_client(test_app, u'OMGOMGOMG', 'public', | |
a11aa2d1 JW |
64 | 'OMGOMG Apache License v2') |
65 | ||
66 | ctx = self.get_context('oauth/client/register.html') | |
67 | ||
68 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 69 | self.db.OAuthClient.name == u'OMGOMGOMG').first() |
a11aa2d1 JW |
70 | |
71 | assert response.status_int == 200 | |
72 | ||
73 | # Should display an error | |
74 | assert ctx['form'].redirect_uri.errors | |
75 | ||
76 | # Should not pass through | |
77 | assert not client | |
78 | ||
5c2ece74 | 79 | def test_2_successful_public_client_registration(self, test_app): |
a11aa2d1 | 80 | ''' Successfully register a public client ''' |
5c2ece74 CAW |
81 | self._setup(test_app) |
82 | self.register_client(test_app, u'OMGOMG', 'public', 'OMG!', | |
a11aa2d1 JW |
83 | 'http://foo.example') |
84 | ||
85 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 86 | self.db.OAuthClient.name == u'OMGOMG').first() |
a11aa2d1 JW |
87 | |
88 | # Client should have been registered | |
89 | assert client | |
90 | ||
5c2ece74 | 91 | def test_3_successful_confidential_client_reg(self, test_app): |
a11aa2d1 | 92 | ''' Register a confidential OAuth client ''' |
5c2ece74 CAW |
93 | self._setup(test_app) |
94 | ||
95 | response = self.register_client( | |
96 | test_app, u'GMOGMO', 'confidential', 'NO GMO!') | |
a11aa2d1 JW |
97 | |
98 | assert response.status_int == 302 | |
99 | ||
100 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 101 | self.db.OAuthClient.name == u'GMOGMO').first() |
a11aa2d1 JW |
102 | |
103 | # Client should have been registered | |
104 | assert client | |
105 | ||
106 | return client | |
107 | ||
5c2ece74 | 108 | def test_4_authorize_confidential_client(self, test_app): |
a11aa2d1 | 109 | ''' Authorize a confidential client as a logged in user ''' |
5c2ece74 | 110 | self._setup(test_app) |
af6a43d1 | 111 | |
5c2ece74 | 112 | client = self.test_3_successful_confidential_client_reg(test_app) |
a11aa2d1 | 113 | |
0df00eb6 JW |
114 | client_identifier = client.identifier |
115 | ||
a11aa2d1 | 116 | redirect_uri = 'https://foo.example' |
5c2ece74 | 117 | response = test_app.get('/oauth/authorize', { |
a11aa2d1 JW |
118 | 'client_id': client.identifier, |
119 | 'scope': 'admin', | |
120 | 'redirect_uri': redirect_uri}) | |
121 | ||
122 | # User-agent should NOT be redirected | |
123 | assert response.status_int == 200 | |
124 | ||
125 | ctx = self.get_context('oauth/authorize.html') | |
126 | ||
127 | form = ctx['form'] | |
128 | ||
129 | # Short for client authorization post reponse | |
5c2ece74 | 130 | capr = test_app.post( |
a11aa2d1 JW |
131 | '/oauth/client/authorize', { |
132 | 'client_id': form.client_id.data, | |
133 | 'allow': 'Allow', | |
134 | 'next': form.next.data}) | |
135 | ||
136 | assert capr.status_int == 302 | |
137 | ||
138 | authorization_response = capr.follow() | |
139 | ||
140 | assert authorization_response.location.startswith(redirect_uri) | |
141 | ||
0df00eb6 | 142 | return authorization_response, client_identifier |
a6ec38c3 JW |
143 | |
144 | def get_code_from_redirect_uri(self, uri): | |
145 | return parse_qs(urlparse(uri).query)['code'][0] | |
146 | ||
5c2ece74 | 147 | def test_token_endpoint_successful_confidential_request(self, test_app): |
a6ec38c3 | 148 | ''' Successful request against token endpoint ''' |
5c2ece74 CAW |
149 | self._setup(test_app) |
150 | ||
151 | code_redirect, client_id = self.test_4_authorize_confidential_client( | |
152 | test_app) | |
a6ec38c3 JW |
153 | |
154 | code = self.get_code_from_redirect_uri(code_redirect.location) | |
155 | ||
156 | client = self.db.OAuthClient.query.filter( | |
157 | self.db.OAuthClient.identifier == unicode(client_id)).first() | |
158 | ||
5c2ece74 | 159 | token_res = test_app.get('/oauth/access_token?client_id={0}&\ |
a6ec38c3 JW |
160 | code={1}&client_secret={2}'.format(client_id, code, client.secret)) |
161 | ||
162 | assert token_res.status_int == 200 | |
163 | ||
164 | token_data = json.loads(token_res.body) | |
165 | ||
166 | assert not 'error' in token_data | |
167 | assert 'access_token' in token_data | |
168 | assert 'token_type' in token_data | |
169 | assert 'expires_in' in token_data | |
170 | assert type(token_data['expires_in']) == int | |
171 | assert token_data['expires_in'] > 0 | |
172 | ||
5c2ece74 | 173 | def test_token_endpont_missing_id_confidential_request(self, test_app): |
a6ec38c3 | 174 | ''' Unsuccessful request against token endpoint, missing client_id ''' |
5c2ece74 CAW |
175 | self._setup(test_app) |
176 | ||
177 | code_redirect, client_id = self.test_4_authorize_confidential_client( | |
178 | test_app) | |
a6ec38c3 JW |
179 | |
180 | code = self.get_code_from_redirect_uri(code_redirect.location) | |
181 | ||
182 | client = self.db.OAuthClient.query.filter( | |
183 | self.db.OAuthClient.identifier == unicode(client_id)).first() | |
184 | ||
5c2ece74 | 185 | token_res = test_app.get('/oauth/access_token?\ |
a6ec38c3 JW |
186 | code={0}&client_secret={1}'.format(code, client.secret)) |
187 | ||
188 | assert token_res.status_int == 200 | |
189 | ||
190 | token_data = json.loads(token_res.body) | |
191 | ||
192 | assert 'error' in token_data | |
193 | assert not 'access_token' in token_data | |
194 | assert token_data['error'] == 'invalid_request' | |
195 | assert token_data['error_description'] == 'Missing client_id in request' |