f69c91f26a576ef24633e179655c0ddc7df01b35
[mediagoblin.git] / mediagoblin / util.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from email.MIMEText import MIMEText
18 import gettext
19 import pkg_resources
20 import smtplib
21 import os
22 import sys
23 import re
24 import urllib
25 from math import ceil
26 import copy
27
28 from babel.localedata import exists
29 import jinja2
30 import translitcodec
31 from paste.deploy.loadwsgi import NicerConfigParser
32 from webob import Response
33
34 from mediagoblin import globals as mgoblin_globals
35 from mediagoblin.db.util import ObjectId
36
37
38 TESTS_ENABLED = False
39 def _activate_testing():
40 """
41 Call this to activate testing in util.py
42 """
43 global TESTS_ENABLED
44 TESTS_ENABLED = True
45
46
47 def get_jinja_loader(user_template_path=None):
48 """
49 Set up the Jinja template loaders, possibly allowing for user
50 overridden templates.
51
52 (In the future we may have another system for providing theming;
53 for now this is good enough.)
54 """
55 if user_template_path:
56 return jinja2.ChoiceLoader(
57 [jinja2.FileSystemLoader(user_template_path),
58 jinja2.PackageLoader('mediagoblin', 'templates')])
59 else:
60 return jinja2.PackageLoader('mediagoblin', 'templates')
61
62
63 SETUP_JINJA_ENVS = {}
64
65
66 def get_jinja_env(template_loader, locale):
67 """
68 Set up the Jinja environment,
69
70 (In the future we may have another system for providing theming;
71 for now this is good enough.)
72 """
73 setup_gettext(locale)
74
75 # If we have a jinja environment set up with this locale, just
76 # return that one.
77 if SETUP_JINJA_ENVS.has_key(locale):
78 return SETUP_JINJA_ENVS[locale]
79
80 template_env = jinja2.Environment(
81 loader=template_loader, autoescape=True,
82 extensions=['jinja2.ext.i18n'])
83
84 template_env.install_gettext_callables(
85 mgoblin_globals.translations.gettext,
86 mgoblin_globals.translations.ngettext)
87
88 if exists(locale):
89 SETUP_JINJA_ENVS[locale] = template_env
90
91 return template_env
92
93
94 # We'll store context information here when doing unit tests
95 TEMPLATE_TEST_CONTEXT = {}
96
97
98 def render_template(request, template, context):
99 """
100 Render a template with context.
101
102 Always inserts the request into the context, so you don't have to.
103 Also stores the context if we're doing unit tests. Helpful!
104 """
105 template = request.template_env.get_template(
106 template)
107 context['request'] = request
108 rendered = template.render(context)
109
110 if TESTS_ENABLED:
111 TEMPLATE_TEST_CONTEXT[template] = context
112
113 return rendered
114
115
116 def clear_test_template_context():
117 global TEMPLATE_TEST_CONTEXT
118 TEMPLATE_TEST_CONTEXT = {}
119
120
121 def render_to_response(request, template, context):
122 """Much like Django's shortcut.render()"""
123 return Response(render_template(request, template, context))
124
125
126 def setup_user_in_request(request):
127 """
128 Examine a request and tack on a request.user parameter if that's
129 appropriate.
130 """
131 if not request.session.has_key('user_id'):
132 request.user = None
133 return
134
135 user = None
136 user = request.app.db.User.one(
137 {'_id': ObjectId(request.session['user_id'])})
138
139 if not user:
140 # Something's wrong... this user doesn't exist? Invalidate
141 # this session.
142 request.session.invalidate()
143
144 request.user = user
145
146
147 def import_component(import_string):
148 """
149 Import a module component defined by STRING. Probably a method,
150 class, or global variable.
151
152 Args:
153 - import_string: a string that defines what to import. Written
154 in the format of "module1.module2:component"
155 """
156 module_name, func_name = import_string.split(':', 1)
157 __import__(module_name)
158 module = sys.modules[module_name]
159 func = getattr(module, func_name)
160 return func
161
162 _punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+')
163
164 def slugify(text, delim=u'-'):
165 """
166 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
167 """
168 result = []
169 for word in _punct_re.split(text.lower()):
170 word = word.encode('translit/long')
171 if word:
172 result.append(word)
173 return unicode(delim.join(result))
174
175 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
176 ### Special email test stuff begins HERE
177 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
178
179 # We have two "test inboxes" here:
180 #
181 # EMAIL_TEST_INBOX:
182 # ----------------
183 # If you're writing test views, you'll probably want to check this.
184 # It contains a list of MIMEText messages.
185 #
186 # EMAIL_TEST_MBOX_INBOX:
187 # ----------------------
188 # This collects the messages from the FakeMhost inbox. It's reslly
189 # just here for testing the send_email method itself.
190 #
191 # Anyway this contains:
192 # - from
193 # - to: a list of email recipient addresses
194 # - message: not just the body, but the whole message, including
195 # headers, etc.
196 #
197 # ***IMPORTANT!***
198 # ----------------
199 # Before running tests that call functions which send email, you should
200 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
201
202 EMAIL_TEST_INBOX = []
203 EMAIL_TEST_MBOX_INBOX = []
204
205
206 class FakeMhost(object):
207 """
208 Just a fake mail host so we can capture and test messages
209 from send_email
210 """
211 def connect(self):
212 pass
213
214 def sendmail(self, from_addr, to_addrs, message):
215 EMAIL_TEST_MBOX_INBOX.append(
216 {'from': from_addr,
217 'to': to_addrs,
218 'message': message})
219
220 def _clear_test_inboxes():
221 global EMAIL_TEST_INBOX
222 global EMAIL_TEST_MBOX_INBOX
223 EMAIL_TEST_INBOX = []
224 EMAIL_TEST_MBOX_INBOX = []
225
226 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
227 ### </Special email test stuff>
228 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
229
230 def send_email(from_addr, to_addrs, subject, message_body):
231 """
232 Simple email sending wrapper, use this so we can capture messages
233 for unit testing purposes.
234
235 Args:
236 - from_addr: address you're sending the email from
237 - to_addrs: list of recipient email addresses
238 - subject: subject of the email
239 - message_body: email body text
240 """
241 # TODO: make a mock mhost if testing is enabled
242 if TESTS_ENABLED or mgoblin_globals.email_debug_mode:
243 mhost = FakeMhost()
244 elif not mgoblin_globals.email_debug_mode:
245 mhost = smtplib.SMTP()
246
247 mhost.connect()
248
249 message = MIMEText(message_body.encode('utf-8'), 'plain', 'utf-8')
250 message['Subject'] = subject
251 message['From'] = from_addr
252 message['To'] = ', '.join(to_addrs)
253
254 if TESTS_ENABLED:
255 EMAIL_TEST_INBOX.append(message)
256
257 if getattr(mgoblin_globals, 'email_debug_mode', False):
258 print u"===== Email ====="
259 print u"From address: %s" % message['From']
260 print u"To addresses: %s" % message['To']
261 print u"Subject: %s" % message['Subject']
262 print u"-- Body: --"
263 print message.get_payload(decode=True)
264
265 return mhost.sendmail(from_addr, to_addrs, message.as_string())
266
267
268 ###################
269 # Translation tools
270 ###################
271
272
273 TRANSLATIONS_PATH = pkg_resources.resource_filename(
274 'mediagoblin', 'translations')
275
276
277 def locale_to_lower_upper(locale):
278 """
279 Take a locale, regardless of style, and format it like "en-us"
280 """
281 if '-' in locale:
282 lang, country = locale.split('-', 1)
283 return '%s_%s' % (lang.lower(), country.upper())
284 elif '_' in locale:
285 lang, country = locale.split('_', 1)
286 return '%s_%s' % (lang.lower(), country.upper())
287 else:
288 return locale.lower()
289
290
291 def locale_to_lower_lower(locale):
292 """
293 Take a locale, regardless of style, and format it like "en_US"
294 """
295 if '_' in locale:
296 lang, country = locale.split('_', 1)
297 return '%s-%s' % (lang.lower(), country.lower())
298 else:
299 return locale.lower()
300
301
302 def get_locale_from_request(request):
303 """
304 Figure out what target language is most appropriate based on the
305 request
306 """
307 request_form = request.GET or request.POST
308
309 if request_form.has_key('lang'):
310 return locale_to_lower_upper(request_form['lang'])
311
312 accept_lang_matches = request.accept_language.best_matches()
313
314 # Your routing can explicitly specify a target language
315 if request.matchdict.has_key('locale'):
316 target_lang = request.matchdict['locale']
317 elif request.session.has_key('target_lang'):
318 target_lang = request.session['target_lang']
319 # Pull the first acceptable language
320 elif accept_lang_matches:
321 target_lang = accept_lang_matches[0]
322 # Fall back to English
323 else:
324 target_lang = 'en'
325
326 return locale_to_lower_upper(target_lang)
327
328
329 def read_config_file(conf_file):
330 """
331 Read a paste deploy style config file and process it.
332 """
333 if not os.path.exists(conf_file):
334 raise IOError(
335 "MEDIAGOBLIN_CONFIG not set or file does not exist")
336
337 parser = NicerConfigParser(conf_file)
338 parser.read(conf_file)
339 parser._defaults.setdefault(
340 'here', os.path.dirname(os.path.abspath(conf_file)))
341 parser._defaults.setdefault(
342 '__file__', os.path.abspath(conf_file))
343
344 mgoblin_conf = dict(
345 [(section_name, dict(parser.items(section_name)))
346 for section_name in parser.sections()])
347
348 return mgoblin_conf
349
350
351 SETUP_GETTEXTS = {}
352
353 def setup_gettext(locale):
354 """
355 Setup the gettext instance based on this locale
356 """
357 # Later on when we have plugins we may want to enable the
358 # multi-translations system they have so we can handle plugin
359 # translations too
360
361 # TODO: fallback nicely on translations from pt_PT to pt if not
362 # available, etc.
363 if SETUP_GETTEXTS.has_key(locale):
364 this_gettext = SETUP_GETTEXTS[locale]
365 else:
366 this_gettext = gettext.translation(
367 'mediagoblin', TRANSLATIONS_PATH, [locale], fallback=True)
368 if exists(locale):
369 SETUP_GETTEXTS[locale] = this_gettext
370
371 mgoblin_globals.setup_globals(
372 translations=this_gettext)
373
374
375 PAGINATION_DEFAULT_PER_PAGE = 30
376
377 class Pagination(object):
378 """
379 Pagination class for mongodb queries.
380
381 Initialization through __init__(self, cursor, page=1, per_page=2),
382 get actual data slice through __call__().
383 """
384
385 def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE):
386 """
387 Initializes Pagination
388
389 Args:
390 - page: requested page
391 - per_page: number of objects per page
392 - cursor: db cursor
393 """
394 self.page = page
395 self.per_page = per_page
396 self.cursor = cursor
397 self.total_count = self.cursor.count()
398
399 def __call__(self):
400 """
401 Returns slice of objects for the requested page
402 """
403 return self.cursor.skip(
404 (self.page - 1) * self.per_page).limit(self.per_page)
405
406 @property
407 def pages(self):
408 return int(ceil(self.total_count / float(self.per_page)))
409
410 @property
411 def has_prev(self):
412 return self.page > 1
413
414 @property
415 def has_next(self):
416 return self.page < self.pages
417
418 def iter_pages(self, left_edge=2, left_current=2,
419 right_current=5, right_edge=2):
420 last = 0
421 for num in xrange(1, self.pages + 1):
422 if num <= left_edge or \
423 (num > self.page - left_current - 1 and \
424 num < self.page + right_current) or \
425 num > self.pages - right_edge:
426 if last + 1 != num:
427 yield None
428 yield num
429 last = num
430
431 def get_page_url_explicit(self, base_url, get_params, page_no):
432 """
433 Get a page url by adding a page= parameter to the base url
434 """
435 new_get_params = copy.copy(get_params or {})
436 new_get_params['page'] = page_no
437 return "%s?%s" % (
438 base_url, urllib.urlencode(new_get_params))
439
440 def get_page_url(self, request, page_no):
441 """
442 Get a new page url based of the request, and the new page number.
443
444 This is a nice wrapper around get_page_url_explicit()
445 """
446 return self.get_page_url_explicit(
447 request.path_info, request.GET, page_no)