From: Jessica Tallon Date: Fri, 1 Aug 2014 21:26:12 +0000 (+0100) Subject: Add more security checks when updating objects and tests X-Git-Url: https://vcs.fsf.org/?a=commitdiff_plain;h=8d75091de225fdae94213fc0079a5cddc4a8171a;p=mediagoblin.git Add more security checks when updating objects and tests --- diff --git a/mediagoblin/federation/views.py b/mediagoblin/federation/views.py index d3ded448..f178a3fb 100644 --- a/mediagoblin/federation/views.py +++ b/mediagoblin/federation/views.py @@ -20,7 +20,6 @@ import mimetypes from werkzeug.datastructures import FileStorage -from mediagoblin.media_types import sniff_media from mediagoblin.decorators import oauth_required from mediagoblin.federation.decorators import user_has_privilege from mediagoblin.db.models import User, MediaEntry, MediaComment @@ -36,18 +35,19 @@ from mediagoblin.media_types.image import MEDIA_TYPE as IMAGE_MEDIA_TYPE def profile(request, raw=False): """ This is /api/user//profile - This will give profile info """ user = request.matchdict["username"] - requested_user = User.query.filter_by(username=user) + requested_user = User.query.filter_by(username=user).first() - if requested_user is None: - return json_error("No such 'user' with id '{0}'".format(user), 404) - - user = requested_user[0] + if user is None: + return json_error( + "No such 'user' with id '{0}'".format(user), + status=404 + ) if raw: - return (user, user.serialize(request)) + return (requested_user.username, requested_user.serialize(request)) # user profiles are public so return information - return json_response(user.serialize(request)) + return json_response(requested_user.serialize(request)) @oauth_required def user(request): @@ -68,12 +68,11 @@ def user(request): def uploads(request): """ Endpoint for file uploads """ user = request.matchdict["username"] - requested_user = User.query.filter_by(username=user) + requested_user = User.query.filter_by(username=user).first() if requested_user is None: return json_error("No such 'user' with id '{0}'".format(user), 404) - requested_user = requested_user[0] if request.method == "POST": # Ensure that the user is only able to upload to their own # upload endpoint. @@ -82,7 +81,7 @@ def uploads(request): "Not able to post to another users feed.", status=403 ) - + # Wrap the data in the werkzeug file wrapper if "Content-Type" not in request.headers: return json_error( @@ -97,7 +96,6 @@ def uploads(request): ) # Find media manager - media_type, media_manager = sniff_media(file_data, filename) entry = new_upload_entry(request.user) entry.media_type = IMAGE_MEDIA_TYPE return api_upload_request(request, file_data, entry) @@ -109,13 +107,12 @@ def uploads(request): def feed(request): """ Handles the user's outbox - /api/user//feed """ user = request.matchdict["username"] - requested_user = User.query.filter_by(username=user) + requested_user = User.query.filter_by(username=user).first() # check if the user exists if requested_user is None: return json_error("No such 'user' with id '{0}'".format(user), 404) - requested_user = requested_user[0] if request.data: data = json.loads(request.data) else: @@ -123,7 +120,9 @@ def feed(request): # We need to check that the user they're posting to is # the person that they are. - if request.method in ["POST", "PUT"] and requested_user.id != request.user.id: + if request.method in ["POST", "PUT"] and \ + requested_user.id != request.user.id: + return json_error( "Not able to post to another users feed.", status=403 @@ -151,14 +150,13 @@ def feed(request): elif obj.get("objectType", None) == "image": # Posting an image to the feed media_id = int(data["object"]["id"]) - media = MediaEntry.query.filter_by(id=media_id) + media = MediaEntry.query.filter_by(id=media_id).first() if media is None: return json_response( "No such 'image' with id '{0}'".format(id=media_id), status=404 ) - media = media.first() if not media.unserialize(data["object"]): return json_error( "Invalid 'image' with id '{0}'".format(media_id) @@ -203,13 +201,20 @@ def feed(request): status=403 ) - comment = MediaComment.query.filter_by(id=obj_id) + comment = MediaComment.query.filter_by(id=obj_id).first() if comment is None: return json_error( "No such 'comment' with id '{0}'.".format(obj_id) ) - comment = comment[0] + # Check that the person trying to update the comment is + # the author of the comment. + if comment.author != request.user.id: + return json_error( + "Only author of comment is able to update comment.", + status=403 + ) + if not comment.unserialize(data["object"]): return json_error( "Invalid 'comment' with id '{0}'".format(obj_id) @@ -224,13 +229,20 @@ def feed(request): return json_response(activity) elif obj["objectType"] == "image": - image = MediaEntry.query.filter_by(id=obj_id) + image = MediaEntry.query.filter_by(id=obj_id).first() if image is None: return json_error( "No such 'image' with the id '{0}'.".format(obj_id) ) - image = image[0] + # Check that the person trying to update the comment is + # the author of the comment. + if image.uploader != request.user.id: + return json_error( + "Only uploader of image is able to update image.", + status=403 + ) + if not image.unserialize(obj): return json_error( "Invalid 'image' with id '{0}'".format(obj_id) @@ -314,7 +326,10 @@ def object(request, raw_obj=False): if object_type not in ["image"]: # not sure why this is 404, maybe ask evan. Maybe 400? - return json_error("Unknown type: {0}".format(object_type), status=404) + return json_error( + "Unknown type: {0}".format(object_type), + status=404 + ) media = MediaEntry.query.filter_by(id=object_id).first() if media is None: diff --git a/mediagoblin/tests/test_api.py b/mediagoblin/tests/test_api.py index a003e66d..bda9459c 100644 --- a/mediagoblin/tests/test_api.py +++ b/mediagoblin/tests/test_api.py @@ -22,7 +22,7 @@ from webtest import AppError from .resources import GOOD_JPG from mediagoblin import mg_globals -from mediagoblin.db.models import User, MediaEntry +from mediagoblin.db.models import User, MediaEntry, MediaComment from mediagoblin.tests.tools import fixture_add_user from mediagoblin.moderation.tools import take_away_privileges @@ -119,7 +119,7 @@ class TestAPI(object): # Check that we got the response we're expecting response, _ = self._post_image_to_feed(test_app, image) assert response.status_code == 200 - + def test_unable_to_upload_as_someone_else(self, test_app): """ Test that can't upload as someoen else """ data = open(GOOD_JPG, "rb").read() @@ -127,10 +127,10 @@ class TestAPI(object): "Content-Type": "image/jpeg", "Content-Length": str(len(data)) } - + with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required): - + # Will be self.user trying to upload as self.other_user with pytest.raises(AppError) as excinfo: test_app.post( @@ -138,22 +138,22 @@ class TestAPI(object): data, headers=headers ) - + assert "403 FORBIDDEN" in excinfo.value.message - + def test_unable_to_post_feed_as_someone_else(self, test_app): """ Tests that can't post an image to someone else's feed """ response, data = self._upload_image(test_app, GOOD_JPG) - + activity = { "verb": "post", "object": data } - + headers = { "Content-Type": "application/json", } - + with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required): with pytest.raises(AppError) as excinfo: @@ -162,7 +162,40 @@ class TestAPI(object): json.dumps(activity), headers=headers ) - + + assert "403 FORBIDDEN" in excinfo.value.message + + def test_only_able_to_update_own_image(self, test_app): + """ Test's that the uploader is the only person who can update an image """ + response, data = self._upload_image(test_app, GOOD_JPG) + response, data = self._post_image_to_feed(test_app, data) + + activity = { + "verb": "update", + "object": data["object"], + } + + headers = { + "Content-Type": "application/json", + } + + # Lets change the image uploader to be self.other_user, this is easier + # than uploading the image as someone else as the way self.mocked_oauth_required + # and self._upload_image. + media = MediaEntry.query.filter_by(id=data["object"]["id"]).first() + media.uploader = self.other_user.id + media.save() + + # Now lets try and edit the image as self.user, this should produce a 403 error. + with mock.patch("mediagoblin.decorators.oauth_required", + new_callable=self.mocked_oauth_required): + with pytest.raises(AppError) as excinfo: + test_app.post( + "/api/user/{0}/feed".format(self.user.username), + json.dumps(activity), + headers=headers + ) + assert "403 FORBIDDEN" in excinfo.value.message def test_upload_image_with_filename(self, test_app): @@ -292,13 +325,13 @@ class TestAPI(object): # Test that the response is what we should be given assert comment.id == comment_data["object"]["id"] assert comment.content == comment_data["object"]["content"] - + def test_unable_to_post_comment_as_someone_else(self, test_app): """ Tests that you're unable to post a comment as someone else. """ # Upload some media to comment on response, data = self._upload_image(test_app, GOOD_JPG) response, data = self._post_image_to_feed(test_app, data) - + activity = { "verb": "post", "object": { @@ -307,11 +340,11 @@ class TestAPI(object): "inReplyTo": data["object"], } } - + headers = { "Content-Type": "application/json", } - + with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required): with pytest.raises(AppError) as excinfo: @@ -320,9 +353,53 @@ class TestAPI(object): json.dumps(activity), headers=headers ) - + assert "403 FORBIDDEN" in excinfo.value.message + def test_unable_to_update_someone_elses_comment(self, test_app): + """ Test that you're able to update someoen elses comment. """ + # Upload some media to comment on + response, data = self._upload_image(test_app, GOOD_JPG) + response, data = self._post_image_to_feed(test_app, data) + + activity = { + "verb": "post", + "object": { + "objectType": "comment", + "content": "comment commenty comment ^_^", + "inReplyTo": data["object"], + } + } + + headers = { + "Content-Type": "application/json", + } + + # Post the comment. + response, comment_data = self._activity_to_feed(test_app, activity) + + # change who uploaded the comment as it's easier than changing + comment_id = comment_data["object"]["id"] + comment = MediaComment.query.filter_by(id=comment_id).first() + comment.author = self.other_user.id + + # Update the comment as someone else. + comment_data["object"]["content"] = "Yep" + activity = { + "verb": "update", + "object": comment_data["object"] + } + + with mock.patch("mediagoblin.decorators.oauth_required", + new_callable=self.mocked_oauth_required): + with pytest.raises(AppError) as excinfo: + test_app.post( + "/api/user/{0}/feed".format(self.user.username), + json.dumps(activity), + headers=headers + ) + + assert "403 FORBIDDEN" in excinfo.value.message def test_profile(self, test_app): """ Tests profile endpoint """