1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from __future__
import division
19 from email
.MIMEText
import MIMEText
26 from math
import ceil
, floor
30 from babel
.localedata
import exists
31 from babel
.support
import LazyProxy
34 from webob
import Response
, exc
35 from lxml
.html
.clean
import Cleaner
37 from wtforms
.form
import Form
39 from mediagoblin
import mg_globals
40 from mediagoblin
import messages
41 from mediagoblin
.db
.util
import ObjectId
43 from itertools
import izip
, count
45 DISPLAY_IMAGE_FETCHING_ORDER
= [u
'medium', u
'original', u
'thumb']
48 def _activate_testing():
50 Call this to activate testing in util.py
56 def clear_test_buckets():
58 We store some things for testing purposes that should be cleared
59 when we want a "clean slate" of information for our next round of
60 tests. Call this function to wipe all that stuff clean.
62 Also wipes out some other things we might redefine during testing,
65 global SETUP_JINJA_ENVS
68 global EMAIL_TEST_INBOX
69 global EMAIL_TEST_MBOX_INBOX
71 EMAIL_TEST_MBOX_INBOX
= []
73 clear_test_template_context()
79 def get_jinja_env(template_loader
, locale
):
81 Set up the Jinja environment,
83 (In the future we may have another system for providing theming;
84 for now this is good enough.)
88 # If we have a jinja environment set up with this locale, just
90 if SETUP_JINJA_ENVS
.has_key(locale
):
91 return SETUP_JINJA_ENVS
[locale
]
93 template_env
= jinja2
.Environment(
94 loader
=template_loader
, autoescape
=True,
95 extensions
=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
97 template_env
.install_gettext_callables(
98 mg_globals
.translations
.ugettext
,
99 mg_globals
.translations
.ungettext
)
101 # All templates will know how to ...
102 # ... fetch all waiting messages and remove them from the queue
103 # ... construct a grid of thumbnails or other media
104 template_env
.globals['fetch_messages'] = messages
.fetch_messages
105 template_env
.globals['gridify_list'] = gridify_list
106 template_env
.globals['gridify_cursor'] = gridify_cursor
109 SETUP_JINJA_ENVS
[locale
] = template_env
114 # We'll store context information here when doing unit tests
115 TEMPLATE_TEST_CONTEXT
= {}
118 def render_template(request
, template_path
, context
):
120 Render a template with context.
122 Always inserts the request into the context, so you don't have to.
123 Also stores the context if we're doing unit tests. Helpful!
125 template
= request
.template_env
.get_template(
127 context
['request'] = request
128 rendered
= template
.render(context
)
131 TEMPLATE_TEST_CONTEXT
[template_path
] = context
136 def clear_test_template_context():
137 global TEMPLATE_TEST_CONTEXT
138 TEMPLATE_TEST_CONTEXT
= {}
141 def render_to_response(request
, template
, context
, status
=200):
142 """Much like Django's shortcut.render()"""
144 render_template(request
, template
, context
),
148 def redirect(request
, *args
, **kwargs
):
149 """Returns a HTTPFound(), takes a request and then urlgen params"""
152 if kwargs
.get('querystring'):
153 querystring
= kwargs
.get('querystring')
154 del kwargs
['querystring']
156 return exc
.HTTPFound(
158 request
.urlgen(*args
, **kwargs
),
159 querystring
if querystring
else '']))
162 def setup_user_in_request(request
):
164 Examine a request and tack on a request.user parameter if that's
167 if not request
.session
.has_key('user_id'):
172 user
= request
.app
.db
.User
.one(
173 {'_id': ObjectId(request
.session
['user_id'])})
176 # Something's wrong... this user doesn't exist? Invalidate
178 request
.session
.invalidate()
183 def import_component(import_string
):
185 Import a module component defined by STRING. Probably a method,
186 class, or global variable.
189 - import_string: a string that defines what to import. Written
190 in the format of "module1.module2:component"
192 module_name
, func_name
= import_string
.split(':', 1)
193 __import__(module_name
)
194 module
= sys
.modules
[module_name
]
195 func
= getattr(module
, func_name
)
198 _punct_re
= re
.compile(r
'[\t !"#$%&\'()*\
-/<=>?
@\
[\\\
]^_`
{|
},.]+')
200 def slugify(text, delim=u'-'):
202 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
205 for word in _punct_re.split(text.lower()):
206 word = word.encode('translit
/long')
209 return unicode(delim.join(result))
211 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
212 ### Special email test stuff begins HERE
213 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
215 # We have two "test inboxes" here:
219 # If you're writing test views
, you
'll probably want to check this.
220 # It contains a list of MIMEText messages.
222 # EMAIL_TEST_MBOX_INBOX:
223 # ----------------------
224 # This collects the messages from the FakeMhost inbox. It's reslly
225 # just here for testing the send_email method itself.
227 # Anyway this contains:
229 # - to: a list of email recipient addresses
230 # - message: not just the body, but the whole message, including
235 # Before running tests that call functions which send email, you should
236 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
238 EMAIL_TEST_INBOX
= []
239 EMAIL_TEST_MBOX_INBOX
= []
242 class FakeMhost(object):
244 Just a fake mail host so we can capture and test messages
250 def sendmail(self
, from_addr
, to_addrs
, message
):
251 EMAIL_TEST_MBOX_INBOX
.append(
256 def _clear_test_inboxes():
257 global EMAIL_TEST_INBOX
258 global EMAIL_TEST_MBOX_INBOX
259 EMAIL_TEST_INBOX
= []
260 EMAIL_TEST_MBOX_INBOX
= []
262 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
263 ### </Special email test stuff>
264 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
266 def send_email(from_addr
, to_addrs
, subject
, message_body
):
268 Simple email sending wrapper, use this so we can capture messages
269 for unit testing purposes.
272 - from_addr: address you're sending the email from
273 - to_addrs: list of recipient email addresses
274 - subject: subject of the email
275 - message_body: email body text
277 # TODO: make a mock mhost if testing is enabled
278 if TESTS_ENABLED
or mg_globals
.app_config
['email_debug_mode']:
280 elif not mg_globals
.app_config
['email_debug_mode']:
281 mhost
= smtplib
.SMTP()
285 message
= MIMEText(message_body
.encode('utf-8'), 'plain', 'utf-8')
286 message
['Subject'] = subject
287 message
['From'] = from_addr
288 message
['To'] = ', '.join(to_addrs
)
291 EMAIL_TEST_INBOX
.append(message
)
293 if mg_globals
.app_config
['email_debug_mode']:
294 print u
"===== Email ====="
295 print u
"From address: %s" % message
['From']
296 print u
"To addresses: %s" % message
['To']
297 print u
"Subject: %s" % message
['Subject']
299 print message
.get_payload(decode
=True)
301 return mhost
.sendmail(from_addr
, to_addrs
, message
.as_string())
309 TRANSLATIONS_PATH
= pkg_resources
.resource_filename(
310 'mediagoblin', 'i18n')
313 def locale_to_lower_upper(locale
):
315 Take a locale, regardless of style, and format it like "en-us"
318 lang
, country
= locale
.split('-', 1)
319 return '%s_%s' % (lang
.lower(), country
.upper())
321 lang
, country
= locale
.split('_', 1)
322 return '%s_%s' % (lang
.lower(), country
.upper())
324 return locale
.lower()
327 def locale_to_lower_lower(locale
):
329 Take a locale, regardless of style, and format it like "en_US"
332 lang
, country
= locale
.split('_', 1)
333 return '%s-%s' % (lang
.lower(), country
.lower())
335 return locale
.lower()
338 def get_locale_from_request(request
):
340 Figure out what target language is most appropriate based on the
343 request_form
= request
.GET
or request
.POST
345 if request_form
.has_key('lang'):
346 return locale_to_lower_upper(request_form
['lang'])
348 accept_lang_matches
= request
.accept_language
.best_matches()
350 # Your routing can explicitly specify a target language
351 matchdict
= request
.matchdict
or {}
353 if matchdict
.has_key('locale'):
354 target_lang
= matchdict
['locale']
355 elif request
.session
.has_key('target_lang'):
356 target_lang
= request
.session
['target_lang']
357 # Pull the first acceptable language
358 elif accept_lang_matches
:
359 target_lang
= accept_lang_matches
[0]
360 # Fall back to English
364 return locale_to_lower_upper(target_lang
)
367 # A super strict version of the lxml.html cleaner class
368 HTML_CLEANER
= Cleaner(
375 processing_instructions
=True,
381 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
382 remove_unknown_tags
=False, # can't be used with allow_tags
383 safe_attrs_only
=True,
384 add_nofollow
=True, # for now
386 whitelist_tags
=set([]))
389 def clean_html(html
):
390 # clean_html barfs on an empty string
394 return HTML_CLEANER
.clean_html(html
)
397 def convert_to_tag_list_of_dicts(tag_string
):
399 Filter input from incoming string containing user tags,
401 Strips trailing, leading, and internal whitespace, and also converts
402 the "tags" text into an array of tags
407 # Strip out internal, trailing, and leading whitespace
408 stripped_tag_string
= u
' '.join(tag_string
.strip().split())
410 # Split the tag string into a list of tags
411 for tag
in stripped_tag_string
.split(
412 mg_globals
.app_config
['tags_delimiter']):
414 # Ignore empty or duplicate tags
415 if tag
.strip() and tag
.strip() not in [t
['name'] for t
in taglist
]:
417 taglist
.append({'name': tag
.strip(),
418 'slug': slugify(tag
.strip())})
422 def media_tags_as_string(media_entry_tags
):
424 Generate a string from a media item's tags, stored as a list of dicts
426 This is the opposite of convert_to_tag_list_of_dicts
428 media_tag_string
= ''
430 media_tag_string
= mg_globals
.app_config
['tags_delimiter'].join(
431 [tag
['name'] for tag
in media_entry_tags
])
432 return media_tag_string
434 TOO_LONG_TAG_WARNING
= \
435 u
'Tags must be shorter than %s characters. Tags that are too long: %s'
437 def tag_length_validator(form
, field
):
439 Make sure tags do not exceed the maximum tag length.
441 tags
= convert_to_tag_list_of_dicts(field
.data
)
443 tag
['name'] for tag
in tags
444 if len(tag
['name']) > mg_globals
.app_config
['tags_max_length']]
447 raise wtforms
.ValidationError(
448 TOO_LONG_TAG_WARNING
% (mg_globals
.app_config
['tags_max_length'], \
449 ', '.join(too_long_tags
)))
452 MARKDOWN_INSTANCE
= markdown
.Markdown(safe_mode
='escape')
454 def cleaned_markdown_conversion(text
):
456 Take a block of text, run it through MarkDown, and clean its HTML.
458 # Markdown will do nothing with and clean_html can do nothing with
463 return clean_html(MARKDOWN_INSTANCE
.convert(text
))
468 def setup_gettext(locale
):
470 Setup the gettext instance based on this locale
472 # Later on when we have plugins we may want to enable the
473 # multi-translations system they have so we can handle plugin
476 # TODO: fallback nicely on translations from pt_PT to pt if not
478 if SETUP_GETTEXTS
.has_key(locale
):
479 this_gettext
= SETUP_GETTEXTS
[locale
]
481 this_gettext
= gettext
.translation(
482 'mediagoblin', TRANSLATIONS_PATH
, [locale
], fallback
=True)
484 SETUP_GETTEXTS
[locale
] = this_gettext
486 mg_globals
.setup_globals(
487 translations
=this_gettext
)
490 # Force en to be setup before anything else so that
491 # mg_globals.translations is never None
495 def pass_to_ugettext(*args
, **kwargs
):
497 Pass a translation on to the appropriate ugettext method.
499 The reason we can't have a global ugettext method is because
500 mg_globals gets swapped out by the application per-request.
502 return mg_globals
.translations
.ugettext(
506 def lazy_pass_to_ugettext(*args
, **kwargs
):
508 Lazily pass to ugettext.
510 This is useful if you have to define a translation on a module
511 level but you need it to not translate until the time that it's
514 return LazyProxy(pass_to_ugettext
, *args
, **kwargs
)
517 def pass_to_ngettext(*args
, **kwargs
):
519 Pass a translation on to the appropriate ngettext method.
521 The reason we can't have a global ngettext method is because
522 mg_globals gets swapped out by the application per-request.
524 return mg_globals
.translations
.ngettext(
528 def lazy_pass_to_ngettext(*args
, **kwargs
):
530 Lazily pass to ngettext.
532 This is useful if you have to define a translation on a module
533 level but you need it to not translate until the time that it's
536 return LazyProxy(pass_to_ngettext
, *args
, **kwargs
)
539 def fake_ugettext_passthrough(string
):
541 Fake a ugettext call for extraction's sake ;)
543 In wtforms there's a separate way to define a method to translate
544 things... so we just need to mark up the text so that it can be
545 extracted, not so that it's actually run through gettext.
550 PAGINATION_DEFAULT_PER_PAGE
= 30
552 class Pagination(object):
554 Pagination class for mongodb queries.
556 Initialization through __init__(self, cursor, page=1, per_page=2),
557 get actual data slice through __call__().
560 def __init__(self
, page
, cursor
, per_page
=PAGINATION_DEFAULT_PER_PAGE
,
563 Initializes Pagination
566 - page: requested page
567 - per_page: number of objects per page
569 - jump_to_id: ObjectId, sets the page to the page containing the object
570 with _id == jump_to_id.
573 self
.per_page
= per_page
575 self
.total_count
= self
.cursor
.count()
576 self
.active_id
= None
579 cursor
= copy
.copy(self
.cursor
)
581 for (doc
, increment
) in izip(cursor
, count(0)):
582 if doc
['_id'] == jump_to_id
:
583 self
.page
= 1 + int(floor(increment
/ self
.per_page
))
585 self
.active_id
= jump_to_id
591 Returns slice of objects for the requested page
593 return self
.cursor
.skip(
594 (self
.page
- 1) * self
.per_page
).limit(self
.per_page
)
598 return int(ceil(self
.total_count
/ float(self
.per_page
)))
606 return self
.page
< self
.pages
608 def iter_pages(self
, left_edge
=2, left_current
=2,
609 right_current
=5, right_edge
=2):
611 for num
in xrange(1, self
.pages
+ 1):
612 if num
<= left_edge
or \
613 (num
> self
.page
- left_current
- 1 and \
614 num
< self
.page
+ right_current
) or \
615 num
> self
.pages
- right_edge
:
621 def get_page_url_explicit(self
, base_url
, get_params
, page_no
):
623 Get a page url by adding a page= parameter to the base url
625 new_get_params
= copy
.copy(get_params
or {})
626 new_get_params
['page'] = page_no
628 base_url
, urllib
.urlencode(new_get_params
))
630 def get_page_url(self
, request
, page_no
):
632 Get a new page url based of the request, and the new page number.
634 This is a nice wrapper around get_page_url_explicit()
636 return self
.get_page_url_explicit(
637 request
.path_info
, request
.GET
, page_no
)
640 def gridify_list(this_list
, num_cols
=5):
642 Generates a list of lists where each sub-list's length depends on
643 the number of columns in the list
647 # Figure out how many rows we should have
648 num_rows
= int(ceil(float(len(this_list
)) / num_cols
))
650 for row_num
in range(num_rows
):
651 slice_min
= row_num
* num_cols
652 slice_max
= (row_num
+ 1) * num_cols
654 row
= this_list
[slice_min
:slice_max
]
661 def gridify_cursor(this_cursor
, num_cols
=5):
663 Generates a list of lists where each sub-list's length depends on
664 the number of columns in the list
666 return gridify_list(list(this_cursor
), num_cols
)
669 def render_404(request
):
673 return render_to_response(
674 request
, 'mediagoblin/404.html', {}, status
=400)