Merge branch 'remotes/gullydwarf-cfdv/f360_tagging' (early part) into mergetags
[mediagoblin.git] / mediagoblin / util.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from __future__ import division
18
19 from email.MIMEText import MIMEText
20 import gettext
21 import pkg_resources
22 import smtplib
23 import sys
24 import re
25 import urllib
26 from math import ceil, floor
27 import copy
28 import wtforms
29
30 from babel.localedata import exists
31 import jinja2
32 import translitcodec
33 from webob import Response, exc
34 from lxml.html.clean import Cleaner
35 import markdown
36
37 from mediagoblin import mg_globals
38 from mediagoblin import messages
39 from mediagoblin.db.util import ObjectId
40
41 from itertools import izip, count
42
43 DISPLAY_IMAGE_FETCHING_ORDER = [u'medium', u'original', u'thumb']
44
45 TESTS_ENABLED = False
46 def _activate_testing():
47 """
48 Call this to activate testing in util.py
49 """
50 global TESTS_ENABLED
51 TESTS_ENABLED = True
52
53
54 def clear_test_buckets():
55 """
56 We store some things for testing purposes that should be cleared
57 when we want a "clean slate" of information for our next round of
58 tests. Call this function to wipe all that stuff clean.
59
60 Also wipes out some other things we might redefine during testing,
61 like the jinja envs.
62 """
63 global SETUP_JINJA_ENVS
64 SETUP_JINJA_ENVS = {}
65
66 global EMAIL_TEST_INBOX
67 global EMAIL_TEST_MBOX_INBOX
68 EMAIL_TEST_INBOX = []
69 EMAIL_TEST_MBOX_INBOX = []
70
71 clear_test_template_context()
72
73
74 SETUP_JINJA_ENVS = {}
75
76
77 def get_jinja_env(template_loader, locale):
78 """
79 Set up the Jinja environment,
80
81 (In the future we may have another system for providing theming;
82 for now this is good enough.)
83 """
84 setup_gettext(locale)
85
86 # If we have a jinja environment set up with this locale, just
87 # return that one.
88 if SETUP_JINJA_ENVS.has_key(locale):
89 return SETUP_JINJA_ENVS[locale]
90
91 template_env = jinja2.Environment(
92 loader=template_loader, autoescape=True,
93 extensions=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
94
95 template_env.install_gettext_callables(
96 mg_globals.translations.gettext,
97 mg_globals.translations.ngettext)
98
99 # All templates will know how to ...
100 # ... fetch all waiting messages and remove them from the queue
101 template_env.globals['fetch_messages'] = messages.fetch_messages
102
103 if exists(locale):
104 SETUP_JINJA_ENVS[locale] = template_env
105
106 return template_env
107
108
109 # We'll store context information here when doing unit tests
110 TEMPLATE_TEST_CONTEXT = {}
111
112
113 def render_template(request, template_path, context):
114 """
115 Render a template with context.
116
117 Always inserts the request into the context, so you don't have to.
118 Also stores the context if we're doing unit tests. Helpful!
119 """
120 template = request.template_env.get_template(
121 template_path)
122 context['request'] = request
123 rendered = template.render(context)
124
125 if TESTS_ENABLED:
126 TEMPLATE_TEST_CONTEXT[template_path] = context
127
128 return rendered
129
130
131 def clear_test_template_context():
132 global TEMPLATE_TEST_CONTEXT
133 TEMPLATE_TEST_CONTEXT = {}
134
135
136 def render_to_response(request, template, context):
137 """Much like Django's shortcut.render()"""
138 return Response(render_template(request, template, context))
139
140
141 def redirect(request, *args, **kwargs):
142 """Returns a HTTPFound(), takes a request and then urlgen params"""
143
144 querystring = None
145 if kwargs.get('querystring'):
146 querystring = kwargs.get('querystring')
147 del kwargs['querystring']
148
149 return exc.HTTPFound(
150 location=''.join([
151 request.urlgen(*args, **kwargs),
152 querystring if querystring else '']))
153
154
155 def setup_user_in_request(request):
156 """
157 Examine a request and tack on a request.user parameter if that's
158 appropriate.
159 """
160 if not request.session.has_key('user_id'):
161 request.user = None
162 return
163
164 user = None
165 user = request.app.db.User.one(
166 {'_id': ObjectId(request.session['user_id'])})
167
168 if not user:
169 # Something's wrong... this user doesn't exist? Invalidate
170 # this session.
171 request.session.invalidate()
172
173 request.user = user
174
175
176 def import_component(import_string):
177 """
178 Import a module component defined by STRING. Probably a method,
179 class, or global variable.
180
181 Args:
182 - import_string: a string that defines what to import. Written
183 in the format of "module1.module2:component"
184 """
185 module_name, func_name = import_string.split(':', 1)
186 __import__(module_name)
187 module = sys.modules[module_name]
188 func = getattr(module, func_name)
189 return func
190
191 _punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+')
192
193 def slugify(text, delim=u'-'):
194 """
195 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
196 """
197 result = []
198 for word in _punct_re.split(text.lower()):
199 word = word.encode('translit/long')
200 if word:
201 result.append(word)
202 return unicode(delim.join(result))
203
204 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
205 ### Special email test stuff begins HERE
206 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
207
208 # We have two "test inboxes" here:
209 #
210 # EMAIL_TEST_INBOX:
211 # ----------------
212 # If you're writing test views, you'll probably want to check this.
213 # It contains a list of MIMEText messages.
214 #
215 # EMAIL_TEST_MBOX_INBOX:
216 # ----------------------
217 # This collects the messages from the FakeMhost inbox. It's reslly
218 # just here for testing the send_email method itself.
219 #
220 # Anyway this contains:
221 # - from
222 # - to: a list of email recipient addresses
223 # - message: not just the body, but the whole message, including
224 # headers, etc.
225 #
226 # ***IMPORTANT!***
227 # ----------------
228 # Before running tests that call functions which send email, you should
229 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
230
231 EMAIL_TEST_INBOX = []
232 EMAIL_TEST_MBOX_INBOX = []
233
234
235 class FakeMhost(object):
236 """
237 Just a fake mail host so we can capture and test messages
238 from send_email
239 """
240 def connect(self):
241 pass
242
243 def sendmail(self, from_addr, to_addrs, message):
244 EMAIL_TEST_MBOX_INBOX.append(
245 {'from': from_addr,
246 'to': to_addrs,
247 'message': message})
248
249 def _clear_test_inboxes():
250 global EMAIL_TEST_INBOX
251 global EMAIL_TEST_MBOX_INBOX
252 EMAIL_TEST_INBOX = []
253 EMAIL_TEST_MBOX_INBOX = []
254
255 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
256 ### </Special email test stuff>
257 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
258
259 def send_email(from_addr, to_addrs, subject, message_body):
260 """
261 Simple email sending wrapper, use this so we can capture messages
262 for unit testing purposes.
263
264 Args:
265 - from_addr: address you're sending the email from
266 - to_addrs: list of recipient email addresses
267 - subject: subject of the email
268 - message_body: email body text
269 """
270 # TODO: make a mock mhost if testing is enabled
271 if TESTS_ENABLED or mg_globals.app_config['email_debug_mode']:
272 mhost = FakeMhost()
273 elif not mg_globals.app_config['email_debug_mode']:
274 mhost = smtplib.SMTP()
275
276 mhost.connect()
277
278 message = MIMEText(message_body.encode('utf-8'), 'plain', 'utf-8')
279 message['Subject'] = subject
280 message['From'] = from_addr
281 message['To'] = ', '.join(to_addrs)
282
283 if TESTS_ENABLED:
284 EMAIL_TEST_INBOX.append(message)
285
286 if mg_globals.app_config['email_debug_mode']:
287 print u"===== Email ====="
288 print u"From address: %s" % message['From']
289 print u"To addresses: %s" % message['To']
290 print u"Subject: %s" % message['Subject']
291 print u"-- Body: --"
292 print message.get_payload(decode=True)
293
294 return mhost.sendmail(from_addr, to_addrs, message.as_string())
295
296
297 ###################
298 # Translation tools
299 ###################
300
301
302 TRANSLATIONS_PATH = pkg_resources.resource_filename(
303 'mediagoblin', 'translations')
304
305
306 def locale_to_lower_upper(locale):
307 """
308 Take a locale, regardless of style, and format it like "en-us"
309 """
310 if '-' in locale:
311 lang, country = locale.split('-', 1)
312 return '%s_%s' % (lang.lower(), country.upper())
313 elif '_' in locale:
314 lang, country = locale.split('_', 1)
315 return '%s_%s' % (lang.lower(), country.upper())
316 else:
317 return locale.lower()
318
319
320 def locale_to_lower_lower(locale):
321 """
322 Take a locale, regardless of style, and format it like "en_US"
323 """
324 if '_' in locale:
325 lang, country = locale.split('_', 1)
326 return '%s-%s' % (lang.lower(), country.lower())
327 else:
328 return locale.lower()
329
330
331 def get_locale_from_request(request):
332 """
333 Figure out what target language is most appropriate based on the
334 request
335 """
336 request_form = request.GET or request.POST
337
338 if request_form.has_key('lang'):
339 return locale_to_lower_upper(request_form['lang'])
340
341 accept_lang_matches = request.accept_language.best_matches()
342
343 # Your routing can explicitly specify a target language
344 if request.matchdict.has_key('locale'):
345 target_lang = request.matchdict['locale']
346 elif request.session.has_key('target_lang'):
347 target_lang = request.session['target_lang']
348 # Pull the first acceptable language
349 elif accept_lang_matches:
350 target_lang = accept_lang_matches[0]
351 # Fall back to English
352 else:
353 target_lang = 'en'
354
355 return locale_to_lower_upper(target_lang)
356
357
358 # A super strict version of the lxml.html cleaner class
359 HTML_CLEANER = Cleaner(
360 scripts=True,
361 javascript=True,
362 comments=True,
363 style=True,
364 links=True,
365 page_structure=True,
366 processing_instructions=True,
367 embedded=True,
368 frames=True,
369 forms=True,
370 annoying_tags=True,
371 allow_tags=[
372 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
373 remove_unknown_tags=False, # can't be used with allow_tags
374 safe_attrs_only=True,
375 add_nofollow=True, # for now
376 host_whitelist=(),
377 whitelist_tags=set([]))
378
379
380 def clean_html(html):
381 # clean_html barfs on an empty string
382 if not html:
383 return u''
384
385 return HTML_CLEANER.clean_html(html)
386
387
388 def convert_to_tag_list_of_dicts(tag_string):
389 """
390 Filter input from incoming string containing user tags,
391
392 Strips trailing, leading, and internal whitespace, and also converts
393 the "tags" text into an array of tags
394 """
395 taglist = []
396 if tag_string:
397
398 # Strip out internal, trailing, and leading whitespace
399 stripped_tag_string = u' '.join(tag_string.strip().split())
400
401 # Split the tag string into a list of tags
402 for tag in stripped_tag_string.split(
403 mg_globals.app_config['tags_delimiter']):
404
405 # Ignore empty or duplicate tags
406 if tag.strip() and tag.strip() not in [t['name'] for t in taglist]:
407
408 if mg_globals.app_config['tags_case_sensitive']:
409 taglist.append({'name': tag.strip(),
410 'slug': slugify(tag.strip())})
411 else:
412 taglist.append({'name': tag.strip().lower(),
413 'slug': slugify(tag.strip().lower())})
414 return taglist
415
416
417 def media_tags_as_string(media_entry_tags):
418 """
419 Generate a string from a media item's tags, stored as a list of dicts
420
421 This is the opposite of convert_to_tag_list_of_dicts
422 """
423 media_tag_string = ''
424 if media_entry_tags:
425 media_tag_string = mg_globals.app_config['tags_delimiter'].join(
426 [tag['name'] for tag in media_entry_tags])
427 return media_tag_string
428
429 TOO_LONG_TAG_WARNING = \
430 u'Tags must be shorter than %s characters. Tags that are too long: %s'
431
432 def tag_length_validator(form, field):
433 """
434 Make sure tags do not exceed the maximum tag length.
435 """
436 tags = convert_to_tag_list_of_dicts(field.data)
437 too_long_tags = [
438 tag['name'] for tag in tags
439 if len(tag['name']) > mg_globals.app_config['tags_max_length']]
440
441 if too_long_tags:
442 raise wtforms.ValidationError(
443 TOO_LONG_TAG_WARNING % (mg_globals.app_config['tags_max_length'], \
444 ', '.join(too_long_tags)))
445
446
447 MARKDOWN_INSTANCE = markdown.Markdown(safe_mode='escape')
448
449 def cleaned_markdown_conversion(text):
450 """
451 Take a block of text, run it through MarkDown, and clean its HTML.
452 """
453 # Markdown will do nothing with and clean_html can do nothing with
454 # an empty string :)
455 if not text:
456 return u''
457
458 return clean_html(MARKDOWN_INSTANCE.convert(text))
459
460
461 SETUP_GETTEXTS = {}
462
463 def setup_gettext(locale):
464 """
465 Setup the gettext instance based on this locale
466 """
467 # Later on when we have plugins we may want to enable the
468 # multi-translations system they have so we can handle plugin
469 # translations too
470
471 # TODO: fallback nicely on translations from pt_PT to pt if not
472 # available, etc.
473 if SETUP_GETTEXTS.has_key(locale):
474 this_gettext = SETUP_GETTEXTS[locale]
475 else:
476 this_gettext = gettext.translation(
477 'mediagoblin', TRANSLATIONS_PATH, [locale], fallback=True)
478 if exists(locale):
479 SETUP_GETTEXTS[locale] = this_gettext
480
481 mg_globals.setup_globals(
482 translations=this_gettext)
483
484
485 PAGINATION_DEFAULT_PER_PAGE = 30
486
487 class Pagination(object):
488 """
489 Pagination class for mongodb queries.
490
491 Initialization through __init__(self, cursor, page=1, per_page=2),
492 get actual data slice through __call__().
493 """
494
495 def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE,
496 jump_to_id=False):
497 """
498 Initializes Pagination
499
500 Args:
501 - page: requested page
502 - per_page: number of objects per page
503 - cursor: db cursor
504 - jump_to_id: ObjectId, sets the page to the page containing the object
505 with _id == jump_to_id.
506 """
507 self.page = page
508 self.per_page = per_page
509 self.cursor = cursor
510 self.total_count = self.cursor.count()
511 self.active_id = None
512
513 if jump_to_id:
514 cursor = copy.copy(self.cursor)
515
516 for (doc, increment) in izip(cursor, count(0)):
517 if doc['_id'] == jump_to_id:
518 self.page = 1 + int(floor(increment / self.per_page))
519
520 self.active_id = jump_to_id
521 break
522
523
524 def __call__(self):
525 """
526 Returns slice of objects for the requested page
527 """
528 return self.cursor.skip(
529 (self.page - 1) * self.per_page).limit(self.per_page)
530
531 @property
532 def pages(self):
533 return int(ceil(self.total_count / float(self.per_page)))
534
535 @property
536 def has_prev(self):
537 return self.page > 1
538
539 @property
540 def has_next(self):
541 return self.page < self.pages
542
543 def iter_pages(self, left_edge=2, left_current=2,
544 right_current=5, right_edge=2):
545 last = 0
546 for num in xrange(1, self.pages + 1):
547 if num <= left_edge or \
548 (num > self.page - left_current - 1 and \
549 num < self.page + right_current) or \
550 num > self.pages - right_edge:
551 if last + 1 != num:
552 yield None
553 yield num
554 last = num
555
556 def get_page_url_explicit(self, base_url, get_params, page_no):
557 """
558 Get a page url by adding a page= parameter to the base url
559 """
560 new_get_params = copy.copy(get_params or {})
561 new_get_params['page'] = page_no
562 return "%s?%s" % (
563 base_url, urllib.urlencode(new_get_params))
564
565 def get_page_url(self, request, page_no):
566 """
567 Get a new page url based of the request, and the new page number.
568
569 This is a nice wrapper around get_page_url_explicit()
570 """
571 return self.get_page_url_explicit(
572 request.path_info, request.GET, page_no)