1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from email
.MIMEText
import MIMEText
25 from string
import strip
29 from babel
.localedata
import exists
32 from webob
import Response
, exc
33 from lxml
.html
.clean
import Cleaner
36 from mediagoblin
import mg_globals
37 from mediagoblin
import messages
38 from mediagoblin
.db
.util
import ObjectId
41 def _activate_testing():
43 Call this to activate testing in util.py
49 def clear_test_buckets():
51 We store some things for testing purposes that should be cleared
52 when we want a "clean slate" of information for our next round of
53 tests. Call this function to wipe all that stuff clean.
55 Also wipes out some other things we might redefine during testing,
58 global SETUP_JINJA_ENVS
61 global EMAIL_TEST_INBOX
62 global EMAIL_TEST_MBOX_INBOX
64 EMAIL_TEST_MBOX_INBOX
= []
66 clear_test_template_context()
72 def get_jinja_env(template_loader
, locale
):
74 Set up the Jinja environment,
76 (In the future we may have another system for providing theming;
77 for now this is good enough.)
81 # If we have a jinja environment set up with this locale, just
83 if SETUP_JINJA_ENVS
.has_key(locale
):
84 return SETUP_JINJA_ENVS
[locale
]
86 template_env
= jinja2
.Environment(
87 loader
=template_loader
, autoescape
=True,
88 extensions
=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
90 template_env
.install_gettext_callables(
91 mg_globals
.translations
.gettext
,
92 mg_globals
.translations
.ngettext
)
94 # All templates will know how to ...
95 # ... fetch all waiting messages and remove them from the queue
96 template_env
.globals['fetch_messages'] = messages
.fetch_messages
99 SETUP_JINJA_ENVS
[locale
] = template_env
104 # We'll store context information here when doing unit tests
105 TEMPLATE_TEST_CONTEXT
= {}
108 def render_template(request
, template_path
, context
):
110 Render a template with context.
112 Always inserts the request into the context, so you don't have to.
113 Also stores the context if we're doing unit tests. Helpful!
115 template
= request
.template_env
.get_template(
117 context
['request'] = request
118 rendered
= template
.render(context
)
121 TEMPLATE_TEST_CONTEXT
[template_path
] = context
126 def clear_test_template_context():
127 global TEMPLATE_TEST_CONTEXT
128 TEMPLATE_TEST_CONTEXT
= {}
131 def render_to_response(request
, template
, context
):
132 """Much like Django's shortcut.render()"""
133 return Response(render_template(request
, template
, context
))
136 def redirect(request
, *args
, **kwargs
):
137 """Returns a HTTPFound(), takes a request and then urlgen params"""
138 return exc
.HTTPFound(location
=request
.urlgen(*args
, **kwargs
))
141 def setup_user_in_request(request
):
143 Examine a request and tack on a request.user parameter if that's
146 if not request
.session
.has_key('user_id'):
151 user
= request
.app
.db
.User
.one(
152 {'_id': ObjectId(request
.session
['user_id'])})
155 # Something's wrong... this user doesn't exist? Invalidate
157 request
.session
.invalidate()
162 def import_component(import_string
):
164 Import a module component defined by STRING. Probably a method,
165 class, or global variable.
168 - import_string: a string that defines what to import. Written
169 in the format of "module1.module2:component"
171 module_name
, func_name
= import_string
.split(':', 1)
172 __import__(module_name
)
173 module
= sys
.modules
[module_name
]
174 func
= getattr(module
, func_name
)
177 _punct_re
= re
.compile(r
'[\t !"#$%&\'()*\
-/<=>?
@\
[\\\
]^_`
{|
},.]+')
179 def slugify(text, delim=u'-'):
181 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
184 for word in _punct_re.split(text.lower()):
185 word = word.encode('translit
/long')
188 return unicode(delim.join(result))
190 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
191 ### Special email test stuff begins HERE
192 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
194 # We have two "test inboxes" here:
198 # If you're writing test views
, you
'll probably want to check this.
199 # It contains a list of MIMEText messages.
201 # EMAIL_TEST_MBOX_INBOX:
202 # ----------------------
203 # This collects the messages from the FakeMhost inbox. It's reslly
204 # just here for testing the send_email method itself.
206 # Anyway this contains:
208 # - to: a list of email recipient addresses
209 # - message: not just the body, but the whole message, including
214 # Before running tests that call functions which send email, you should
215 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
217 EMAIL_TEST_INBOX
= []
218 EMAIL_TEST_MBOX_INBOX
= []
221 class FakeMhost(object):
223 Just a fake mail host so we can capture and test messages
229 def sendmail(self
, from_addr
, to_addrs
, message
):
230 EMAIL_TEST_MBOX_INBOX
.append(
235 def _clear_test_inboxes():
236 global EMAIL_TEST_INBOX
237 global EMAIL_TEST_MBOX_INBOX
238 EMAIL_TEST_INBOX
= []
239 EMAIL_TEST_MBOX_INBOX
= []
241 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
242 ### </Special email test stuff>
243 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
245 def send_email(from_addr
, to_addrs
, subject
, message_body
):
247 Simple email sending wrapper, use this so we can capture messages
248 for unit testing purposes.
251 - from_addr: address you're sending the email from
252 - to_addrs: list of recipient email addresses
253 - subject: subject of the email
254 - message_body: email body text
256 # TODO: make a mock mhost if testing is enabled
257 if TESTS_ENABLED
or mg_globals
.email_debug_mode
:
259 elif not mg_globals
.email_debug_mode
:
260 mhost
= smtplib
.SMTP()
264 message
= MIMEText(message_body
.encode('utf-8'), 'plain', 'utf-8')
265 message
['Subject'] = subject
266 message
['From'] = from_addr
267 message
['To'] = ', '.join(to_addrs
)
270 EMAIL_TEST_INBOX
.append(message
)
272 if getattr(mg_globals
, 'email_debug_mode', False):
273 print u
"===== Email ====="
274 print u
"From address: %s" % message
['From']
275 print u
"To addresses: %s" % message
['To']
276 print u
"Subject: %s" % message
['Subject']
278 print message
.get_payload(decode
=True)
280 return mhost
.sendmail(from_addr
, to_addrs
, message
.as_string())
288 TRANSLATIONS_PATH
= pkg_resources
.resource_filename(
289 'mediagoblin', 'translations')
292 def locale_to_lower_upper(locale
):
294 Take a locale, regardless of style, and format it like "en-us"
297 lang
, country
= locale
.split('-', 1)
298 return '%s_%s' % (lang
.lower(), country
.upper())
300 lang
, country
= locale
.split('_', 1)
301 return '%s_%s' % (lang
.lower(), country
.upper())
303 return locale
.lower()
306 def locale_to_lower_lower(locale
):
308 Take a locale, regardless of style, and format it like "en_US"
311 lang
, country
= locale
.split('_', 1)
312 return '%s-%s' % (lang
.lower(), country
.lower())
314 return locale
.lower()
317 def get_locale_from_request(request
):
319 Figure out what target language is most appropriate based on the
322 request_form
= request
.GET
or request
.POST
324 if request_form
.has_key('lang'):
325 return locale_to_lower_upper(request_form
['lang'])
327 accept_lang_matches
= request
.accept_language
.best_matches()
329 # Your routing can explicitly specify a target language
330 if request
.matchdict
.has_key('locale'):
331 target_lang
= request
.matchdict
['locale']
332 elif request
.session
.has_key('target_lang'):
333 target_lang
= request
.session
['target_lang']
334 # Pull the first acceptable language
335 elif accept_lang_matches
:
336 target_lang
= accept_lang_matches
[0]
337 # Fall back to English
341 return locale_to_lower_upper(target_lang
)
344 # A super strict version of the lxml.html cleaner class
345 HTML_CLEANER
= Cleaner(
352 processing_instructions
=True,
358 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
359 remove_unknown_tags
=False, # can't be used with allow_tags
360 safe_attrs_only
=True,
361 add_nofollow
=True, # for now
363 whitelist_tags
=set([]))
366 def clean_html(html
):
367 # clean_html barfs on an empty string
371 return HTML_CLEANER
.clean_html(html
)
374 def convert_to_tag_list_of_dicts(tag_string
):
376 Filter input from incoming string containing user tags,
378 Strips trailing, leading, and internal whitespace, and also converts
379 the "tags" text into an array of tags
384 # Strip out internal, trailing, and leading whitespace
385 stripped_tag_string
= u
' '.join(tag_string
.strip().split())
387 # Split the tag string into a list of tags
388 for tag
in stripped_tag_string
.split(
389 mg_globals
.app_config
['tags_delimiter']):
391 # Ignore empty or duplicate tags
392 if tag
.strip() and tag
.strip() not in [t
['name'] for t
in taglist
]:
394 if mg_globals
.app_config
['tags_case_sensitive']:
395 taglist
.append({'name': tag
.strip(),
396 'slug': slugify(tag
.strip())})
398 taglist
.append({'name': tag
.strip().lower(),
399 'slug': slugify(tag
.strip().lower())})
403 def media_tags_as_string(media_entry_tags
):
405 Generate a string from a media item's tags, stored as a list of dicts
407 This is the opposite of convert_to_tag_list_of_dicts
409 media_tag_string
= ''
411 media_tag_string
= mg_globals
.app_config
['tags_delimiter'].join(
412 [tag
['name'] for tag
in media_entry_tags
])
413 return media_tag_string
415 TOO_LONG_TAG_WARNING
= \
416 u
'Tags must be shorter than %s characters. Tags that are too long: %s'
418 def tag_length_validator(form
, field
):
420 Make sure tags do not exceed the maximum tag length.
422 tags
= convert_to_tag_list_of_dicts(field
.data
)
424 tag
['name'] for tag
in tags
425 if len(tag
['name']) > mg_globals
.app_config
['tags_max_length']]
428 raise wtforms
.ValidationError(
429 TOO_LONG_TAG_WARNING
% (mg_globals
.app_config
['tags_max_length'], \
430 ', '.join(too_long_tags
)))
433 MARKDOWN_INSTANCE
= markdown
.Markdown(safe_mode
='escape')
435 def cleaned_markdown_conversion(text
):
437 Take a block of text, run it through MarkDown, and clean its HTML.
439 # Markdown will do nothing with and clean_html can do nothing with
444 return clean_html(MARKDOWN_INSTANCE
.convert(text
))
449 def setup_gettext(locale
):
451 Setup the gettext instance based on this locale
453 # Later on when we have plugins we may want to enable the
454 # multi-translations system they have so we can handle plugin
457 # TODO: fallback nicely on translations from pt_PT to pt if not
459 if SETUP_GETTEXTS
.has_key(locale
):
460 this_gettext
= SETUP_GETTEXTS
[locale
]
462 this_gettext
= gettext
.translation(
463 'mediagoblin', TRANSLATIONS_PATH
, [locale
], fallback
=True)
465 SETUP_GETTEXTS
[locale
] = this_gettext
467 mg_globals
.setup_globals(
468 translations
=this_gettext
)
471 PAGINATION_DEFAULT_PER_PAGE
= 30
473 class Pagination(object):
475 Pagination class for mongodb queries.
477 Initialization through __init__(self, cursor, page=1, per_page=2),
478 get actual data slice through __call__().
481 def __init__(self
, page
, cursor
, per_page
=PAGINATION_DEFAULT_PER_PAGE
):
483 Initializes Pagination
486 - page: requested page
487 - per_page: number of objects per page
491 self
.per_page
= per_page
493 self
.total_count
= self
.cursor
.count()
497 Returns slice of objects for the requested page
499 return self
.cursor
.skip(
500 (self
.page
- 1) * self
.per_page
).limit(self
.per_page
)
504 return int(ceil(self
.total_count
/ float(self
.per_page
)))
512 return self
.page
< self
.pages
514 def iter_pages(self
, left_edge
=2, left_current
=2,
515 right_current
=5, right_edge
=2):
517 for num
in xrange(1, self
.pages
+ 1):
518 if num
<= left_edge
or \
519 (num
> self
.page
- left_current
- 1 and \
520 num
< self
.page
+ right_current
) or \
521 num
> self
.pages
- right_edge
:
527 def get_page_url_explicit(self
, base_url
, get_params
, page_no
):
529 Get a page url by adding a page= parameter to the base url
531 new_get_params
= copy
.copy(get_params
or {})
532 new_get_params
['page'] = page_no
534 base_url
, urllib
.urlencode(new_get_params
))
536 def get_page_url(self
, request
, page_no
):
538 Get a new page url based of the request, and the new page number.
540 This is a nice wrapper around get_page_url_explicit()
542 return self
.get_page_url_explicit(
543 request
.path_info
, request
.GET
, page_no
)