Changing datetime formatting from |format to .stftime()
[mediagoblin.git] / mediagoblin / util.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from __future__ import division
18
19 from email.MIMEText import MIMEText
20 import gettext
21 import pkg_resources
22 import smtplib
23 import sys
24 import re
25 import urllib
26 from math import ceil, floor
27 import copy
28 import wtforms
29
30 from babel.localedata import exists
31 from babel.support import LazyProxy
32 import jinja2
33 import translitcodec
34 from webob import Response, exc
35 from lxml.html.clean import Cleaner
36 import markdown
37 from wtforms.form import Form
38
39 from mediagoblin import mg_globals
40 from mediagoblin import messages
41 from mediagoblin.db.util import ObjectId
42
43 from itertools import izip, count
44
45 DISPLAY_IMAGE_FETCHING_ORDER = [u'medium', u'original', u'thumb']
46
47 TESTS_ENABLED = False
48 def _activate_testing():
49 """
50 Call this to activate testing in util.py
51 """
52 global TESTS_ENABLED
53 TESTS_ENABLED = True
54
55
56 def clear_test_buckets():
57 """
58 We store some things for testing purposes that should be cleared
59 when we want a "clean slate" of information for our next round of
60 tests. Call this function to wipe all that stuff clean.
61
62 Also wipes out some other things we might redefine during testing,
63 like the jinja envs.
64 """
65 global SETUP_JINJA_ENVS
66 SETUP_JINJA_ENVS = {}
67
68 global EMAIL_TEST_INBOX
69 global EMAIL_TEST_MBOX_INBOX
70 EMAIL_TEST_INBOX = []
71 EMAIL_TEST_MBOX_INBOX = []
72
73 clear_test_template_context()
74
75
76 SETUP_JINJA_ENVS = {}
77
78
79 def get_jinja_env(template_loader, locale):
80 """
81 Set up the Jinja environment,
82
83 (In the future we may have another system for providing theming;
84 for now this is good enough.)
85 """
86 setup_gettext(locale)
87
88 # If we have a jinja environment set up with this locale, just
89 # return that one.
90 if SETUP_JINJA_ENVS.has_key(locale):
91 return SETUP_JINJA_ENVS[locale]
92
93 template_env = jinja2.Environment(
94 loader=template_loader, autoescape=True,
95 extensions=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
96
97 template_env.install_gettext_callables(
98 mg_globals.translations.ugettext,
99 mg_globals.translations.ungettext)
100
101 # All templates will know how to ...
102 # ... fetch all waiting messages and remove them from the queue
103 # ... construct a grid of thumbnails or other media
104 template_env.globals['fetch_messages'] = messages.fetch_messages
105 template_env.globals['gridify_list'] = gridify_list
106 template_env.globals['gridify_cursor'] = gridify_cursor
107
108 if exists(locale):
109 SETUP_JINJA_ENVS[locale] = template_env
110
111 return template_env
112
113
114 # We'll store context information here when doing unit tests
115 TEMPLATE_TEST_CONTEXT = {}
116
117
118 def render_template(request, template_path, context):
119 """
120 Render a template with context.
121
122 Always inserts the request into the context, so you don't have to.
123 Also stores the context if we're doing unit tests. Helpful!
124 """
125 template = request.template_env.get_template(
126 template_path)
127 context['request'] = request
128 rendered = template.render(context)
129
130 if TESTS_ENABLED:
131 TEMPLATE_TEST_CONTEXT[template_path] = context
132
133 return rendered
134
135
136 def clear_test_template_context():
137 global TEMPLATE_TEST_CONTEXT
138 TEMPLATE_TEST_CONTEXT = {}
139
140
141 def render_to_response(request, template, context):
142 """Much like Django's shortcut.render()"""
143 return Response(render_template(request, template, context))
144
145
146 def redirect(request, *args, **kwargs):
147 """Returns a HTTPFound(), takes a request and then urlgen params"""
148
149 querystring = None
150 if kwargs.get('querystring'):
151 querystring = kwargs.get('querystring')
152 del kwargs['querystring']
153
154 return exc.HTTPFound(
155 location=''.join([
156 request.urlgen(*args, **kwargs),
157 querystring if querystring else '']))
158
159
160 def setup_user_in_request(request):
161 """
162 Examine a request and tack on a request.user parameter if that's
163 appropriate.
164 """
165 if not request.session.has_key('user_id'):
166 request.user = None
167 return
168
169 user = None
170 user = request.app.db.User.one(
171 {'_id': ObjectId(request.session['user_id'])})
172
173 if not user:
174 # Something's wrong... this user doesn't exist? Invalidate
175 # this session.
176 request.session.invalidate()
177
178 request.user = user
179
180
181 def import_component(import_string):
182 """
183 Import a module component defined by STRING. Probably a method,
184 class, or global variable.
185
186 Args:
187 - import_string: a string that defines what to import. Written
188 in the format of "module1.module2:component"
189 """
190 module_name, func_name = import_string.split(':', 1)
191 __import__(module_name)
192 module = sys.modules[module_name]
193 func = getattr(module, func_name)
194 return func
195
196 _punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+')
197
198 def slugify(text, delim=u'-'):
199 """
200 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
201 """
202 result = []
203 for word in _punct_re.split(text.lower()):
204 word = word.encode('translit/long')
205 if word:
206 result.append(word)
207 return unicode(delim.join(result))
208
209 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
210 ### Special email test stuff begins HERE
211 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
212
213 # We have two "test inboxes" here:
214 #
215 # EMAIL_TEST_INBOX:
216 # ----------------
217 # If you're writing test views, you'll probably want to check this.
218 # It contains a list of MIMEText messages.
219 #
220 # EMAIL_TEST_MBOX_INBOX:
221 # ----------------------
222 # This collects the messages from the FakeMhost inbox. It's reslly
223 # just here for testing the send_email method itself.
224 #
225 # Anyway this contains:
226 # - from
227 # - to: a list of email recipient addresses
228 # - message: not just the body, but the whole message, including
229 # headers, etc.
230 #
231 # ***IMPORTANT!***
232 # ----------------
233 # Before running tests that call functions which send email, you should
234 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
235
236 EMAIL_TEST_INBOX = []
237 EMAIL_TEST_MBOX_INBOX = []
238
239
240 class FakeMhost(object):
241 """
242 Just a fake mail host so we can capture and test messages
243 from send_email
244 """
245 def connect(self):
246 pass
247
248 def sendmail(self, from_addr, to_addrs, message):
249 EMAIL_TEST_MBOX_INBOX.append(
250 {'from': from_addr,
251 'to': to_addrs,
252 'message': message})
253
254 def _clear_test_inboxes():
255 global EMAIL_TEST_INBOX
256 global EMAIL_TEST_MBOX_INBOX
257 EMAIL_TEST_INBOX = []
258 EMAIL_TEST_MBOX_INBOX = []
259
260 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
261 ### </Special email test stuff>
262 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
263
264 def send_email(from_addr, to_addrs, subject, message_body):
265 """
266 Simple email sending wrapper, use this so we can capture messages
267 for unit testing purposes.
268
269 Args:
270 - from_addr: address you're sending the email from
271 - to_addrs: list of recipient email addresses
272 - subject: subject of the email
273 - message_body: email body text
274 """
275 # TODO: make a mock mhost if testing is enabled
276 if TESTS_ENABLED or mg_globals.app_config['email_debug_mode']:
277 mhost = FakeMhost()
278 elif not mg_globals.app_config['email_debug_mode']:
279 mhost = smtplib.SMTP()
280
281 mhost.connect()
282
283 message = MIMEText(message_body.encode('utf-8'), 'plain', 'utf-8')
284 message['Subject'] = subject
285 message['From'] = from_addr
286 message['To'] = ', '.join(to_addrs)
287
288 if TESTS_ENABLED:
289 EMAIL_TEST_INBOX.append(message)
290
291 if mg_globals.app_config['email_debug_mode']:
292 print u"===== Email ====="
293 print u"From address: %s" % message['From']
294 print u"To addresses: %s" % message['To']
295 print u"Subject: %s" % message['Subject']
296 print u"-- Body: --"
297 print message.get_payload(decode=True)
298
299 return mhost.sendmail(from_addr, to_addrs, message.as_string())
300
301
302 ###################
303 # Translation tools
304 ###################
305
306
307 TRANSLATIONS_PATH = pkg_resources.resource_filename(
308 'mediagoblin', 'i18n')
309
310
311 def locale_to_lower_upper(locale):
312 """
313 Take a locale, regardless of style, and format it like "en-us"
314 """
315 if '-' in locale:
316 lang, country = locale.split('-', 1)
317 return '%s_%s' % (lang.lower(), country.upper())
318 elif '_' in locale:
319 lang, country = locale.split('_', 1)
320 return '%s_%s' % (lang.lower(), country.upper())
321 else:
322 return locale.lower()
323
324
325 def locale_to_lower_lower(locale):
326 """
327 Take a locale, regardless of style, and format it like "en_US"
328 """
329 if '_' in locale:
330 lang, country = locale.split('_', 1)
331 return '%s-%s' % (lang.lower(), country.lower())
332 else:
333 return locale.lower()
334
335
336 def get_locale_from_request(request):
337 """
338 Figure out what target language is most appropriate based on the
339 request
340 """
341 request_form = request.GET or request.POST
342
343 if request_form.has_key('lang'):
344 return locale_to_lower_upper(request_form['lang'])
345
346 accept_lang_matches = request.accept_language.best_matches()
347
348 # Your routing can explicitly specify a target language
349 if request.matchdict.has_key('locale'):
350 target_lang = request.matchdict['locale']
351 elif request.session.has_key('target_lang'):
352 target_lang = request.session['target_lang']
353 # Pull the first acceptable language
354 elif accept_lang_matches:
355 target_lang = accept_lang_matches[0]
356 # Fall back to English
357 else:
358 target_lang = 'en'
359
360 return locale_to_lower_upper(target_lang)
361
362
363 # A super strict version of the lxml.html cleaner class
364 HTML_CLEANER = Cleaner(
365 scripts=True,
366 javascript=True,
367 comments=True,
368 style=True,
369 links=True,
370 page_structure=True,
371 processing_instructions=True,
372 embedded=True,
373 frames=True,
374 forms=True,
375 annoying_tags=True,
376 allow_tags=[
377 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
378 remove_unknown_tags=False, # can't be used with allow_tags
379 safe_attrs_only=True,
380 add_nofollow=True, # for now
381 host_whitelist=(),
382 whitelist_tags=set([]))
383
384
385 def clean_html(html):
386 # clean_html barfs on an empty string
387 if not html:
388 return u''
389
390 return HTML_CLEANER.clean_html(html)
391
392
393 def convert_to_tag_list_of_dicts(tag_string):
394 """
395 Filter input from incoming string containing user tags,
396
397 Strips trailing, leading, and internal whitespace, and also converts
398 the "tags" text into an array of tags
399 """
400 taglist = []
401 if tag_string:
402
403 # Strip out internal, trailing, and leading whitespace
404 stripped_tag_string = u' '.join(tag_string.strip().split())
405
406 # Split the tag string into a list of tags
407 for tag in stripped_tag_string.split(
408 mg_globals.app_config['tags_delimiter']):
409
410 # Ignore empty or duplicate tags
411 if tag.strip() and tag.strip() not in [t['name'] for t in taglist]:
412
413 taglist.append({'name': tag.strip(),
414 'slug': slugify(tag.strip())})
415 return taglist
416
417
418 def media_tags_as_string(media_entry_tags):
419 """
420 Generate a string from a media item's tags, stored as a list of dicts
421
422 This is the opposite of convert_to_tag_list_of_dicts
423 """
424 media_tag_string = ''
425 if media_entry_tags:
426 media_tag_string = mg_globals.app_config['tags_delimiter'].join(
427 [tag['name'] for tag in media_entry_tags])
428 return media_tag_string
429
430 TOO_LONG_TAG_WARNING = \
431 u'Tags must be shorter than %s characters. Tags that are too long: %s'
432
433 def tag_length_validator(form, field):
434 """
435 Make sure tags do not exceed the maximum tag length.
436 """
437 tags = convert_to_tag_list_of_dicts(field.data)
438 too_long_tags = [
439 tag['name'] for tag in tags
440 if len(tag['name']) > mg_globals.app_config['tags_max_length']]
441
442 if too_long_tags:
443 raise wtforms.ValidationError(
444 TOO_LONG_TAG_WARNING % (mg_globals.app_config['tags_max_length'], \
445 ', '.join(too_long_tags)))
446
447
448 MARKDOWN_INSTANCE = markdown.Markdown(safe_mode='escape')
449
450 def cleaned_markdown_conversion(text):
451 """
452 Take a block of text, run it through MarkDown, and clean its HTML.
453 """
454 # Markdown will do nothing with and clean_html can do nothing with
455 # an empty string :)
456 if not text:
457 return u''
458
459 return clean_html(MARKDOWN_INSTANCE.convert(text))
460
461
462 SETUP_GETTEXTS = {}
463
464 def setup_gettext(locale):
465 """
466 Setup the gettext instance based on this locale
467 """
468 # Later on when we have plugins we may want to enable the
469 # multi-translations system they have so we can handle plugin
470 # translations too
471
472 # TODO: fallback nicely on translations from pt_PT to pt if not
473 # available, etc.
474 if SETUP_GETTEXTS.has_key(locale):
475 this_gettext = SETUP_GETTEXTS[locale]
476 else:
477 this_gettext = gettext.translation(
478 'mediagoblin', TRANSLATIONS_PATH, [locale], fallback=True)
479 if exists(locale):
480 SETUP_GETTEXTS[locale] = this_gettext
481
482 mg_globals.setup_globals(
483 translations=this_gettext)
484
485
486 # Force en to be setup before anything else so that
487 # mg_globals.translations is never None
488 setup_gettext('en')
489
490
491 def pass_to_ugettext(*args, **kwargs):
492 """
493 Pass a translation on to the appropriate ugettext method.
494
495 The reason we can't have a global ugettext method is because
496 mg_globals gets swapped out by the application per-request.
497 """
498 return mg_globals.translations.ugettext(
499 *args, **kwargs)
500
501
502 def lazy_pass_to_ugettext(*args, **kwargs):
503 """
504 Lazily pass to ugettext.
505
506 This is useful if you have to define a translation on a module
507 level but you need it to not translate until the time that it's
508 used as a string.
509 """
510 return LazyProxy(pass_to_ugettext, *args, **kwargs)
511
512
513 def pass_to_ngettext(*args, **kwargs):
514 """
515 Pass a translation on to the appropriate ngettext method.
516
517 The reason we can't have a global ngettext method is because
518 mg_globals gets swapped out by the application per-request.
519 """
520 return mg_globals.translations.ngettext(
521 *args, **kwargs)
522
523
524 def lazy_pass_to_ngettext(*args, **kwargs):
525 """
526 Lazily pass to ngettext.
527
528 This is useful if you have to define a translation on a module
529 level but you need it to not translate until the time that it's
530 used as a string.
531 """
532 return LazyProxy(pass_to_ngettext, *args, **kwargs)
533
534
535 def fake_ugettext_passthrough(string):
536 """
537 Fake a ugettext call for extraction's sake ;)
538
539 In wtforms there's a separate way to define a method to translate
540 things... so we just need to mark up the text so that it can be
541 extracted, not so that it's actually run through gettext.
542 """
543 return string
544
545
546 PAGINATION_DEFAULT_PER_PAGE = 30
547
548 class Pagination(object):
549 """
550 Pagination class for mongodb queries.
551
552 Initialization through __init__(self, cursor, page=1, per_page=2),
553 get actual data slice through __call__().
554 """
555
556 def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE,
557 jump_to_id=False):
558 """
559 Initializes Pagination
560
561 Args:
562 - page: requested page
563 - per_page: number of objects per page
564 - cursor: db cursor
565 - jump_to_id: ObjectId, sets the page to the page containing the object
566 with _id == jump_to_id.
567 """
568 self.page = page
569 self.per_page = per_page
570 self.cursor = cursor
571 self.total_count = self.cursor.count()
572 self.active_id = None
573
574 if jump_to_id:
575 cursor = copy.copy(self.cursor)
576
577 for (doc, increment) in izip(cursor, count(0)):
578 if doc['_id'] == jump_to_id:
579 self.page = 1 + int(floor(increment / self.per_page))
580
581 self.active_id = jump_to_id
582 break
583
584
585 def __call__(self):
586 """
587 Returns slice of objects for the requested page
588 """
589 return self.cursor.skip(
590 (self.page - 1) * self.per_page).limit(self.per_page)
591
592 @property
593 def pages(self):
594 return int(ceil(self.total_count / float(self.per_page)))
595
596 @property
597 def has_prev(self):
598 return self.page > 1
599
600 @property
601 def has_next(self):
602 return self.page < self.pages
603
604 def iter_pages(self, left_edge=2, left_current=2,
605 right_current=5, right_edge=2):
606 last = 0
607 for num in xrange(1, self.pages + 1):
608 if num <= left_edge or \
609 (num > self.page - left_current - 1 and \
610 num < self.page + right_current) or \
611 num > self.pages - right_edge:
612 if last + 1 != num:
613 yield None
614 yield num
615 last = num
616
617 def get_page_url_explicit(self, base_url, get_params, page_no):
618 """
619 Get a page url by adding a page= parameter to the base url
620 """
621 new_get_params = copy.copy(get_params or {})
622 new_get_params['page'] = page_no
623 return "%s?%s" % (
624 base_url, urllib.urlencode(new_get_params))
625
626 def get_page_url(self, request, page_no):
627 """
628 Get a new page url based of the request, and the new page number.
629
630 This is a nice wrapper around get_page_url_explicit()
631 """
632 return self.get_page_url_explicit(
633 request.path_info, request.GET, page_no)
634
635
636 def gridify_list(this_list, num_cols=5):
637 """
638 Generates a list of lists where each sub-list's length depends on
639 the number of columns in the list
640 """
641 grid = []
642
643 # Figure out how many rows we should have
644 num_rows = int(ceil(float(len(this_list)) / num_cols))
645
646 for row_num in range(num_rows):
647 slice_min = row_num * num_cols
648 slice_max = (row_num + 1) * num_cols
649
650 row = this_list[slice_min:slice_max]
651
652 grid.append(row)
653
654 return grid
655
656
657 def gridify_cursor(this_cursor, num_cols=5):
658 """
659 Generates a list of lists where each sub-list's length depends on
660 the number of columns in the list
661 """
662 return gridify_list(list(this_cursor), num_cols)