Merge branch '216_cwebber_style_unique_slugs'
[mediagoblin.git] / mediagoblin / decorators.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from functools import wraps
18
19 from urlparse import urljoin
20 from werkzeug.exceptions import Forbidden, NotFound
21 from werkzeug.urls import url_quote
22
23 from mediagoblin import mg_globals as mgg
24 from mediagoblin.db.models import MediaEntry, User
25 from mediagoblin.tools.response import redirect, render_404
26
27
28 def require_active_login(controller):
29 """
30 Require an active login from the user.
31 """
32 @wraps(controller)
33 def new_controller_func(request, *args, **kwargs):
34 if request.user and \
35 request.user.status == u'needs_email_verification':
36 return redirect(
37 request, 'mediagoblin.user_pages.user_home',
38 user=request.user.username)
39 elif not request.user or request.user.status != u'active':
40 next_url = urljoin(
41 request.urlgen('mediagoblin.auth.login',
42 qualified=True),
43 request.url)
44
45 return redirect(request, 'mediagoblin.auth.login',
46 next=url_quote(next_url))
47
48 return controller(request, *args, **kwargs)
49
50 return new_controller_func
51
52 def active_user_from_url(controller):
53 """Retrieve User() from <user> URL pattern and pass in as url_user=...
54
55 Returns a 404 if no such active user has been found"""
56 @wraps(controller)
57 def wrapper(request, *args, **kwargs):
58 user = User.query.filter_by(username=request.matchdict['user']).first()
59 if user is None:
60 return render_404(request)
61
62 return controller(request, *args, url_user=user, **kwargs)
63
64 return wrapper
65
66
67 def user_may_delete_media(controller):
68 """
69 Require user ownership of the MediaEntry to delete.
70 """
71 @wraps(controller)
72 def wrapper(request, *args, **kwargs):
73 uploader_id = kwargs['media'].uploader
74 if not (request.user.is_admin or
75 request.user.id == uploader_id):
76 raise Forbidden()
77
78 return controller(request, *args, **kwargs)
79
80 return wrapper
81
82
83 def user_may_alter_collection(controller):
84 """
85 Require user ownership of the Collection to modify.
86 """
87 @wraps(controller)
88 def wrapper(request, *args, **kwargs):
89 creator_id = request.db.User.find_one(
90 {'username': request.matchdict['user']}).id
91 if not (request.user.is_admin or
92 request.user.id == creator_id):
93 raise Forbidden()
94
95 return controller(request, *args, **kwargs)
96
97 return wrapper
98
99
100 def uses_pagination(controller):
101 """
102 Check request GET 'page' key for wrong values
103 """
104 @wraps(controller)
105 def wrapper(request, *args, **kwargs):
106 try:
107 page = int(request.GET.get('page', 1))
108 if page < 0:
109 return render_404(request)
110 except ValueError:
111 return render_404(request)
112
113 return controller(request, page=page, *args, **kwargs)
114
115 return wrapper
116
117
118 def get_user_media_entry(controller):
119 """
120 Pass in a MediaEntry based off of a url component
121 """
122 @wraps(controller)
123 def wrapper(request, *args, **kwargs):
124 user = User.query.filter_by(username=request.matchdict['user']).first()
125 if not user:
126 raise NotFound()
127
128 media = MediaEntry.query.filter_by(
129 slug = request.matchdict['media'],
130 state = u'processed',
131 uploader = user.id).first()
132
133 if not media:
134 # no media via slug? Grab it via object id
135 try:
136 media = MediaEntry.query.filter_by(
137 id = int(request.matchdict['media']),
138 state = u'processed',
139 uploader = user.id).first()
140 except ValueError:
141 # media "id" was no int
142 raise NotFound()
143
144 if not media:
145 # no media by that id? Okay, 404.
146 raise NotFound()
147
148 return controller(request, media=media, *args, **kwargs)
149
150 return wrapper
151
152
153 def get_user_collection(controller):
154 """
155 Pass in a Collection based off of a url component
156 """
157 @wraps(controller)
158 def wrapper(request, *args, **kwargs):
159 user = request.db.User.find_one(
160 {'username': request.matchdict['user']})
161
162 if not user:
163 return render_404(request)
164
165 collection = request.db.Collection.find_one(
166 {'slug': request.matchdict['collection'],
167 'creator': user.id})
168
169 # Still no collection? Okay, 404.
170 if not collection:
171 return render_404(request)
172
173 return controller(request, collection=collection, *args, **kwargs)
174
175 return wrapper
176
177
178 def get_user_collection_item(controller):
179 """
180 Pass in a CollectionItem based off of a url component
181 """
182 @wraps(controller)
183 def wrapper(request, *args, **kwargs):
184 user = request.db.User.find_one(
185 {'username': request.matchdict['user']})
186
187 if not user:
188 return render_404(request)
189
190 collection = request.db.Collection.find_one(
191 {'slug': request.matchdict['collection'],
192 'creator': user.id})
193
194 collection_item = request.db.CollectionItem.find_one(
195 {'id': request.matchdict['collection_item'] })
196
197 # Still no collection item? Okay, 404.
198 if not collection_item:
199 return render_404(request)
200
201 return controller(request, collection_item=collection_item, *args, **kwargs)
202
203 return wrapper
204
205
206 def get_media_entry_by_id(controller):
207 """
208 Pass in a MediaEntry based off of a url component
209 """
210 @wraps(controller)
211 def wrapper(request, *args, **kwargs):
212 media = MediaEntry.query.filter_by(
213 id=request.matchdict['media_id'],
214 state=u'processed').first()
215 # Still no media? Okay, 404.
216 if not media:
217 return render_404(request)
218
219 given_username = request.matchdict.get('user')
220 if given_username and (given_username != media.get_uploader.username):
221 return render_404(request)
222
223 return controller(request, media=media, *args, **kwargs)
224
225 return wrapper
226
227
228 def get_workbench(func):
229 """Decorator, passing in a workbench as kwarg which is cleaned up afterwards"""
230
231 @wraps(func)
232 def new_func(*args, **kwargs):
233 with mgg.workbench_manager.create() as workbench:
234 return func(*args, workbench=workbench, **kwargs)
235
236 return new_func