1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from urlparse
import parse_qs
, urlparse
22 from oauthlib
.oauth1
import Client
24 from mediagoblin
import mg_globals
25 from mediagoblin
.tools
import template
, pluginapi
26 from mediagoblin
.tests
.tools
import fixture_add_user
29 class TestOAuth(object):
31 MIME_FORM
= "application/x-www-form-urlencoded"
32 MIME_JSON
= "application/json"
34 @pytest.fixture(autouse
=True)
35 def setup(self
, test_app
):
36 self
.test_app
= test_app
38 self
.db
= mg_globals
.database
40 self
.pman
= pluginapi
.PluginManager()
42 self
.user_password
= "AUserPassword123"
43 self
.user
= fixture_add_user("OAuthy", self
.user_password
)
50 "username": self
.user
.username
,
51 "password": self
.user_password
})
53 def register_client(self
, **kwargs
):
54 """ Regiters a client with the API """
56 kwargs
["type"] = "client_associate"
57 kwargs
["application_type"] = kwargs
.get("application_type", "native")
58 return self
.test_app
.post("/api/client/register", kwargs
)
60 def test_client_client_register_limited_info(self
):
61 """ Tests that a client can be registered with limited information """
62 response
= self
.register_client()
63 client_info
= response
.json
65 client
= self
.db
.Client
.query
.filter_by(id=client_info
["client_id"]).first()
67 assert response
.status_int
== 200
68 assert client
is not None
70 def test_client_register_full_info(self
):
71 """ Provides every piece of information possible to register client """
73 "application_name": "Testificate MD",
74 "application_type": "web",
75 "contacts": "someone@someplace.com tuteo@tsengeo.lu",
76 "logo_url": "http://ayrel.com/utral.png",
77 "redirect_uris": "http://navi-kosman.lu http://gmg-yawne-oeru.lu",
80 response
= self
.register_client(**query
)
81 client_info
= response
.json
83 client
= self
.db
.Client
.query
.filter_by(id=client_info
["client_id"]).first()
85 assert client
is not None
86 assert client
.secret
== client_info
["client_secret"]
87 assert client
.application_type
== query
["application_type"]
88 assert client
.redirect_uri
== query
["redirect_uris"].split()
89 assert client
.logo_url
== query
["logo_url"]
90 assert client
.contacts
== query
["contacts"].split()
93 def test_client_update(self
):
94 """ Tests that you can update a client """
95 # first we need to register a client
96 response
= self
.register_client()
98 client_info
= response
.json
99 client
= self
.db
.Client
.query
.filter_by(id=client_info
["client_id"]).first()
103 "type": "client_update",
104 "application_name": "neytiri",
105 "contacts": "someone@someplace.com abc@cba.com",
106 "logo_url": "http://place.com/picture.png",
107 "application_type": "web",
108 "redirect_uris": "http://blah.gmg/whatever https://inboxen.org/",
111 update_response
= self
.register_client(**update_query
)
113 assert update_response
.status_int
== 200
114 client_info
= update_response
.json
115 client
= self
.db
.Client
.query
.filter_by(id=client_info
["client_id"]).first()
117 assert client
.secret
== client_info
["client_secret"]
118 assert client
.application_type
== update_query
["application_type"]
119 assert client
.application_name
== update_query
["application_name"]
120 assert client
.contacts
== update_query
["contacts"].split()
121 assert client
.logo_url
== update_query
["logo_url"]
122 assert client
.redirect_uri
== update_query
["redirect_uris"].split()
124 def to_authorize_headers(self
, data
):
126 for key
, value
in data
.items():
127 headers
+= '{0}="{1}",'.format(key
, value
)
128 return {"Authorization": "OAuth " + headers
[:-1]}
130 def test_request_token(self
):
131 """ Test a request for a request token """
132 response
= self
.register_client()
134 client_id
= response
.json
["client_id"]
136 endpoint
= "/oauth/request_token"
138 "oauth_consumer_key": client_id
,
139 "oauth_nonce": "abcdefghij",
140 "oauth_timestamp": 123456789.0,
141 "oauth_callback": "https://some.url/callback",
144 headers
= self
.to_authorize_headers(request_query
)
146 headers
["Content-Type"] = self
.MIME_FORM
148 response
= self
.test_app
.post(endpoint
, headers
=headers
)
149 response
= cgi
.parse_qs(response
.body
)
151 # each element is a list, reduce it to a string
152 for key
, value
in response
.items():
153 response
[key
] = value
[0]
155 request_token
= self
.db
.RequestToken
.query
.filter_by(
156 token
=response
["oauth_token"]
159 client
= self
.db
.Client
.query
.filter_by(id=client_id
).first()
161 assert request_token
is not None
162 assert request_token
.secret
== response
["oauth_token_secret"]
163 assert request_token
.client
== client
.id
164 assert request_token
.used
== False
165 assert request_token
.callback
== request_query
["oauth_callback"]