d20aca100f2890d43fe962d1a65451e12f1ed9c3
[mediagoblin.git] / mediagoblin / decorators.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from functools import wraps
18
19 from werkzeug.exceptions import Forbidden, NotFound
20 from oauthlib.oauth1 import ResourceEndpoint
21
22 from six.moves.urllib.parse import urljoin
23
24 from mediagoblin import mg_globals as mgg
25 from mediagoblin import messages
26 from mediagoblin.db.models import MediaEntry, LocalUser, MediaComment, AccessToken
27 from mediagoblin.tools.response import (
28 redirect, render_404,
29 render_user_banned, json_response)
30 from mediagoblin.tools.translate import pass_to_ugettext as _
31
32 from mediagoblin.oauth.tools.request import decode_authorization_header
33 from mediagoblin.oauth.oauth import GMGRequestValidator
34
35
36 def user_not_banned(controller):
37 """
38 Requires that the user has not been banned. Otherwise redirects to the page
39 explaining why they have been banned
40 """
41 @wraps(controller)
42 def wrapper(request, *args, **kwargs):
43 if request.user:
44 if request.user.is_banned():
45 return render_user_banned(request)
46 return controller(request, *args, **kwargs)
47
48 return wrapper
49
50
51 def require_active_login(controller):
52 """
53 Require an active login from the user. If the user is banned, redirects to
54 the "You are Banned" page.
55 """
56 @wraps(controller)
57 @user_not_banned
58 def new_controller_func(request, *args, **kwargs):
59 if request.user and \
60 not request.user.has_privilege(u'active'):
61 return redirect(
62 request, 'mediagoblin.user_pages.user_home',
63 user=request.user.username)
64 elif not request.user or not request.user.has_privilege(u'active'):
65 next_url = urljoin(
66 request.urlgen('mediagoblin.auth.login',
67 qualified=True),
68 request.url)
69
70 return redirect(request, 'mediagoblin.auth.login',
71 next=next_url)
72
73 return controller(request, *args, **kwargs)
74
75 return new_controller_func
76
77
78 def user_has_privilege(privilege_name, allow_admin=True):
79 """
80 Requires that a user have a particular privilege in order to access a page.
81 In order to require that a user have multiple privileges, use this
82 decorator twice on the same view. This decorator also makes sure that the
83 user is not banned, or else it redirects them to the "You are Banned" page.
84
85 :param privilege_name A unicode object that is that represents
86 the privilege object. This object is
87 the name of the privilege, as assigned
88 in the Privilege.privilege_name column
89
90 :param allow_admin If this is true then if the user is an admin
91 it will allow the user even if the user doesn't
92 have the privilage given in privilage_name.
93 """
94
95 def user_has_privilege_decorator(controller):
96 @wraps(controller)
97 @require_active_login
98 def wrapper(request, *args, **kwargs):
99 if not request.user.has_privilege(privilege_name, allow_admin):
100 raise Forbidden()
101
102 return controller(request, *args, **kwargs)
103
104 return wrapper
105 return user_has_privilege_decorator
106
107
108 def active_user_from_url(controller):
109 """Retrieve LocalUser() from <user> URL pattern and pass in as url_user=...
110
111 Returns a 404 if no such active user has been found"""
112 @wraps(controller)
113 def wrapper(request, *args, **kwargs):
114 user = LocalUser.query.filter_by(username=request.matchdict['user']).first()
115 if user is None:
116 return render_404(request)
117
118 return controller(request, *args, url_user=user, **kwargs)
119
120 return wrapper
121
122
123 def user_may_delete_media(controller):
124 """
125 Require user ownership of the MediaEntry to delete.
126 """
127 @wraps(controller)
128 def wrapper(request, *args, **kwargs):
129 uploader_id = kwargs['media'].uploader
130 if not (request.user.has_privilege(u'admin') or
131 request.user.id == uploader_id):
132 raise Forbidden()
133
134 return controller(request, *args, **kwargs)
135
136 return wrapper
137
138
139 def user_may_alter_collection(controller):
140 """
141 Require user ownership of the Collection to modify.
142 """
143 @wraps(controller)
144 def wrapper(request, *args, **kwargs):
145 creator_id = request.db.LocalUser.query.filter_by(
146 username=request.matchdict['user']).first().id
147 if not (request.user.has_privilege(u'admin') or
148 request.user.id == creator_id):
149 raise Forbidden()
150
151 return controller(request, *args, **kwargs)
152
153 return wrapper
154
155
156 def uses_pagination(controller):
157 """
158 Check request GET 'page' key for wrong values
159 """
160 @wraps(controller)
161 def wrapper(request, *args, **kwargs):
162 try:
163 page = int(request.GET.get('page', 1))
164 if page < 0:
165 return render_404(request)
166 except ValueError:
167 return render_404(request)
168
169 return controller(request, page=page, *args, **kwargs)
170
171 return wrapper
172
173
174 def get_user_media_entry(controller):
175 """
176 Pass in a MediaEntry based off of a url component
177 """
178 @wraps(controller)
179 def wrapper(request, *args, **kwargs):
180 user = LocalUser.query.filter_by(username=request.matchdict['user']).first()
181 if not user:
182 raise NotFound()
183
184 media = None
185
186 # might not be a slug, might be an id, but whatever
187 media_slug = request.matchdict['media']
188
189 # if it starts with id: it actually isn't a slug, it's an id.
190 if media_slug.startswith(u'id:'):
191 try:
192 media = MediaEntry.query.filter_by(
193 id=int(media_slug[3:]),
194 state=u'processed',
195 uploader=user.id).first()
196 except ValueError:
197 raise NotFound()
198 else:
199 # no magical id: stuff? It's a slug!
200 media = MediaEntry.query.filter_by(
201 slug=media_slug,
202 state=u'processed',
203 uploader=user.id).first()
204
205 if not media:
206 # Didn't find anything? Okay, 404.
207 raise NotFound()
208
209 return controller(request, media=media, *args, **kwargs)
210
211 return wrapper
212
213
214 def get_user_collection(controller):
215 """
216 Pass in a Collection based off of a url component
217 """
218 @wraps(controller)
219 def wrapper(request, *args, **kwargs):
220 user = request.db.LocalUser.query.filter_by(
221 username=request.matchdict['user']).first()
222
223 if not user:
224 return render_404(request)
225
226 collection = request.db.Collection.query.filter_by(
227 slug=request.matchdict['collection'],
228 creator=user.id).first()
229
230 # Still no collection? Okay, 404.
231 if not collection:
232 return render_404(request)
233
234 return controller(request, collection=collection, *args, **kwargs)
235
236 return wrapper
237
238
239 def get_user_collection_item(controller):
240 """
241 Pass in a CollectionItem based off of a url component
242 """
243 @wraps(controller)
244 def wrapper(request, *args, **kwargs):
245 user = request.db.LocalUser.query.filter_by(
246 username=request.matchdict['user']).first()
247
248 if not user:
249 return render_404(request)
250
251 collection_item = request.db.CollectionItem.query.filter_by(
252 id=request.matchdict['collection_item']).first()
253
254 # Still no collection item? Okay, 404.
255 if not collection_item:
256 return render_404(request)
257
258 return controller(request, collection_item=collection_item, *args, **kwargs)
259
260 return wrapper
261
262
263 def get_media_entry_by_id(controller):
264 """
265 Pass in a MediaEntry based off of a url component
266 """
267 @wraps(controller)
268 def wrapper(request, *args, **kwargs):
269 media = MediaEntry.query.filter_by(
270 id=request.matchdict['media_id'],
271 state=u'processed').first()
272 # Still no media? Okay, 404.
273 if not media:
274 return render_404(request)
275
276 given_username = request.matchdict.get('user')
277 if given_username and (given_username != media.get_uploader.username):
278 return render_404(request)
279
280 return controller(request, media=media, *args, **kwargs)
281
282 return wrapper
283
284
285 def get_workbench(func):
286 """Decorator, passing in a workbench as kwarg which is cleaned up afterwards"""
287
288 @wraps(func)
289 def new_func(*args, **kwargs):
290 with mgg.workbench_manager.create() as workbench:
291 return func(*args, workbench=workbench, **kwargs)
292
293 return new_func
294
295
296 def allow_registration(controller):
297 """ Decorator for if registration is enabled"""
298 @wraps(controller)
299 def wrapper(request, *args, **kwargs):
300 if not mgg.app_config["allow_registration"]:
301 messages.add_message(
302 request,
303 messages.WARNING,
304 _('Sorry, registration is disabled on this instance.'))
305 return redirect(request, "index")
306
307 return controller(request, *args, **kwargs)
308
309 return wrapper
310
311 def allow_reporting(controller):
312 """ Decorator for if reporting is enabled"""
313 @wraps(controller)
314 def wrapper(request, *args, **kwargs):
315 if not mgg.app_config["allow_reporting"]:
316 messages.add_message(
317 request,
318 messages.WARNING,
319 _('Sorry, reporting is disabled on this instance.'))
320 return redirect(request, 'index')
321
322 return controller(request, *args, **kwargs)
323
324 return wrapper
325
326 def get_optional_media_comment_by_id(controller):
327 """
328 Pass in a MediaComment based off of a url component. Because of this decor-
329 -ator's use in filing Media or Comment Reports, it has two valid outcomes.
330
331 :returns The view function being wrapped with kwarg `comment` set to
332 the MediaComment who's id is in the URL. If there is a
333 comment id in the URL and if it is valid.
334 :returns The view function being wrapped with kwarg `comment` set to
335 None. If there is no comment id in the URL.
336 :returns A 404 Error page, if there is a comment if in the URL and it
337 is invalid.
338 """
339 @wraps(controller)
340 def wrapper(request, *args, **kwargs):
341 if 'comment' in request.matchdict:
342 comment = MediaComment.query.filter_by(
343 id=request.matchdict['comment']).first()
344
345 if comment is None:
346 return render_404(request)
347
348 return controller(request, comment=comment, *args, **kwargs)
349 else:
350 return controller(request, comment=None, *args, **kwargs)
351 return wrapper
352
353
354 def auth_enabled(controller):
355 """Decorator for if an auth plugin is enabled"""
356 @wraps(controller)
357 def wrapper(request, *args, **kwargs):
358 if not mgg.app.auth:
359 messages.add_message(
360 request,
361 messages.WARNING,
362 _('Sorry, authentication is disabled on this instance.'))
363 return redirect(request, 'index')
364
365 return controller(request, *args, **kwargs)
366
367 return wrapper
368
369 def require_admin_or_moderator_login(controller):
370 """
371 Require a login from an administrator or a moderator.
372 """
373 @wraps(controller)
374 def new_controller_func(request, *args, **kwargs):
375 if request.user and \
376 not (request.user.has_privilege(u'admin')
377 or request.user.has_privilege(u'moderator')):
378
379 raise Forbidden()
380 elif not request.user:
381 next_url = urljoin(
382 request.urlgen('mediagoblin.auth.login',
383 qualified=True),
384 request.url)
385
386 return redirect(request, 'mediagoblin.auth.login',
387 next=next_url)
388
389 return controller(request, *args, **kwargs)
390
391 return new_controller_func
392
393
394
395 def oauth_required(controller):
396 """ Used to wrap API endpoints where oauth is required """
397 @wraps(controller)
398 def wrapper(request, *args, **kwargs):
399 data = request.headers
400 authorization = decode_authorization_header(data)
401
402 if authorization == dict():
403 error = "Missing required parameter."
404 return json_response({"error": error}, status=400)
405
406
407 request_validator = GMGRequestValidator()
408 resource_endpoint = ResourceEndpoint(request_validator)
409 valid, r = resource_endpoint.validate_protected_resource_request(
410 uri=request.url,
411 http_method=request.method,
412 body=request.data,
413 headers=dict(request.headers),
414 )
415
416 if not valid:
417 error = "Invalid oauth prarameter."
418 return json_response({"error": error}, status=400)
419
420 # Fill user if not already
421 token = authorization[u"oauth_token"]
422 request.access_token = AccessToken.query.filter_by(token=token).first()
423 if request.access_token is not None and request.user is None:
424 user_id = request.access_token.user
425 request.user = LocalUser.query.filter_by(id=user_id).first()
426
427 return controller(request, *args, **kwargs)
428
429 return wrapper