Added empty_space class for user profile placeholders
[mediagoblin.git] / mediagoblin / util.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from __future__ import division
18
19 from email.MIMEText import MIMEText
20 import gettext
21 import pkg_resources
22 import smtplib
23 import sys
24 import re
25 import urllib
26 from math import ceil, floor
27 import copy
28 import wtforms
29
30 from babel.localedata import exists
31 from babel.support import LazyProxy
32 import jinja2
33 import translitcodec
34 from webob import Response, exc
35 from lxml.html.clean import Cleaner
36 import markdown
37 from wtforms.form import Form
38
39 from mediagoblin import mg_globals
40 from mediagoblin import messages
41 from mediagoblin.db.util import ObjectId
42
43 from itertools import izip, count
44
45 DISPLAY_IMAGE_FETCHING_ORDER = [u'medium', u'original', u'thumb']
46
47 TESTS_ENABLED = False
48 def _activate_testing():
49 """
50 Call this to activate testing in util.py
51 """
52 global TESTS_ENABLED
53 TESTS_ENABLED = True
54
55
56 def clear_test_buckets():
57 """
58 We store some things for testing purposes that should be cleared
59 when we want a "clean slate" of information for our next round of
60 tests. Call this function to wipe all that stuff clean.
61
62 Also wipes out some other things we might redefine during testing,
63 like the jinja envs.
64 """
65 global SETUP_JINJA_ENVS
66 SETUP_JINJA_ENVS = {}
67
68 global EMAIL_TEST_INBOX
69 global EMAIL_TEST_MBOX_INBOX
70 EMAIL_TEST_INBOX = []
71 EMAIL_TEST_MBOX_INBOX = []
72
73 clear_test_template_context()
74
75
76 SETUP_JINJA_ENVS = {}
77
78
79 def get_jinja_env(template_loader, locale):
80 """
81 Set up the Jinja environment,
82
83 (In the future we may have another system for providing theming;
84 for now this is good enough.)
85 """
86 setup_gettext(locale)
87
88 # If we have a jinja environment set up with this locale, just
89 # return that one.
90 if SETUP_JINJA_ENVS.has_key(locale):
91 return SETUP_JINJA_ENVS[locale]
92
93 template_env = jinja2.Environment(
94 loader=template_loader, autoescape=True,
95 extensions=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
96
97 template_env.install_gettext_callables(
98 mg_globals.translations.ugettext,
99 mg_globals.translations.ungettext)
100
101 # All templates will know how to ...
102 # ... fetch all waiting messages and remove them from the queue
103 template_env.globals['fetch_messages'] = messages.fetch_messages
104
105 if exists(locale):
106 SETUP_JINJA_ENVS[locale] = template_env
107
108 return template_env
109
110
111 # We'll store context information here when doing unit tests
112 TEMPLATE_TEST_CONTEXT = {}
113
114
115 def render_template(request, template_path, context):
116 """
117 Render a template with context.
118
119 Always inserts the request into the context, so you don't have to.
120 Also stores the context if we're doing unit tests. Helpful!
121 """
122 template = request.template_env.get_template(
123 template_path)
124 context['request'] = request
125 rendered = template.render(context)
126
127 if TESTS_ENABLED:
128 TEMPLATE_TEST_CONTEXT[template_path] = context
129
130 return rendered
131
132
133 def clear_test_template_context():
134 global TEMPLATE_TEST_CONTEXT
135 TEMPLATE_TEST_CONTEXT = {}
136
137
138 def render_to_response(request, template, context):
139 """Much like Django's shortcut.render()"""
140 return Response(render_template(request, template, context))
141
142
143 def redirect(request, *args, **kwargs):
144 """Returns a HTTPFound(), takes a request and then urlgen params"""
145
146 querystring = None
147 if kwargs.get('querystring'):
148 querystring = kwargs.get('querystring')
149 del kwargs['querystring']
150
151 return exc.HTTPFound(
152 location=''.join([
153 request.urlgen(*args, **kwargs),
154 querystring if querystring else '']))
155
156
157 def setup_user_in_request(request):
158 """
159 Examine a request and tack on a request.user parameter if that's
160 appropriate.
161 """
162 if not request.session.has_key('user_id'):
163 request.user = None
164 return
165
166 user = None
167 user = request.app.db.User.one(
168 {'_id': ObjectId(request.session['user_id'])})
169
170 if not user:
171 # Something's wrong... this user doesn't exist? Invalidate
172 # this session.
173 request.session.invalidate()
174
175 request.user = user
176
177
178 def import_component(import_string):
179 """
180 Import a module component defined by STRING. Probably a method,
181 class, or global variable.
182
183 Args:
184 - import_string: a string that defines what to import. Written
185 in the format of "module1.module2:component"
186 """
187 module_name, func_name = import_string.split(':', 1)
188 __import__(module_name)
189 module = sys.modules[module_name]
190 func = getattr(module, func_name)
191 return func
192
193 _punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+')
194
195 def slugify(text, delim=u'-'):
196 """
197 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
198 """
199 result = []
200 for word in _punct_re.split(text.lower()):
201 word = word.encode('translit/long')
202 if word:
203 result.append(word)
204 return unicode(delim.join(result))
205
206 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
207 ### Special email test stuff begins HERE
208 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
209
210 # We have two "test inboxes" here:
211 #
212 # EMAIL_TEST_INBOX:
213 # ----------------
214 # If you're writing test views, you'll probably want to check this.
215 # It contains a list of MIMEText messages.
216 #
217 # EMAIL_TEST_MBOX_INBOX:
218 # ----------------------
219 # This collects the messages from the FakeMhost inbox. It's reslly
220 # just here for testing the send_email method itself.
221 #
222 # Anyway this contains:
223 # - from
224 # - to: a list of email recipient addresses
225 # - message: not just the body, but the whole message, including
226 # headers, etc.
227 #
228 # ***IMPORTANT!***
229 # ----------------
230 # Before running tests that call functions which send email, you should
231 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
232
233 EMAIL_TEST_INBOX = []
234 EMAIL_TEST_MBOX_INBOX = []
235
236
237 class FakeMhost(object):
238 """
239 Just a fake mail host so we can capture and test messages
240 from send_email
241 """
242 def connect(self):
243 pass
244
245 def sendmail(self, from_addr, to_addrs, message):
246 EMAIL_TEST_MBOX_INBOX.append(
247 {'from': from_addr,
248 'to': to_addrs,
249 'message': message})
250
251 def _clear_test_inboxes():
252 global EMAIL_TEST_INBOX
253 global EMAIL_TEST_MBOX_INBOX
254 EMAIL_TEST_INBOX = []
255 EMAIL_TEST_MBOX_INBOX = []
256
257 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
258 ### </Special email test stuff>
259 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
260
261 def send_email(from_addr, to_addrs, subject, message_body):
262 """
263 Simple email sending wrapper, use this so we can capture messages
264 for unit testing purposes.
265
266 Args:
267 - from_addr: address you're sending the email from
268 - to_addrs: list of recipient email addresses
269 - subject: subject of the email
270 - message_body: email body text
271 """
272 # TODO: make a mock mhost if testing is enabled
273 if TESTS_ENABLED or mg_globals.app_config['email_debug_mode']:
274 mhost = FakeMhost()
275 elif not mg_globals.app_config['email_debug_mode']:
276 mhost = smtplib.SMTP()
277
278 mhost.connect()
279
280 message = MIMEText(message_body.encode('utf-8'), 'plain', 'utf-8')
281 message['Subject'] = subject
282 message['From'] = from_addr
283 message['To'] = ', '.join(to_addrs)
284
285 if TESTS_ENABLED:
286 EMAIL_TEST_INBOX.append(message)
287
288 if mg_globals.app_config['email_debug_mode']:
289 print u"===== Email ====="
290 print u"From address: %s" % message['From']
291 print u"To addresses: %s" % message['To']
292 print u"Subject: %s" % message['Subject']
293 print u"-- Body: --"
294 print message.get_payload(decode=True)
295
296 return mhost.sendmail(from_addr, to_addrs, message.as_string())
297
298
299 ###################
300 # Translation tools
301 ###################
302
303
304 TRANSLATIONS_PATH = pkg_resources.resource_filename(
305 'mediagoblin', 'i18n')
306
307
308 def locale_to_lower_upper(locale):
309 """
310 Take a locale, regardless of style, and format it like "en-us"
311 """
312 if '-' in locale:
313 lang, country = locale.split('-', 1)
314 return '%s_%s' % (lang.lower(), country.upper())
315 elif '_' in locale:
316 lang, country = locale.split('_', 1)
317 return '%s_%s' % (lang.lower(), country.upper())
318 else:
319 return locale.lower()
320
321
322 def locale_to_lower_lower(locale):
323 """
324 Take a locale, regardless of style, and format it like "en_US"
325 """
326 if '_' in locale:
327 lang, country = locale.split('_', 1)
328 return '%s-%s' % (lang.lower(), country.lower())
329 else:
330 return locale.lower()
331
332
333 def get_locale_from_request(request):
334 """
335 Figure out what target language is most appropriate based on the
336 request
337 """
338 request_form = request.GET or request.POST
339
340 if request_form.has_key('lang'):
341 return locale_to_lower_upper(request_form['lang'])
342
343 accept_lang_matches = request.accept_language.best_matches()
344
345 # Your routing can explicitly specify a target language
346 if request.matchdict.has_key('locale'):
347 target_lang = request.matchdict['locale']
348 elif request.session.has_key('target_lang'):
349 target_lang = request.session['target_lang']
350 # Pull the first acceptable language
351 elif accept_lang_matches:
352 target_lang = accept_lang_matches[0]
353 # Fall back to English
354 else:
355 target_lang = 'en'
356
357 return locale_to_lower_upper(target_lang)
358
359
360 # A super strict version of the lxml.html cleaner class
361 HTML_CLEANER = Cleaner(
362 scripts=True,
363 javascript=True,
364 comments=True,
365 style=True,
366 links=True,
367 page_structure=True,
368 processing_instructions=True,
369 embedded=True,
370 frames=True,
371 forms=True,
372 annoying_tags=True,
373 allow_tags=[
374 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
375 remove_unknown_tags=False, # can't be used with allow_tags
376 safe_attrs_only=True,
377 add_nofollow=True, # for now
378 host_whitelist=(),
379 whitelist_tags=set([]))
380
381
382 def clean_html(html):
383 # clean_html barfs on an empty string
384 if not html:
385 return u''
386
387 return HTML_CLEANER.clean_html(html)
388
389
390 def convert_to_tag_list_of_dicts(tag_string):
391 """
392 Filter input from incoming string containing user tags,
393
394 Strips trailing, leading, and internal whitespace, and also converts
395 the "tags" text into an array of tags
396 """
397 taglist = []
398 if tag_string:
399
400 # Strip out internal, trailing, and leading whitespace
401 stripped_tag_string = u' '.join(tag_string.strip().split())
402
403 # Split the tag string into a list of tags
404 for tag in stripped_tag_string.split(
405 mg_globals.app_config['tags_delimiter']):
406
407 # Ignore empty or duplicate tags
408 if tag.strip() and tag.strip() not in [t['name'] for t in taglist]:
409
410 taglist.append({'name': tag.strip(),
411 'slug': slugify(tag.strip())})
412 return taglist
413
414
415 def media_tags_as_string(media_entry_tags):
416 """
417 Generate a string from a media item's tags, stored as a list of dicts
418
419 This is the opposite of convert_to_tag_list_of_dicts
420 """
421 media_tag_string = ''
422 if media_entry_tags:
423 media_tag_string = mg_globals.app_config['tags_delimiter'].join(
424 [tag['name'] for tag in media_entry_tags])
425 return media_tag_string
426
427 TOO_LONG_TAG_WARNING = \
428 u'Tags must be shorter than %s characters. Tags that are too long: %s'
429
430 def tag_length_validator(form, field):
431 """
432 Make sure tags do not exceed the maximum tag length.
433 """
434 tags = convert_to_tag_list_of_dicts(field.data)
435 too_long_tags = [
436 tag['name'] for tag in tags
437 if len(tag['name']) > mg_globals.app_config['tags_max_length']]
438
439 if too_long_tags:
440 raise wtforms.ValidationError(
441 TOO_LONG_TAG_WARNING % (mg_globals.app_config['tags_max_length'], \
442 ', '.join(too_long_tags)))
443
444
445 MARKDOWN_INSTANCE = markdown.Markdown(safe_mode='escape')
446
447 def cleaned_markdown_conversion(text):
448 """
449 Take a block of text, run it through MarkDown, and clean its HTML.
450 """
451 # Markdown will do nothing with and clean_html can do nothing with
452 # an empty string :)
453 if not text:
454 return u''
455
456 return clean_html(MARKDOWN_INSTANCE.convert(text))
457
458
459 SETUP_GETTEXTS = {}
460
461 def setup_gettext(locale):
462 """
463 Setup the gettext instance based on this locale
464 """
465 # Later on when we have plugins we may want to enable the
466 # multi-translations system they have so we can handle plugin
467 # translations too
468
469 # TODO: fallback nicely on translations from pt_PT to pt if not
470 # available, etc.
471 if SETUP_GETTEXTS.has_key(locale):
472 this_gettext = SETUP_GETTEXTS[locale]
473 else:
474 this_gettext = gettext.translation(
475 'mediagoblin', TRANSLATIONS_PATH, [locale], fallback=True)
476 if exists(locale):
477 SETUP_GETTEXTS[locale] = this_gettext
478
479 mg_globals.setup_globals(
480 translations=this_gettext)
481
482
483 # Force en to be setup before anything else so that
484 # mg_globals.translations is never None
485 setup_gettext('en')
486
487
488 def pass_to_ugettext(*args, **kwargs):
489 """
490 Pass a translation on to the appropriate ugettext method.
491
492 The reason we can't have a global ugettext method is because
493 mg_globals gets swapped out by the application per-request.
494 """
495 return mg_globals.translations.ugettext(
496 *args, **kwargs)
497
498
499 def lazy_pass_to_ugettext(*args, **kwargs):
500 """
501 Lazily pass to ugettext.
502
503 This is useful if you have to define a translation on a module
504 level but you need it to not translate until the time that it's
505 used as a string.
506 """
507 return LazyProxy(pass_to_ugettext, *args, **kwargs)
508
509
510 def pass_to_ngettext(*args, **kwargs):
511 """
512 Pass a translation on to the appropriate ngettext method.
513
514 The reason we can't have a global ngettext method is because
515 mg_globals gets swapped out by the application per-request.
516 """
517 return mg_globals.translations.ngettext(
518 *args, **kwargs)
519
520
521 def lazy_pass_to_ngettext(*args, **kwargs):
522 """
523 Lazily pass to ngettext.
524
525 This is useful if you have to define a translation on a module
526 level but you need it to not translate until the time that it's
527 used as a string.
528 """
529 return LazyProxy(pass_to_ngettext, *args, **kwargs)
530
531
532 def fake_ugettext_passthrough(string):
533 """
534 Fake a ugettext call for extraction's sake ;)
535
536 In wtforms there's a separate way to define a method to translate
537 things... so we just need to mark up the text so that it can be
538 extracted, not so that it's actually run through gettext.
539 """
540 return string
541
542
543 PAGINATION_DEFAULT_PER_PAGE = 30
544
545 class Pagination(object):
546 """
547 Pagination class for mongodb queries.
548
549 Initialization through __init__(self, cursor, page=1, per_page=2),
550 get actual data slice through __call__().
551 """
552
553 def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE,
554 jump_to_id=False):
555 """
556 Initializes Pagination
557
558 Args:
559 - page: requested page
560 - per_page: number of objects per page
561 - cursor: db cursor
562 - jump_to_id: ObjectId, sets the page to the page containing the object
563 with _id == jump_to_id.
564 """
565 self.page = page
566 self.per_page = per_page
567 self.cursor = cursor
568 self.total_count = self.cursor.count()
569 self.active_id = None
570
571 if jump_to_id:
572 cursor = copy.copy(self.cursor)
573
574 for (doc, increment) in izip(cursor, count(0)):
575 if doc['_id'] == jump_to_id:
576 self.page = 1 + int(floor(increment / self.per_page))
577
578 self.active_id = jump_to_id
579 break
580
581
582 def __call__(self):
583 """
584 Returns slice of objects for the requested page
585 """
586 return self.cursor.skip(
587 (self.page - 1) * self.per_page).limit(self.per_page)
588
589 @property
590 def pages(self):
591 return int(ceil(self.total_count / float(self.per_page)))
592
593 @property
594 def has_prev(self):
595 return self.page > 1
596
597 @property
598 def has_next(self):
599 return self.page < self.pages
600
601 def iter_pages(self, left_edge=2, left_current=2,
602 right_current=5, right_edge=2):
603 last = 0
604 for num in xrange(1, self.pages + 1):
605 if num <= left_edge or \
606 (num > self.page - left_current - 1 and \
607 num < self.page + right_current) or \
608 num > self.pages - right_edge:
609 if last + 1 != num:
610 yield None
611 yield num
612 last = num
613
614 def get_page_url_explicit(self, base_url, get_params, page_no):
615 """
616 Get a page url by adding a page= parameter to the base url
617 """
618 new_get_params = copy.copy(get_params or {})
619 new_get_params['page'] = page_no
620 return "%s?%s" % (
621 base_url, urllib.urlencode(new_get_params))
622
623 def get_page_url(self, request, page_no):
624 """
625 Get a new page url based of the request, and the new page number.
626
627 This is a nice wrapper around get_page_url_explicit()
628 """
629 return self.get_page_url_explicit(
630 request.path_info, request.GET, page_no)