1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 Free Software Foundation, Inc
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from __future__
import division
19 from email
.MIMEText
import MIMEText
26 from math
import ceil
, floor
30 from babel
.localedata
import exists
33 from webob
import Response
, exc
34 from lxml
.html
.clean
import Cleaner
37 from mediagoblin
import mg_globals
38 from mediagoblin
import messages
39 from mediagoblin
.db
.util
import ObjectId
41 from itertools
import izip
, count
43 DISPLAY_IMAGE_FETCHING_ORDER
= [u
'medium', u
'original', u
'thumb']
46 def _activate_testing():
48 Call this to activate testing in util.py
54 def clear_test_buckets():
56 We store some things for testing purposes that should be cleared
57 when we want a "clean slate" of information for our next round of
58 tests. Call this function to wipe all that stuff clean.
60 Also wipes out some other things we might redefine during testing,
63 global SETUP_JINJA_ENVS
66 global EMAIL_TEST_INBOX
67 global EMAIL_TEST_MBOX_INBOX
69 EMAIL_TEST_MBOX_INBOX
= []
71 clear_test_template_context()
77 def get_jinja_env(template_loader
, locale
):
79 Set up the Jinja environment,
81 (In the future we may have another system for providing theming;
82 for now this is good enough.)
86 # If we have a jinja environment set up with this locale, just
88 if SETUP_JINJA_ENVS
.has_key(locale
):
89 return SETUP_JINJA_ENVS
[locale
]
91 template_env
= jinja2
.Environment(
92 loader
=template_loader
, autoescape
=True,
93 extensions
=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
95 template_env
.install_gettext_callables(
96 mg_globals
.translations
.gettext
,
97 mg_globals
.translations
.ngettext
)
99 # All templates will know how to ...
100 # ... fetch all waiting messages and remove them from the queue
101 template_env
.globals['fetch_messages'] = messages
.fetch_messages
104 SETUP_JINJA_ENVS
[locale
] = template_env
109 # We'll store context information here when doing unit tests
110 TEMPLATE_TEST_CONTEXT
= {}
113 def render_template(request
, template_path
, context
):
115 Render a template with context.
117 Always inserts the request into the context, so you don't have to.
118 Also stores the context if we're doing unit tests. Helpful!
120 template
= request
.template_env
.get_template(
122 context
['request'] = request
123 rendered
= template
.render(context
)
126 TEMPLATE_TEST_CONTEXT
[template_path
] = context
131 def clear_test_template_context():
132 global TEMPLATE_TEST_CONTEXT
133 TEMPLATE_TEST_CONTEXT
= {}
136 def render_to_response(request
, template
, context
):
137 """Much like Django's shortcut.render()"""
138 return Response(render_template(request
, template
, context
))
141 def redirect(request
, *args
, **kwargs
):
142 """Returns a HTTPFound(), takes a request and then urlgen params"""
145 if kwargs
.get('querystring'):
146 querystring
= kwargs
.get('querystring')
147 del kwargs
['querystring']
149 return exc
.HTTPFound(
151 request
.urlgen(*args
, **kwargs
),
152 querystring
if querystring
else '']))
155 def setup_user_in_request(request
):
157 Examine a request and tack on a request.user parameter if that's
160 if not request
.session
.has_key('user_id'):
165 user
= request
.app
.db
.User
.one(
166 {'_id': ObjectId(request
.session
['user_id'])})
169 # Something's wrong... this user doesn't exist? Invalidate
171 request
.session
.invalidate()
176 def import_component(import_string
):
178 Import a module component defined by STRING. Probably a method,
179 class, or global variable.
182 - import_string: a string that defines what to import. Written
183 in the format of "module1.module2:component"
185 module_name
, func_name
= import_string
.split(':', 1)
186 __import__(module_name
)
187 module
= sys
.modules
[module_name
]
188 func
= getattr(module
, func_name
)
191 _punct_re
= re
.compile(r
'[\t !"#$%&\'()*\
-/<=>?
@\
[\\\
]^_`
{|
},.]+')
193 def slugify(text, delim=u'-'):
195 Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/
198 for word in _punct_re.split(text.lower()):
199 word = word.encode('translit
/long')
202 return unicode(delim.join(result))
204 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
205 ### Special email test stuff begins HERE
206 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
208 # We have two "test inboxes" here:
212 # If you're writing test views
, you
'll probably want to check this.
213 # It contains a list of MIMEText messages.
215 # EMAIL_TEST_MBOX_INBOX:
216 # ----------------------
217 # This collects the messages from the FakeMhost inbox. It's reslly
218 # just here for testing the send_email method itself.
220 # Anyway this contains:
222 # - to: a list of email recipient addresses
223 # - message: not just the body, but the whole message, including
228 # Before running tests that call functions which send email, you should
229 # always call _clear_test_inboxes() to "wipe" the inboxes clean.
231 EMAIL_TEST_INBOX
= []
232 EMAIL_TEST_MBOX_INBOX
= []
235 class FakeMhost(object):
237 Just a fake mail host so we can capture and test messages
243 def sendmail(self
, from_addr
, to_addrs
, message
):
244 EMAIL_TEST_MBOX_INBOX
.append(
249 def _clear_test_inboxes():
250 global EMAIL_TEST_INBOX
251 global EMAIL_TEST_MBOX_INBOX
252 EMAIL_TEST_INBOX
= []
253 EMAIL_TEST_MBOX_INBOX
= []
255 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
256 ### </Special email test stuff>
257 ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
259 def send_email(from_addr
, to_addrs
, subject
, message_body
):
261 Simple email sending wrapper, use this so we can capture messages
262 for unit testing purposes.
265 - from_addr: address you're sending the email from
266 - to_addrs: list of recipient email addresses
267 - subject: subject of the email
268 - message_body: email body text
270 # TODO: make a mock mhost if testing is enabled
271 if TESTS_ENABLED
or mg_globals
.app_config
['email_debug_mode']:
273 elif not mg_globals
.app_config
['email_debug_mode']:
274 mhost
= smtplib
.SMTP()
278 message
= MIMEText(message_body
.encode('utf-8'), 'plain', 'utf-8')
279 message
['Subject'] = subject
280 message
['From'] = from_addr
281 message
['To'] = ', '.join(to_addrs
)
284 EMAIL_TEST_INBOX
.append(message
)
286 if mg_globals
.app_config
['email_debug_mode']:
287 print u
"===== Email ====="
288 print u
"From address: %s" % message
['From']
289 print u
"To addresses: %s" % message
['To']
290 print u
"Subject: %s" % message
['Subject']
292 print message
.get_payload(decode
=True)
294 return mhost
.sendmail(from_addr
, to_addrs
, message
.as_string())
302 TRANSLATIONS_PATH
= pkg_resources
.resource_filename(
303 'mediagoblin', 'translations')
306 def locale_to_lower_upper(locale
):
308 Take a locale, regardless of style, and format it like "en-us"
311 lang
, country
= locale
.split('-', 1)
312 return '%s_%s' % (lang
.lower(), country
.upper())
314 lang
, country
= locale
.split('_', 1)
315 return '%s_%s' % (lang
.lower(), country
.upper())
317 return locale
.lower()
320 def locale_to_lower_lower(locale
):
322 Take a locale, regardless of style, and format it like "en_US"
325 lang
, country
= locale
.split('_', 1)
326 return '%s-%s' % (lang
.lower(), country
.lower())
328 return locale
.lower()
331 def get_locale_from_request(request
):
333 Figure out what target language is most appropriate based on the
336 request_form
= request
.GET
or request
.POST
338 if request_form
.has_key('lang'):
339 return locale_to_lower_upper(request_form
['lang'])
341 accept_lang_matches
= request
.accept_language
.best_matches()
343 # Your routing can explicitly specify a target language
344 if request
.matchdict
.has_key('locale'):
345 target_lang
= request
.matchdict
['locale']
346 elif request
.session
.has_key('target_lang'):
347 target_lang
= request
.session
['target_lang']
348 # Pull the first acceptable language
349 elif accept_lang_matches
:
350 target_lang
= accept_lang_matches
[0]
351 # Fall back to English
355 return locale_to_lower_upper(target_lang
)
358 # A super strict version of the lxml.html cleaner class
359 HTML_CLEANER
= Cleaner(
366 processing_instructions
=True,
372 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
373 remove_unknown_tags
=False, # can't be used with allow_tags
374 safe_attrs_only
=True,
375 add_nofollow
=True, # for now
377 whitelist_tags
=set([]))
380 def clean_html(html
):
381 # clean_html barfs on an empty string
385 return HTML_CLEANER
.clean_html(html
)
388 def convert_to_tag_list_of_dicts(tag_string
):
390 Filter input from incoming string containing user tags,
392 Strips trailing, leading, and internal whitespace, and also converts
393 the "tags" text into an array of tags
398 # Strip out internal, trailing, and leading whitespace
399 stripped_tag_string
= u
' '.join(tag_string
.strip().split())
401 # Split the tag string into a list of tags
402 for tag
in stripped_tag_string
.split(
403 mg_globals
.app_config
['tags_delimiter']):
405 # Ignore empty or duplicate tags
406 if tag
.strip() and tag
.strip() not in [t
['name'] for t
in taglist
]:
408 if mg_globals
.app_config
['tags_case_sensitive']:
409 taglist
.append({'name': tag
.strip(),
410 'slug': slugify(tag
.strip())})
412 taglist
.append({'name': tag
.strip().lower(),
413 'slug': slugify(tag
.strip().lower())})
417 def media_tags_as_string(media_entry_tags
):
419 Generate a string from a media item's tags, stored as a list of dicts
421 This is the opposite of convert_to_tag_list_of_dicts
423 media_tag_string
= ''
425 media_tag_string
= mg_globals
.app_config
['tags_delimiter'].join(
426 [tag
['name'] for tag
in media_entry_tags
])
427 return media_tag_string
429 TOO_LONG_TAG_WARNING
= \
430 u
'Tags must be shorter than %s characters. Tags that are too long: %s'
432 def tag_length_validator(form
, field
):
434 Make sure tags do not exceed the maximum tag length.
436 tags
= convert_to_tag_list_of_dicts(field
.data
)
438 tag
['name'] for tag
in tags
439 if len(tag
['name']) > mg_globals
.app_config
['tags_max_length']]
442 raise wtforms
.ValidationError(
443 TOO_LONG_TAG_WARNING
% (mg_globals
.app_config
['tags_max_length'], \
444 ', '.join(too_long_tags
)))
447 MARKDOWN_INSTANCE
= markdown
.Markdown(safe_mode
='escape')
449 def cleaned_markdown_conversion(text
):
451 Take a block of text, run it through MarkDown, and clean its HTML.
453 # Markdown will do nothing with and clean_html can do nothing with
458 return clean_html(MARKDOWN_INSTANCE
.convert(text
))
463 def setup_gettext(locale
):
465 Setup the gettext instance based on this locale
467 # Later on when we have plugins we may want to enable the
468 # multi-translations system they have so we can handle plugin
471 # TODO: fallback nicely on translations from pt_PT to pt if not
473 if SETUP_GETTEXTS
.has_key(locale
):
474 this_gettext
= SETUP_GETTEXTS
[locale
]
476 this_gettext
= gettext
.translation(
477 'mediagoblin', TRANSLATIONS_PATH
, [locale
], fallback
=True)
479 SETUP_GETTEXTS
[locale
] = this_gettext
481 mg_globals
.setup_globals(
482 translations
=this_gettext
)
485 PAGINATION_DEFAULT_PER_PAGE
= 30
487 class Pagination(object):
489 Pagination class for mongodb queries.
491 Initialization through __init__(self, cursor, page=1, per_page=2),
492 get actual data slice through __call__().
495 def __init__(self
, page
, cursor
, per_page
=PAGINATION_DEFAULT_PER_PAGE
,
498 Initializes Pagination
501 - page: requested page
502 - per_page: number of objects per page
504 - jump_to_id: ObjectId, sets the page to the page containing the object
505 with _id == jump_to_id.
508 self
.per_page
= per_page
510 self
.total_count
= self
.cursor
.count()
511 self
.active_id
= None
514 cursor
= copy
.copy(self
.cursor
)
516 for (doc
, increment
) in izip(cursor
, count(0)):
517 if doc
['_id'] == jump_to_id
:
518 self
.page
= 1 + int(floor(increment
/ self
.per_page
))
520 self
.active_id
= jump_to_id
526 Returns slice of objects for the requested page
528 return self
.cursor
.skip(
529 (self
.page
- 1) * self
.per_page
).limit(self
.per_page
)
533 return int(ceil(self
.total_count
/ float(self
.per_page
)))
541 return self
.page
< self
.pages
543 def iter_pages(self
, left_edge
=2, left_current
=2,
544 right_current
=5, right_edge
=2):
546 for num
in xrange(1, self
.pages
+ 1):
547 if num
<= left_edge
or \
548 (num
> self
.page
- left_current
- 1 and \
549 num
< self
.page
+ right_current
) or \
550 num
> self
.pages
- right_edge
:
556 def get_page_url_explicit(self
, base_url
, get_params
, page_no
):
558 Get a page url by adding a page= parameter to the base url
560 new_get_params
= copy
.copy(get_params
or {})
561 new_get_params
['page'] = page_no
563 base_url
, urllib
.urlencode(new_get_params
))
565 def get_page_url(self
, request
, page_no
):
567 Get a new page url based of the request, and the new page number.
569 This is a nice wrapper around get_page_url_explicit()
571 return self
.get_page_url_explicit(
572 request
.path_info
, request
.GET
, page_no
)