1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from __future__
import division
19 from email
.MIMEText
import MIMEText
26 from math
import ceil
, floor
30 from babel
.localedata
import exists
31 from babel
.support
import LazyProxy
34 from webob
import Response
, exc
35 from lxml
.html
.clean
import Cleaner
37 from wtforms
.form
import Form
39 from mediagoblin
import mg_globals
40 from mediagoblin
import messages
41 from mediagoblin
.db
.util
import ObjectId
43 from itertools
import izip
, count
45 DISPLAY_IMAGE_FETCHING_ORDER
= [u
'medium', u
'original', u
'thumb']
48 def _activate_testing():
50 Call this to activate testing in util.py
56 def clear_test_buckets():
58 We store some things for testing purposes that should be cleared
59 when we want a "clean slate" of information for our next round of
60 tests. Call this function to wipe all that stuff clean.
62 Also wipes out some other things we might redefine during testing,
65 global SETUP_JINJA_ENVS
68 global EMAIL_TEST_INBOX
69 global EMAIL_TEST_MBOX_INBOX
71 EMAIL_TEST_MBOX_INBOX
= []
73 clear_test_template_context()
79 def get_jinja_env(template_loader
, locale
):
81 Set up the Jinja environment,
83 (In the future we may have another system for providing theming;
84 for now this is good enough.)
88 # If we have a jinja environment set up with this locale, just
90 if SETUP_JINJA_ENVS
.has_key(locale
):
91 return SETUP_JINJA_ENVS
[locale
]
93 template_env
= jinja2
.Environment(
94 loader
=template_loader
, autoescape
=True,
95 extensions
=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
97 template_env
.install_gettext_callables(
98 mg_globals
.translations
.ugettext
,
99 mg_globals
.translations
.ungettext
)
101 # All templates will know how to ...
102 # ... fetch all waiting messages and remove them from the queue
103 # ... construct a grid of thumbnails or other media
104 template_env
.globals['fetch_messages'] = messages
.fetch_messages
105 template_env
.globals['gridify_list'] = gridify_list
106 template_env
.globals['gridify_cursor'] = gridify_cursor
109 SETUP_JINJA_ENVS
[locale
] = template_env
114 # We'll store context information here when doing unit tests
115 TEMPLATE_TEST_CONTEXT
= {}
118 def render_template(request
, template_path
, context
):
120 Render a template with context.
122 Always inserts the request into the context, so you don't have to.
123 Also stores the context if we're doing unit tests. Helpful!
125 template
= request
.template_env
.get_template(
127 context
['request'] = request
128 rendered
= template
.render(context
)
131 TEMPLATE_TEST_CONTEXT
[template_path
] = context
136 def clear_test_template_context():
137 global TEMPLATE_TEST_CONTEXT
138 TEMPLATE_TEST_CONTEXT
= {}
141 def render_to_response(request
, template
, context
):
142 """Much like Django's shortcut.render()"""
143 return Response(render_template(request
, template
, context
))
146 def redirect(request
, *args
, **kwargs
):
147 """Returns a HTTPFound(), takes a request and then urlgen params"""
150 if kwargs
.get('querystring'):
151 querystring
= kwargs
.get('querystring')
152 del kwargs
['querystring']
154 return exc
.HTTPFound(
156 request
.urlgen(*args
, **kwargs
),
157 querystring
if querystring
else '']))
160 def setup_user_in_request(request
):
162 Examine a request and tack on a request.user parameter if that's
165 if not request
.session
.has_key('user_id'):
170 user
= request
.app
.db
.User
.one(
171 {'_id': ObjectId(request
.session
['user_id'])})
174 # Something's wrong... this user doesn't exist? Invalidate
176 request
.session
.invalidate()
181 def import_component(import_string
):
183 Import a module component defined by STRING. Probably a method,
184 class, or global variable.
187 - import_string: a string that defines what to import. Written
188 in the format of "module1.module2:component"
190 module_name
, func_name
= import_string
.split(':', 1)
191 __import__(module_name
)
192 module
= sys
.modules
[module_name
]
193 func
= getattr(module
, func_name
)
196 _punct_re
= re
.compile(r
'[\t !"#$%&\'()*\
-/<=>?
@\
[\\\
]^_`
{|
},.]+')
198 def slugify(text, delim=u'-'):
200 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
203 for word in _punct_re.split(text.lower()):
204 word = word.encode('translit
/long')
207 return unicode(delim.join(result))
209 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
210 ### Special email test stuff begins HERE
211 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
213 # We have two "test inboxes" here:
217 # If you're writing test views
, you
'll probably want to check this.
218 # It contains a list of MIMEText messages.
220 # EMAIL_TEST_MBOX_INBOX:
221 # ----------------------
222 # This collects the messages from the FakeMhost inbox. It's reslly
223 # just here for testing the send_email method itself.
225 # Anyway this contains:
227 # - to: a list of email recipient addresses
228 # - message: not just the body, but the whole message, including
233 # Before running tests that call functions which send email, you should
234 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
236 EMAIL_TEST_INBOX
= []
237 EMAIL_TEST_MBOX_INBOX
= []
240 class FakeMhost(object):
242 Just a fake mail host so we can capture and test messages
248 def sendmail(self
, from_addr
, to_addrs
, message
):
249 EMAIL_TEST_MBOX_INBOX
.append(
254 def _clear_test_inboxes():
255 global EMAIL_TEST_INBOX
256 global EMAIL_TEST_MBOX_INBOX
257 EMAIL_TEST_INBOX
= []
258 EMAIL_TEST_MBOX_INBOX
= []
260 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
261 ### </Special email test stuff>
262 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
264 def send_email(from_addr
, to_addrs
, subject
, message_body
):
266 Simple email sending wrapper, use this so we can capture messages
267 for unit testing purposes.
270 - from_addr: address you're sending the email from
271 - to_addrs: list of recipient email addresses
272 - subject: subject of the email
273 - message_body: email body text
275 # TODO: make a mock mhost if testing is enabled
276 if TESTS_ENABLED
or mg_globals
.app_config
['email_debug_mode']:
278 elif not mg_globals
.app_config
['email_debug_mode']:
279 mhost
= smtplib
.SMTP()
283 message
= MIMEText(message_body
.encode('utf-8'), 'plain', 'utf-8')
284 message
['Subject'] = subject
285 message
['From'] = from_addr
286 message
['To'] = ', '.join(to_addrs
)
289 EMAIL_TEST_INBOX
.append(message
)
291 if mg_globals
.app_config
['email_debug_mode']:
292 print u
"===== Email ====="
293 print u
"From address: %s" % message
['From']
294 print u
"To addresses: %s" % message
['To']
295 print u
"Subject: %s" % message
['Subject']
297 print message
.get_payload(decode
=True)
299 return mhost
.sendmail(from_addr
, to_addrs
, message
.as_string())
307 TRANSLATIONS_PATH
= pkg_resources
.resource_filename(
308 'mediagoblin', 'i18n')
311 def locale_to_lower_upper(locale
):
313 Take a locale, regardless of style, and format it like "en-us"
316 lang
, country
= locale
.split('-', 1)
317 return '%s_%s' % (lang
.lower(), country
.upper())
319 lang
, country
= locale
.split('_', 1)
320 return '%s_%s' % (lang
.lower(), country
.upper())
322 return locale
.lower()
325 def locale_to_lower_lower(locale
):
327 Take a locale, regardless of style, and format it like "en_US"
330 lang
, country
= locale
.split('_', 1)
331 return '%s-%s' % (lang
.lower(), country
.lower())
333 return locale
.lower()
336 def get_locale_from_request(request
):
338 Figure out what target language is most appropriate based on the
341 request_form
= request
.GET
or request
.POST
343 if request_form
.has_key('lang'):
344 return locale_to_lower_upper(request_form
['lang'])
346 accept_lang_matches
= request
.accept_language
.best_matches()
348 # Your routing can explicitly specify a target language
349 if request
.matchdict
.has_key('locale'):
350 target_lang
= request
.matchdict
['locale']
351 elif request
.session
.has_key('target_lang'):
352 target_lang
= request
.session
['target_lang']
353 # Pull the first acceptable language
354 elif accept_lang_matches
:
355 target_lang
= accept_lang_matches
[0]
356 # Fall back to English
360 return locale_to_lower_upper(target_lang
)
363 # A super strict version of the lxml.html cleaner class
364 HTML_CLEANER
= Cleaner(
371 processing_instructions
=True,
377 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
378 remove_unknown_tags
=False, # can't be used with allow_tags
379 safe_attrs_only
=True,
380 add_nofollow
=True, # for now
382 whitelist_tags
=set([]))
385 def clean_html(html
):
386 # clean_html barfs on an empty string
390 return HTML_CLEANER
.clean_html(html
)
393 def convert_to_tag_list_of_dicts(tag_string
):
395 Filter input from incoming string containing user tags,
397 Strips trailing, leading, and internal whitespace, and also converts
398 the "tags" text into an array of tags
403 # Strip out internal, trailing, and leading whitespace
404 stripped_tag_string
= u
' '.join(tag_string
.strip().split())
406 # Split the tag string into a list of tags
407 for tag
in stripped_tag_string
.split(
408 mg_globals
.app_config
['tags_delimiter']):
410 # Ignore empty or duplicate tags
411 if tag
.strip() and tag
.strip() not in [t
['name'] for t
in taglist
]:
413 taglist
.append({'name': tag
.strip(),
414 'slug': slugify(tag
.strip())})
418 def media_tags_as_string(media_entry_tags
):
420 Generate a string from a media item's tags, stored as a list of dicts
422 This is the opposite of convert_to_tag_list_of_dicts
424 media_tag_string
= ''
426 media_tag_string
= mg_globals
.app_config
['tags_delimiter'].join(
427 [tag
['name'] for tag
in media_entry_tags
])
428 return media_tag_string
430 TOO_LONG_TAG_WARNING
= \
431 u
'Tags must be shorter than %s characters. Tags that are too long: %s'
433 def tag_length_validator(form
, field
):
435 Make sure tags do not exceed the maximum tag length.
437 tags
= convert_to_tag_list_of_dicts(field
.data
)
439 tag
['name'] for tag
in tags
440 if len(tag
['name']) > mg_globals
.app_config
['tags_max_length']]
443 raise wtforms
.ValidationError(
444 TOO_LONG_TAG_WARNING
% (mg_globals
.app_config
['tags_max_length'], \
445 ', '.join(too_long_tags
)))
448 MARKDOWN_INSTANCE
= markdown
.Markdown(safe_mode
='escape')
450 def cleaned_markdown_conversion(text
):
452 Take a block of text, run it through MarkDown, and clean its HTML.
454 # Markdown will do nothing with and clean_html can do nothing with
459 return clean_html(MARKDOWN_INSTANCE
.convert(text
))
464 def setup_gettext(locale
):
466 Setup the gettext instance based on this locale
468 # Later on when we have plugins we may want to enable the
469 # multi-translations system they have so we can handle plugin
472 # TODO: fallback nicely on translations from pt_PT to pt if not
474 if SETUP_GETTEXTS
.has_key(locale
):
475 this_gettext
= SETUP_GETTEXTS
[locale
]
477 this_gettext
= gettext
.translation(
478 'mediagoblin', TRANSLATIONS_PATH
, [locale
], fallback
=True)
480 SETUP_GETTEXTS
[locale
] = this_gettext
482 mg_globals
.setup_globals(
483 translations
=this_gettext
)
486 # Force en to be setup before anything else so that
487 # mg_globals.translations is never None
491 def pass_to_ugettext(*args
, **kwargs
):
493 Pass a translation on to the appropriate ugettext method.
495 The reason we can't have a global ugettext method is because
496 mg_globals gets swapped out by the application per-request.
498 return mg_globals
.translations
.ugettext(
502 def lazy_pass_to_ugettext(*args
, **kwargs
):
504 Lazily pass to ugettext.
506 This is useful if you have to define a translation on a module
507 level but you need it to not translate until the time that it's
510 return LazyProxy(pass_to_ugettext
, *args
, **kwargs
)
513 def pass_to_ngettext(*args
, **kwargs
):
515 Pass a translation on to the appropriate ngettext method.
517 The reason we can't have a global ngettext method is because
518 mg_globals gets swapped out by the application per-request.
520 return mg_globals
.translations
.ngettext(
524 def lazy_pass_to_ngettext(*args
, **kwargs
):
526 Lazily pass to ngettext.
528 This is useful if you have to define a translation on a module
529 level but you need it to not translate until the time that it's
532 return LazyProxy(pass_to_ngettext
, *args
, **kwargs
)
535 def fake_ugettext_passthrough(string
):
537 Fake a ugettext call for extraction's sake ;)
539 In wtforms there's a separate way to define a method to translate
540 things... so we just need to mark up the text so that it can be
541 extracted, not so that it's actually run through gettext.
546 PAGINATION_DEFAULT_PER_PAGE
= 30
548 class Pagination(object):
550 Pagination class for mongodb queries.
552 Initialization through __init__(self, cursor, page=1, per_page=2),
553 get actual data slice through __call__().
556 def __init__(self
, page
, cursor
, per_page
=PAGINATION_DEFAULT_PER_PAGE
,
559 Initializes Pagination
562 - page: requested page
563 - per_page: number of objects per page
565 - jump_to_id: ObjectId, sets the page to the page containing the object
566 with _id == jump_to_id.
569 self
.per_page
= per_page
571 self
.total_count
= self
.cursor
.count()
572 self
.active_id
= None
575 cursor
= copy
.copy(self
.cursor
)
577 for (doc
, increment
) in izip(cursor
, count(0)):
578 if doc
['_id'] == jump_to_id
:
579 self
.page
= 1 + int(floor(increment
/ self
.per_page
))
581 self
.active_id
= jump_to_id
587 Returns slice of objects for the requested page
589 return self
.cursor
.skip(
590 (self
.page
- 1) * self
.per_page
).limit(self
.per_page
)
594 return int(ceil(self
.total_count
/ float(self
.per_page
)))
602 return self
.page
< self
.pages
604 def iter_pages(self
, left_edge
=2, left_current
=2,
605 right_current
=5, right_edge
=2):
607 for num
in xrange(1, self
.pages
+ 1):
608 if num
<= left_edge
or \
609 (num
> self
.page
- left_current
- 1 and \
610 num
< self
.page
+ right_current
) or \
611 num
> self
.pages
- right_edge
:
617 def get_page_url_explicit(self
, base_url
, get_params
, page_no
):
619 Get a page url by adding a page= parameter to the base url
621 new_get_params
= copy
.copy(get_params
or {})
622 new_get_params
['page'] = page_no
624 base_url
, urllib
.urlencode(new_get_params
))
626 def get_page_url(self
, request
, page_no
):
628 Get a new page url based of the request, and the new page number.
630 This is a nice wrapper around get_page_url_explicit()
632 return self
.get_page_url_explicit(
633 request
.path_info
, request
.GET
, page_no
)
636 def gridify_list(this_list
, num_cols
=5):
638 Generates a list of lists where each sub-list's length depends on
639 the number of columns in the list
643 # Figure out how many rows we should have
644 num_rows
= int(ceil(float(len(this_list
)) / num_cols
))
646 for row_num
in range(num_rows
):
647 slice_min
= row_num
* num_cols
648 slice_max
= (row_num
+ 1) * num_cols
650 row
= this_list
[slice_min
:slice_max
]
657 def gridify_cursor(this_cursor
, num_cols
=5):
659 Generates a list of lists where each sub-list's length depends on
660 the number of columns in the list
662 return gridify_list(list(this_cursor
), num_cols
)