A more reliable test, check against expected keys, rather than received
[mediagoblin.git] / mediagoblin / tests / test_oauth1.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 import pytest
18
19 from six.moves.urllib.parse import parse_qs, urlparse
20
21 from oauthlib.oauth1 import Client
22
23 from mediagoblin import mg_globals
24 from mediagoblin.tools import template, pluginapi
25 from mediagoblin.tests.tools import fixture_add_user
26
27
28 class TestOAuth(object):
29
30 MIME_FORM = "application/x-www-form-urlencoded"
31 MIME_JSON = "application/json"
32
33 @pytest.fixture(autouse=True)
34 def setup(self, test_app):
35 self.test_app = test_app
36
37 self.db = mg_globals.database
38
39 self.pman = pluginapi.PluginManager()
40
41 self.user_password = "AUserPassword123"
42 self.user = fixture_add_user("OAuthy", self.user_password)
43
44 self.login()
45
46 def login(self):
47 self.test_app.post(
48 "/auth/login/", {
49 "username": self.user.username,
50 "password": self.user_password})
51
52 def register_client(self, **kwargs):
53 """ Regiters a client with the API """
54
55 kwargs["type"] = "client_associate"
56 kwargs["application_type"] = kwargs.get("application_type", "native")
57 return self.test_app.post("/api/client/register", kwargs)
58
59 def test_client_client_register_limited_info(self):
60 """ Tests that a client can be registered with limited information """
61 response = self.register_client()
62 client_info = response.json
63
64 client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
65
66 assert response.status_int == 200
67 assert client is not None
68
69 def test_client_register_full_info(self):
70 """ Provides every piece of information possible to register client """
71 query = {
72 "application_name": "Testificate MD",
73 "application_type": "web",
74 "contacts": "someone@someplace.com tuteo@tsengeo.lu",
75 "logo_uri": "http://ayrel.com/utral.png",
76 "redirect_uris": "http://navi-kosman.lu http://gmg-yawne-oeru.lu",
77 }
78
79 response = self.register_client(**query)
80 client_info = response.json
81
82 client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
83
84 assert client is not None
85 assert client.secret == client_info["client_secret"]
86 assert client.application_type == query["application_type"]
87 assert client.redirect_uri == query["redirect_uris"].split()
88 assert client.logo_url == query["logo_uri"]
89 assert client.contacts == query["contacts"].split()
90
91
92 def test_client_update(self):
93 """ Tests that you can update a client """
94 # first we need to register a client
95 response = self.register_client()
96
97 client_info = response.json
98 client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
99
100 # Now update
101 update_query = {
102 "type": "client_update",
103 "application_name": "neytiri",
104 "contacts": "someone@someplace.com abc@cba.com",
105 "logo_uri": "http://place.com/picture.png",
106 "application_type": "web",
107 "redirect_uris": "http://blah.gmg/whatever https://inboxen.org/",
108 }
109
110 update_response = self.register_client(**update_query)
111
112 assert update_response.status_int == 200
113 client_info = update_response.json
114 client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
115
116 assert client.secret == client_info["client_secret"]
117 assert client.application_type == update_query["application_type"]
118 assert client.application_name == update_query["application_name"]
119 assert client.contacts == update_query["contacts"].split()
120 assert client.logo_url == update_query["logo_uri"]
121 assert client.redirect_uri == update_query["redirect_uris"].split()
122
123 def to_authorize_headers(self, data):
124 headers = ""
125 for key, value in data.items():
126 headers += '{0}="{1}",'.format(key, value)
127 return {"Authorization": "OAuth " + headers[:-1]}
128
129 def test_request_token(self):
130 """ Test a request for a request token """
131 response = self.register_client()
132
133 client_id = response.json["client_id"]
134
135 endpoint = "/oauth/request_token"
136 request_query = {
137 "oauth_consumer_key": client_id,
138 "oauth_nonce": "abcdefghij",
139 "oauth_timestamp": 123456789.0,
140 "oauth_callback": "https://some.url/callback",
141 }
142
143 headers = self.to_authorize_headers(request_query)
144
145 headers["Content-Type"] = self.MIME_FORM
146
147 response = self.test_app.post(endpoint, headers=headers)
148 response = parse_qs(response.body.decode())
149
150 # each element is a list, reduce it to a string
151 for key, value in response.items():
152 response[key] = value[0]
153
154 request_token = self.db.RequestToken.query.filter_by(
155 token=response["oauth_token"]
156 ).first()
157
158 client = self.db.Client.query.filter_by(id=client_id).first()
159
160 assert request_token is not None
161 assert request_token.secret == response["oauth_token_secret"]
162 assert request_token.client == client.id
163 assert request_token.used == False
164 assert request_token.callback == request_query["oauth_callback"]