-skip persona test is requests is not installed
[mediagoblin.git] / mediagoblin / decorators.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from functools import wraps
18
19 from urlparse import urljoin
20 from werkzeug.exceptions import Forbidden, NotFound
21 from oauthlib.oauth1 import ResourceEndpoint
22
23 from mediagoblin import mg_globals as mgg
24 from mediagoblin import messages
25 from mediagoblin.db.models import MediaEntry, User
26 from mediagoblin.tools.response import json_response, redirect, render_404
27 from mediagoblin.tools.translate import pass_to_ugettext as _
28
29 from mediagoblin.oauth.tools.request import decode_authorization_header
30 from mediagoblin.oauth.oauth import GMGRequestValidator
31
32 def require_active_login(controller):
33 """
34 Require an active login from the user.
35 """
36 @wraps(controller)
37 def new_controller_func(request, *args, **kwargs):
38 if request.user and \
39 request.user.status == u'needs_email_verification':
40 return redirect(
41 request, 'mediagoblin.user_pages.user_home',
42 user=request.user.username)
43 elif not request.user or request.user.status != u'active':
44 next_url = urljoin(
45 request.urlgen('mediagoblin.auth.login',
46 qualified=True),
47 request.url)
48
49 return redirect(request, 'mediagoblin.auth.login',
50 next=next_url)
51
52 return controller(request, *args, **kwargs)
53
54 return new_controller_func
55
56 def active_user_from_url(controller):
57 """Retrieve User() from <user> URL pattern and pass in as url_user=...
58
59 Returns a 404 if no such active user has been found"""
60 @wraps(controller)
61 def wrapper(request, *args, **kwargs):
62 user = User.query.filter_by(username=request.matchdict['user']).first()
63 if user is None:
64 return render_404(request)
65
66 return controller(request, *args, url_user=user, **kwargs)
67
68 return wrapper
69
70
71 def user_may_delete_media(controller):
72 """
73 Require user ownership of the MediaEntry to delete.
74 """
75 @wraps(controller)
76 def wrapper(request, *args, **kwargs):
77 uploader_id = kwargs['media'].uploader
78 if not (request.user.is_admin or
79 request.user.id == uploader_id):
80 raise Forbidden()
81
82 return controller(request, *args, **kwargs)
83
84 return wrapper
85
86
87 def user_may_alter_collection(controller):
88 """
89 Require user ownership of the Collection to modify.
90 """
91 @wraps(controller)
92 def wrapper(request, *args, **kwargs):
93 creator_id = request.db.User.query.filter_by(
94 username=request.matchdict['user']).first().id
95 if not (request.user.is_admin or
96 request.user.id == creator_id):
97 raise Forbidden()
98
99 return controller(request, *args, **kwargs)
100
101 return wrapper
102
103
104 def uses_pagination(controller):
105 """
106 Check request GET 'page' key for wrong values
107 """
108 @wraps(controller)
109 def wrapper(request, *args, **kwargs):
110 try:
111 page = int(request.GET.get('page', 1))
112 if page < 0:
113 return render_404(request)
114 except ValueError:
115 return render_404(request)
116
117 return controller(request, page=page, *args, **kwargs)
118
119 return wrapper
120
121
122 def get_user_media_entry(controller):
123 """
124 Pass in a MediaEntry based off of a url component
125 """
126 @wraps(controller)
127 def wrapper(request, *args, **kwargs):
128 user = User.query.filter_by(username=request.matchdict['user']).first()
129 if not user:
130 raise NotFound()
131
132 media = None
133
134 # might not be a slug, might be an id, but whatever
135 media_slug = request.matchdict['media']
136
137 # if it starts with id: it actually isn't a slug, it's an id.
138 if media_slug.startswith(u'id:'):
139 try:
140 media = MediaEntry.query.filter_by(
141 id=int(media_slug[3:]),
142 state=u'processed',
143 uploader=user.id).first()
144 except ValueError:
145 raise NotFound()
146 else:
147 # no magical id: stuff? It's a slug!
148 media = MediaEntry.query.filter_by(
149 slug=media_slug,
150 state=u'processed',
151 uploader=user.id).first()
152
153 if not media:
154 # Didn't find anything? Okay, 404.
155 raise NotFound()
156
157 return controller(request, media=media, *args, **kwargs)
158
159 return wrapper
160
161
162 def get_user_collection(controller):
163 """
164 Pass in a Collection based off of a url component
165 """
166 @wraps(controller)
167 def wrapper(request, *args, **kwargs):
168 user = request.db.User.query.filter_by(
169 username=request.matchdict['user']).first()
170
171 if not user:
172 return render_404(request)
173
174 collection = request.db.Collection.query.filter_by(
175 slug=request.matchdict['collection'],
176 creator=user.id).first()
177
178 # Still no collection? Okay, 404.
179 if not collection:
180 return render_404(request)
181
182 return controller(request, collection=collection, *args, **kwargs)
183
184 return wrapper
185
186
187 def get_user_collection_item(controller):
188 """
189 Pass in a CollectionItem based off of a url component
190 """
191 @wraps(controller)
192 def wrapper(request, *args, **kwargs):
193 user = request.db.User.query.filter_by(
194 username=request.matchdict['user']).first()
195
196 if not user:
197 return render_404(request)
198
199 collection_item = request.db.CollectionItem.query.filter_by(
200 id=request.matchdict['collection_item']).first()
201
202 # Still no collection item? Okay, 404.
203 if not collection_item:
204 return render_404(request)
205
206 return controller(request, collection_item=collection_item, *args, **kwargs)
207
208 return wrapper
209
210
211 def get_media_entry_by_id(controller):
212 """
213 Pass in a MediaEntry based off of a url component
214 """
215 @wraps(controller)
216 def wrapper(request, *args, **kwargs):
217 media = MediaEntry.query.filter_by(
218 id=request.matchdict['media_id'],
219 state=u'processed').first()
220 # Still no media? Okay, 404.
221 if not media:
222 return render_404(request)
223
224 given_username = request.matchdict.get('user')
225 if given_username and (given_username != media.get_uploader.username):
226 return render_404(request)
227
228 return controller(request, media=media, *args, **kwargs)
229
230 return wrapper
231
232
233 def get_workbench(func):
234 """Decorator, passing in a workbench as kwarg which is cleaned up afterwards"""
235
236 @wraps(func)
237 def new_func(*args, **kwargs):
238 with mgg.workbench_manager.create() as workbench:
239 return func(*args, workbench=workbench, **kwargs)
240
241 return new_func
242
243
244 def allow_registration(controller):
245 """ Decorator for if registration is enabled"""
246 @wraps(controller)
247 def wrapper(request, *args, **kwargs):
248 if not mgg.app_config["allow_registration"]:
249 messages.add_message(
250 request,
251 messages.WARNING,
252 _('Sorry, registration is disabled on this instance.'))
253 return redirect(request, "index")
254
255 return controller(request, *args, **kwargs)
256
257 return wrapper
258
259
260 def auth_enabled(controller):
261 """Decorator for if an auth plugin is enabled"""
262 @wraps(controller)
263 def wrapper(request, *args, **kwargs):
264 if not mgg.app.auth:
265 messages.add_message(
266 request,
267 messages.WARNING,
268 _('Sorry, authentication is disabled on this instance.'))
269 return redirect(request, 'index')
270
271 return controller(request, *args, **kwargs)
272
273 return wrapper
274
275 def oauth_required(controller):
276 """ Used to wrap API endpoints where oauth is required """
277 @wraps(controller)
278 def wrapper(request, *args, **kwargs):
279 data = request.headers
280 authorization = decode_authorization_header(data)
281
282 if authorization == dict():
283 error = "Missing required parameter."
284 return json_response({"error": error}, status=400)
285
286
287 request_validator = GMGRequestValidator()
288 resource_endpoint = ResourceEndpoint(request_validator)
289 valid, request = resource_endpoint.validate_protected_resource_request(
290 uri=request.url,
291 http_method=request.method,
292 body=request.get_data(),
293 headers=dict(request.headers),
294 )
295
296 if not valid:
297 error = "Invalid oauth prarameter."
298 return json_response({"error": error}, status=400)
299
300 return controller(request, *args, **kwargs)
301
302 return wrapper