More unicode fixes in the test suite
[mediagoblin.git] / mediagoblin / tests / test_oauth.py
CommitLineData
a11aa2d1
JW
1# GNU MediaGoblin -- federated, autonomous media hosting
2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
a6ec38c3 17import json
a11aa2d1
JW
18import logging
19
a6ec38c3
JW
20from urlparse import parse_qs, urlparse
21
a11aa2d1 22from mediagoblin import mg_globals
a11aa2d1
JW
23from mediagoblin.tools import template, pluginapi
24from mediagoblin.tests.tools import get_test_app, fixture_add_user
a11aa2d1
JW
25
26
27_log = logging.getLogger(__name__)
28
29
30class TestOAuth(object):
31 def setUp(self):
32 self.app = get_test_app()
33 self.db = mg_globals.database
34
35 self.pman = pluginapi.PluginManager()
36
fc7b1b17
SS
37 self.user_password = u'4cc355_70k3N'
38 self.user = fixture_add_user(u'joauth', self.user_password)
a11aa2d1
JW
39
40 self.login()
41
42 def login(self):
43 self.app.post(
44 '/auth/login/', {
45 'username': self.user.username,
46 'password': self.user_password})
47
48 def register_client(self, name, client_type, description=None,
49 redirect_uri=''):
50 return self.app.post(
51 '/oauth/client/register', {
52 'name': name,
53 'description': description,
54 'type': client_type,
55 'redirect_uri': redirect_uri})
56
57 def get_context(self, template_name):
58 return template.TEMPLATE_TEST_CONTEXT[template_name]
59
60 def test_1_public_client_registration_without_redirect_uri(self):
61 ''' Test 'public' OAuth client registration without any redirect uri '''
af6a43d1 62 response = self.register_client(u'OMGOMGOMG', 'public',
a11aa2d1
JW
63 'OMGOMG Apache License v2')
64
65 ctx = self.get_context('oauth/client/register.html')
66
67 client = self.db.OAuthClient.query.filter(
af6a43d1 68 self.db.OAuthClient.name == u'OMGOMGOMG').first()
a11aa2d1
JW
69
70 assert response.status_int == 200
71
72 # Should display an error
73 assert ctx['form'].redirect_uri.errors
74
75 # Should not pass through
76 assert not client
77
78 def test_2_successful_public_client_registration(self):
79 ''' Successfully register a public client '''
80 self.login()
af6a43d1 81 self.register_client(u'OMGOMG', 'public', 'OMG!',
a11aa2d1
JW
82 'http://foo.example')
83
84 client = self.db.OAuthClient.query.filter(
af6a43d1 85 self.db.OAuthClient.name == u'OMGOMG').first()
a11aa2d1
JW
86
87 # Client should have been registered
88 assert client
89
90 def test_3_successful_confidential_client_reg(self):
91 ''' Register a confidential OAuth client '''
af6a43d1 92 response = self.register_client(u'GMOGMO', 'confidential', 'NO GMO!')
a11aa2d1
JW
93
94 assert response.status_int == 302
95
96 client = self.db.OAuthClient.query.filter(
af6a43d1 97 self.db.OAuthClient.name == u'GMOGMO').first()
a11aa2d1
JW
98
99 # Client should have been registered
100 assert client
101
102 return client
103
104 def test_4_authorize_confidential_client(self):
105 ''' Authorize a confidential client as a logged in user '''
af6a43d1 106
a11aa2d1
JW
107 client = self.test_3_successful_confidential_client_reg()
108
0df00eb6
JW
109 client_identifier = client.identifier
110
a11aa2d1
JW
111 redirect_uri = 'https://foo.example'
112 response = self.app.get('/oauth/authorize', {
113 'client_id': client.identifier,
114 'scope': 'admin',
115 'redirect_uri': redirect_uri})
116
117 # User-agent should NOT be redirected
118 assert response.status_int == 200
119
120 ctx = self.get_context('oauth/authorize.html')
121
122 form = ctx['form']
123
124 # Short for client authorization post reponse
125 capr = self.app.post(
126 '/oauth/client/authorize', {
127 'client_id': form.client_id.data,
128 'allow': 'Allow',
129 'next': form.next.data})
130
131 assert capr.status_int == 302
132
133 authorization_response = capr.follow()
134
135 assert authorization_response.location.startswith(redirect_uri)
136
0df00eb6 137 return authorization_response, client_identifier
a6ec38c3
JW
138
139 def get_code_from_redirect_uri(self, uri):
140 return parse_qs(urlparse(uri).query)['code'][0]
141
142 def test_token_endpoint_successful_confidential_request(self):
143 ''' Successful request against token endpoint '''
144 code_redirect, client_id = self.test_4_authorize_confidential_client()
145
146 code = self.get_code_from_redirect_uri(code_redirect.location)
147
148 client = self.db.OAuthClient.query.filter(
149 self.db.OAuthClient.identifier == unicode(client_id)).first()
150
151 token_res = self.app.get('/oauth/access_token?client_id={0}&\
152code={1}&client_secret={2}'.format(client_id, code, client.secret))
153
154 assert token_res.status_int == 200
155
156 token_data = json.loads(token_res.body)
157
158 assert not 'error' in token_data
159 assert 'access_token' in token_data
160 assert 'token_type' in token_data
161 assert 'expires_in' in token_data
162 assert type(token_data['expires_in']) == int
163 assert token_data['expires_in'] > 0
164
165 def test_token_endpont_missing_id_confidential_request(self):
166 ''' Unsuccessful request against token endpoint, missing client_id '''
167 code_redirect, client_id = self.test_4_authorize_confidential_client()
168
169 code = self.get_code_from_redirect_uri(code_redirect.location)
170
171 client = self.db.OAuthClient.query.filter(
172 self.db.OAuthClient.identifier == unicode(client_id)).first()
173
174 token_res = self.app.get('/oauth/access_token?\
175code={0}&client_secret={1}'.format(code, client.secret))
176
177 assert token_res.status_int == 200
178
179 token_data = json.loads(token_res.body)
180
181 assert 'error' in token_data
182 assert not 'access_token' in token_data
183 assert token_data['error'] == 'invalid_request'
184 assert token_data['error_description'] == 'Missing client_id in request'