1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from urlparse
import parse_qs
, urlparse
22 from mediagoblin
import mg_globals
23 from mediagoblin
.tools
import template
, pluginapi
24 from mediagoblin
.tests
.tools
import get_app
, fixture_add_user
27 _log
= logging
.getLogger(__name__
)
30 class TestOAuth(object):
33 self
.db
= mg_globals
.database
35 self
.pman
= pluginapi
.PluginManager()
37 self
.user_password
= u
'4cc355_70k3N'
38 self
.user
= fixture_add_user(u
'joauth', self
.user_password
)
45 'username': self
.user
.username
,
46 'password': self
.user_password
})
48 def register_client(self
, name
, client_type
, description
=None,
51 '/oauth/client/register', {
53 'description': description
,
55 'redirect_uri': redirect_uri
})
57 def get_context(self
, template_name
):
58 return template
.TEMPLATE_TEST_CONTEXT
[template_name
]
60 def test_1_public_client_registration_without_redirect_uri(self
):
61 ''' Test 'public' OAuth client registration without any redirect uri '''
62 response
= self
.register_client(u
'OMGOMGOMG', 'public',
63 'OMGOMG Apache License v2')
65 ctx
= self
.get_context('oauth/client/register.html')
67 client
= self
.db
.OAuthClient
.query
.filter(
68 self
.db
.OAuthClient
.name
== u
'OMGOMGOMG').first()
70 assert response
.status_int
== 200
72 # Should display an error
73 assert ctx
['form'].redirect_uri
.errors
75 # Should not pass through
78 def test_2_successful_public_client_registration(self
):
79 ''' Successfully register a public client '''
81 self
.register_client(u
'OMGOMG', 'public', 'OMG!',
84 client
= self
.db
.OAuthClient
.query
.filter(
85 self
.db
.OAuthClient
.name
== u
'OMGOMG').first()
87 # Client should have been registered
90 def test_3_successful_confidential_client_reg(self
):
91 ''' Register a confidential OAuth client '''
92 response
= self
.register_client(u
'GMOGMO', 'confidential', 'NO GMO!')
94 assert response
.status_int
== 302
96 client
= self
.db
.OAuthClient
.query
.filter(
97 self
.db
.OAuthClient
.name
== u
'GMOGMO').first()
99 # Client should have been registered
104 def test_4_authorize_confidential_client(self
):
105 ''' Authorize a confidential client as a logged in user '''
107 client
= self
.test_3_successful_confidential_client_reg()
109 client_identifier
= client
.identifier
111 redirect_uri
= 'https://foo.example'
112 response
= self
.app
.get('/oauth/authorize', {
113 'client_id': client
.identifier
,
115 'redirect_uri': redirect_uri
})
117 # User-agent should NOT be redirected
118 assert response
.status_int
== 200
120 ctx
= self
.get_context('oauth/authorize.html')
124 # Short for client authorization post reponse
125 capr
= self
.app
.post(
126 '/oauth/client/authorize', {
127 'client_id': form
.client_id
.data
,
129 'next': form
.next
.data
})
131 assert capr
.status_int
== 302
133 authorization_response
= capr
.follow()
135 assert authorization_response
.location
.startswith(redirect_uri
)
137 return authorization_response
, client_identifier
139 def get_code_from_redirect_uri(self
, uri
):
140 return parse_qs(urlparse(uri
).query
)['code'][0]
142 def test_token_endpoint_successful_confidential_request(self
):
143 ''' Successful request against token endpoint '''
144 code_redirect
, client_id
= self
.test_4_authorize_confidential_client()
146 code
= self
.get_code_from_redirect_uri(code_redirect
.location
)
148 client
= self
.db
.OAuthClient
.query
.filter(
149 self
.db
.OAuthClient
.identifier
== unicode(client_id
)).first()
151 token_res
= self
.app
.get('/oauth/access_token?client_id={0}&\
152 code={1}&client_secret={2}'.format(client_id
, code
, client
.secret
))
154 assert token_res
.status_int
== 200
156 token_data
= json
.loads(token_res
.body
)
158 assert not 'error' in token_data
159 assert 'access_token' in token_data
160 assert 'token_type' in token_data
161 assert 'expires_in' in token_data
162 assert type(token_data
['expires_in']) == int
163 assert token_data
['expires_in'] > 0
165 def test_token_endpont_missing_id_confidential_request(self
):
166 ''' Unsuccessful request against token endpoint, missing client_id '''
167 code_redirect
, client_id
= self
.test_4_authorize_confidential_client()
169 code
= self
.get_code_from_redirect_uri(code_redirect
.location
)
171 client
= self
.db
.OAuthClient
.query
.filter(
172 self
.db
.OAuthClient
.identifier
== unicode(client_id
)).first()
174 token_res
= self
.app
.get('/oauth/access_token?\
175 code={0}&client_secret={1}'.format(code
, client
.secret
))
177 assert token_res
.status_int
== 200
179 token_data
= json
.loads(token_res
.body
)
181 assert 'error' in token_data
182 assert not 'access_token' in token_data
183 assert token_data
['error'] == 'invalid_request'
184 assert token_data
['error_description'] == 'Missing client_id in request'