Add image URL's (thumb & full)
[mediagoblin.git] / mediagoblin / tests / test_oauth2.py
CommitLineData
a11aa2d1
JW
1# GNU MediaGoblin -- federated, autonomous media hosting
2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
a6ec38c3 17import json
a11aa2d1
JW
18import logging
19
2455a54f 20import pytest
a6ec38c3
JW
21from urlparse import parse_qs, urlparse
22
a11aa2d1 23from mediagoblin import mg_globals
a11aa2d1 24from mediagoblin.tools import template, pluginapi
5c2ece74 25from mediagoblin.tests.tools import fixture_add_user
a11aa2d1
JW
26
27
28_log = logging.getLogger(__name__)
29
30
31class TestOAuth(object):
2455a54f
CAW
32 @pytest.fixture(autouse=True)
33 def setup(self, test_app):
34 self.test_app = test_app
35
a11aa2d1
JW
36 self.db = mg_globals.database
37
38 self.pman = pluginapi.PluginManager()
39
fc7b1b17 40 self.user_password = u'4cc355_70k3N'
e1561d04 41 self.user = fixture_add_user(u'joauth', self.user_password,
42 privileges=[u'active'])
a11aa2d1 43
2455a54f 44 self.login()
a11aa2d1 45
2455a54f
CAW
46 def login(self):
47 self.test_app.post(
48 '/auth/login/', {
49 'username': self.user.username,
50 'password': self.user_password})
a11aa2d1 51
2455a54f
CAW
52 def register_client(self, name, client_type, description=None,
53 redirect_uri=''):
54 return self.test_app.post(
1c694fbe 55 '/oauth-2/client/register', {
a11aa2d1
JW
56 'name': name,
57 'description': description,
58 'type': client_type,
59 'redirect_uri': redirect_uri})
60
61 def get_context(self, template_name):
62 return template.TEMPLATE_TEST_CONTEXT[template_name]
63
2455a54f 64 def test_1_public_client_registration_without_redirect_uri(self):
a11aa2d1 65 ''' Test 'public' OAuth client registration without any redirect uri '''
2455a54f
CAW
66 response = self.register_client(
67 u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2')
a11aa2d1
JW
68
69 ctx = self.get_context('oauth/client/register.html')
70
71 client = self.db.OAuthClient.query.filter(
af6a43d1 72 self.db.OAuthClient.name == u'OMGOMGOMG').first()
a11aa2d1
JW
73
74 assert response.status_int == 200
75
76 # Should display an error
c121a7d3 77 assert len(ctx['form'].redirect_uri.errors)
a11aa2d1
JW
78
79 # Should not pass through
80 assert not client
81
2455a54f 82 def test_2_successful_public_client_registration(self):
a11aa2d1 83 ''' Successfully register a public client '''
c121a7d3 84 uri = 'http://foo.example'
2455a54f
CAW
85 self.register_client(
86 u'OMGOMG', 'public', 'OMG!', uri)
a11aa2d1
JW
87
88 client = self.db.OAuthClient.query.filter(
af6a43d1 89 self.db.OAuthClient.name == u'OMGOMG').first()
a11aa2d1 90
c121a7d3
JW
91 # redirect_uri should be set
92 assert client.redirect_uri == uri
93
a11aa2d1
JW
94 # Client should have been registered
95 assert client
96
2455a54f 97 def test_3_successful_confidential_client_reg(self):
a11aa2d1 98 ''' Register a confidential OAuth client '''
5c2ece74 99 response = self.register_client(
2455a54f 100 u'GMOGMO', 'confidential', 'NO GMO!')
a11aa2d1
JW
101
102 assert response.status_int == 302
103
104 client = self.db.OAuthClient.query.filter(
af6a43d1 105 self.db.OAuthClient.name == u'GMOGMO').first()
a11aa2d1
JW
106
107 # Client should have been registered
108 assert client
109
110 return client
111
2455a54f 112 def test_4_authorize_confidential_client(self):
a11aa2d1 113 ''' Authorize a confidential client as a logged in user '''
2455a54f 114 client = self.test_3_successful_confidential_client_reg()
a11aa2d1 115
0df00eb6
JW
116 client_identifier = client.identifier
117
a11aa2d1 118 redirect_uri = 'https://foo.example'
1c694fbe 119 response = self.test_app.get('/oauth-2/authorize', {
a11aa2d1 120 'client_id': client.identifier,
c121a7d3 121 'scope': 'all',
a11aa2d1
JW
122 'redirect_uri': redirect_uri})
123
124 # User-agent should NOT be redirected
125 assert response.status_int == 200
126
127 ctx = self.get_context('oauth/authorize.html')
128
129 form = ctx['form']
130
131 # Short for client authorization post reponse
2455a54f 132 capr = self.test_app.post(
1c694fbe 133 '/oauth-2/client/authorize', {
a11aa2d1
JW
134 'client_id': form.client_id.data,
135 'allow': 'Allow',
136 'next': form.next.data})
137
138 assert capr.status_int == 302
139
140 authorization_response = capr.follow()
141
142 assert authorization_response.location.startswith(redirect_uri)
143
0df00eb6 144 return authorization_response, client_identifier
a6ec38c3
JW
145
146 def get_code_from_redirect_uri(self, uri):
c121a7d3 147 ''' Get the value of ?code= from an URI '''
a6ec38c3
JW
148 return parse_qs(urlparse(uri).query)['code'][0]
149
2455a54f 150 def test_token_endpoint_successful_confidential_request(self):
a6ec38c3 151 ''' Successful request against token endpoint '''
2455a54f 152 code_redirect, client_id = self.test_4_authorize_confidential_client()
a6ec38c3
JW
153
154 code = self.get_code_from_redirect_uri(code_redirect.location)
155
156 client = self.db.OAuthClient.query.filter(
157 self.db.OAuthClient.identifier == unicode(client_id)).first()
158
1c694fbe 159 token_res = self.test_app.get('/oauth-2/access_token?client_id={0}&\
a6ec38c3
JW
160code={1}&client_secret={2}'.format(client_id, code, client.secret))
161
162 assert token_res.status_int == 200
163
164 token_data = json.loads(token_res.body)
165
166 assert not 'error' in token_data
167 assert 'access_token' in token_data
168 assert 'token_type' in token_data
169 assert 'expires_in' in token_data
170 assert type(token_data['expires_in']) == int
171 assert token_data['expires_in'] > 0
172
c121a7d3
JW
173 # There should be a refresh token provided in the token data
174 assert len(token_data['refresh_token'])
175
176 return client_id, token_data
177
2455a54f 178 def test_token_endpont_missing_id_confidential_request(self):
a6ec38c3 179 ''' Unsuccessful request against token endpoint, missing client_id '''
2455a54f 180 code_redirect, client_id = self.test_4_authorize_confidential_client()
a6ec38c3
JW
181
182 code = self.get_code_from_redirect_uri(code_redirect.location)
183
184 client = self.db.OAuthClient.query.filter(
185 self.db.OAuthClient.identifier == unicode(client_id)).first()
186
1c694fbe 187 token_res = self.test_app.get('/oauth-2/access_token?\
a6ec38c3
JW
188code={0}&client_secret={1}'.format(code, client.secret))
189
190 assert token_res.status_int == 200
191
192 token_data = json.loads(token_res.body)
193
194 assert 'error' in token_data
195 assert not 'access_token' in token_data
196 assert token_data['error'] == 'invalid_request'
c121a7d3
JW
197 assert len(token_data['error_description'])
198
2455a54f 199 def test_refresh_token(self):
c121a7d3
JW
200 ''' Try to get a new access token using the refresh token '''
201 # Get an access token and a refresh token
202 client_id, token_data =\
2455a54f 203 self.test_token_endpoint_successful_confidential_request()
c121a7d3
JW
204
205 client = self.db.OAuthClient.query.filter(
206 self.db.OAuthClient.identifier == client_id).first()
207
1c694fbe 208 token_res = self.test_app.get('/oauth-2/access_token',
c121a7d3
JW
209 {'refresh_token': token_data['refresh_token'],
210 'client_id': client_id,
211 'client_secret': client.secret
212 })
213
214 assert token_res.status_int == 200
215
216 new_token_data = json.loads(token_res.body)
217
218 assert not 'error' in new_token_data
219 assert 'access_token' in new_token_data
220 assert 'token_type' in new_token_data
221 assert 'expires_in' in new_token_data
222 assert type(new_token_data['expires_in']) == int
223 assert new_token_data['expires_in'] > 0