86ba4168 |
1 | # GNU MediaGoblin -- federated, autonomous media hosting |
2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. |
3 | # |
4 | # This program is free software: you can redistribute it and/or modify |
5 | # it under the terms of the GNU Affero General Public License as published by |
6 | # the Free Software Foundation, either version 3 of the License, or |
7 | # (at your option) any later version. |
8 | # |
9 | # This program is distributed in the hope that it will be useful, |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | # GNU Affero General Public License for more details. |
13 | # |
14 | # You should have received a copy of the GNU Affero General Public License |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | |
89d5b44e |
17 | import cgi |
86ba4168 |
18 | |
19 | import pytest |
20 | from urlparse import parse_qs, urlparse |
21 | |
89d5b44e |
22 | from oauthlib.oauth1 import Client |
23 | |
86ba4168 |
24 | from mediagoblin import mg_globals |
25 | from mediagoblin.tools import template, pluginapi |
26 | from mediagoblin.tests.tools import fixture_add_user |
27 | |
28 | |
29 | class TestOAuth(object): |
89d5b44e |
30 | |
31 | MIME_FORM = "application/x-www-form-urlencoded" |
32 | MIME_JSON = "application/json" |
33 | |
86ba4168 |
34 | @pytest.fixture(autouse=True) |
35 | def setup(self, test_app): |
36 | self.test_app = test_app |
37 | |
38 | self.db = mg_globals.database |
39 | |
40 | self.pman = pluginapi.PluginManager() |
41 | |
42 | self.user_password = "AUserPassword123" |
43 | self.user = fixture_add_user("OAuthy", self.user_password) |
44 | |
45 | self.login() |
46 | |
47 | def login(self): |
48 | self.test_app.post( |
49 | "/auth/login/", { |
50 | "username": self.user.username, |
51 | "password": self.user_password}) |
52 | |
53 | def register_client(self, **kwargs): |
54 | """ Regiters a client with the API """ |
55 | |
56 | kwargs["type"] = "client_associate" |
57 | kwargs["application_type"] = kwargs.get("application_type", "native") |
58 | return self.test_app.post("/api/client/register", kwargs) |
59 | |
60 | def test_client_client_register_limited_info(self): |
61 | """ Tests that a client can be registered with limited information """ |
62 | response = self.register_client() |
89d5b44e |
63 | client_info = response.json |
86ba4168 |
64 | |
65 | client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() |
66 | |
67 | assert response.status_int == 200 |
68 | assert client is not None |
69 | |
70 | def test_client_register_full_info(self): |
71 | """ Provides every piece of information possible to register client """ |
72 | query = { |
73 | "application_name": "Testificate MD", |
74 | "application_type": "web", |
75 | "contacts": "someone@someplace.com tuteo@tsengeo.lu", |
76 | "logo_url": "http://ayrel.com/utral.png", |
77 | "redirect_uris": "http://navi-kosman.lu http://gmg-yawne-oeru.lu", |
78 | } |
79 | |
80 | response = self.register_client(**query) |
89d5b44e |
81 | client_info = response.json |
86ba4168 |
82 | |
83 | client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() |
84 | |
85 | assert client is not None |
86 | assert client.secret == client_info["client_secret"] |
87 | assert client.application_type == query["application_type"] |
88 | assert client.redirect_uri == query["redirect_uris"].split() |
89 | assert client.logo_url == query["logo_url"] |
90 | assert client.contacts == query["contacts"].split() |
91 | |
92 | |
93 | def test_client_update(self): |
94 | """ Tests that you can update a client """ |
95 | # first we need to register a client |
96 | response = self.register_client() |
97 | |
89d5b44e |
98 | client_info = response.json |
86ba4168 |
99 | client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() |
100 | |
101 | # Now update |
102 | update_query = { |
103 | "type": "client_update", |
104 | "application_name": "neytiri", |
105 | "contacts": "someone@someplace.com abc@cba.com", |
106 | "logo_url": "http://place.com/picture.png", |
107 | "application_type": "web", |
108 | "redirect_uris": "http://blah.gmg/whatever https://inboxen.org/", |
109 | } |
110 | |
111 | update_response = self.register_client(**update_query) |
112 | |
113 | assert update_response.status_int == 200 |
89d5b44e |
114 | client_info = update_response.json |
115 | client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() |
86ba4168 |
116 | |
117 | assert client.secret == client_info["client_secret"] |
118 | assert client.application_type == update_query["application_type"] |
119 | assert client.application_name == update_query["application_name"] |
120 | assert client.contacts == update_query["contacts"].split() |
121 | assert client.logo_url == update_query["logo_url"] |
122 | assert client.redirect_uri == update_query["redirect_uris"].split() |
123 | |
89d5b44e |
124 | def to_authorize_headers(self, data): |
125 | headers = "" |
126 | for key, value in data.items(): |
127 | headers += '{0}="{1}",'.format(key, value) |
128 | return {"Authorization": "OAuth " + headers[:-1]} |
129 | |
130 | def test_request_token(self): |
86ba4168 |
131 | """ Test a request for a request token """ |
132 | response = self.register_client() |
133 | |
89d5b44e |
134 | client_id = response.json["client_id"] |
135 | |
136 | endpoint = "/oauth/request_token" |
137 | request_query = { |
138 | "oauth_consumer_key": client_id, |
139 | "oauth_nonce": "abcdefghij", |
140 | "oauth_timestamp": 123456789.0, |
141 | "oauth_callback": "https://some.url/callback", |
142 | } |
143 | |
144 | headers = self.to_authorize_headers(request_query) |
145 | |
146 | headers["Content-Type"] = self.MIME_FORM |
147 | |
148 | response = self.test_app.post(endpoint, headers=headers) |
149 | response = cgi.parse_qs(response.body) |
150 | |
151 | # each element is a list, reduce it to a string |
152 | for key, value in response.items(): |
153 | response[key] = value[0] |
154 | |
155 | request_token = self.db.RequestToken.query.filter_by( |
156 | token=response["oauth_token"] |
157 | ).first() |
158 | |
159 | client = self.db.Client.query.filter_by(id=client_id).first() |
160 | |
161 | assert request_token is not None |
162 | assert request_token.secret == response["oauth_token_secret"] |
163 | assert request_token.client == client.id |
164 | assert request_token.used == False |
165 | assert request_token.callback == request_query["oauth_callback"] |
86ba4168 |
166 | |