This should be my final code update before I am ready for review! Basically, in
[mediagoblin.git] / mediagoblin / decorators.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from functools import wraps
18
19 from urlparse import urljoin
20 from werkzeug.exceptions import Forbidden, NotFound
21 from oauthlib.oauth1 import ResourceEndpoint
22
23 from mediagoblin import mg_globals as mgg
24 from mediagoblin import messages
25 from mediagoblin.db.models import (MediaEntry, User, MediaComment,
26 UserBan, Privilege)
27 from mediagoblin.tools.response import (redirect, render_404,
28 render_user_banned, json_response)
29 from mediagoblin.tools.translate import pass_to_ugettext as _
30
31 from mediagoblin.oauth.tools.request import decode_authorization_header
32 from mediagoblin.oauth.oauth import GMGRequestValidator
33
34
35 def user_not_banned(controller):
36 """
37 Requires that the user has not been banned. Otherwise redirects to the page
38 explaining why they have been banned
39 """
40 @wraps(controller)
41 def wrapper(request, *args, **kwargs):
42 if request.user:
43 if request.user.is_banned():
44 return render_user_banned(request)
45 return controller(request, *args, **kwargs)
46
47 return wrapper
48
49
50 def require_active_login(controller):
51 """
52 Require an active login from the user. If the user is banned, redirects to
53 the "You are Banned" page.
54 """
55 @wraps(controller)
56 @user_not_banned
57 def new_controller_func(request, *args, **kwargs):
58 if request.user and \
59 not request.user.has_privilege(u'active'):
60 return redirect(
61 request, 'mediagoblin.user_pages.user_home',
62 user=request.user.username)
63 elif not request.user or not request.user.has_privilege(u'active'):
64 next_url = urljoin(
65 request.urlgen('mediagoblin.auth.login',
66 qualified=True),
67 request.url)
68
69 return redirect(request, 'mediagoblin.auth.login',
70 next=next_url)
71
72 return controller(request, *args, **kwargs)
73
74 return new_controller_func
75
76
77 def user_has_privilege(privilege_name):
78 """
79 Requires that a user have a particular privilege in order to access a page.
80 In order to require that a user have multiple privileges, use this
81 decorator twice on the same view. This decorator also makes sure that the
82 user is not banned, or else it redirects them to the "You are Banned" page.
83
84 :param privilege_name A unicode object that is that represents
85 the privilege object. This object is
86 the name of the privilege, as assigned
87 in the Privilege.privilege_name column
88 """
89
90 def user_has_privilege_decorator(controller):
91 @wraps(controller)
92 @require_active_login
93 def wrapper(request, *args, **kwargs):
94 user_id = request.user.id
95 if not request.user.has_privilege(privilege_name):
96 raise Forbidden()
97
98 return controller(request, *args, **kwargs)
99
100 return wrapper
101 return user_has_privilege_decorator
102
103
104 def active_user_from_url(controller):
105 """Retrieve User() from <user> URL pattern and pass in as url_user=...
106
107 Returns a 404 if no such active user has been found"""
108 @wraps(controller)
109 def wrapper(request, *args, **kwargs):
110 user = User.query.filter_by(username=request.matchdict['user']).first()
111 if user is None:
112 return render_404(request)
113
114 return controller(request, *args, url_user=user, **kwargs)
115
116 return wrapper
117
118
119 def user_may_delete_media(controller):
120 """
121 Require user ownership of the MediaEntry to delete.
122 """
123 @wraps(controller)
124 def wrapper(request, *args, **kwargs):
125 uploader_id = kwargs['media'].uploader
126 if not (request.user.has_privilege(u'admin') or
127 request.user.id == uploader_id):
128 raise Forbidden()
129
130 return controller(request, *args, **kwargs)
131
132 return wrapper
133
134
135 def user_may_alter_collection(controller):
136 """
137 Require user ownership of the Collection to modify.
138 """
139 @wraps(controller)
140 def wrapper(request, *args, **kwargs):
141 creator_id = request.db.User.query.filter_by(
142 username=request.matchdict['user']).first().id
143 if not (request.user.has_privilege(u'admin') or
144 request.user.id == creator_id):
145 raise Forbidden()
146
147 return controller(request, *args, **kwargs)
148
149 return wrapper
150
151
152 def uses_pagination(controller):
153 """
154 Check request GET 'page' key for wrong values
155 """
156 @wraps(controller)
157 def wrapper(request, *args, **kwargs):
158 try:
159 page = int(request.GET.get('page', 1))
160 if page < 0:
161 return render_404(request)
162 except ValueError:
163 return render_404(request)
164
165 return controller(request, page=page, *args, **kwargs)
166
167 return wrapper
168
169
170 def get_user_media_entry(controller):
171 """
172 Pass in a MediaEntry based off of a url component
173 """
174 @wraps(controller)
175 def wrapper(request, *args, **kwargs):
176 user = User.query.filter_by(username=request.matchdict['user']).first()
177 if not user:
178 raise NotFound()
179
180 media = None
181
182 # might not be a slug, might be an id, but whatever
183 media_slug = request.matchdict['media']
184
185 # if it starts with id: it actually isn't a slug, it's an id.
186 if media_slug.startswith(u'id:'):
187 try:
188 media = MediaEntry.query.filter_by(
189 id=int(media_slug[3:]),
190 state=u'processed',
191 uploader=user.id).first()
192 except ValueError:
193 raise NotFound()
194 else:
195 # no magical id: stuff? It's a slug!
196 media = MediaEntry.query.filter_by(
197 slug=media_slug,
198 state=u'processed',
199 uploader=user.id).first()
200
201 if not media:
202 # Didn't find anything? Okay, 404.
203 raise NotFound()
204
205 return controller(request, media=media, *args, **kwargs)
206
207 return wrapper
208
209
210 def get_user_collection(controller):
211 """
212 Pass in a Collection based off of a url component
213 """
214 @wraps(controller)
215 def wrapper(request, *args, **kwargs):
216 user = request.db.User.query.filter_by(
217 username=request.matchdict['user']).first()
218
219 if not user:
220 return render_404(request)
221
222 collection = request.db.Collection.query.filter_by(
223 slug=request.matchdict['collection'],
224 creator=user.id).first()
225
226 # Still no collection? Okay, 404.
227 if not collection:
228 return render_404(request)
229
230 return controller(request, collection=collection, *args, **kwargs)
231
232 return wrapper
233
234
235 def get_user_collection_item(controller):
236 """
237 Pass in a CollectionItem based off of a url component
238 """
239 @wraps(controller)
240 def wrapper(request, *args, **kwargs):
241 user = request.db.User.query.filter_by(
242 username=request.matchdict['user']).first()
243
244 if not user:
245 return render_404(request)
246
247 collection_item = request.db.CollectionItem.query.filter_by(
248 id=request.matchdict['collection_item']).first()
249
250 # Still no collection item? Okay, 404.
251 if not collection_item:
252 return render_404(request)
253
254 return controller(request, collection_item=collection_item, *args, **kwargs)
255
256 return wrapper
257
258
259 def get_media_entry_by_id(controller):
260 """
261 Pass in a MediaEntry based off of a url component
262 """
263 @wraps(controller)
264 def wrapper(request, *args, **kwargs):
265 media = MediaEntry.query.filter_by(
266 id=request.matchdict['media_id'],
267 state=u'processed').first()
268 # Still no media? Okay, 404.
269 if not media:
270 return render_404(request)
271
272 given_username = request.matchdict.get('user')
273 if given_username and (given_username != media.get_uploader.username):
274 return render_404(request)
275
276 return controller(request, media=media, *args, **kwargs)
277
278 return wrapper
279
280
281 def get_workbench(func):
282 """Decorator, passing in a workbench as kwarg which is cleaned up afterwards"""
283
284 @wraps(func)
285 def new_func(*args, **kwargs):
286 with mgg.workbench_manager.create() as workbench:
287 return func(*args, workbench=workbench, **kwargs)
288
289 return new_func
290
291
292 def allow_registration(controller):
293 """ Decorator for if registration is enabled"""
294 @wraps(controller)
295 def wrapper(request, *args, **kwargs):
296 if not mgg.app_config["allow_registration"]:
297 messages.add_message(
298 request,
299 messages.WARNING,
300 _('Sorry, registration is disabled on this instance.'))
301 return redirect(request, "index")
302
303 return controller(request, *args, **kwargs)
304
305 return wrapper
306
307 def get_optional_media_comment_by_id(controller):
308 """
309 Pass in a MediaComment based off of a url component. Because of this decor-
310 -ator's use in filing Media or Comment Reports, it has two valid outcomes.
311
312 :returns The view function being wrapped with kwarg `comment` set to
313 the MediaComment who's id is in the URL. If there is a
314 comment id in the URL and if it is valid.
315 :returns The view function being wrapped with kwarg `comment` set to
316 None. If there is no comment id in the URL.
317 :returns A 404 Error page, if there is a comment if in the URL and it
318 is invalid.
319 """
320 @wraps(controller)
321 def wrapper(request, *args, **kwargs):
322 if 'comment' in request.matchdict:
323 comment = MediaComment.query.filter_by(
324 id=request.matchdict['comment']).first()
325
326 if comment is None:
327 return render_404(request)
328
329 return controller(request, comment=comment, *args, **kwargs)
330 else:
331 return controller(request, comment=None, *args, **kwargs)
332 return wrapper
333
334
335 def auth_enabled(controller):
336 """Decorator for if an auth plugin is enabled"""
337 @wraps(controller)
338 def wrapper(request, *args, **kwargs):
339 if not mgg.app.auth:
340 messages.add_message(
341 request,
342 messages.WARNING,
343 _('Sorry, authentication is disabled on this instance.'))
344 return redirect(request, 'index')
345
346 return controller(request, *args, **kwargs)
347
348 return wrapper
349
350 def require_admin_or_moderator_login(controller):
351 """
352 Require a login from an administrator or a moderator.
353 """
354 @wraps(controller)
355 def new_controller_func(request, *args, **kwargs):
356 if request.user and \
357 not request.user.has_privilege(u'admin',u'moderator'):
358
359 raise Forbidden()
360 elif not request.user:
361 next_url = urljoin(
362 request.urlgen('mediagoblin.auth.login',
363 qualified=True),
364 request.url)
365
366 return redirect(request, 'mediagoblin.auth.login',
367 next=next_url)
368
369 return controller(request, *args, **kwargs)
370
371 return new_controller_func
372
373
374
375 def oauth_required(controller):
376 """ Used to wrap API endpoints where oauth is required """
377 @wraps(controller)
378 def wrapper(request, *args, **kwargs):
379 data = request.headers
380 authorization = decode_authorization_header(data)
381
382 if authorization == dict():
383 error = "Missing required parameter."
384 return json_response({"error": error}, status=400)
385
386
387 request_validator = GMGRequestValidator()
388 resource_endpoint = ResourceEndpoint(request_validator)
389 valid, request = resource_endpoint.validate_protected_resource_request(
390 uri=request.url,
391 http_method=request.method,
392 body=request.get_data(),
393 headers=dict(request.headers),
394 )
395
396 if not valid:
397 error = "Invalid oauth prarameter."
398 return json_response({"error": error}, status=400)
399
400 return controller(request, *args, **kwargs)
401
402 return wrapper