From: Jessica Tallon Date: Thu, 10 Jul 2014 17:17:47 +0000 (+0100) Subject: Require uploader privileges to upload media to API X-Git-Url: https://vcs.fsf.org/?a=commitdiff_plain;h=967df5eff0c00fe7cd860ebfb297ee1f2e0bcdaf;p=mediagoblin.git Require uploader privileges to upload media to API --- diff --git a/mediagoblin/federation/views.py b/mediagoblin/federation/views.py index 8af5565b..6e4d81d4 100644 --- a/mediagoblin/federation/views.py +++ b/mediagoblin/federation/views.py @@ -1,4 +1,3 @@ - import json import io import mimetypes @@ -7,6 +6,7 @@ from werkzeug.datastructures import FileStorage from mediagoblin.media_types import sniff_media from mediagoblin.decorators import oauth_required +from mediagoblin.federation.decorators import user_has_privilege from mediagoblin.db.models import User, MediaEntry, MediaComment from mediagoblin.tools.response import redirect, json_response from mediagoblin.meddleware.csrf import csrf_exempt @@ -46,6 +46,7 @@ def user(request): @oauth_required @csrf_exempt +@user_has_privilege(u'uploader') def uploads(request): """ Endpoint for file uploads """ user = request.matchdict["username"] diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py index e1ca688b..21222304 100644 --- a/mediagoblin/tests/test_api.py +++ b/mediagoblin/tests/test_api.py @@ -19,21 +19,16 @@ import json import pytest import mock +from webtest import AppError + from mediagoblin import mg_globals from .resources import GOOD_JPG +from mediagoblin.db.models import User from mediagoblin.tests.tools import fixture_add_user from mediagoblin.moderation.tools import take_away_privileges from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \ BIG_BLUE -def mocked_oauth_required(*args, **kwargs): - """ Mocks mediagoblin.decorator.oauth_required to always validate """ - - def oauth_required(controller): - return controller - - return oauth_required - class TestAPI(object): @pytest.fixture(autouse=True) @@ -42,6 +37,18 @@ class TestAPI(object): self.db = mg_globals.database self.user = fixture_add_user(privileges=[u'active', u'uploader']) + def mocked_oauth_required(self, *args, **kwargs): + """ Mocks mediagoblin.decorator.oauth_required to always validate """ + + def fake_controller(controller, request, *args, **kwargs): + request.user = User.query.filter_by(id=self.user.id).first() + return controller(request, *args, **kwargs) + + def oauth_required(c): + return lambda *args, **kwargs: fake_controller(c, *args, **kwargs) + + return oauth_required + def test_can_post_image(self, test_app): """ Tests that an image can be posted to the API """ # First request we need to do is to upload the image @@ -52,7 +59,7 @@ class TestAPI(object): } - with mock.patch("mediagoblin.decorators.oauth_required", new_callable=mocked_oauth_required): + with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required): response = test_app.post( "/api/user/{0}/uploads".format(self.user.username), data, @@ -98,15 +105,13 @@ class TestAPI(object): "Content-Length": str(len(data)), } - with mock.patch("mediagoblin.decorators.oauth_required", new_callable=mocked_oauth_required): - response = test_app.post( - "/api/user/{0}/uploads".format(self.user.username), - data, - headers=headers - ) - - error = json.loads(response.body) + with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required): + with pytest.raises(AppError) as excinfo: + response = test_app.post( + "/api/user/{0}/uploads".format(self.user.username), + data, + headers=headers + ) # Assert that we've got a 403 - assert response.status_code == 403 - assert "error" in error + assert "403 FORBIDDEN" in excinfo.value.message diff --git a/mediagoblin/tests/test_oauth1.py b/mediagoblin/tests/test_oauth1.py index 073c2884..568036e5 100644 --- a/mediagoblin/tests/test_oauth1.py +++ b/mediagoblin/tests/test_oauth1.py @@ -52,8 +52,8 @@ class TestOAuth(object): def register_client(self, **kwargs): """ Regiters a client with the API """ - - kwargs["type"] = "client_associate" + + kwargs["type"] = "client_associate" kwargs["application_type"] = kwargs.get("application_type", "native") return self.test_app.post("/api/client/register", kwargs) @@ -63,7 +63,7 @@ class TestOAuth(object): client_info = response.json client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() - + assert response.status_int == 200 assert client is not None @@ -81,7 +81,7 @@ class TestOAuth(object): client_info = response.json client = self.db.Client.query.filter_by(id=client_info["client_id"]).first() - + assert client is not None assert client.secret == client_info["client_secret"] assert client.application_type == query["application_type"] @@ -163,4 +163,3 @@ class TestOAuth(object): assert request_token.client == client.id assert request_token.used == False assert request_token.callback == request_query["oauth_callback"] - diff --git a/mediagoblin/tools/request.py b/mediagoblin/tools/request.py index 2de0b32f..d2cb0f6a 100644 --- a/mediagoblin/tools/request.py +++ b/mediagoblin/tools/request.py @@ -16,7 +16,9 @@ import json import logging -from mediagoblin.db.models import User + +from mediagoblin.db.models import User, AccessToken +from mediagoblin.oauth.tools.request import decode_authorization_header _log = logging.getLogger(__name__) @@ -31,6 +33,18 @@ def setup_user_in_request(request): Examine a request and tack on a request.user parameter if that's appropriate. """ + # If API request the user will be associated with the access token + authorization = decode_authorization_header(request.headers) + + if authorization.get(u"access_token"): + # Check authorization header. + token = authorization[u"oauth_token"] + token = AccessToken.query.filter_by(token=token).first() + if token is not None: + request.user = token.user + return + + if 'user_id' not in request.session: request.user = None return @@ -46,7 +60,7 @@ def setup_user_in_request(request): def decode_request(request): """ Decodes a request based on MIME-Type """ data = request.data - + if request.content_type == json_encoded: data = json.loads(data) elif request.content_type == form_encoded or request.content_type == "":