Support Unicode characters in configuration values
[mediagoblin.git] / mediagoblin / decorators.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from functools import wraps
18
19 from werkzeug.exceptions import Forbidden, NotFound
20 from oauthlib.oauth1 import ResourceEndpoint
21
22 from six.moves.urllib.parse import urljoin
23
24 from mediagoblin import mg_globals as mgg
25 from mediagoblin import messages
26 from mediagoblin.db.models import MediaEntry, LocalUser, TextComment, \
27 AccessToken, Comment
28 from mediagoblin.tools.response import (
29 redirect, render_404,
30 render_user_banned, json_response)
31 from mediagoblin.tools.translate import pass_to_ugettext as _
32
33 from mediagoblin.oauth.tools.request import decode_authorization_header
34 from mediagoblin.oauth.oauth import GMGRequestValidator
35
36
37 def user_not_banned(controller):
38 """
39 Requires that the user has not been banned. Otherwise redirects to the page
40 explaining why they have been banned
41 """
42 @wraps(controller)
43 def wrapper(request, *args, **kwargs):
44 if request.user:
45 if request.user.is_banned():
46 return render_user_banned(request)
47 return controller(request, *args, **kwargs)
48
49 return wrapper
50
51
52 def require_active_login(controller):
53 """
54 Require an active login from the user. If the user is banned, redirects to
55 the "You are Banned" page.
56 """
57 @wraps(controller)
58 @user_not_banned
59 def new_controller_func(request, *args, **kwargs):
60 if request.user and \
61 not request.user.has_privilege(u'active'):
62 return redirect(
63 request, 'mediagoblin.user_pages.user_home',
64 user=request.user.username)
65 elif not request.user or not request.user.has_privilege(u'active'):
66 next_url = urljoin(
67 request.urlgen('mediagoblin.auth.login',
68 qualified=True),
69 request.url)
70
71 return redirect(request, 'mediagoblin.auth.login',
72 next=next_url)
73
74 return controller(request, *args, **kwargs)
75
76 return new_controller_func
77
78
79 def user_has_privilege(privilege_name, allow_admin=True):
80 """
81 Requires that a user have a particular privilege in order to access a page.
82 In order to require that a user have multiple privileges, use this
83 decorator twice on the same view. This decorator also makes sure that the
84 user is not banned, or else it redirects them to the "You are Banned" page.
85
86 :param privilege_name A unicode object that is that represents
87 the privilege object. This object is
88 the name of the privilege, as assigned
89 in the Privilege.privilege_name column
90
91 :param allow_admin If this is true then if the user is an admin
92 it will allow the user even if the user doesn't
93 have the privilage given in privilage_name.
94 """
95
96 def user_has_privilege_decorator(controller):
97 @wraps(controller)
98 @require_active_login
99 def wrapper(request, *args, **kwargs):
100 if not request.user.has_privilege(privilege_name, allow_admin):
101 raise Forbidden()
102
103 return controller(request, *args, **kwargs)
104
105 return wrapper
106 return user_has_privilege_decorator
107
108
109 def active_user_from_url(controller):
110 """Retrieve LocalUser() from <user> URL pattern and pass in as url_user=...
111
112 Returns a 404 if no such active user has been found"""
113 @wraps(controller)
114 def wrapper(request, *args, **kwargs):
115 user = LocalUser.query.filter_by(username=request.matchdict['user']).first()
116 if user is None:
117 return render_404(request)
118
119 return controller(request, *args, url_user=user, **kwargs)
120
121 return wrapper
122
123
124 def user_may_delete_media(controller):
125 """
126 Require user ownership of the MediaEntry to delete.
127 """
128 @wraps(controller)
129 def wrapper(request, *args, **kwargs):
130 uploader_id = kwargs['media'].actor
131 if not (request.user.has_privilege(u'admin') or
132 request.user.id == uploader_id):
133 raise Forbidden()
134
135 return controller(request, *args, **kwargs)
136
137 return wrapper
138
139
140 def user_may_alter_collection(controller):
141 """
142 Require user ownership of the Collection to modify.
143 """
144 @wraps(controller)
145 def wrapper(request, *args, **kwargs):
146 creator_id = request.db.LocalUser.query.filter_by(
147 username=request.matchdict['user']).first().id
148 if not (request.user.has_privilege(u'admin') or
149 request.user.id == creator_id):
150 raise Forbidden()
151
152 return controller(request, *args, **kwargs)
153
154 return wrapper
155
156
157 def uses_pagination(controller):
158 """
159 Check request GET 'page' key for wrong values
160 """
161 @wraps(controller)
162 def wrapper(request, *args, **kwargs):
163 try:
164 page = int(request.GET.get('page', 1))
165 if page < 0:
166 return render_404(request)
167 except ValueError:
168 return render_404(request)
169
170 return controller(request, page=page, *args, **kwargs)
171
172 return wrapper
173
174
175 def get_user_media_entry(controller):
176 """
177 Pass in a MediaEntry based off of a url component
178 """
179 @wraps(controller)
180 def wrapper(request, *args, **kwargs):
181 user = LocalUser.query.filter_by(username=request.matchdict['user']).first()
182 if not user:
183 raise NotFound()
184
185 media = None
186
187 # might not be a slug, might be an id, but whatever
188 media_slug = request.matchdict['media']
189
190 # if it starts with id: it actually isn't a slug, it's an id.
191 if media_slug.startswith(u'id:'):
192 try:
193 media = MediaEntry.query.filter_by(
194 id=int(media_slug[3:]),
195 state=u'processed',
196 actor=user.id).first()
197 except ValueError:
198 raise NotFound()
199 else:
200 # no magical id: stuff? It's a slug!
201 media = MediaEntry.query.filter_by(
202 slug=media_slug,
203 state=u'processed',
204 actor=user.id).first()
205
206 if not media:
207 # Didn't find anything? Okay, 404.
208 raise NotFound()
209
210 return controller(request, media=media, *args, **kwargs)
211
212 return wrapper
213
214
215 def get_user_collection(controller):
216 """
217 Pass in a Collection based off of a url component
218 """
219 @wraps(controller)
220 def wrapper(request, *args, **kwargs):
221 user = request.db.LocalUser.query.filter_by(
222 username=request.matchdict['user']).first()
223
224 if not user:
225 return render_404(request)
226
227 collection = request.db.Collection.query.filter_by(
228 slug=request.matchdict['collection'],
229 actor=user.id).first()
230
231 # Still no collection? Okay, 404.
232 if not collection:
233 return render_404(request)
234
235 return controller(request, collection=collection, *args, **kwargs)
236
237 return wrapper
238
239
240 def get_user_collection_item(controller):
241 """
242 Pass in a CollectionItem based off of a url component
243 """
244 @wraps(controller)
245 def wrapper(request, *args, **kwargs):
246 user = request.db.LocalUser.query.filter_by(
247 username=request.matchdict['user']).first()
248
249 if not user:
250 return render_404(request)
251
252 collection_item = request.db.CollectionItem.query.filter_by(
253 id=request.matchdict['collection_item']).first()
254
255 # Still no collection item? Okay, 404.
256 if not collection_item:
257 return render_404(request)
258
259 return controller(request, collection_item=collection_item, *args, **kwargs)
260
261 return wrapper
262
263
264 def get_media_entry_by_id(controller):
265 """
266 Pass in a MediaEntry based off of a url component
267 """
268 @wraps(controller)
269 def wrapper(request, *args, **kwargs):
270 media = MediaEntry.query.filter_by(
271 id=request.matchdict['media_id']).first()
272 # Still no media? Okay, 404.
273 if not media:
274 return render_404(request)
275
276 given_username = request.matchdict.get('user')
277 if given_username and (given_username != media.get_actor.username):
278 return render_404(request)
279
280 return controller(request, media=media, *args, **kwargs)
281
282 return wrapper
283
284
285 def get_workbench(func):
286 """Decorator, passing in a workbench as kwarg which is cleaned up afterwards"""
287
288 @wraps(func)
289 def new_func(*args, **kwargs):
290 with mgg.workbench_manager.create() as workbench:
291 return func(*args, workbench=workbench, **kwargs)
292
293 return new_func
294
295
296 def allow_registration(controller):
297 """ Decorator for if registration is enabled"""
298 @wraps(controller)
299 def wrapper(request, *args, **kwargs):
300 if not mgg.app_config["allow_registration"]:
301 messages.add_message(
302 request,
303 messages.WARNING,
304 _('Sorry, registration is disabled on this instance.'))
305 return redirect(request, "index")
306
307 return controller(request, *args, **kwargs)
308
309 return wrapper
310
311 def allow_reporting(controller):
312 """ Decorator for if reporting is enabled"""
313 @wraps(controller)
314 def wrapper(request, *args, **kwargs):
315 if not mgg.app_config["allow_reporting"]:
316 messages.add_message(
317 request,
318 messages.WARNING,
319 _('Sorry, reporting is disabled on this instance.'))
320 return redirect(request, 'index')
321
322 return controller(request, *args, **kwargs)
323
324 return wrapper
325
326 def get_optional_media_comment_by_id(controller):
327 """
328 Pass in a Comment based off of a url component. Because of this decor-
329 -ator's use in filing Reports, it has two valid outcomes.
330
331 :returns The view function being wrapped with kwarg `comment` set to
332 the Comment who's id is in the URL. If there is a
333 comment id in the URL and if it is valid.
334 :returns The view function being wrapped with kwarg `comment` set to
335 None. If there is no comment id in the URL.
336 :returns A 404 Error page, if there is a comment if in the URL and it
337 is invalid.
338 """
339 @wraps(controller)
340 def wrapper(request, *args, **kwargs):
341 if 'comment' in request.matchdict:
342 comment = Comment.query.filter_by(
343 id=request.matchdict['comment']
344 ).first()
345
346 if comment is None:
347 return render_404(request)
348
349 return controller(request, comment=comment, *args, **kwargs)
350 else:
351 return controller(request, comment=None, *args, **kwargs)
352 return wrapper
353
354
355 def auth_enabled(controller):
356 """Decorator for if an auth plugin is enabled"""
357 @wraps(controller)
358 def wrapper(request, *args, **kwargs):
359 if not mgg.app.auth:
360 messages.add_message(
361 request,
362 messages.WARNING,
363 _('Sorry, authentication is disabled on this instance.'))
364 return redirect(request, 'index')
365
366 return controller(request, *args, **kwargs)
367
368 return wrapper
369
370 def require_admin_or_moderator_login(controller):
371 """
372 Require a login from an administrator or a moderator.
373 """
374 @wraps(controller)
375 def new_controller_func(request, *args, **kwargs):
376 if request.user and \
377 not (request.user.has_privilege(u'admin')
378 or request.user.has_privilege(u'moderator')):
379
380 raise Forbidden()
381 elif not request.user:
382 next_url = urljoin(
383 request.urlgen('mediagoblin.auth.login',
384 qualified=True),
385 request.url)
386
387 return redirect(request, 'mediagoblin.auth.login',
388 next=next_url)
389
390 return controller(request, *args, **kwargs)
391
392 return new_controller_func
393
394
395
396 def oauth_required(controller):
397 """ Used to wrap API endpoints where oauth is required """
398 @wraps(controller)
399 def wrapper(request, *args, **kwargs):
400 data = request.headers
401 authorization = decode_authorization_header(data)
402
403 if authorization == dict():
404 error = "Missing required parameter."
405 return json_response({"error": error}, status=400)
406
407
408 request_validator = GMGRequestValidator()
409 resource_endpoint = ResourceEndpoint(request_validator)
410 valid, r = resource_endpoint.validate_protected_resource_request(
411 uri=request.url,
412 http_method=request.method,
413 body=request.data,
414 headers=dict(request.headers),
415 )
416
417 if not valid:
418 error = "Invalid oauth parameter."
419 return json_response({"error": error}, status=400)
420
421 # Fill user if not already
422 token = authorization[u"oauth_token"]
423 request.access_token = AccessToken.query.filter_by(token=token).first()
424 if request.access_token is not None and request.user is None:
425 user_id = request.access_token.actor
426 request.user = LocalUser.query.filter_by(id=user_id).first()
427
428 return controller(request, *args, **kwargs)
429
430 return wrapper