Fixed tests
[mediagoblin.git] / mediagoblin / decorators.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from functools import wraps
18
19 from webob import exc
20
21 from mediagoblin.db.util import ObjectId, InvalidId
22 from mediagoblin.tools.response import redirect, render_404
23
24
25 def require_active_login(controller):
26 """
27 Require an active login from the user.
28 """
29 @wraps(controller)
30 def new_controller_func(request, *args, **kwargs):
31 if request.user and \
32 request.user.get('status') == u'needs_email_verification':
33 return redirect(
34 request, 'mediagoblin.user_pages.user_home',
35 user=request.user.username)
36 elif not request.user or request.user.get('status') != u'active':
37 return exc.HTTPFound(
38 location="%s?next=%s" % (
39 request.urlgen("mediagoblin.auth.login"),
40 request.full_path))
41
42 return controller(request, *args, **kwargs)
43
44 return new_controller_func
45
46
47 def user_may_delete_media(controller):
48 """
49 Require user ownership of the MediaEntry to delete.
50 """
51 @wraps(controller)
52 def wrapper(request, *args, **kwargs):
53 uploader_id = request.db.MediaEntry.find_one(
54 {'_id': ObjectId(request.matchdict['media'])}).uploader
55 if not (request.user.is_admin or
56 request.user._id == uploader_id):
57 return exc.HTTPForbidden()
58
59 return controller(request, *args, **kwargs)
60
61 return wrapper
62
63
64 def uses_pagination(controller):
65 """
66 Check request GET 'page' key for wrong values
67 """
68 @wraps(controller)
69 def wrapper(request, *args, **kwargs):
70 try:
71 page = int(request.GET.get('page', 1))
72 if page < 0:
73 return render_404(request)
74 except ValueError:
75 return render_404(request)
76
77 return controller(request, page=page, *args, **kwargs)
78
79 return wrapper
80
81
82 def get_user_media_entry(controller):
83 """
84 Pass in a MediaEntry based off of a url component
85 """
86 @wraps(controller)
87 def wrapper(request, *args, **kwargs):
88 user = request.db.User.find_one(
89 {'username': request.matchdict['user']})
90
91 if not user:
92 return render_404(request)
93 media = request.db.MediaEntry.find_one(
94 {'slug': request.matchdict['media'],
95 'state': u'processed',
96 'uploader': user._id})
97
98 # no media via slug? Grab it via ObjectId
99 if not media:
100 try:
101 media = request.db.MediaEntry.find_one(
102 {'_id': ObjectId(request.matchdict['media']),
103 'state': u'processed',
104 'uploader': user._id})
105 except InvalidId:
106 return render_404(request)
107
108 # Still no media? Okay, 404.
109 if not media:
110 return render_404(request)
111
112 return controller(request, media=media, *args, **kwargs)
113
114 return wrapper
115
116
117 def get_media_entry_by_id(controller):
118 """
119 Pass in a MediaEntry based off of a url component
120 """
121 @wraps(controller)
122 def wrapper(request, *args, **kwargs):
123 try:
124 media = request.db.MediaEntry.find_one(
125 {'_id': ObjectId(request.matchdict['media']),
126 'state': u'processed'})
127 except InvalidId:
128 return render_404(request)
129
130 # Still no media? Okay, 404.
131 if not media:
132 return render_404(request)
133
134 return controller(request, media=media, *args, **kwargs)
135
136 return wrapper