Commit | Line | Data |
---|---|---|
86ba4168 | 1 | # GNU MediaGoblin -- federated, autonomous media hosting |
2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. | |
3 | # | |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
86ba4168 | 17 | import pytest |
9459fa3c BP |
18 | |
19 | from six.moves.urllib.parse import parse_qs, urlparse | |
86ba4168 | 20 | |
89d5b44e | 21 | from oauthlib.oauth1 import Client |
22 | ||
86ba4168 | 23 | from mediagoblin import mg_globals |
24 | from mediagoblin.tools import template, pluginapi | |
25 | from mediagoblin.tests.tools import fixture_add_user | |
26 | ||
27 | ||
28 | class TestOAuth(object): | |
89d5b44e | 29 | |
30 | MIME_FORM = "application/x-www-form-urlencoded" | |
31 | MIME_JSON = "application/json" | |
32 | ||
86ba4168 | 33 | @pytest.fixture(autouse=True) |
34 | def setup(self, test_app): | |
35 | self.test_app = test_app | |
36 | ||
37 | self.db = mg_globals.database | |
38 | ||
39 | self.pman = pluginapi.PluginManager() | |
40 | ||
41 | self.user_password = "AUserPassword123" | |
42 | self.user = fixture_add_user("OAuthy", self.user_password) | |
43 | ||
44 | self.login() | |
45 | ||
46 | def login(self): | |
47 | self.test_app.post( | |
48 | "/auth/login/", { | |
49 | "username": self.user.username, | |
50 | "password": self.user_password}) | |
51 | ||
52 | def register_client(self, **kwargs): | |
53 | """ Regiters a client with the API """ | |
9459fa3c BP |
54 | |
55 | kwargs["type"] = "client_associate" | |
86ba4168 | 56 | kwargs["application_type"] = kwargs.get("application_type", "native") |
57 | return self.test_app.post("/api/client/register", kwargs) | |
58 | ||
59 | def test_client_client_register_limited_info(self): | |
60 | """ Tests that a client can be registered with limited information """ | |
61 | response = self.register_client() | |
89d5b44e | 62 | client_info = response.json |
86ba4168 | 63 | |
64 | client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() | |
9459fa3c | 65 | |
86ba4168 | 66 | assert response.status_int == 200 |
67 | assert client is not None | |
68 | ||
69 | def test_client_register_full_info(self): | |
70 | """ Provides every piece of information possible to register client """ | |
71 | query = { | |
72 | "application_name": "Testificate MD", | |
73 | "application_type": "web", | |
74 | "contacts": "someone@someplace.com tuteo@tsengeo.lu", | |
670cdef7 | 75 | "logo_uri": "http://ayrel.com/utral.png", |
86ba4168 | 76 | "redirect_uris": "http://navi-kosman.lu http://gmg-yawne-oeru.lu", |
77 | } | |
78 | ||
79 | response = self.register_client(**query) | |
89d5b44e | 80 | client_info = response.json |
86ba4168 | 81 | |
82 | client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() | |
9459fa3c | 83 | |
86ba4168 | 84 | assert client is not None |
85 | assert client.secret == client_info["client_secret"] | |
86 | assert client.application_type == query["application_type"] | |
87 | assert client.redirect_uri == query["redirect_uris"].split() | |
670cdef7 | 88 | assert client.logo_url == query["logo_uri"] |
86ba4168 | 89 | assert client.contacts == query["contacts"].split() |
90 | ||
91 | ||
92 | def test_client_update(self): | |
93 | """ Tests that you can update a client """ | |
94 | # first we need to register a client | |
95 | response = self.register_client() | |
96 | ||
89d5b44e | 97 | client_info = response.json |
86ba4168 | 98 | client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() |
99 | ||
100 | # Now update | |
101 | update_query = { | |
102 | "type": "client_update", | |
103 | "application_name": "neytiri", | |
104 | "contacts": "someone@someplace.com abc@cba.com", | |
670cdef7 | 105 | "logo_uri": "http://place.com/picture.png", |
86ba4168 | 106 | "application_type": "web", |
107 | "redirect_uris": "http://blah.gmg/whatever https://inboxen.org/", | |
108 | } | |
109 | ||
110 | update_response = self.register_client(**update_query) | |
111 | ||
112 | assert update_response.status_int == 200 | |
89d5b44e | 113 | client_info = update_response.json |
114 | client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() | |
86ba4168 | 115 | |
116 | assert client.secret == client_info["client_secret"] | |
117 | assert client.application_type == update_query["application_type"] | |
118 | assert client.application_name == update_query["application_name"] | |
119 | assert client.contacts == update_query["contacts"].split() | |
670cdef7 | 120 | assert client.logo_url == update_query["logo_uri"] |
86ba4168 | 121 | assert client.redirect_uri == update_query["redirect_uris"].split() |
122 | ||
89d5b44e | 123 | def to_authorize_headers(self, data): |
124 | headers = "" | |
125 | for key, value in data.items(): | |
126 | headers += '{0}="{1}",'.format(key, value) | |
127 | return {"Authorization": "OAuth " + headers[:-1]} | |
128 | ||
129 | def test_request_token(self): | |
86ba4168 | 130 | """ Test a request for a request token """ |
131 | response = self.register_client() | |
132 | ||
89d5b44e | 133 | client_id = response.json["client_id"] |
134 | ||
135 | endpoint = "/oauth/request_token" | |
136 | request_query = { | |
137 | "oauth_consumer_key": client_id, | |
138 | "oauth_nonce": "abcdefghij", | |
139 | "oauth_timestamp": 123456789.0, | |
140 | "oauth_callback": "https://some.url/callback", | |
141 | } | |
142 | ||
143 | headers = self.to_authorize_headers(request_query) | |
144 | ||
145 | headers["Content-Type"] = self.MIME_FORM | |
146 | ||
147 | response = self.test_app.post(endpoint, headers=headers) | |
fe6f82be | 148 | response = parse_qs(response.body.decode()) |
89d5b44e | 149 | |
150 | # each element is a list, reduce it to a string | |
151 | for key, value in response.items(): | |
152 | response[key] = value[0] | |
153 | ||
154 | request_token = self.db.RequestToken.query.filter_by( | |
155 | token=response["oauth_token"] | |
156 | ).first() | |
157 | ||
158 | client = self.db.Client.query.filter_by(id=client_id).first() | |
159 | ||
160 | assert request_token is not None | |
161 | assert request_token.secret == response["oauth_token_secret"] | |
162 | assert request_token.client == client.id | |
163 | assert request_token.used == False | |
164 | assert request_token.callback == request_query["oauth_callback"] |