Merge remote-tracking branch 'gitorious/master'
[mediagoblin.git] / mediagoblin / util.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from email.MIMEText import MIMEText
18 import gettext
19 import pkg_resources
20 import smtplib
21 import os
22 import sys
23 import re
24 import urllib
25 from math import ceil
26 import copy
27
28 from babel.localedata import exists
29 import jinja2
30 import translitcodec
31 from paste.deploy.loadwsgi import NicerConfigParser
32 from webob import Response, exc
33 from lxml.html.clean import Cleaner
34
35 from mediagoblin import mg_globals
36 from mediagoblin.db.util import ObjectId
37
38
39 TESTS_ENABLED = False
40 def _activate_testing():
41 """
42 Call this to activate testing in util.py
43 """
44 global TESTS_ENABLED
45 TESTS_ENABLED = True
46
47
48 def clear_test_buckets():
49 """
50 We store some things for testing purposes that should be cleared
51 when we want a "clean slate" of information for our next round of
52 tests. Call this function to wipe all that stuff clean.
53
54 Also wipes out some other things we might redefine during testing,
55 like the jinja envs.
56 """
57 global SETUP_JINJA_ENVS
58 SETUP_JINJA_ENVS = {}
59
60 global EMAIL_TEST_INBOX
61 global EMAIL_TEST_MBOX_INBOX
62 EMAIL_TEST_INBOX = []
63 EMAIL_TEST_MBOX_INBOX = []
64
65 clear_test_template_context()
66
67
68 def get_jinja_loader(user_template_path=None):
69 """
70 Set up the Jinja template loaders, possibly allowing for user
71 overridden templates.
72
73 (In the future we may have another system for providing theming;
74 for now this is good enough.)
75 """
76 if user_template_path:
77 return jinja2.ChoiceLoader(
78 [jinja2.FileSystemLoader(user_template_path),
79 jinja2.PackageLoader('mediagoblin', 'templates')])
80 else:
81 return jinja2.PackageLoader('mediagoblin', 'templates')
82
83
84 SETUP_JINJA_ENVS = {}
85
86
87 def get_jinja_env(template_loader, locale):
88 """
89 Set up the Jinja environment,
90
91 (In the future we may have another system for providing theming;
92 for now this is good enough.)
93 """
94 setup_gettext(locale)
95
96 # If we have a jinja environment set up with this locale, just
97 # return that one.
98 if SETUP_JINJA_ENVS.has_key(locale):
99 return SETUP_JINJA_ENVS[locale]
100
101 template_env = jinja2.Environment(
102 loader=template_loader, autoescape=True,
103 extensions=['jinja2.ext.i18n'])
104
105 template_env.install_gettext_callables(
106 mg_globals.translations.gettext,
107 mg_globals.translations.ngettext)
108
109 if exists(locale):
110 SETUP_JINJA_ENVS[locale] = template_env
111
112 return template_env
113
114
115 # We'll store context information here when doing unit tests
116 TEMPLATE_TEST_CONTEXT = {}
117
118
119 def render_template(request, template_path, context):
120 """
121 Render a template with context.
122
123 Always inserts the request into the context, so you don't have to.
124 Also stores the context if we're doing unit tests. Helpful!
125 """
126 template = request.template_env.get_template(
127 template_path)
128 context['request'] = request
129 rendered = template.render(context)
130
131 if TESTS_ENABLED:
132 TEMPLATE_TEST_CONTEXT[template_path] = context
133
134 return rendered
135
136
137 def clear_test_template_context():
138 global TEMPLATE_TEST_CONTEXT
139 TEMPLATE_TEST_CONTEXT = {}
140
141
142 def render_to_response(request, template, context):
143 """Much like Django's shortcut.render()"""
144 return Response(render_template(request, template, context))
145
146
147 def redirect(request, *args, **kwargs):
148 """Returns a HTTPFound(), takes a request and then urlgen params"""
149 return exc.HTTPFound(location=request.urlgen(*args, **kwargs))
150
151
152 def setup_user_in_request(request):
153 """
154 Examine a request and tack on a request.user parameter if that's
155 appropriate.
156 """
157 if not request.session.has_key('user_id'):
158 request.user = None
159 return
160
161 user = None
162 user = request.app.db.User.one(
163 {'_id': ObjectId(request.session['user_id'])})
164
165 if not user:
166 # Something's wrong... this user doesn't exist? Invalidate
167 # this session.
168 request.session.invalidate()
169
170 request.user = user
171
172
173 def import_component(import_string):
174 """
175 Import a module component defined by STRING. Probably a method,
176 class, or global variable.
177
178 Args:
179 - import_string: a string that defines what to import. Written
180 in the format of "module1.module2:component"
181 """
182 module_name, func_name = import_string.split(':', 1)
183 __import__(module_name)
184 module = sys.modules[module_name]
185 func = getattr(module, func_name)
186 return func
187
188 _punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+')
189
190 def slugify(text, delim=u'-'):
191 """
192 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
193 """
194 result = []
195 for word in _punct_re.split(text.lower()):
196 word = word.encode('translit/long')
197 if word:
198 result.append(word)
199 return unicode(delim.join(result))
200
201 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
202 ### Special email test stuff begins HERE
203 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
204
205 # We have two "test inboxes" here:
206 #
207 # EMAIL_TEST_INBOX:
208 # ----------------
209 # If you're writing test views, you'll probably want to check this.
210 # It contains a list of MIMEText messages.
211 #
212 # EMAIL_TEST_MBOX_INBOX:
213 # ----------------------
214 # This collects the messages from the FakeMhost inbox. It's reslly
215 # just here for testing the send_email method itself.
216 #
217 # Anyway this contains:
218 # - from
219 # - to: a list of email recipient addresses
220 # - message: not just the body, but the whole message, including
221 # headers, etc.
222 #
223 # ***IMPORTANT!***
224 # ----------------
225 # Before running tests that call functions which send email, you should
226 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
227
228 EMAIL_TEST_INBOX = []
229 EMAIL_TEST_MBOX_INBOX = []
230
231
232 class FakeMhost(object):
233 """
234 Just a fake mail host so we can capture and test messages
235 from send_email
236 """
237 def connect(self):
238 pass
239
240 def sendmail(self, from_addr, to_addrs, message):
241 EMAIL_TEST_MBOX_INBOX.append(
242 {'from': from_addr,
243 'to': to_addrs,
244 'message': message})
245
246 def _clear_test_inboxes():
247 global EMAIL_TEST_INBOX
248 global EMAIL_TEST_MBOX_INBOX
249 EMAIL_TEST_INBOX = []
250 EMAIL_TEST_MBOX_INBOX = []
251
252 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
253 ### </Special email test stuff>
254 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
255
256 def send_email(from_addr, to_addrs, subject, message_body):
257 """
258 Simple email sending wrapper, use this so we can capture messages
259 for unit testing purposes.
260
261 Args:
262 - from_addr: address you're sending the email from
263 - to_addrs: list of recipient email addresses
264 - subject: subject of the email
265 - message_body: email body text
266 """
267 # TODO: make a mock mhost if testing is enabled
268 if TESTS_ENABLED or mg_globals.email_debug_mode:
269 mhost = FakeMhost()
270 elif not mg_globals.email_debug_mode:
271 mhost = smtplib.SMTP()
272
273 mhost.connect()
274
275 message = MIMEText(message_body.encode('utf-8'), 'plain', 'utf-8')
276 message['Subject'] = subject
277 message['From'] = from_addr
278 message['To'] = ', '.join(to_addrs)
279
280 if TESTS_ENABLED:
281 EMAIL_TEST_INBOX.append(message)
282
283 if getattr(mg_globals, 'email_debug_mode', False):
284 print u"===== Email ====="
285 print u"From address: %s" % message['From']
286 print u"To addresses: %s" % message['To']
287 print u"Subject: %s" % message['Subject']
288 print u"-- Body: --"
289 print message.get_payload(decode=True)
290
291 return mhost.sendmail(from_addr, to_addrs, message.as_string())
292
293
294 ###################
295 # Translation tools
296 ###################
297
298
299 TRANSLATIONS_PATH = pkg_resources.resource_filename(
300 'mediagoblin', 'translations')
301
302
303 def locale_to_lower_upper(locale):
304 """
305 Take a locale, regardless of style, and format it like "en-us"
306 """
307 if '-' in locale:
308 lang, country = locale.split('-', 1)
309 return '%s_%s' % (lang.lower(), country.upper())
310 elif '_' in locale:
311 lang, country = locale.split('_', 1)
312 return '%s_%s' % (lang.lower(), country.upper())
313 else:
314 return locale.lower()
315
316
317 def locale_to_lower_lower(locale):
318 """
319 Take a locale, regardless of style, and format it like "en_US"
320 """
321 if '_' in locale:
322 lang, country = locale.split('_', 1)
323 return '%s-%s' % (lang.lower(), country.lower())
324 else:
325 return locale.lower()
326
327
328 def get_locale_from_request(request):
329 """
330 Figure out what target language is most appropriate based on the
331 request
332 """
333 request_form = request.GET or request.POST
334
335 if request_form.has_key('lang'):
336 return locale_to_lower_upper(request_form['lang'])
337
338 accept_lang_matches = request.accept_language.best_matches()
339
340 # Your routing can explicitly specify a target language
341 if request.matchdict.has_key('locale'):
342 target_lang = request.matchdict['locale']
343 elif request.session.has_key('target_lang'):
344 target_lang = request.session['target_lang']
345 # Pull the first acceptable language
346 elif accept_lang_matches:
347 target_lang = accept_lang_matches[0]
348 # Fall back to English
349 else:
350 target_lang = 'en'
351
352 return locale_to_lower_upper(target_lang)
353
354
355 def read_config_file(conf_file):
356 """
357 Read a paste deploy style config file and process it.
358 """
359 if not os.path.exists(conf_file):
360 raise IOError(
361 "MEDIAGOBLIN_CONFIG not set or file does not exist")
362
363 parser = NicerConfigParser(conf_file)
364 parser.read(conf_file)
365 parser._defaults.setdefault(
366 'here', os.path.dirname(os.path.abspath(conf_file)))
367 parser._defaults.setdefault(
368 '__file__', os.path.abspath(conf_file))
369
370 mgoblin_conf = dict(
371 [(section_name, dict(parser.items(section_name)))
372 for section_name in parser.sections()])
373
374 return mgoblin_conf
375
376
377 # A super strict version of the lxml.html cleaner class
378 HTML_CLEANER = Cleaner(
379 scripts=True,
380 javascript=True,
381 comments=True,
382 style=True,
383 links=True,
384 page_structure=True,
385 processing_instructions=True,
386 embedded=True,
387 frames=True,
388 forms=True,
389 annoying_tags=True,
390 allow_tags=[
391 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
392 remove_unknown_tags=False, # can't be used with allow_tags
393 safe_attrs_only=True,
394 add_nofollow=True, # for now
395 host_whitelist=(),
396 whitelist_tags=set([]))
397
398
399 def clean_html(html):
400 return HTML_CLEANER.clean_html(html)
401
402
403 SETUP_GETTEXTS = {}
404
405 def setup_gettext(locale):
406 """
407 Setup the gettext instance based on this locale
408 """
409 # Later on when we have plugins we may want to enable the
410 # multi-translations system they have so we can handle plugin
411 # translations too
412
413 # TODO: fallback nicely on translations from pt_PT to pt if not
414 # available, etc.
415 if SETUP_GETTEXTS.has_key(locale):
416 this_gettext = SETUP_GETTEXTS[locale]
417 else:
418 this_gettext = gettext.translation(
419 'mediagoblin', TRANSLATIONS_PATH, [locale], fallback=True)
420 if exists(locale):
421 SETUP_GETTEXTS[locale] = this_gettext
422
423 mg_globals.setup_globals(
424 translations=this_gettext)
425
426
427 PAGINATION_DEFAULT_PER_PAGE = 30
428
429 class Pagination(object):
430 """
431 Pagination class for mongodb queries.
432
433 Initialization through __init__(self, cursor, page=1, per_page=2),
434 get actual data slice through __call__().
435 """
436
437 def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE):
438 """
439 Initializes Pagination
440
441 Args:
442 - page: requested page
443 - per_page: number of objects per page
444 - cursor: db cursor
445 """
446 self.page = page
447 self.per_page = per_page
448 self.cursor = cursor
449 self.total_count = self.cursor.count()
450
451 def __call__(self):
452 """
453 Returns slice of objects for the requested page
454 """
455 return self.cursor.skip(
456 (self.page - 1) * self.per_page).limit(self.per_page)
457
458 @property
459 def pages(self):
460 return int(ceil(self.total_count / float(self.per_page)))
461
462 @property
463 def has_prev(self):
464 return self.page > 1
465
466 @property
467 def has_next(self):
468 return self.page < self.pages
469
470 def iter_pages(self, left_edge=2, left_current=2,
471 right_current=5, right_edge=2):
472 last = 0
473 for num in xrange(1, self.pages + 1):
474 if num <= left_edge or \
475 (num > self.page - left_current - 1 and \
476 num < self.page + right_current) or \
477 num > self.pages - right_edge:
478 if last + 1 != num:
479 yield None
480 yield num
481 last = num
482
483 def get_page_url_explicit(self, base_url, get_params, page_no):
484 """
485 Get a page url by adding a page= parameter to the base url
486 """
487 new_get_params = copy.copy(get_params or {})
488 new_get_params['page'] = page_no
489 return "%s?%s" % (
490 base_url, urllib.urlencode(new_get_params))
491
492 def get_page_url(self, request, page_no):
493 """
494 Get a new page url based of the request, and the new page number.
495
496 This is a nice wrapper around get_page_url_explicit()
497 """
498 return self.get_page_url_explicit(
499 request.path_info, request.GET, page_no)