eb13183042608fc0ff382b172deb9a755c1a6ca8
[mediagoblin.git] / mediagoblin / util.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from email.MIMEText import MIMEText
18 import gettext
19 import pkg_resources
20 import smtplib
21 import os
22 import sys
23 import re
24 import urllib
25 from math import ceil
26 import copy
27
28 from babel.localedata import exists
29 import jinja2
30 import translitcodec
31 from paste.deploy.loadwsgi import NicerConfigParser
32 from webob import Response, exc
33 from lxml.html.clean import Cleaner
34
35 from mediagoblin import mg_globals
36 from mediagoblin.db.util import ObjectId
37
38 TESTS_ENABLED = False
39 def _activate_testing():
40 """
41 Call this to activate testing in util.py
42 """
43 global TESTS_ENABLED
44 TESTS_ENABLED = True
45
46
47 def clear_test_buckets():
48 """
49 We store some things for testing purposes that should be cleared
50 when we want a "clean slate" of information for our next round of
51 tests. Call this function to wipe all that stuff clean.
52
53 Also wipes out some other things we might redefine during testing,
54 like the jinja envs.
55 """
56 global SETUP_JINJA_ENVS
57 SETUP_JINJA_ENVS = {}
58
59 global EMAIL_TEST_INBOX
60 global EMAIL_TEST_MBOX_INBOX
61 EMAIL_TEST_INBOX = []
62 EMAIL_TEST_MBOX_INBOX = []
63
64 clear_test_template_context()
65
66
67 def get_jinja_loader(user_template_path=None):
68 """
69 Set up the Jinja template loaders, possibly allowing for user
70 overridden templates.
71
72 (In the future we may have another system for providing theming;
73 for now this is good enough.)
74 """
75 if user_template_path:
76 return jinja2.ChoiceLoader(
77 [jinja2.FileSystemLoader(user_template_path),
78 jinja2.PackageLoader('mediagoblin', 'templates')])
79 else:
80 return jinja2.PackageLoader('mediagoblin', 'templates')
81
82
83 SETUP_JINJA_ENVS = {}
84
85
86 def get_jinja_env(template_loader, locale):
87 """
88 Set up the Jinja environment,
89
90 (In the future we may have another system for providing theming;
91 for now this is good enough.)
92 """
93 setup_gettext(locale)
94
95 # If we have a jinja environment set up with this locale, just
96 # return that one.
97 if SETUP_JINJA_ENVS.has_key(locale):
98 return SETUP_JINJA_ENVS[locale]
99
100 template_env = jinja2.Environment(
101 loader=template_loader, autoescape=True,
102 extensions=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
103
104 template_env.install_gettext_callables(
105 mg_globals.translations.gettext,
106 mg_globals.translations.ngettext)
107
108 if exists(locale):
109 SETUP_JINJA_ENVS[locale] = template_env
110
111 return template_env
112
113
114 # We'll store context information here when doing unit tests
115 TEMPLATE_TEST_CONTEXT = {}
116
117
118 def render_template(request, template_path, context):
119 """
120 Render a template with context.
121
122 Always inserts the request into the context, so you don't have to.
123 Also stores the context if we're doing unit tests. Helpful!
124 """
125 template = request.template_env.get_template(
126 template_path)
127 context['request'] = request
128 rendered = template.render(context)
129
130 if TESTS_ENABLED:
131 TEMPLATE_TEST_CONTEXT[template_path] = context
132
133 return rendered
134
135
136 def clear_test_template_context():
137 global TEMPLATE_TEST_CONTEXT
138 TEMPLATE_TEST_CONTEXT = {}
139
140
141 def render_to_response(request, template, context):
142 """Much like Django's shortcut.render()"""
143 return Response(render_template(request, template, context))
144
145
146 def redirect(request, *args, **kwargs):
147 """Returns a HTTPFound(), takes a request and then urlgen params"""
148 return exc.HTTPFound(location=request.urlgen(*args, **kwargs))
149
150
151 def setup_user_in_request(request):
152 """
153 Examine a request and tack on a request.user parameter if that's
154 appropriate.
155 """
156 if not request.session.has_key('user_id'):
157 request.user = None
158 return
159
160 user = None
161 user = request.app.db.User.one(
162 {'_id': ObjectId(request.session['user_id'])})
163
164 if not user:
165 # Something's wrong... this user doesn't exist? Invalidate
166 # this session.
167 request.session.invalidate()
168
169 request.user = user
170
171
172 def import_component(import_string):
173 """
174 Import a module component defined by STRING. Probably a method,
175 class, or global variable.
176
177 Args:
178 - import_string: a string that defines what to import. Written
179 in the format of "module1.module2:component"
180 """
181 module_name, func_name = import_string.split(':', 1)
182 __import__(module_name)
183 module = sys.modules[module_name]
184 func = getattr(module, func_name)
185 return func
186
187 _punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+')
188
189 def slugify(text, delim=u'-'):
190 """
191 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
192 """
193 result = []
194 for word in _punct_re.split(text.lower()):
195 word = word.encode('translit/long')
196 if word:
197 result.append(word)
198 return unicode(delim.join(result))
199
200 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
201 ### Special email test stuff begins HERE
202 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
203
204 # We have two "test inboxes" here:
205 #
206 # EMAIL_TEST_INBOX:
207 # ----------------
208 # If you're writing test views, you'll probably want to check this.
209 # It contains a list of MIMEText messages.
210 #
211 # EMAIL_TEST_MBOX_INBOX:
212 # ----------------------
213 # This collects the messages from the FakeMhost inbox. It's reslly
214 # just here for testing the send_email method itself.
215 #
216 # Anyway this contains:
217 # - from
218 # - to: a list of email recipient addresses
219 # - message: not just the body, but the whole message, including
220 # headers, etc.
221 #
222 # ***IMPORTANT!***
223 # ----------------
224 # Before running tests that call functions which send email, you should
225 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
226
227 EMAIL_TEST_INBOX = []
228 EMAIL_TEST_MBOX_INBOX = []
229
230
231 class FakeMhost(object):
232 """
233 Just a fake mail host so we can capture and test messages
234 from send_email
235 """
236 def connect(self):
237 pass
238
239 def sendmail(self, from_addr, to_addrs, message):
240 EMAIL_TEST_MBOX_INBOX.append(
241 {'from': from_addr,
242 'to': to_addrs,
243 'message': message})
244
245 def _clear_test_inboxes():
246 global EMAIL_TEST_INBOX
247 global EMAIL_TEST_MBOX_INBOX
248 EMAIL_TEST_INBOX = []
249 EMAIL_TEST_MBOX_INBOX = []
250
251 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
252 ### </Special email test stuff>
253 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
254
255 def send_email(from_addr, to_addrs, subject, message_body):
256 """
257 Simple email sending wrapper, use this so we can capture messages
258 for unit testing purposes.
259
260 Args:
261 - from_addr: address you're sending the email from
262 - to_addrs: list of recipient email addresses
263 - subject: subject of the email
264 - message_body: email body text
265 """
266 # TODO: make a mock mhost if testing is enabled
267 if TESTS_ENABLED or mg_globals.email_debug_mode:
268 mhost = FakeMhost()
269 elif not mg_globals.email_debug_mode:
270 mhost = smtplib.SMTP()
271
272 mhost.connect()
273
274 message = MIMEText(message_body.encode('utf-8'), 'plain', 'utf-8')
275 message['Subject'] = subject
276 message['From'] = from_addr
277 message['To'] = ', '.join(to_addrs)
278
279 if TESTS_ENABLED:
280 EMAIL_TEST_INBOX.append(message)
281
282 if getattr(mg_globals, 'email_debug_mode', False):
283 print u"===== Email ====="
284 print u"From address: %s" % message['From']
285 print u"To addresses: %s" % message['To']
286 print u"Subject: %s" % message['Subject']
287 print u"-- Body: --"
288 print message.get_payload(decode=True)
289
290 return mhost.sendmail(from_addr, to_addrs, message.as_string())
291
292
293 ###################
294 # Translation tools
295 ###################
296
297
298 TRANSLATIONS_PATH = pkg_resources.resource_filename(
299 'mediagoblin', 'translations')
300
301
302 def locale_to_lower_upper(locale):
303 """
304 Take a locale, regardless of style, and format it like "en-us"
305 """
306 if '-' in locale:
307 lang, country = locale.split('-', 1)
308 return '%s_%s' % (lang.lower(), country.upper())
309 elif '_' in locale:
310 lang, country = locale.split('_', 1)
311 return '%s_%s' % (lang.lower(), country.upper())
312 else:
313 return locale.lower()
314
315
316 def locale_to_lower_lower(locale):
317 """
318 Take a locale, regardless of style, and format it like "en_US"
319 """
320 if '_' in locale:
321 lang, country = locale.split('_', 1)
322 return '%s-%s' % (lang.lower(), country.lower())
323 else:
324 return locale.lower()
325
326
327 def get_locale_from_request(request):
328 """
329 Figure out what target language is most appropriate based on the
330 request
331 """
332 request_form = request.GET or request.POST
333
334 if request_form.has_key('lang'):
335 return locale_to_lower_upper(request_form['lang'])
336
337 accept_lang_matches = request.accept_language.best_matches()
338
339 # Your routing can explicitly specify a target language
340 if request.matchdict.has_key('locale'):
341 target_lang = request.matchdict['locale']
342 elif request.session.has_key('target_lang'):
343 target_lang = request.session['target_lang']
344 # Pull the first acceptable language
345 elif accept_lang_matches:
346 target_lang = accept_lang_matches[0]
347 # Fall back to English
348 else:
349 target_lang = 'en'
350
351 return locale_to_lower_upper(target_lang)
352
353
354 def read_config_file(conf_file):
355 """
356 Read a paste deploy style config file and process it.
357 """
358 if not os.path.exists(conf_file):
359 raise IOError(
360 "MEDIAGOBLIN_CONFIG not set or file does not exist")
361
362 parser = NicerConfigParser(conf_file)
363 parser.read(conf_file)
364 parser._defaults.setdefault(
365 'here', os.path.dirname(os.path.abspath(conf_file)))
366 parser._defaults.setdefault(
367 '__file__', os.path.abspath(conf_file))
368
369 mgoblin_conf = dict(
370 [(section_name, dict(parser.items(section_name)))
371 for section_name in parser.sections()])
372
373 return mgoblin_conf
374
375
376 # A super strict version of the lxml.html cleaner class
377 HTML_CLEANER = Cleaner(
378 scripts=True,
379 javascript=True,
380 comments=True,
381 style=True,
382 links=True,
383 page_structure=True,
384 processing_instructions=True,
385 embedded=True,
386 frames=True,
387 forms=True,
388 annoying_tags=True,
389 allow_tags=[
390 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
391 remove_unknown_tags=False, # can't be used with allow_tags
392 safe_attrs_only=True,
393 add_nofollow=True, # for now
394 host_whitelist=(),
395 whitelist_tags=set([]))
396
397
398 def clean_html(html):
399 return HTML_CLEANER.clean_html(html)
400
401
402 SETUP_GETTEXTS = {}
403
404 def setup_gettext(locale):
405 """
406 Setup the gettext instance based on this locale
407 """
408 # Later on when we have plugins we may want to enable the
409 # multi-translations system they have so we can handle plugin
410 # translations too
411
412 # TODO: fallback nicely on translations from pt_PT to pt if not
413 # available, etc.
414 if SETUP_GETTEXTS.has_key(locale):
415 this_gettext = SETUP_GETTEXTS[locale]
416 else:
417 this_gettext = gettext.translation(
418 'mediagoblin', TRANSLATIONS_PATH, [locale], fallback=True)
419 if exists(locale):
420 SETUP_GETTEXTS[locale] = this_gettext
421
422 mg_globals.setup_globals(
423 translations=this_gettext)
424
425
426 PAGINATION_DEFAULT_PER_PAGE = 30
427
428 class Pagination(object):
429 """
430 Pagination class for mongodb queries.
431
432 Initialization through __init__(self, cursor, page=1, per_page=2),
433 get actual data slice through __call__().
434 """
435
436 def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE):
437 """
438 Initializes Pagination
439
440 Args:
441 - page: requested page
442 - per_page: number of objects per page
443 - cursor: db cursor
444 """
445 self.page = page
446 self.per_page = per_page
447 self.cursor = cursor
448 self.total_count = self.cursor.count()
449
450 def __call__(self):
451 """
452 Returns slice of objects for the requested page
453 """
454 return self.cursor.skip(
455 (self.page - 1) * self.per_page).limit(self.per_page)
456
457 @property
458 def pages(self):
459 return int(ceil(self.total_count / float(self.per_page)))
460
461 @property
462 def has_prev(self):
463 return self.page > 1
464
465 @property
466 def has_next(self):
467 return self.page < self.pages
468
469 def iter_pages(self, left_edge=2, left_current=2,
470 right_current=5, right_edge=2):
471 last = 0
472 for num in xrange(1, self.pages + 1):
473 if num <= left_edge or \
474 (num > self.page - left_current - 1 and \
475 num < self.page + right_current) or \
476 num > self.pages - right_edge:
477 if last + 1 != num:
478 yield None
479 yield num
480 last = num
481
482 def get_page_url_explicit(self, base_url, get_params, page_no):
483 """
484 Get a page url by adding a page= parameter to the base url
485 """
486 new_get_params = copy.copy(get_params or {})
487 new_get_params['page'] = page_no
488 return "%s?%s" % (
489 base_url, urllib.urlencode(new_get_params))
490
491 def get_page_url(self, request, page_no):
492 """
493 Get a new page url based of the request, and the new page number.
494
495 This is a nice wrapper around get_page_url_explicit()
496 """
497 return self.get_page_url_explicit(
498 request.path_info, request.GET, page_no)