-
import json
import io
import mimetypes
from mediagoblin.media_types import sniff_media
from mediagoblin.decorators import oauth_required
+from mediagoblin.federation.decorators import user_has_privilege
from mediagoblin.db.models import User, MediaEntry, MediaComment
from mediagoblin.tools.response import redirect, json_response
from mediagoblin.meddleware.csrf import csrf_exempt
@oauth_required
@csrf_exempt
+@user_has_privilege(u'uploader')
def uploads(request):
""" Endpoint for file uploads """
user = request.matchdict["username"]
import pytest
import mock
+from webtest import AppError
+
from mediagoblin import mg_globals
from .resources import GOOD_JPG
+from mediagoblin.db.models import User
from mediagoblin.tests.tools import fixture_add_user
from mediagoblin.moderation.tools import take_away_privileges
from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
BIG_BLUE
-def mocked_oauth_required(*args, **kwargs):
- """ Mocks mediagoblin.decorator.oauth_required to always validate """
-
- def oauth_required(controller):
- return controller
-
- return oauth_required
-
class TestAPI(object):
@pytest.fixture(autouse=True)
self.db = mg_globals.database
self.user = fixture_add_user(privileges=[u'active', u'uploader'])
+ def mocked_oauth_required(self, *args, **kwargs):
+ """ Mocks mediagoblin.decorator.oauth_required to always validate """
+
+ def fake_controller(controller, request, *args, **kwargs):
+ request.user = User.query.filter_by(id=self.user.id).first()
+ return controller(request, *args, **kwargs)
+
+ def oauth_required(c):
+ return lambda *args, **kwargs: fake_controller(c, *args, **kwargs)
+
+ return oauth_required
+
def test_can_post_image(self, test_app):
""" Tests that an image can be posted to the API """
# First request we need to do is to upload the image
}
- with mock.patch("mediagoblin.decorators.oauth_required", new_callable=mocked_oauth_required):
+ with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required):
response = test_app.post(
"/api/user/{0}/uploads".format(self.user.username),
data,
"Content-Length": str(len(data)),
}
- with mock.patch("mediagoblin.decorators.oauth_required", new_callable=mocked_oauth_required):
- response = test_app.post(
- "/api/user/{0}/uploads".format(self.user.username),
- data,
- headers=headers
- )
-
- error = json.loads(response.body)
+ with mock.patch("mediagoblin.decorators.oauth_required", new_callable=self.mocked_oauth_required):
+ with pytest.raises(AppError) as excinfo:
+ response = test_app.post(
+ "/api/user/{0}/uploads".format(self.user.username),
+ data,
+ headers=headers
+ )
# Assert that we've got a 403
- assert response.status_code == 403
- assert "error" in error
+ assert "403 FORBIDDEN" in excinfo.value.message
def register_client(self, **kwargs):
""" Regiters a client with the API """
-
- kwargs["type"] = "client_associate"
+
+ kwargs["type"] = "client_associate"
kwargs["application_type"] = kwargs.get("application_type", "native")
return self.test_app.post("/api/client/register", kwargs)
client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
-
+
assert response.status_int == 200
assert client is not None
client_info = response.json
client = self.db.Client.query.filter_by(id=client_info["client_id"]).first()
-
+
assert client is not None
assert client.secret == client_info["client_secret"]
assert client.application_type == query["application_type"]
assert request_token.client == client.id
assert request_token.used == False
assert request_token.callback == request_query["oauth_callback"]
-
import json
import logging
-from mediagoblin.db.models import User
+
+from mediagoblin.db.models import User, AccessToken
+from mediagoblin.oauth.tools.request import decode_authorization_header
_log = logging.getLogger(__name__)
Examine a request and tack on a request.user parameter if that's
appropriate.
"""
+ # If API request the user will be associated with the access token
+ authorization = decode_authorization_header(request.headers)
+
+ if authorization.get(u"access_token"):
+ # Check authorization header.
+ token = authorization[u"oauth_token"]
+ token = AccessToken.query.filter_by(token=token).first()
+ if token is not None:
+ request.user = token.user
+ return
+
+
if 'user_id' not in request.session:
request.user = None
return
def decode_request(request):
""" Decodes a request based on MIME-Type """
data = request.data
-
+
if request.content_type == json_encoded:
data = json.loads(data)
elif request.content_type == form_encoded or request.content_type == "":