X-Git-Url: https://vcs.fsf.org/?a=blobdiff_plain;f=mediagoblin%2Futil.py;h=c9f4a0ac72a48d5264b3eb692ca93cd4fe75a7e9;hb=1bcf30faa9af876d1a2ccd442d7a96d39582d0f9;hp=8c6ec6ccec3049f729ac03bfe4b169c21beb7bbc;hpb=b77eec653df14059296fc3185ff9817edfa0825b;p=mediagoblin.git diff --git a/mediagoblin/util.py b/mediagoblin/util.py index 8c6ec6cc..c9f4a0ac 100644 --- a/mediagoblin/util.py +++ b/mediagoblin/util.py @@ -14,17 +14,33 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from __future__ import division + from email.MIMEText import MIMEText import gettext import pkg_resources import smtplib import sys +import re +import urllib +from math import ceil, floor +import copy +import wtforms +from babel.localedata import exists import jinja2 -import mongokit +import translitcodec +from webob import Response, exc +from lxml.html.clean import Cleaner +import markdown + +from mediagoblin import mg_globals +from mediagoblin import messages +from mediagoblin.db.util import ObjectId -from mediagoblin import globals as mgoblin_globals +from itertools import izip, count +DISPLAY_IMAGE_FETCHING_ORDER = [u'medium', u'original', u'thumb'] TESTS_ENABLED = False def _activate_testing(): @@ -35,20 +51,27 @@ def _activate_testing(): TESTS_ENABLED = True -def get_jinja_loader(user_template_path=None): +def clear_test_buckets(): """ - Set up the Jinja template loaders, possibly allowing for user - overridden templates. + We store some things for testing purposes that should be cleared + when we want a "clean slate" of information for our next round of + tests. Call this function to wipe all that stuff clean. - (In the future we may have another system for providing theming; - for now this is good enough.) + Also wipes out some other things we might redefine during testing, + like the jinja envs. """ - if user_template_path: - return jinja2.ChoiceLoader( - [jinja2.FileSystemLoader(user_template_path), - jinja2.PackageLoader('mediagoblin', 'templates')]) - else: - return jinja2.PackageLoader('mediagoblin', 'templates') + global SETUP_JINJA_ENVS + SETUP_JINJA_ENVS = {} + + global EMAIL_TEST_INBOX + global EMAIL_TEST_MBOX_INBOX + EMAIL_TEST_INBOX = [] + EMAIL_TEST_MBOX_INBOX = [] + + clear_test_template_context() + + +SETUP_JINJA_ENVS = {} def get_jinja_env(template_loader, locale): @@ -60,17 +83,75 @@ def get_jinja_env(template_loader, locale): """ setup_gettext(locale) + # If we have a jinja environment set up with this locale, just + # return that one. + if SETUP_JINJA_ENVS.has_key(locale): + return SETUP_JINJA_ENVS[locale] + template_env = jinja2.Environment( loader=template_loader, autoescape=True, - extensions=['jinja2.ext.i18n']) + extensions=['jinja2.ext.i18n', 'jinja2.ext.autoescape']) template_env.install_gettext_callables( - mgoblin_globals.translations.gettext, - mgoblin_globals.translations.ngettext) + mg_globals.translations.gettext, + mg_globals.translations.ngettext) + + # All templates will know how to ... + # ... fetch all waiting messages and remove them from the queue + template_env.globals['fetch_messages'] = messages.fetch_messages + + if exists(locale): + SETUP_JINJA_ENVS[locale] = template_env return template_env +# We'll store context information here when doing unit tests +TEMPLATE_TEST_CONTEXT = {} + + +def render_template(request, template_path, context): + """ + Render a template with context. + + Always inserts the request into the context, so you don't have to. + Also stores the context if we're doing unit tests. Helpful! + """ + template = request.template_env.get_template( + template_path) + context['request'] = request + rendered = template.render(context) + + if TESTS_ENABLED: + TEMPLATE_TEST_CONTEXT[template_path] = context + + return rendered + + +def clear_test_template_context(): + global TEMPLATE_TEST_CONTEXT + TEMPLATE_TEST_CONTEXT = {} + + +def render_to_response(request, template, context): + """Much like Django's shortcut.render()""" + return Response(render_template(request, template, context)) + + +def redirect(request, *args, **kwargs): + """Returns a HTTPFound(), takes a request and then urlgen params""" + + querystring = None + if kwargs.get('querystring'): + querystring = kwargs.get('querystring') + del kwargs['querystring'] + + return exc.HTTPFound( + location=''.join([ + request.urlgen(*args, **kwargs), + querystring if querystring else ''])) + + def setup_user_in_request(request): """ Examine a request and tack on a request.user parameter if that's @@ -82,7 +163,7 @@ def setup_user_in_request(request): user = None user = request.app.db.User.one( - {'_id': mongokit.ObjectId(request.session['user_id'])}) + {'_id': ObjectId(request.session['user_id'])}) if not user: # Something's wrong... this user doesn't exist? Invalidate @@ -107,6 +188,18 @@ def import_component(import_string): func = getattr(module, func_name) return func +_punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+') + +def slugify(text, delim=u'-'): + """ + Generates an ASCII-only slug. Taken from http://flask.pocoo.org/snippets/5/ + """ + result = [] + for word in _punct_re.split(text.lower()): + word = word.encode('translit/long') + if word: + result.append(word) + return unicode(delim.join(result)) ### ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ### Special email test stuff begins HERE @@ -175,9 +268,9 @@ def send_email(from_addr, to_addrs, subject, message_body): - message_body: email body text """ # TODO: make a mock mhost if testing is enabled - if TESTS_ENABLED or mgoblin_globals.email_debug_mode: + if TESTS_ENABLED or mg_globals.app_config['email_debug_mode']: mhost = FakeMhost() - elif not mgoblin_globals.email_debug_mode: + elif not mg_globals.app_config['email_debug_mode']: mhost = smtplib.SMTP() mhost.connect() @@ -190,7 +283,7 @@ def send_email(from_addr, to_addrs, subject, message_body): if TESTS_ENABLED: EMAIL_TEST_INBOX.append(message) - if getattr(mgoblin_globals, 'email_debug_mode', False): + if mg_globals.app_config['email_debug_mode']: print u"===== Email =====" print u"From address: %s" % message['From'] print u"To addresses: %s" % message['To'] @@ -207,7 +300,7 @@ def send_email(from_addr, to_addrs, subject, message_body): TRANSLATIONS_PATH = pkg_resources.resource_filename( - 'mediagoblin', 'translations') + 'mediagoblin', 'i18n') def locale_to_lower_upper(locale): @@ -248,8 +341,8 @@ def get_locale_from_request(request): accept_lang_matches = request.accept_language.best_matches() # Your routing can explicitly specify a target language - if request.matchdict.has_key('target_lang'): - target_lang = request.matchdict['target_lang'] + if request.matchdict.has_key('locale'): + target_lang = request.matchdict['locale'] elif request.session.has_key('target_lang'): target_lang = request.session['target_lang'] # Pull the first acceptable language @@ -262,6 +355,107 @@ def get_locale_from_request(request): return locale_to_lower_upper(target_lang) +# A super strict version of the lxml.html cleaner class +HTML_CLEANER = Cleaner( + scripts=True, + javascript=True, + comments=True, + style=True, + links=True, + page_structure=True, + processing_instructions=True, + embedded=True, + frames=True, + forms=True, + annoying_tags=True, + allow_tags=[ + 'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'], + remove_unknown_tags=False, # can't be used with allow_tags + safe_attrs_only=True, + add_nofollow=True, # for now + host_whitelist=(), + whitelist_tags=set([])) + + +def clean_html(html): + # clean_html barfs on an empty string + if not html: + return u'' + + return HTML_CLEANER.clean_html(html) + + +def convert_to_tag_list_of_dicts(tag_string): + """ + Filter input from incoming string containing user tags, + + Strips trailing, leading, and internal whitespace, and also converts + the "tags" text into an array of tags + """ + taglist = [] + if tag_string: + + # Strip out internal, trailing, and leading whitespace + stripped_tag_string = u' '.join(tag_string.strip().split()) + + # Split the tag string into a list of tags + for tag in stripped_tag_string.split( + mg_globals.app_config['tags_delimiter']): + + # Ignore empty or duplicate tags + if tag.strip() and tag.strip() not in [t['name'] for t in taglist]: + + taglist.append({'name': tag.strip(), + 'slug': slugify(tag.strip())}) + return taglist + + +def media_tags_as_string(media_entry_tags): + """ + Generate a string from a media item's tags, stored as a list of dicts + + This is the opposite of convert_to_tag_list_of_dicts + """ + media_tag_string = '' + if media_entry_tags: + media_tag_string = mg_globals.app_config['tags_delimiter'].join( + [tag['name'] for tag in media_entry_tags]) + return media_tag_string + +TOO_LONG_TAG_WARNING = \ + u'Tags must be shorter than %s characters. Tags that are too long: %s' + +def tag_length_validator(form, field): + """ + Make sure tags do not exceed the maximum tag length. + """ + tags = convert_to_tag_list_of_dicts(field.data) + too_long_tags = [ + tag['name'] for tag in tags + if len(tag['name']) > mg_globals.app_config['tags_max_length']] + + if too_long_tags: + raise wtforms.ValidationError( + TOO_LONG_TAG_WARNING % (mg_globals.app_config['tags_max_length'], \ + ', '.join(too_long_tags))) + + +MARKDOWN_INSTANCE = markdown.Markdown(safe_mode='escape') + +def cleaned_markdown_conversion(text): + """ + Take a block of text, run it through MarkDown, and clean its HTML. + """ + # Markdown will do nothing with and clean_html can do nothing with + # an empty string :) + if not text: + return u'' + + return clean_html(MARKDOWN_INSTANCE.convert(text)) + + +SETUP_GETTEXTS = {} + def setup_gettext(locale): """ Setup the gettext instance based on this locale @@ -272,8 +466,103 @@ def setup_gettext(locale): # TODO: fallback nicely on translations from pt_PT to pt if not # available, etc. - this_gettext = gettext.translation( - 'mediagoblin', TRANSLATIONS_PATH, [locale], fallback=True) + if SETUP_GETTEXTS.has_key(locale): + this_gettext = SETUP_GETTEXTS[locale] + else: + this_gettext = gettext.translation( + 'mediagoblin', TRANSLATIONS_PATH, [locale], fallback=True) + if exists(locale): + SETUP_GETTEXTS[locale] = this_gettext - mgoblin_globals.setup_globals( + mg_globals.setup_globals( translations=this_gettext) + + +PAGINATION_DEFAULT_PER_PAGE = 30 + +class Pagination(object): + """ + Pagination class for mongodb queries. + + Initialization through __init__(self, cursor, page=1, per_page=2), + get actual data slice through __call__(). + """ + + def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE, + jump_to_id=False): + """ + Initializes Pagination + + Args: + - page: requested page + - per_page: number of objects per page + - cursor: db cursor + - jump_to_id: ObjectId, sets the page to the page containing the object + with _id == jump_to_id. + """ + self.page = page + self.per_page = per_page + self.cursor = cursor + self.total_count = self.cursor.count() + self.active_id = None + + if jump_to_id: + cursor = copy.copy(self.cursor) + + for (doc, increment) in izip(cursor, count(0)): + if doc['_id'] == jump_to_id: + self.page = 1 + int(floor(increment / self.per_page)) + + self.active_id = jump_to_id + break + + + def __call__(self): + """ + Returns slice of objects for the requested page + """ + return self.cursor.skip( + (self.page - 1) * self.per_page).limit(self.per_page) + + @property + def pages(self): + return int(ceil(self.total_count / float(self.per_page))) + + @property + def has_prev(self): + return self.page > 1 + + @property + def has_next(self): + return self.page < self.pages + + def iter_pages(self, left_edge=2, left_current=2, + right_current=5, right_edge=2): + last = 0 + for num in xrange(1, self.pages + 1): + if num <= left_edge or \ + (num > self.page - left_current - 1 and \ + num < self.page + right_current) or \ + num > self.pages - right_edge: + if last + 1 != num: + yield None + yield num + last = num + + def get_page_url_explicit(self, base_url, get_params, page_no): + """ + Get a page url by adding a page= parameter to the base url + """ + new_get_params = copy.copy(get_params or {}) + new_get_params['page'] = page_no + return "%s?%s" % ( + base_url, urllib.urlencode(new_get_params)) + + def get_page_url(self, request, page_no): + """ + Get a new page url based of the request, and the new page number. + + This is a nice wrapper around get_page_url_explicit() + """ + return self.get_page_url_explicit( + request.path_info, request.GET, page_no)