1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from __future__
import division
19 from email
.MIMEText
import MIMEText
26 from math
import ceil
, floor
30 from babel
.localedata
import exists
31 from babel
.support
import LazyProxy
34 from webob
import Response
, exc
35 from lxml
.html
.clean
import Cleaner
37 from wtforms
.form
import Form
39 from mediagoblin
import mg_globals
40 from mediagoblin
import messages
41 from mediagoblin
.db
.util
import ObjectId
43 from itertools
import izip
, count
45 DISPLAY_IMAGE_FETCHING_ORDER
= [u
'medium', u
'original', u
'thumb']
48 def _activate_testing():
50 Call this to activate testing in util.py
56 def clear_test_buckets():
58 We store some things for testing purposes that should be cleared
59 when we want a "clean slate" of information for our next round of
60 tests. Call this function to wipe all that stuff clean.
62 Also wipes out some other things we might redefine during testing,
65 global SETUP_JINJA_ENVS
68 global EMAIL_TEST_INBOX
69 global EMAIL_TEST_MBOX_INBOX
71 EMAIL_TEST_MBOX_INBOX
= []
73 clear_test_template_context()
79 def get_jinja_env(template_loader
, locale
):
81 Set up the Jinja environment,
83 (In the future we may have another system for providing theming;
84 for now this is good enough.)
88 # If we have a jinja environment set up with this locale, just
90 if SETUP_JINJA_ENVS
.has_key(locale
):
91 return SETUP_JINJA_ENVS
[locale
]
93 template_env
= jinja2
.Environment(
94 loader
=template_loader
, autoescape
=True,
95 extensions
=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
97 template_env
.install_gettext_callables(
98 mg_globals
.translations
.ugettext
,
99 mg_globals
.translations
.ungettext
)
101 # All templates will know how to ...
102 # ... fetch all waiting messages and remove them from the queue
103 template_env
.globals['fetch_messages'] = messages
.fetch_messages
106 SETUP_JINJA_ENVS
[locale
] = template_env
111 # We'll store context information here when doing unit tests
112 TEMPLATE_TEST_CONTEXT
= {}
115 def render_template(request
, template_path
, context
):
117 Render a template with context.
119 Always inserts the request into the context, so you don't have to.
120 Also stores the context if we're doing unit tests. Helpful!
122 template
= request
.template_env
.get_template(
124 context
['request'] = request
125 rendered
= template
.render(context
)
128 TEMPLATE_TEST_CONTEXT
[template_path
] = context
133 def clear_test_template_context():
134 global TEMPLATE_TEST_CONTEXT
135 TEMPLATE_TEST_CONTEXT
= {}
138 def render_to_response(request
, template
, context
):
139 """Much like Django's shortcut.render()"""
140 return Response(render_template(request
, template
, context
))
143 def redirect(request
, *args
, **kwargs
):
144 """Returns a HTTPFound(), takes a request and then urlgen params"""
147 if kwargs
.get('querystring'):
148 querystring
= kwargs
.get('querystring')
149 del kwargs
['querystring']
151 return exc
.HTTPFound(
153 request
.urlgen(*args
, **kwargs
),
154 querystring
if querystring
else '']))
157 def setup_user_in_request(request
):
159 Examine a request and tack on a request.user parameter if that's
162 if not request
.session
.has_key('user_id'):
167 user
= request
.app
.db
.User
.one(
168 {'_id': ObjectId(request
.session
['user_id'])})
171 # Something's wrong... this user doesn't exist? Invalidate
173 request
.session
.invalidate()
178 def import_component(import_string
):
180 Import a module component defined by STRING. Probably a method,
181 class, or global variable.
184 - import_string: a string that defines what to import. Written
185 in the format of "module1.module2:component"
187 module_name
, func_name
= import_string
.split(':', 1)
188 __import__(module_name
)
189 module
= sys
.modules
[module_name
]
190 func
= getattr(module
, func_name
)
193 _punct_re
= re
.compile(r
'[\t !"#$%&\'()*\
-/<=>?
@\
[\\\
]^_`
{|
},.]+')
195 def slugify(text, delim=u'-'):
197 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
200 for word in _punct_re.split(text.lower()):
201 word = word.encode('translit
/long')
204 return unicode(delim.join(result))
206 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
207 ### Special email test stuff begins HERE
208 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
210 # We have two "test inboxes" here:
214 # If you're writing test views
, you
'll probably want to check this.
215 # It contains a list of MIMEText messages.
217 # EMAIL_TEST_MBOX_INBOX:
218 # ----------------------
219 # This collects the messages from the FakeMhost inbox. It's reslly
220 # just here for testing the send_email method itself.
222 # Anyway this contains:
224 # - to: a list of email recipient addresses
225 # - message: not just the body, but the whole message, including
230 # Before running tests that call functions which send email, you should
231 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
233 EMAIL_TEST_INBOX
= []
234 EMAIL_TEST_MBOX_INBOX
= []
237 class FakeMhost(object):
239 Just a fake mail host so we can capture and test messages
245 def sendmail(self
, from_addr
, to_addrs
, message
):
246 EMAIL_TEST_MBOX_INBOX
.append(
251 def _clear_test_inboxes():
252 global EMAIL_TEST_INBOX
253 global EMAIL_TEST_MBOX_INBOX
254 EMAIL_TEST_INBOX
= []
255 EMAIL_TEST_MBOX_INBOX
= []
257 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
258 ### </Special email test stuff>
259 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
261 def send_email(from_addr
, to_addrs
, subject
, message_body
):
263 Simple email sending wrapper, use this so we can capture messages
264 for unit testing purposes.
267 - from_addr: address you're sending the email from
268 - to_addrs: list of recipient email addresses
269 - subject: subject of the email
270 - message_body: email body text
272 # TODO: make a mock mhost if testing is enabled
273 if TESTS_ENABLED
or mg_globals
.app_config
['email_debug_mode']:
275 elif not mg_globals
.app_config
['email_debug_mode']:
276 mhost
= smtplib
.SMTP()
280 message
= MIMEText(message_body
.encode('utf-8'), 'plain', 'utf-8')
281 message
['Subject'] = subject
282 message
['From'] = from_addr
283 message
['To'] = ', '.join(to_addrs
)
286 EMAIL_TEST_INBOX
.append(message
)
288 if mg_globals
.app_config
['email_debug_mode']:
289 print u
"===== Email ====="
290 print u
"From address: %s" % message
['From']
291 print u
"To addresses: %s" % message
['To']
292 print u
"Subject: %s" % message
['Subject']
294 print message
.get_payload(decode
=True)
296 return mhost
.sendmail(from_addr
, to_addrs
, message
.as_string())
304 TRANSLATIONS_PATH
= pkg_resources
.resource_filename(
305 'mediagoblin', 'i18n')
308 def locale_to_lower_upper(locale
):
310 Take a locale, regardless of style, and format it like "en-us"
313 lang
, country
= locale
.split('-', 1)
314 return '%s_%s' % (lang
.lower(), country
.upper())
316 lang
, country
= locale
.split('_', 1)
317 return '%s_%s' % (lang
.lower(), country
.upper())
319 return locale
.lower()
322 def locale_to_lower_lower(locale
):
324 Take a locale, regardless of style, and format it like "en_US"
327 lang
, country
= locale
.split('_', 1)
328 return '%s-%s' % (lang
.lower(), country
.lower())
330 return locale
.lower()
333 def get_locale_from_request(request
):
335 Figure out what target language is most appropriate based on the
338 request_form
= request
.GET
or request
.POST
340 if request_form
.has_key('lang'):
341 return locale_to_lower_upper(request_form
['lang'])
343 accept_lang_matches
= request
.accept_language
.best_matches()
345 # Your routing can explicitly specify a target language
346 if request
.matchdict
.has_key('locale'):
347 target_lang
= request
.matchdict
['locale']
348 elif request
.session
.has_key('target_lang'):
349 target_lang
= request
.session
['target_lang']
350 # Pull the first acceptable language
351 elif accept_lang_matches
:
352 target_lang
= accept_lang_matches
[0]
353 # Fall back to English
357 return locale_to_lower_upper(target_lang
)
360 # A super strict version of the lxml.html cleaner class
361 HTML_CLEANER
= Cleaner(
368 processing_instructions
=True,
374 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
375 remove_unknown_tags
=False, # can't be used with allow_tags
376 safe_attrs_only
=True,
377 add_nofollow
=True, # for now
379 whitelist_tags
=set([]))
382 def clean_html(html
):
383 # clean_html barfs on an empty string
387 return HTML_CLEANER
.clean_html(html
)
390 def convert_to_tag_list_of_dicts(tag_string
):
392 Filter input from incoming string containing user tags,
394 Strips trailing, leading, and internal whitespace, and also converts
395 the "tags" text into an array of tags
400 # Strip out internal, trailing, and leading whitespace
401 stripped_tag_string
= u
' '.join(tag_string
.strip().split())
403 # Split the tag string into a list of tags
404 for tag
in stripped_tag_string
.split(
405 mg_globals
.app_config
['tags_delimiter']):
407 # Ignore empty or duplicate tags
408 if tag
.strip() and tag
.strip() not in [t
['name'] for t
in taglist
]:
410 taglist
.append({'name': tag
.strip(),
411 'slug': slugify(tag
.strip())})
415 def media_tags_as_string(media_entry_tags
):
417 Generate a string from a media item's tags, stored as a list of dicts
419 This is the opposite of convert_to_tag_list_of_dicts
421 media_tag_string
= ''
423 media_tag_string
= mg_globals
.app_config
['tags_delimiter'].join(
424 [tag
['name'] for tag
in media_entry_tags
])
425 return media_tag_string
427 TOO_LONG_TAG_WARNING
= \
428 u
'Tags must be shorter than %s characters. Tags that are too long: %s'
430 def tag_length_validator(form
, field
):
432 Make sure tags do not exceed the maximum tag length.
434 tags
= convert_to_tag_list_of_dicts(field
.data
)
436 tag
['name'] for tag
in tags
437 if len(tag
['name']) > mg_globals
.app_config
['tags_max_length']]
440 raise wtforms
.ValidationError(
441 TOO_LONG_TAG_WARNING
% (mg_globals
.app_config
['tags_max_length'], \
442 ', '.join(too_long_tags
)))
445 MARKDOWN_INSTANCE
= markdown
.Markdown(safe_mode
='escape')
447 def cleaned_markdown_conversion(text
):
449 Take a block of text, run it through MarkDown, and clean its HTML.
451 # Markdown will do nothing with and clean_html can do nothing with
456 return clean_html(MARKDOWN_INSTANCE
.convert(text
))
461 def setup_gettext(locale
):
463 Setup the gettext instance based on this locale
465 # Later on when we have plugins we may want to enable the
466 # multi-translations system they have so we can handle plugin
469 # TODO: fallback nicely on translations from pt_PT to pt if not
471 if SETUP_GETTEXTS
.has_key(locale
):
472 this_gettext
= SETUP_GETTEXTS
[locale
]
474 this_gettext
= gettext
.translation(
475 'mediagoblin', TRANSLATIONS_PATH
, [locale
], fallback
=True)
477 SETUP_GETTEXTS
[locale
] = this_gettext
479 mg_globals
.setup_globals(
480 translations
=this_gettext
)
483 # Force en to be setup before anything else so that
484 # mg_globals.translations is never None
488 def pass_to_ugettext(*args
, **kwargs
):
490 Pass a translation on to the appropriate ugettext method.
492 The reason we can't have a global ugettext method is because
493 mg_globals gets swapped out by the application per-request.
495 return mg_globals
.translations
.ugettext(
499 def lazy_pass_to_ugettext(*args
, **kwargs
):
501 Lazily pass to ugettext.
503 This is useful if you have to define a translation on a module
504 level but you need it to not translate until the time that it's
507 return LazyProxy(pass_to_ugettext
, *args
, **kwargs
)
510 def pass_to_ngettext(*args
, **kwargs
):
512 Pass a translation on to the appropriate ngettext method.
514 The reason we can't have a global ngettext method is because
515 mg_globals gets swapped out by the application per-request.
517 return mg_globals
.translations
.ngettext(
521 def lazy_pass_to_ngettext(*args
, **kwargs
):
523 Lazily pass to ngettext.
525 This is useful if you have to define a translation on a module
526 level but you need it to not translate until the time that it's
529 return LazyProxy(pass_to_ngettext
, *args
, **kwargs
)
532 def fake_ugettext_passthrough(string
):
534 Fake a ugettext call for extraction's sake ;)
536 In wtforms there's a separate way to define a method to translate
537 things... so we just need to mark up the text so that it can be
538 extracted, not so that it's actually run through gettext.
543 PAGINATION_DEFAULT_PER_PAGE
= 30
545 class Pagination(object):
547 Pagination class for mongodb queries.
549 Initialization through __init__(self, cursor, page=1, per_page=2),
550 get actual data slice through __call__().
553 def __init__(self
, page
, cursor
, per_page
=PAGINATION_DEFAULT_PER_PAGE
,
556 Initializes Pagination
559 - page: requested page
560 - per_page: number of objects per page
562 - jump_to_id: ObjectId, sets the page to the page containing the object
563 with _id == jump_to_id.
566 self
.per_page
= per_page
568 self
.total_count
= self
.cursor
.count()
569 self
.active_id
= None
572 cursor
= copy
.copy(self
.cursor
)
574 for (doc
, increment
) in izip(cursor
, count(0)):
575 if doc
['_id'] == jump_to_id
:
576 self
.page
= 1 + int(floor(increment
/ self
.per_page
))
578 self
.active_id
= jump_to_id
584 Returns slice of objects for the requested page
586 return self
.cursor
.skip(
587 (self
.page
- 1) * self
.per_page
).limit(self
.per_page
)
591 return int(ceil(self
.total_count
/ float(self
.per_page
)))
599 return self
.page
< self
.pages
601 def iter_pages(self
, left_edge
=2, left_current
=2,
602 right_current
=5, right_edge
=2):
604 for num
in xrange(1, self
.pages
+ 1):
605 if num
<= left_edge
or \
606 (num
> self
.page
- left_current
- 1 and \
607 num
< self
.page
+ right_current
) or \
608 num
> self
.pages
- right_edge
:
614 def get_page_url_explicit(self
, base_url
, get_params
, page_no
):
616 Get a page url by adding a page= parameter to the base url
618 new_get_params
= copy
.copy(get_params
or {})
619 new_get_params
['page'] = page_no
621 base_url
, urllib
.urlencode(new_get_params
))
623 def get_page_url(self
, request
, page_no
):
625 Get a new page url based of the request, and the new page number.
627 This is a nice wrapper around get_page_url_explicit()
629 return self
.get_page_url_explicit(
630 request
.path_info
, request
.GET
, page_no
)