1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from urlparse
import parse_qs
, urlparse
23 from mediagoblin
import mg_globals
24 from mediagoblin
.tools
import template
, pluginapi
25 from mediagoblin
.tests
.tools
import fixture_add_user
28 _log
= logging
.getLogger(__name__
)
31 class TestOAuth(object):
32 @pytest.fixture(autouse
=True)
33 def setup(self
, test_app
):
34 self
.test_app
= test_app
36 self
.db
= mg_globals
.database
38 self
.pman
= pluginapi
.PluginManager()
40 self
.user_password
= u
'4cc355_70k3N'
41 self
.user
= fixture_add_user(u
'joauth', self
.user_password
)
48 'username': self
.user
.username
,
49 'password': self
.user_password
})
51 def register_client(self
, name
, client_type
, description
=None,
53 return self
.test_app
.post(
54 '/oauth/client/register', {
56 'description': description
,
58 'redirect_uri': redirect_uri
})
60 def get_context(self
, template_name
):
61 return template
.TEMPLATE_TEST_CONTEXT
[template_name
]
63 def test_1_public_client_registration_without_redirect_uri(self
):
64 ''' Test 'public' OAuth client registration without any redirect uri '''
65 response
= self
.register_client(
66 u
'OMGOMGOMG', 'public', 'OMGOMG Apache License v2')
68 ctx
= self
.get_context('oauth/client/register.html')
70 client
= self
.db
.OAuthClient
.query
.filter(
71 self
.db
.OAuthClient
.name
== u
'OMGOMGOMG').first()
73 assert response
.status_int
== 200
75 # Should display an error
76 assert len(ctx
['form'].redirect_uri
.errors
)
78 # Should not pass through
81 def test_2_successful_public_client_registration(self
):
82 ''' Successfully register a public client '''
83 uri
= 'http://foo.example'
85 u
'OMGOMG', 'public', 'OMG!', uri
)
87 client
= self
.db
.OAuthClient
.query
.filter(
88 self
.db
.OAuthClient
.name
== u
'OMGOMG').first()
90 # redirect_uri should be set
91 assert client
.redirect_uri
== uri
93 # Client should have been registered
96 def test_3_successful_confidential_client_reg(self
):
97 ''' Register a confidential OAuth client '''
98 response
= self
.register_client(
99 u
'GMOGMO', 'confidential', 'NO GMO!')
101 assert response
.status_int
== 302
103 client
= self
.db
.OAuthClient
.query
.filter(
104 self
.db
.OAuthClient
.name
== u
'GMOGMO').first()
106 # Client should have been registered
111 def test_4_authorize_confidential_client(self
):
112 ''' Authorize a confidential client as a logged in user '''
113 client
= self
.test_3_successful_confidential_client_reg()
115 client_identifier
= client
.identifier
117 redirect_uri
= 'https://foo.example'
118 response
= self
.test_app
.get('/oauth/authorize', {
119 'client_id': client
.identifier
,
121 'redirect_uri': redirect_uri
})
123 # User-agent should NOT be redirected
124 assert response
.status_int
== 200
126 ctx
= self
.get_context('oauth/authorize.html')
130 # Short for client authorization post reponse
131 capr
= self
.test_app
.post(
132 '/oauth/client/authorize', {
133 'client_id': form
.client_id
.data
,
135 'next': form
.next
.data
})
137 assert capr
.status_int
== 302
139 authorization_response
= capr
.follow()
141 assert authorization_response
.location
.startswith(redirect_uri
)
143 return authorization_response
, client_identifier
145 def get_code_from_redirect_uri(self
, uri
):
146 ''' Get the value of ?code= from an URI '''
147 return parse_qs(urlparse(uri
).query
)['code'][0]
149 def test_token_endpoint_successful_confidential_request(self
):
150 ''' Successful request against token endpoint '''
151 code_redirect
, client_id
= self
.test_4_authorize_confidential_client()
153 code
= self
.get_code_from_redirect_uri(code_redirect
.location
)
155 client
= self
.db
.OAuthClient
.query
.filter(
156 self
.db
.OAuthClient
.identifier
== unicode(client_id
)).first()
158 token_res
= self
.test_app
.get('/oauth/access_token?client_id={0}&\
159 code={1}&client_secret={2}'.format(client_id
, code
, client
.secret
))
161 assert token_res
.status_int
== 200
163 token_data
= json
.loads(token_res
.body
)
165 assert not 'error' in token_data
166 assert 'access_token' in token_data
167 assert 'token_type' in token_data
168 assert 'expires_in' in token_data
169 assert type(token_data
['expires_in']) == int
170 assert token_data
['expires_in'] > 0
172 # There should be a refresh token provided in the token data
173 assert len(token_data
['refresh_token'])
175 return client_id
, token_data
177 def test_token_endpont_missing_id_confidential_request(self
):
178 ''' Unsuccessful request against token endpoint, missing client_id '''
179 code_redirect
, client_id
= self
.test_4_authorize_confidential_client()
181 code
= self
.get_code_from_redirect_uri(code_redirect
.location
)
183 client
= self
.db
.OAuthClient
.query
.filter(
184 self
.db
.OAuthClient
.identifier
== unicode(client_id
)).first()
186 token_res
= self
.test_app
.get('/oauth/access_token?\
187 code={0}&client_secret={1}'.format(code
, client
.secret
))
189 assert token_res
.status_int
== 200
191 token_data
= json
.loads(token_res
.body
)
193 assert 'error' in token_data
194 assert not 'access_token' in token_data
195 assert token_data
['error'] == 'invalid_request'
196 assert len(token_data
['error_description'])
198 def test_refresh_token(self
):
199 ''' Try to get a new access token using the refresh token '''
200 # Get an access token and a refresh token
201 client_id
, token_data
=\
202 self
.test_token_endpoint_successful_confidential_request()
204 client
= self
.db
.OAuthClient
.query
.filter(
205 self
.db
.OAuthClient
.identifier
== client_id
).first()
207 token_res
= self
.test_app
.get('/oauth/access_token',
208 {'refresh_token': token_data
['refresh_token'],
209 'client_id': client_id
,
210 'client_secret': client
.secret
213 assert token_res
.status_int
== 200
215 new_token_data
= json
.loads(token_res
.body
)
217 assert not 'error' in new_token_data
218 assert 'access_token' in new_token_data
219 assert 'token_type' in new_token_data
220 assert 'expires_in' in new_token_data
221 assert type(new_token_data
['expires_in']) == int
222 assert new_token_data
['expires_in'] > 0