This attribute in quotes, too. :)
[mediagoblin.git] / mediagoblin / util.py
... / ...
CommitLineData
1# GNU MediaGoblin -- federated, autonomous media hosting
2# Copyright (C) 2011 Free Software Foundation, Inc
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17from email.MIMEText import MIMEText
18import gettext
19import pkg_resources
20import smtplib
21import os
22import sys
23import re
24import urllib
25from math import ceil
26import copy
27
28from babel.localedata import exists
29import jinja2
30import translitcodec
31from paste.deploy.loadwsgi import NicerConfigParser
32from webob import Response, exc
33
34from mediagoblin import globals as mgoblin_globals
35from mediagoblin.db.util import ObjectId
36
37
38TESTS_ENABLED = False
39def _activate_testing():
40 """
41 Call this to activate testing in util.py
42 """
43 global TESTS_ENABLED
44 TESTS_ENABLED = True
45
46
47def clear_test_buckets():
48 """
49 We store some things for testing purposes that should be cleared
50 when we want a "clean slate" of information for our next round of
51 tests. Call this function to wipe all that stuff clean.
52
53 Also wipes out some other things we might redefine during testing,
54 like the jinja envs.
55 """
56 global SETUP_JINJA_ENVS
57 SETUP_JINJA_ENVS = {}
58
59 global EMAIL_TEST_INBOX
60 global EMAIL_TEST_MBOX_INBOX
61 EMAIL_TEST_INBOX = []
62 EMAIL_TEST_MBOX_INBOX = []
63
64 clear_test_template_context()
65
66
67def get_jinja_loader(user_template_path=None):
68 """
69 Set up the Jinja template loaders, possibly allowing for user
70 overridden templates.
71
72 (In the future we may have another system for providing theming;
73 for now this is good enough.)
74 """
75 if user_template_path:
76 return jinja2.ChoiceLoader(
77 [jinja2.FileSystemLoader(user_template_path),
78 jinja2.PackageLoader('mediagoblin', 'templates')])
79 else:
80 return jinja2.PackageLoader('mediagoblin', 'templates')
81
82
83SETUP_JINJA_ENVS = {}
84
85
86def get_jinja_env(template_loader, locale):
87 """
88 Set up the Jinja environment,
89
90 (In the future we may have another system for providing theming;
91 for now this is good enough.)
92 """
93 setup_gettext(locale)
94
95 # If we have a jinja environment set up with this locale, just
96 # return that one.
97 if SETUP_JINJA_ENVS.has_key(locale):
98 return SETUP_JINJA_ENVS[locale]
99
100 template_env = jinja2.Environment(
101 loader=template_loader, autoescape=True,
102 extensions=['jinja2.ext.i18n'])
103
104 template_env.install_gettext_callables(
105 mgoblin_globals.translations.gettext,
106 mgoblin_globals.translations.ngettext)
107
108 if exists(locale):
109 SETUP_JINJA_ENVS[locale] = template_env
110
111 return template_env
112
113
114# We'll store context information here when doing unit tests
115TEMPLATE_TEST_CONTEXT = {}
116
117
118def render_template(request, template_path, context):
119 """
120 Render a template with context.
121
122 Always inserts the request into the context, so you don't have to.
123 Also stores the context if we're doing unit tests. Helpful!
124 """
125 template = request.template_env.get_template(
126 template_path)
127 context['request'] = request
128 rendered = template.render(context)
129
130 if TESTS_ENABLED:
131 TEMPLATE_TEST_CONTEXT[template_path] = context
132
133 return rendered
134
135
136def clear_test_template_context():
137 global TEMPLATE_TEST_CONTEXT
138 TEMPLATE_TEST_CONTEXT = {}
139
140
141def render_to_response(request, template, context):
142 """Much like Django's shortcut.render()"""
143 return Response(render_template(request, template, context))
144
145
146def redirect(request, *args, **kwargs):
147 """Returns a HTTPFound(), takes a request and then urlgen params"""
148 return exc.HTTPFound(location=request.urlgen(*args, **kwargs))
149
150
151def setup_user_in_request(request):
152 """
153 Examine a request and tack on a request.user parameter if that's
154 appropriate.
155 """
156 if not request.session.has_key('user_id'):
157 request.user = None
158 return
159
160 user = None
161 user = request.app.db.User.one(
162 {'_id': ObjectId(request.session['user_id'])})
163
164 if not user:
165 # Something's wrong... this user doesn't exist? Invalidate
166 # this session.
167 request.session.invalidate()
168
169 request.user = user
170
171
172def import_component(import_string):
173 """
174 Import a module component defined by STRING. Probably a method,
175 class, or global variable.
176
177 Args:
178 - import_string: a string that defines what to import. Written
179 in the format of "module1.module2:component"
180 """
181 module_name, func_name = import_string.split(':', 1)
182 __import__(module_name)
183 module = sys.modules[module_name]
184 func = getattr(module, func_name)
185 return func
186
187_punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+')
188
189def slugify(text, delim=u'-'):
190 """
191 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
192 """
193 result = []
194 for word in _punct_re.split(text.lower()):
195 word = word.encode('translit/long')
196 if word:
197 result.append(word)
198 return unicode(delim.join(result))
199
200### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
201### Special email test stuff begins HERE
202### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
203
204# We have two "test inboxes" here:
205#
206# EMAIL_TEST_INBOX:
207# ----------------
208# If you're writing test views, you'll probably want to check this.
209# It contains a list of MIMEText messages.
210#
211# EMAIL_TEST_MBOX_INBOX:
212# ----------------------
213# This collects the messages from the FakeMhost inbox. It's reslly
214# just here for testing the send_email method itself.
215#
216# Anyway this contains:
217# - from
218# - to: a list of email recipient addresses
219# - message: not just the body, but the whole message, including
220# headers, etc.
221#
222# ***IMPORTANT!***
223# ----------------
224# Before running tests that call functions which send email, you should
225# always call _clear_test_inboxes() to "wipe" the inboxes clean.
226
227EMAIL_TEST_INBOX = []
228EMAIL_TEST_MBOX_INBOX = []
229
230
231class FakeMhost(object):
232 """
233 Just a fake mail host so we can capture and test messages
234 from send_email
235 """
236 def connect(self):
237 pass
238
239 def sendmail(self, from_addr, to_addrs, message):
240 EMAIL_TEST_MBOX_INBOX.append(
241 {'from': from_addr,
242 'to': to_addrs,
243 'message': message})
244
245def _clear_test_inboxes():
246 global EMAIL_TEST_INBOX
247 global EMAIL_TEST_MBOX_INBOX
248 EMAIL_TEST_INBOX = []
249 EMAIL_TEST_MBOX_INBOX = []
250
251### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
252### </Special email test stuff>
253### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
254
255def send_email(from_addr, to_addrs, subject, message_body):
256 """
257 Simple email sending wrapper, use this so we can capture messages
258 for unit testing purposes.
259
260 Args:
261 - from_addr: address you're sending the email from
262 - to_addrs: list of recipient email addresses
263 - subject: subject of the email
264 - message_body: email body text
265 """
266 # TODO: make a mock mhost if testing is enabled
267 if TESTS_ENABLED or mgoblin_globals.email_debug_mode:
268 mhost = FakeMhost()
269 elif not mgoblin_globals.email_debug_mode:
270 mhost = smtplib.SMTP()
271
272 mhost.connect()
273
274 message = MIMEText(message_body.encode('utf-8'), 'plain', 'utf-8')
275 message['Subject'] = subject
276 message['From'] = from_addr
277 message['To'] = ', '.join(to_addrs)
278
279 if TESTS_ENABLED:
280 EMAIL_TEST_INBOX.append(message)
281
282 if getattr(mgoblin_globals, 'email_debug_mode', False):
283 print u"===== Email ====="
284 print u"From address: %s" % message['From']
285 print u"To addresses: %s" % message['To']
286 print u"Subject: %s" % message['Subject']
287 print u"-- Body: --"
288 print message.get_payload(decode=True)
289
290 return mhost.sendmail(from_addr, to_addrs, message.as_string())
291
292
293###################
294# Translation tools
295###################
296
297
298TRANSLATIONS_PATH = pkg_resources.resource_filename(
299 'mediagoblin', 'translations')
300
301
302def locale_to_lower_upper(locale):
303 """
304 Take a locale, regardless of style, and format it like "en-us"
305 """
306 if '-' in locale:
307 lang, country = locale.split('-', 1)
308 return '%s_%s' % (lang.lower(), country.upper())
309 elif '_' in locale:
310 lang, country = locale.split('_', 1)
311 return '%s_%s' % (lang.lower(), country.upper())
312 else:
313 return locale.lower()
314
315
316def locale_to_lower_lower(locale):
317 """
318 Take a locale, regardless of style, and format it like "en_US"
319 """
320 if '_' in locale:
321 lang, country = locale.split('_', 1)
322 return '%s-%s' % (lang.lower(), country.lower())
323 else:
324 return locale.lower()
325
326
327def get_locale_from_request(request):
328 """
329 Figure out what target language is most appropriate based on the
330 request
331 """
332 request_form = request.GET or request.POST
333
334 if request_form.has_key('lang'):
335 return locale_to_lower_upper(request_form['lang'])
336
337 accept_lang_matches = request.accept_language.best_matches()
338
339 # Your routing can explicitly specify a target language
340 if request.matchdict.has_key('locale'):
341 target_lang = request.matchdict['locale']
342 elif request.session.has_key('target_lang'):
343 target_lang = request.session['target_lang']
344 # Pull the first acceptable language
345 elif accept_lang_matches:
346 target_lang = accept_lang_matches[0]
347 # Fall back to English
348 else:
349 target_lang = 'en'
350
351 return locale_to_lower_upper(target_lang)
352
353
354def read_config_file(conf_file):
355 """
356 Read a paste deploy style config file and process it.
357 """
358 if not os.path.exists(conf_file):
359 raise IOError(
360 "MEDIAGOBLIN_CONFIG not set or file does not exist")
361
362 parser = NicerConfigParser(conf_file)
363 parser.read(conf_file)
364 parser._defaults.setdefault(
365 'here', os.path.dirname(os.path.abspath(conf_file)))
366 parser._defaults.setdefault(
367 '__file__', os.path.abspath(conf_file))
368
369 mgoblin_conf = dict(
370 [(section_name, dict(parser.items(section_name)))
371 for section_name in parser.sections()])
372
373 return mgoblin_conf
374
375
376SETUP_GETTEXTS = {}
377
378def setup_gettext(locale):
379 """
380 Setup the gettext instance based on this locale
381 """
382 # Later on when we have plugins we may want to enable the
383 # multi-translations system they have so we can handle plugin
384 # translations too
385
386 # TODO: fallback nicely on translations from pt_PT to pt if not
387 # available, etc.
388 if SETUP_GETTEXTS.has_key(locale):
389 this_gettext = SETUP_GETTEXTS[locale]
390 else:
391 this_gettext = gettext.translation(
392 'mediagoblin', TRANSLATIONS_PATH, [locale], fallback=True)
393 if exists(locale):
394 SETUP_GETTEXTS[locale] = this_gettext
395
396 mgoblin_globals.setup_globals(
397 translations=this_gettext)
398
399
400PAGINATION_DEFAULT_PER_PAGE = 30
401
402class Pagination(object):
403 """
404 Pagination class for mongodb queries.
405
406 Initialization through __init__(self, cursor, page=1, per_page=2),
407 get actual data slice through __call__().
408 """
409
410 def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE):
411 """
412 Initializes Pagination
413
414 Args:
415 - page: requested page
416 - per_page: number of objects per page
417 - cursor: db cursor
418 """
419 self.page = page
420 self.per_page = per_page
421 self.cursor = cursor
422 self.total_count = self.cursor.count()
423
424 def __call__(self):
425 """
426 Returns slice of objects for the requested page
427 """
428 return self.cursor.skip(
429 (self.page - 1) * self.per_page).limit(self.per_page)
430
431 @property
432 def pages(self):
433 return int(ceil(self.total_count / float(self.per_page)))
434
435 @property
436 def has_prev(self):
437 return self.page > 1
438
439 @property
440 def has_next(self):
441 return self.page < self.pages
442
443 def iter_pages(self, left_edge=2, left_current=2,
444 right_current=5, right_edge=2):
445 last = 0
446 for num in xrange(1, self.pages + 1):
447 if num <= left_edge or \
448 (num > self.page - left_current - 1 and \
449 num < self.page + right_current) or \
450 num > self.pages - right_edge:
451 if last + 1 != num:
452 yield None
453 yield num
454 last = num
455
456 def get_page_url_explicit(self, base_url, get_params, page_no):
457 """
458 Get a page url by adding a page= parameter to the base url
459 """
460 new_get_params = copy.copy(get_params or {})
461 new_get_params['page'] = page_no
462 return "%s?%s" % (
463 base_url, urllib.urlencode(new_get_params))
464
465 def get_page_url(self, request, page_no):
466 """
467 Get a new page url based of the request, and the new page number.
468
469 This is a nice wrapper around get_page_url_explicit()
470 """
471 return self.get_page_url_explicit(
472 request.path_info, request.GET, page_no)