Patch by Strum. Ticket #451 - Convert all mongokit style .find, .find_one, .one calls...
[mediagoblin.git] / mediagoblin / tests / test_oauth.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 import json
18 import logging
19
20 import pytest
21 from urlparse import parse_qs, urlparse
22
23 from mediagoblin import mg_globals
24 from mediagoblin.tools import template, pluginapi
25 from mediagoblin.tests.tools import fixture_add_user
26
27
28 _log = logging.getLogger(__name__)
29
30
31 class TestOAuth(object):
32 @pytest.fixture(autouse=True)
33 def setup(self, test_app):
34 self.test_app = test_app
35
36 self.db = mg_globals.database
37
38 self.pman = pluginapi.PluginManager()
39
40 self.user_password = u'4cc355_70k3N'
41 self.user = fixture_add_user(u'joauth', self.user_password)
42
43 self.login()
44
45 def login(self):
46 self.test_app.post(
47 '/auth/login/', {
48 'username': self.user.username,
49 'password': self.user_password})
50
51 def register_client(self, name, client_type, description=None,
52 redirect_uri=''):
53 return self.test_app.post(
54 '/oauth/client/register', {
55 'name': name,
56 'description': description,
57 'type': client_type,
58 'redirect_uri': redirect_uri})
59
60 def get_context(self, template_name):
61 return template.TEMPLATE_TEST_CONTEXT[template_name]
62
63 def test_1_public_client_registration_without_redirect_uri(self):
64 ''' Test 'public' OAuth client registration without any redirect uri '''
65 response = self.register_client(
66 u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2')
67
68 ctx = self.get_context('oauth/client/register.html')
69
70 client = self.db.OAuthClient.query.filter(
71 self.db.OAuthClient.name == u'OMGOMGOMG').first()
72
73 assert response.status_int == 200
74
75 # Should display an error
76 assert len(ctx['form'].redirect_uri.errors)
77
78 # Should not pass through
79 assert not client
80
81 def test_2_successful_public_client_registration(self):
82 ''' Successfully register a public client '''
83 uri = 'http://foo.example'
84 self.register_client(
85 u'OMGOMG', 'public', 'OMG!', uri)
86
87 client = self.db.OAuthClient.query.filter(
88 self.db.OAuthClient.name == u'OMGOMG').first()
89
90 # redirect_uri should be set
91 assert client.redirect_uri == uri
92
93 # Client should have been registered
94 assert client
95
96 def test_3_successful_confidential_client_reg(self):
97 ''' Register a confidential OAuth client '''
98 response = self.register_client(
99 u'GMOGMO', 'confidential', 'NO GMO!')
100
101 assert response.status_int == 302
102
103 client = self.db.OAuthClient.query.filter(
104 self.db.OAuthClient.name == u'GMOGMO').first()
105
106 # Client should have been registered
107 assert client
108
109 return client
110
111 def test_4_authorize_confidential_client(self):
112 ''' Authorize a confidential client as a logged in user '''
113 client = self.test_3_successful_confidential_client_reg()
114
115 client_identifier = client.identifier
116
117 redirect_uri = 'https://foo.example'
118 response = self.test_app.get('/oauth/authorize', {
119 'client_id': client.identifier,
120 'scope': 'all',
121 'redirect_uri': redirect_uri})
122
123 # User-agent should NOT be redirected
124 assert response.status_int == 200
125
126 ctx = self.get_context('oauth/authorize.html')
127
128 form = ctx['form']
129
130 # Short for client authorization post reponse
131 capr = self.test_app.post(
132 '/oauth/client/authorize', {
133 'client_id': form.client_id.data,
134 'allow': 'Allow',
135 'next': form.next.data})
136
137 assert capr.status_int == 302
138
139 authorization_response = capr.follow()
140
141 assert authorization_response.location.startswith(redirect_uri)
142
143 return authorization_response, client_identifier
144
145 def get_code_from_redirect_uri(self, uri):
146 ''' Get the value of ?code= from an URI '''
147 return parse_qs(urlparse(uri).query)['code'][0]
148
149 def test_token_endpoint_successful_confidential_request(self):
150 ''' Successful request against token endpoint '''
151 code_redirect, client_id = self.test_4_authorize_confidential_client()
152
153 code = self.get_code_from_redirect_uri(code_redirect.location)
154
155 client = self.db.OAuthClient.query.filter(
156 self.db.OAuthClient.identifier == unicode(client_id)).first()
157
158 token_res = self.test_app.get('/oauth/access_token?client_id={0}&\
159 code={1}&client_secret={2}'.format(client_id, code, client.secret))
160
161 assert token_res.status_int == 200
162
163 token_data = json.loads(token_res.body)
164
165 assert not 'error' in token_data
166 assert 'access_token' in token_data
167 assert 'token_type' in token_data
168 assert 'expires_in' in token_data
169 assert type(token_data['expires_in']) == int
170 assert token_data['expires_in'] > 0
171
172 # There should be a refresh token provided in the token data
173 assert len(token_data['refresh_token'])
174
175 return client_id, token_data
176
177 def test_token_endpont_missing_id_confidential_request(self):
178 ''' Unsuccessful request against token endpoint, missing client_id '''
179 code_redirect, client_id = self.test_4_authorize_confidential_client()
180
181 code = self.get_code_from_redirect_uri(code_redirect.location)
182
183 client = self.db.OAuthClient.query.filter(
184 self.db.OAuthClient.identifier == unicode(client_id)).first()
185
186 token_res = self.test_app.get('/oauth/access_token?\
187 code={0}&client_secret={1}'.format(code, client.secret))
188
189 assert token_res.status_int == 200
190
191 token_data = json.loads(token_res.body)
192
193 assert 'error' in token_data
194 assert not 'access_token' in token_data
195 assert token_data['error'] == 'invalid_request'
196 assert len(token_data['error_description'])
197
198 def test_refresh_token(self):
199 ''' Try to get a new access token using the refresh token '''
200 # Get an access token and a refresh token
201 client_id, token_data =\
202 self.test_token_endpoint_successful_confidential_request()
203
204 client = self.db.OAuthClient.query.filter(
205 self.db.OAuthClient.identifier == client_id).first()
206
207 token_res = self.test_app.get('/oauth/access_token',
208 {'refresh_token': token_data['refresh_token'],
209 'client_id': client_id,
210 'client_secret': client.secret
211 })
212
213 assert token_res.status_int == 200
214
215 new_token_data = json.loads(token_res.body)
216
217 assert not 'error' in new_token_data
218 assert 'access_token' in new_token_data
219 assert 'token_type' in new_token_data
220 assert 'expires_in' in new_token_data
221 assert type(new_token_data['expires_in']) == int
222 assert new_token_data['expires_in'] > 0