Commit | Line | Data |
---|---|---|
a11aa2d1 JW |
1 | # GNU MediaGoblin -- federated, autonomous media hosting |
2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. | |
3 | # | |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
a6ec38c3 | 17 | import json |
a11aa2d1 JW |
18 | import logging |
19 | ||
a6ec38c3 JW |
20 | from urlparse import parse_qs, urlparse |
21 | ||
a11aa2d1 | 22 | from mediagoblin import mg_globals |
a11aa2d1 | 23 | from mediagoblin.tools import template, pluginapi |
1be247b3 | 24 | from mediagoblin.tests.tools import get_app, fixture_add_user |
a11aa2d1 JW |
25 | |
26 | ||
27 | _log = logging.getLogger(__name__) | |
28 | ||
29 | ||
30 | class TestOAuth(object): | |
31 | def setUp(self): | |
1be247b3 | 32 | self.app = get_app() |
a11aa2d1 JW |
33 | self.db = mg_globals.database |
34 | ||
35 | self.pman = pluginapi.PluginManager() | |
36 | ||
fc7b1b17 SS |
37 | self.user_password = u'4cc355_70k3N' |
38 | self.user = fixture_add_user(u'joauth', self.user_password) | |
a11aa2d1 JW |
39 | |
40 | self.login() | |
41 | ||
42 | def login(self): | |
43 | self.app.post( | |
44 | '/auth/login/', { | |
45 | 'username': self.user.username, | |
46 | 'password': self.user_password}) | |
47 | ||
48 | def register_client(self, name, client_type, description=None, | |
49 | redirect_uri=''): | |
50 | return self.app.post( | |
51 | '/oauth/client/register', { | |
52 | 'name': name, | |
53 | 'description': description, | |
54 | 'type': client_type, | |
55 | 'redirect_uri': redirect_uri}) | |
56 | ||
57 | def get_context(self, template_name): | |
58 | return template.TEMPLATE_TEST_CONTEXT[template_name] | |
59 | ||
60 | def test_1_public_client_registration_without_redirect_uri(self): | |
61 | ''' Test 'public' OAuth client registration without any redirect uri ''' | |
af6a43d1 | 62 | response = self.register_client(u'OMGOMGOMG', 'public', |
a11aa2d1 JW |
63 | 'OMGOMG Apache License v2') |
64 | ||
65 | ctx = self.get_context('oauth/client/register.html') | |
66 | ||
67 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 68 | self.db.OAuthClient.name == u'OMGOMGOMG').first() |
a11aa2d1 JW |
69 | |
70 | assert response.status_int == 200 | |
71 | ||
72 | # Should display an error | |
73 | assert ctx['form'].redirect_uri.errors | |
74 | ||
75 | # Should not pass through | |
76 | assert not client | |
77 | ||
78 | def test_2_successful_public_client_registration(self): | |
79 | ''' Successfully register a public client ''' | |
80 | self.login() | |
af6a43d1 | 81 | self.register_client(u'OMGOMG', 'public', 'OMG!', |
a11aa2d1 JW |
82 | 'http://foo.example') |
83 | ||
84 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 85 | self.db.OAuthClient.name == u'OMGOMG').first() |
a11aa2d1 JW |
86 | |
87 | # Client should have been registered | |
88 | assert client | |
89 | ||
90 | def test_3_successful_confidential_client_reg(self): | |
91 | ''' Register a confidential OAuth client ''' | |
af6a43d1 | 92 | response = self.register_client(u'GMOGMO', 'confidential', 'NO GMO!') |
a11aa2d1 JW |
93 | |
94 | assert response.status_int == 302 | |
95 | ||
96 | client = self.db.OAuthClient.query.filter( | |
af6a43d1 | 97 | self.db.OAuthClient.name == u'GMOGMO').first() |
a11aa2d1 JW |
98 | |
99 | # Client should have been registered | |
100 | assert client | |
101 | ||
102 | return client | |
103 | ||
104 | def test_4_authorize_confidential_client(self): | |
105 | ''' Authorize a confidential client as a logged in user ''' | |
af6a43d1 | 106 | |
a11aa2d1 JW |
107 | client = self.test_3_successful_confidential_client_reg() |
108 | ||
0df00eb6 JW |
109 | client_identifier = client.identifier |
110 | ||
a11aa2d1 JW |
111 | redirect_uri = 'https://foo.example' |
112 | response = self.app.get('/oauth/authorize', { | |
113 | 'client_id': client.identifier, | |
114 | 'scope': 'admin', | |
115 | 'redirect_uri': redirect_uri}) | |
116 | ||
117 | # User-agent should NOT be redirected | |
118 | assert response.status_int == 200 | |
119 | ||
120 | ctx = self.get_context('oauth/authorize.html') | |
121 | ||
122 | form = ctx['form'] | |
123 | ||
124 | # Short for client authorization post reponse | |
125 | capr = self.app.post( | |
126 | '/oauth/client/authorize', { | |
127 | 'client_id': form.client_id.data, | |
128 | 'allow': 'Allow', | |
129 | 'next': form.next.data}) | |
130 | ||
131 | assert capr.status_int == 302 | |
132 | ||
133 | authorization_response = capr.follow() | |
134 | ||
135 | assert authorization_response.location.startswith(redirect_uri) | |
136 | ||
0df00eb6 | 137 | return authorization_response, client_identifier |
a6ec38c3 JW |
138 | |
139 | def get_code_from_redirect_uri(self, uri): | |
140 | return parse_qs(urlparse(uri).query)['code'][0] | |
141 | ||
142 | def test_token_endpoint_successful_confidential_request(self): | |
143 | ''' Successful request against token endpoint ''' | |
144 | code_redirect, client_id = self.test_4_authorize_confidential_client() | |
145 | ||
146 | code = self.get_code_from_redirect_uri(code_redirect.location) | |
147 | ||
148 | client = self.db.OAuthClient.query.filter( | |
149 | self.db.OAuthClient.identifier == unicode(client_id)).first() | |
150 | ||
151 | token_res = self.app.get('/oauth/access_token?client_id={0}&\ | |
152 | code={1}&client_secret={2}'.format(client_id, code, client.secret)) | |
153 | ||
154 | assert token_res.status_int == 200 | |
155 | ||
156 | token_data = json.loads(token_res.body) | |
157 | ||
158 | assert not 'error' in token_data | |
159 | assert 'access_token' in token_data | |
160 | assert 'token_type' in token_data | |
161 | assert 'expires_in' in token_data | |
162 | assert type(token_data['expires_in']) == int | |
163 | assert token_data['expires_in'] > 0 | |
164 | ||
165 | def test_token_endpont_missing_id_confidential_request(self): | |
166 | ''' Unsuccessful request against token endpoint, missing client_id ''' | |
167 | code_redirect, client_id = self.test_4_authorize_confidential_client() | |
168 | ||
169 | code = self.get_code_from_redirect_uri(code_redirect.location) | |
170 | ||
171 | client = self.db.OAuthClient.query.filter( | |
172 | self.db.OAuthClient.identifier == unicode(client_id)).first() | |
173 | ||
174 | token_res = self.app.get('/oauth/access_token?\ | |
175 | code={0}&client_secret={1}'.format(code, client.secret)) | |
176 | ||
177 | assert token_res.status_int == 200 | |
178 | ||
179 | token_data = json.loads(token_res.body) | |
180 | ||
181 | assert 'error' in token_data | |
182 | assert not 'access_token' in token_data | |
183 | assert token_data['error'] == 'invalid_request' | |
184 | assert token_data['error_description'] == 'Missing client_id in request' |