4421bec4436b8b6e1b011a250899e97cc29c78b7
[mediagoblin.git] / mediagoblin / util.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from email.MIMEText import MIMEText
18 import gettext
19 import pkg_resources
20 import smtplib
21 import sys
22 import re
23 import urllib
24 from math import ceil
25 from string import strip
26 import copy
27
28 from babel.localedata import exists
29 import jinja2
30 import translitcodec
31 from webob import Response, exc
32 from lxml.html.clean import Cleaner
33 import markdown
34
35 from mediagoblin import mg_globals
36 from mediagoblin import messages
37 from mediagoblin.db.util import ObjectId
38
39 TESTS_ENABLED = False
40 def _activate_testing():
41 """
42 Call this to activate testing in util.py
43 """
44 global TESTS_ENABLED
45 TESTS_ENABLED = True
46
47
48 def clear_test_buckets():
49 """
50 We store some things for testing purposes that should be cleared
51 when we want a "clean slate" of information for our next round of
52 tests. Call this function to wipe all that stuff clean.
53
54 Also wipes out some other things we might redefine during testing,
55 like the jinja envs.
56 """
57 global SETUP_JINJA_ENVS
58 SETUP_JINJA_ENVS = {}
59
60 global EMAIL_TEST_INBOX
61 global EMAIL_TEST_MBOX_INBOX
62 EMAIL_TEST_INBOX = []
63 EMAIL_TEST_MBOX_INBOX = []
64
65 clear_test_template_context()
66
67
68 SETUP_JINJA_ENVS = {}
69
70
71 def get_jinja_env(template_loader, locale):
72 """
73 Set up the Jinja environment,
74
75 (In the future we may have another system for providing theming;
76 for now this is good enough.)
77 """
78 setup_gettext(locale)
79
80 # If we have a jinja environment set up with this locale, just
81 # return that one.
82 if SETUP_JINJA_ENVS.has_key(locale):
83 return SETUP_JINJA_ENVS[locale]
84
85 template_env = jinja2.Environment(
86 loader=template_loader, autoescape=True,
87 extensions=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
88
89 template_env.install_gettext_callables(
90 mg_globals.translations.gettext,
91 mg_globals.translations.ngettext)
92
93 # All templates will know how to ...
94 # ... fetch all waiting messages and remove them from the queue
95 template_env.globals['fetch_messages'] = messages.fetch_messages
96
97 if exists(locale):
98 SETUP_JINJA_ENVS[locale] = template_env
99
100 return template_env
101
102
103 # We'll store context information here when doing unit tests
104 TEMPLATE_TEST_CONTEXT = {}
105
106
107 def render_template(request, template_path, context):
108 """
109 Render a template with context.
110
111 Always inserts the request into the context, so you don't have to.
112 Also stores the context if we're doing unit tests. Helpful!
113 """
114 template = request.template_env.get_template(
115 template_path)
116 context['request'] = request
117 rendered = template.render(context)
118
119 if TESTS_ENABLED:
120 TEMPLATE_TEST_CONTEXT[template_path] = context
121
122 return rendered
123
124
125 def clear_test_template_context():
126 global TEMPLATE_TEST_CONTEXT
127 TEMPLATE_TEST_CONTEXT = {}
128
129
130 def render_to_response(request, template, context):
131 """Much like Django's shortcut.render()"""
132 return Response(render_template(request, template, context))
133
134
135 def redirect(request, *args, **kwargs):
136 """Returns a HTTPFound(), takes a request and then urlgen params"""
137 return exc.HTTPFound(location=request.urlgen(*args, **kwargs))
138
139
140 def setup_user_in_request(request):
141 """
142 Examine a request and tack on a request.user parameter if that's
143 appropriate.
144 """
145 if not request.session.has_key('user_id'):
146 request.user = None
147 return
148
149 user = None
150 user = request.app.db.User.one(
151 {'_id': ObjectId(request.session['user_id'])})
152
153 if not user:
154 # Something's wrong... this user doesn't exist? Invalidate
155 # this session.
156 request.session.invalidate()
157
158 request.user = user
159
160
161 def import_component(import_string):
162 """
163 Import a module component defined by STRING. Probably a method,
164 class, or global variable.
165
166 Args:
167 - import_string: a string that defines what to import. Written
168 in the format of "module1.module2:component"
169 """
170 module_name, func_name = import_string.split(':', 1)
171 __import__(module_name)
172 module = sys.modules[module_name]
173 func = getattr(module, func_name)
174 return func
175
176 _punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+')
177
178 def slugify(text, delim=u'-'):
179 """
180 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
181 """
182 result = []
183 for word in _punct_re.split(text.lower()):
184 word = word.encode('translit/long')
185 if word:
186 result.append(word)
187 return unicode(delim.join(result))
188
189 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
190 ### Special email test stuff begins HERE
191 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
192
193 # We have two "test inboxes" here:
194 #
195 # EMAIL_TEST_INBOX:
196 # ----------------
197 # If you're writing test views, you'll probably want to check this.
198 # It contains a list of MIMEText messages.
199 #
200 # EMAIL_TEST_MBOX_INBOX:
201 # ----------------------
202 # This collects the messages from the FakeMhost inbox. It's reslly
203 # just here for testing the send_email method itself.
204 #
205 # Anyway this contains:
206 # - from
207 # - to: a list of email recipient addresses
208 # - message: not just the body, but the whole message, including
209 # headers, etc.
210 #
211 # ***IMPORTANT!***
212 # ----------------
213 # Before running tests that call functions which send email, you should
214 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
215
216 EMAIL_TEST_INBOX = []
217 EMAIL_TEST_MBOX_INBOX = []
218
219
220 class FakeMhost(object):
221 """
222 Just a fake mail host so we can capture and test messages
223 from send_email
224 """
225 def connect(self):
226 pass
227
228 def sendmail(self, from_addr, to_addrs, message):
229 EMAIL_TEST_MBOX_INBOX.append(
230 {'from': from_addr,
231 'to': to_addrs,
232 'message': message})
233
234 def _clear_test_inboxes():
235 global EMAIL_TEST_INBOX
236 global EMAIL_TEST_MBOX_INBOX
237 EMAIL_TEST_INBOX = []
238 EMAIL_TEST_MBOX_INBOX = []
239
240 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
241 ### </Special email test stuff>
242 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
243
244 def send_email(from_addr, to_addrs, subject, message_body):
245 """
246 Simple email sending wrapper, use this so we can capture messages
247 for unit testing purposes.
248
249 Args:
250 - from_addr: address you're sending the email from
251 - to_addrs: list of recipient email addresses
252 - subject: subject of the email
253 - message_body: email body text
254 """
255 # TODO: make a mock mhost if testing is enabled
256 if TESTS_ENABLED or mg_globals.email_debug_mode:
257 mhost = FakeMhost()
258 elif not mg_globals.email_debug_mode:
259 mhost = smtplib.SMTP()
260
261 mhost.connect()
262
263 message = MIMEText(message_body.encode('utf-8'), 'plain', 'utf-8')
264 message['Subject'] = subject
265 message['From'] = from_addr
266 message['To'] = ', '.join(to_addrs)
267
268 if TESTS_ENABLED:
269 EMAIL_TEST_INBOX.append(message)
270
271 if getattr(mg_globals, 'email_debug_mode', False):
272 print u"===== Email ====="
273 print u"From address: %s" % message['From']
274 print u"To addresses: %s" % message['To']
275 print u"Subject: %s" % message['Subject']
276 print u"-- Body: --"
277 print message.get_payload(decode=True)
278
279 return mhost.sendmail(from_addr, to_addrs, message.as_string())
280
281
282 ###################
283 # Translation tools
284 ###################
285
286
287 TRANSLATIONS_PATH = pkg_resources.resource_filename(
288 'mediagoblin', 'translations')
289
290
291 def locale_to_lower_upper(locale):
292 """
293 Take a locale, regardless of style, and format it like "en-us"
294 """
295 if '-' in locale:
296 lang, country = locale.split('-', 1)
297 return '%s_%s' % (lang.lower(), country.upper())
298 elif '_' in locale:
299 lang, country = locale.split('_', 1)
300 return '%s_%s' % (lang.lower(), country.upper())
301 else:
302 return locale.lower()
303
304
305 def locale_to_lower_lower(locale):
306 """
307 Take a locale, regardless of style, and format it like "en_US"
308 """
309 if '_' in locale:
310 lang, country = locale.split('_', 1)
311 return '%s-%s' % (lang.lower(), country.lower())
312 else:
313 return locale.lower()
314
315
316 def get_locale_from_request(request):
317 """
318 Figure out what target language is most appropriate based on the
319 request
320 """
321 request_form = request.GET or request.POST
322
323 if request_form.has_key('lang'):
324 return locale_to_lower_upper(request_form['lang'])
325
326 accept_lang_matches = request.accept_language.best_matches()
327
328 # Your routing can explicitly specify a target language
329 if request.matchdict.has_key('locale'):
330 target_lang = request.matchdict['locale']
331 elif request.session.has_key('target_lang'):
332 target_lang = request.session['target_lang']
333 # Pull the first acceptable language
334 elif accept_lang_matches:
335 target_lang = accept_lang_matches[0]
336 # Fall back to English
337 else:
338 target_lang = 'en'
339
340 return locale_to_lower_upper(target_lang)
341
342
343 # A super strict version of the lxml.html cleaner class
344 HTML_CLEANER = Cleaner(
345 scripts=True,
346 javascript=True,
347 comments=True,
348 style=True,
349 links=True,
350 page_structure=True,
351 processing_instructions=True,
352 embedded=True,
353 frames=True,
354 forms=True,
355 annoying_tags=True,
356 allow_tags=[
357 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
358 remove_unknown_tags=False, # can't be used with allow_tags
359 safe_attrs_only=True,
360 add_nofollow=True, # for now
361 host_whitelist=(),
362 whitelist_tags=set([]))
363
364
365 def clean_html(html):
366 # clean_html barfs on an empty string
367 if not html:
368 return u''
369
370 return HTML_CLEANER.clean_html(html)
371
372
373 TAGS_DELIMITER = u' '
374
375 def convert_to_tag_list(tag_string):
376 """
377 Filter input from a "tags" field,
378
379 Strips trailing, leading, and internal whitespace, and also converts
380 the user input into an array of tags
381 """
382 if tag_string:
383 taglist = []
384 stripped_tag_string = u' '.join(tag_string.strip().split())
385 for tag in stripped_tag_string.split(TAGS_DELIMITER):
386 if tag.strip(): taglist.append(tag.strip())
387 return taglist
388
389
390 MARKDOWN_INSTANCE = markdown.Markdown(safe_mode='escape')
391
392 def cleaned_markdown_conversion(text):
393 """
394 Take a block of text, run it through MarkDown, and clean its HTML.
395 """
396 # Markdown will do nothing with and clean_html can do nothing with
397 # an empty string :)
398 if not text:
399 return u''
400
401 return clean_html(MARKDOWN_INSTANCE.convert(text))
402
403
404 SETUP_GETTEXTS = {}
405
406 def setup_gettext(locale):
407 """
408 Setup the gettext instance based on this locale
409 """
410 # Later on when we have plugins we may want to enable the
411 # multi-translations system they have so we can handle plugin
412 # translations too
413
414 # TODO: fallback nicely on translations from pt_PT to pt if not
415 # available, etc.
416 if SETUP_GETTEXTS.has_key(locale):
417 this_gettext = SETUP_GETTEXTS[locale]
418 else:
419 this_gettext = gettext.translation(
420 'mediagoblin', TRANSLATIONS_PATH, [locale], fallback=True)
421 if exists(locale):
422 SETUP_GETTEXTS[locale] = this_gettext
423
424 mg_globals.setup_globals(
425 translations=this_gettext)
426
427
428 PAGINATION_DEFAULT_PER_PAGE = 30
429
430 class Pagination(object):
431 """
432 Pagination class for mongodb queries.
433
434 Initialization through __init__(self, cursor, page=1, per_page=2),
435 get actual data slice through __call__().
436 """
437
438 def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE):
439 """
440 Initializes Pagination
441
442 Args:
443 - page: requested page
444 - per_page: number of objects per page
445 - cursor: db cursor
446 """
447 self.page = page
448 self.per_page = per_page
449 self.cursor = cursor
450 self.total_count = self.cursor.count()
451
452 def __call__(self):
453 """
454 Returns slice of objects for the requested page
455 """
456 return self.cursor.skip(
457 (self.page - 1) * self.per_page).limit(self.per_page)
458
459 @property
460 def pages(self):
461 return int(ceil(self.total_count / float(self.per_page)))
462
463 @property
464 def has_prev(self):
465 return self.page > 1
466
467 @property
468 def has_next(self):
469 return self.page < self.pages
470
471 def iter_pages(self, left_edge=2, left_current=2,
472 right_current=5, right_edge=2):
473 last = 0
474 for num in xrange(1, self.pages + 1):
475 if num <= left_edge or \
476 (num > self.page - left_current - 1 and \
477 num < self.page + right_current) or \
478 num > self.pages - right_edge:
479 if last + 1 != num:
480 yield None
481 yield num
482 last = num
483
484 def get_page_url_explicit(self, base_url, get_params, page_no):
485 """
486 Get a page url by adding a page= parameter to the base url
487 """
488 new_get_params = copy.copy(get_params or {})
489 new_get_params['page'] = page_no
490 return "%s?%s" % (
491 base_url, urllib.urlencode(new_get_params))
492
493 def get_page_url(self, request, page_no):
494 """
495 Get a new page url based of the request, and the new page number.
496
497 This is a nice wrapper around get_page_url_explicit()
498 """
499 return self.get_page_url_explicit(
500 request.path_info, request.GET, page_no)