Merge remote branch 'upstream/master' into dev/mount_storage
[mediagoblin.git] / mediagoblin / util.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from __future__ import division
18
19 from email.MIMEText import MIMEText
20 import gettext
21 import pkg_resources
22 import smtplib
23 import sys
24 import re
25 import urllib
26 from math import ceil, floor
27 import copy
28 import wtforms
29
30 from babel.localedata import exists
31 import jinja2
32 import translitcodec
33 from webob import Response, exc
34 from lxml.html.clean import Cleaner
35 import markdown
36
37 from mediagoblin import mg_globals
38 from mediagoblin import messages
39 from mediagoblin.db.util import ObjectId
40
41 from itertools import izip, count
42
43 DISPLAY_IMAGE_FETCHING_ORDER = [u'medium', u'original', u'thumb']
44
45 TESTS_ENABLED = False
46 def _activate_testing():
47 """
48 Call this to activate testing in util.py
49 """
50 global TESTS_ENABLED
51 TESTS_ENABLED = True
52
53
54 def clear_test_buckets():
55 """
56 We store some things for testing purposes that should be cleared
57 when we want a "clean slate" of information for our next round of
58 tests. Call this function to wipe all that stuff clean.
59
60 Also wipes out some other things we might redefine during testing,
61 like the jinja envs.
62 """
63 global SETUP_JINJA_ENVS
64 SETUP_JINJA_ENVS = {}
65
66 global EMAIL_TEST_INBOX
67 global EMAIL_TEST_MBOX_INBOX
68 EMAIL_TEST_INBOX = []
69 EMAIL_TEST_MBOX_INBOX = []
70
71 clear_test_template_context()
72
73
74 SETUP_JINJA_ENVS = {}
75
76
77 def get_jinja_env(template_loader, locale):
78 """
79 Set up the Jinja environment,
80
81 (In the future we may have another system for providing theming;
82 for now this is good enough.)
83 """
84 setup_gettext(locale)
85
86 # If we have a jinja environment set up with this locale, just
87 # return that one.
88 if SETUP_JINJA_ENVS.has_key(locale):
89 return SETUP_JINJA_ENVS[locale]
90
91 template_env = jinja2.Environment(
92 loader=template_loader, autoescape=True,
93 extensions=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
94
95 template_env.install_gettext_callables(
96 mg_globals.translations.gettext,
97 mg_globals.translations.ngettext)
98
99 # All templates will know how to ...
100 # ... fetch all waiting messages and remove them from the queue
101 template_env.globals['fetch_messages'] = messages.fetch_messages
102
103 if exists(locale):
104 SETUP_JINJA_ENVS[locale] = template_env
105
106 return template_env
107
108
109 # We'll store context information here when doing unit tests
110 TEMPLATE_TEST_CONTEXT = {}
111
112
113 def render_template(request, template_path, context):
114 """
115 Render a template with context.
116
117 Always inserts the request into the context, so you don't have to.
118 Also stores the context if we're doing unit tests. Helpful!
119 """
120 template = request.template_env.get_template(
121 template_path)
122 context['request'] = request
123 rendered = template.render(context)
124
125 if TESTS_ENABLED:
126 TEMPLATE_TEST_CONTEXT[template_path] = context
127
128 return rendered
129
130
131 def clear_test_template_context():
132 global TEMPLATE_TEST_CONTEXT
133 TEMPLATE_TEST_CONTEXT = {}
134
135
136 def render_to_response(request, template, context):
137 """Much like Django's shortcut.render()"""
138 return Response(render_template(request, template, context))
139
140
141 def redirect(request, *args, **kwargs):
142 """Returns a HTTPFound(), takes a request and then urlgen params"""
143
144 querystring = None
145 if kwargs.get('querystring'):
146 querystring = kwargs.get('querystring')
147 del kwargs['querystring']
148
149 return exc.HTTPFound(
150 location=''.join([
151 request.urlgen(*args, **kwargs),
152 querystring if querystring else '']))
153
154
155 def setup_user_in_request(request):
156 """
157 Examine a request and tack on a request.user parameter if that's
158 appropriate.
159 """
160 if not request.session.has_key('user_id'):
161 request.user = None
162 return
163
164 user = None
165 user = request.app.db.User.one(
166 {'_id': ObjectId(request.session['user_id'])})
167
168 if not user:
169 # Something's wrong... this user doesn't exist? Invalidate
170 # this session.
171 request.session.invalidate()
172
173 request.user = user
174
175
176 def import_component(import_string):
177 """
178 Import a module component defined by STRING. Probably a method,
179 class, or global variable.
180
181 Args:
182 - import_string: a string that defines what to import. Written
183 in the format of "module1.module2:component"
184 """
185 module_name, func_name = import_string.split(':', 1)
186 __import__(module_name)
187 module = sys.modules[module_name]
188 func = getattr(module, func_name)
189 return func
190
191 _punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+')
192
193 def slugify(text, delim=u'-'):
194 """
195 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
196 """
197 result = []
198 for word in _punct_re.split(text.lower()):
199 word = word.encode('translit/long')
200 if word:
201 result.append(word)
202 return unicode(delim.join(result))
203
204 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
205 ### Special email test stuff begins HERE
206 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
207
208 # We have two "test inboxes" here:
209 #
210 # EMAIL_TEST_INBOX:
211 # ----------------
212 # If you're writing test views, you'll probably want to check this.
213 # It contains a list of MIMEText messages.
214 #
215 # EMAIL_TEST_MBOX_INBOX:
216 # ----------------------
217 # This collects the messages from the FakeMhost inbox. It's reslly
218 # just here for testing the send_email method itself.
219 #
220 # Anyway this contains:
221 # - from
222 # - to: a list of email recipient addresses
223 # - message: not just the body, but the whole message, including
224 # headers, etc.
225 #
226 # ***IMPORTANT!***
227 # ----------------
228 # Before running tests that call functions which send email, you should
229 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
230
231 EMAIL_TEST_INBOX = []
232 EMAIL_TEST_MBOX_INBOX = []
233
234
235 class FakeMhost(object):
236 """
237 Just a fake mail host so we can capture and test messages
238 from send_email
239 """
240 def connect(self):
241 pass
242
243 def sendmail(self, from_addr, to_addrs, message):
244 EMAIL_TEST_MBOX_INBOX.append(
245 {'from': from_addr,
246 'to': to_addrs,
247 'message': message})
248
249 def _clear_test_inboxes():
250 global EMAIL_TEST_INBOX
251 global EMAIL_TEST_MBOX_INBOX
252 EMAIL_TEST_INBOX = []
253 EMAIL_TEST_MBOX_INBOX = []
254
255 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
256 ### </Special email test stuff>
257 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
258
259 def send_email(from_addr, to_addrs, subject, message_body):
260 """
261 Simple email sending wrapper, use this so we can capture messages
262 for unit testing purposes.
263
264 Args:
265 - from_addr: address you're sending the email from
266 - to_addrs: list of recipient email addresses
267 - subject: subject of the email
268 - message_body: email body text
269 """
270 # TODO: make a mock mhost if testing is enabled
271 if TESTS_ENABLED or mg_globals.app_config['email_debug_mode']:
272 mhost = FakeMhost()
273 elif not mg_globals.app_config['email_debug_mode']:
274 mhost = smtplib.SMTP()
275
276 mhost.connect()
277
278 message = MIMEText(message_body.encode('utf-8'), 'plain', 'utf-8')
279 message['Subject'] = subject
280 message['From'] = from_addr
281 message['To'] = ', '.join(to_addrs)
282
283 if TESTS_ENABLED:
284 EMAIL_TEST_INBOX.append(message)
285
286 if mg_globals.app_config['email_debug_mode']:
287 print u"===== Email ====="
288 print u"From address: %s" % message['From']
289 print u"To addresses: %s" % message['To']
290 print u"Subject: %s" % message['Subject']
291 print u"-- Body: --"
292 print message.get_payload(decode=True)
293
294 return mhost.sendmail(from_addr, to_addrs, message.as_string())
295
296
297 ###################
298 # Translation tools
299 ###################
300
301
302 TRANSLATIONS_PATH = pkg_resources.resource_filename(
303 'mediagoblin', 'i18n')
304
305
306 def locale_to_lower_upper(locale):
307 """
308 Take a locale, regardless of style, and format it like "en-us"
309 """
310 if '-' in locale:
311 lang, country = locale.split('-', 1)
312 return '%s_%s' % (lang.lower(), country.upper())
313 elif '_' in locale:
314 lang, country = locale.split('_', 1)
315 return '%s_%s' % (lang.lower(), country.upper())
316 else:
317 return locale.lower()
318
319
320 def locale_to_lower_lower(locale):
321 """
322 Take a locale, regardless of style, and format it like "en_US"
323 """
324 if '_' in locale:
325 lang, country = locale.split('_', 1)
326 return '%s-%s' % (lang.lower(), country.lower())
327 else:
328 return locale.lower()
329
330
331 def get_locale_from_request(request):
332 """
333 Figure out what target language is most appropriate based on the
334 request
335 """
336 request_form = request.GET or request.POST
337
338 if request_form.has_key('lang'):
339 return locale_to_lower_upper(request_form['lang'])
340
341 accept_lang_matches = request.accept_language.best_matches()
342
343 # Your routing can explicitly specify a target language
344 if request.matchdict.has_key('locale'):
345 target_lang = request.matchdict['locale']
346 elif request.session.has_key('target_lang'):
347 target_lang = request.session['target_lang']
348 # Pull the first acceptable language
349 elif accept_lang_matches:
350 target_lang = accept_lang_matches[0]
351 # Fall back to English
352 else:
353 target_lang = 'en'
354
355 return locale_to_lower_upper(target_lang)
356
357
358 # A super strict version of the lxml.html cleaner class
359 HTML_CLEANER = Cleaner(
360 scripts=True,
361 javascript=True,
362 comments=True,
363 style=True,
364 links=True,
365 page_structure=True,
366 processing_instructions=True,
367 embedded=True,
368 frames=True,
369 forms=True,
370 annoying_tags=True,
371 allow_tags=[
372 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
373 remove_unknown_tags=False, # can't be used with allow_tags
374 safe_attrs_only=True,
375 add_nofollow=True, # for now
376 host_whitelist=(),
377 whitelist_tags=set([]))
378
379
380 def clean_html(html):
381 # clean_html barfs on an empty string
382 if not html:
383 return u''
384
385 return HTML_CLEANER.clean_html(html)
386
387
388 def convert_to_tag_list_of_dicts(tag_string):
389 """
390 Filter input from incoming string containing user tags,
391
392 Strips trailing, leading, and internal whitespace, and also converts
393 the "tags" text into an array of tags
394 """
395 taglist = []
396 if tag_string:
397
398 # Strip out internal, trailing, and leading whitespace
399 stripped_tag_string = u' '.join(tag_string.strip().split())
400
401 # Split the tag string into a list of tags
402 for tag in stripped_tag_string.split(
403 mg_globals.app_config['tags_delimiter']):
404
405 # Ignore empty or duplicate tags
406 if tag.strip() and tag.strip() not in [t['name'] for t in taglist]:
407
408 taglist.append({'name': tag.strip(),
409 'slug': slugify(tag.strip())})
410 return taglist
411
412
413 def media_tags_as_string(media_entry_tags):
414 """
415 Generate a string from a media item's tags, stored as a list of dicts
416
417 This is the opposite of convert_to_tag_list_of_dicts
418 """
419 media_tag_string = ''
420 if media_entry_tags:
421 media_tag_string = mg_globals.app_config['tags_delimiter'].join(
422 [tag['name'] for tag in media_entry_tags])
423 return media_tag_string
424
425 TOO_LONG_TAG_WARNING = \
426 u'Tags must be shorter than %s characters. Tags that are too long: %s'
427
428 def tag_length_validator(form, field):
429 """
430 Make sure tags do not exceed the maximum tag length.
431 """
432 tags = convert_to_tag_list_of_dicts(field.data)
433 too_long_tags = [
434 tag['name'] for tag in tags
435 if len(tag['name']) > mg_globals.app_config['tags_max_length']]
436
437 if too_long_tags:
438 raise wtforms.ValidationError(
439 TOO_LONG_TAG_WARNING % (mg_globals.app_config['tags_max_length'], \
440 ', '.join(too_long_tags)))
441
442
443 MARKDOWN_INSTANCE = markdown.Markdown(safe_mode='escape')
444
445 def cleaned_markdown_conversion(text):
446 """
447 Take a block of text, run it through MarkDown, and clean its HTML.
448 """
449 # Markdown will do nothing with and clean_html can do nothing with
450 # an empty string :)
451 if not text:
452 return u''
453
454 return clean_html(MARKDOWN_INSTANCE.convert(text))
455
456
457 SETUP_GETTEXTS = {}
458
459 def setup_gettext(locale):
460 """
461 Setup the gettext instance based on this locale
462 """
463 # Later on when we have plugins we may want to enable the
464 # multi-translations system they have so we can handle plugin
465 # translations too
466
467 # TODO: fallback nicely on translations from pt_PT to pt if not
468 # available, etc.
469 if SETUP_GETTEXTS.has_key(locale):
470 this_gettext = SETUP_GETTEXTS[locale]
471 else:
472 this_gettext = gettext.translation(
473 'mediagoblin', TRANSLATIONS_PATH, [locale], fallback=True)
474 if exists(locale):
475 SETUP_GETTEXTS[locale] = this_gettext
476
477 mg_globals.setup_globals(
478 translations=this_gettext)
479
480
481 PAGINATION_DEFAULT_PER_PAGE = 30
482
483 class Pagination(object):
484 """
485 Pagination class for mongodb queries.
486
487 Initialization through __init__(self, cursor, page=1, per_page=2),
488 get actual data slice through __call__().
489 """
490
491 def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE,
492 jump_to_id=False):
493 """
494 Initializes Pagination
495
496 Args:
497 - page: requested page
498 - per_page: number of objects per page
499 - cursor: db cursor
500 - jump_to_id: ObjectId, sets the page to the page containing the object
501 with _id == jump_to_id.
502 """
503 self.page = page
504 self.per_page = per_page
505 self.cursor = cursor
506 self.total_count = self.cursor.count()
507 self.active_id = None
508
509 if jump_to_id:
510 cursor = copy.copy(self.cursor)
511
512 for (doc, increment) in izip(cursor, count(0)):
513 if doc['_id'] == jump_to_id:
514 self.page = 1 + int(floor(increment / self.per_page))
515
516 self.active_id = jump_to_id
517 break
518
519
520 def __call__(self):
521 """
522 Returns slice of objects for the requested page
523 """
524 return self.cursor.skip(
525 (self.page - 1) * self.per_page).limit(self.per_page)
526
527 @property
528 def pages(self):
529 return int(ceil(self.total_count / float(self.per_page)))
530
531 @property
532 def has_prev(self):
533 return self.page > 1
534
535 @property
536 def has_next(self):
537 return self.page < self.pages
538
539 def iter_pages(self, left_edge=2, left_current=2,
540 right_current=5, right_edge=2):
541 last = 0
542 for num in xrange(1, self.pages + 1):
543 if num <= left_edge or \
544 (num > self.page - left_current - 1 and \
545 num < self.page + right_current) or \
546 num > self.pages - right_edge:
547 if last + 1 != num:
548 yield None
549 yield num
550 last = num
551
552 def get_page_url_explicit(self, base_url, get_params, page_no):
553 """
554 Get a page url by adding a page= parameter to the base url
555 """
556 new_get_params = copy.copy(get_params or {})
557 new_get_params['page'] = page_no
558 return "%s?%s" % (
559 base_url, urllib.urlencode(new_get_params))
560
561 def get_page_url(self, request, page_no):
562 """
563 Get a new page url based of the request, and the new page number.
564
565 This is a nice wrapper around get_page_url_explicit()
566 """
567 return self.get_page_url_explicit(
568 request.path_info, request.GET, page_no)