Merge remote branch 'remotes/elrond/dev/mount_storage'
[mediagoblin.git] / mediagoblin / util.py
index b675662e4ac8a6f6d3aebcceade77c4bf91abeed..c9f4a0ac72a48d5264b3eb692ca93cd4fe75a7e9 100644 (file)
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+from __future__ import division
+
 from email.MIMEText import MIMEText
 import gettext
 import pkg_resources
 import smtplib
-import os
 import sys
 import re
 import urllib
-from math import ceil
+from math import ceil, floor
 import copy
+import wtforms
 
 from babel.localedata import exists
 import jinja2
 import translitcodec
-from paste.deploy.loadwsgi import NicerConfigParser
+from webob import Response, exc
+from lxml.html.clean import Cleaner
+import markdown
 
-from mediagoblin import globals as mgoblin_globals
+from mediagoblin import mg_globals
+from mediagoblin import messages
 from mediagoblin.db.util import ObjectId
 
+from itertools import izip, count
+
+DISPLAY_IMAGE_FETCHING_ORDER = [u'medium', u'original', u'thumb']
 
 TESTS_ENABLED = False
 def _activate_testing():
@@ -43,20 +51,24 @@ def _activate_testing():
     TESTS_ENABLED = True
 
 
-def get_jinja_loader(user_template_path=None):
+def clear_test_buckets():
     """
-    Set up the Jinja template loaders, possibly allowing for user
-    overridden templates.
+    We store some things for testing purposes that should be cleared
+    when we want a "clean slate" of information for our next round of
+    tests.  Call this function to wipe all that stuff clean.
 
-    (In the future we may have another system for providing theming;
-    for now this is good enough.)
+    Also wipes out some other things we might redefine during testing,
+    like the jinja envs.
     """
-    if user_template_path:
-        return jinja2.ChoiceLoader(
-            [jinja2.FileSystemLoader(user_template_path),
-             jinja2.PackageLoader('mediagoblin', 'templates')])
-    else:
-        return jinja2.PackageLoader('mediagoblin', 'templates')
+    global SETUP_JINJA_ENVS
+    SETUP_JINJA_ENVS = {}
+
+    global EMAIL_TEST_INBOX
+    global EMAIL_TEST_MBOX_INBOX
+    EMAIL_TEST_INBOX = []
+    EMAIL_TEST_MBOX_INBOX = []
+
+    clear_test_template_context()
 
 
 SETUP_JINJA_ENVS = {}
@@ -78,11 +90,15 @@ def get_jinja_env(template_loader, locale):
 
     template_env = jinja2.Environment(
         loader=template_loader, autoescape=True,
-        extensions=['jinja2.ext.i18n'])
+        extensions=['jinja2.ext.i18n', 'jinja2.ext.autoescape'])
 
     template_env.install_gettext_callables(
-        mgoblin_globals.translations.gettext,
-        mgoblin_globals.translations.ngettext)
+        mg_globals.translations.gettext,
+        mg_globals.translations.ngettext)
+
+    # All templates will know how to ...
+    # ... fetch all waiting messages and remove them from the queue
+    template_env.globals['fetch_messages'] = messages.fetch_messages
 
     if exists(locale):
         SETUP_JINJA_ENVS[locale] = template_env
@@ -117,6 +133,25 @@ def clear_test_template_context():
     TEMPLATE_TEST_CONTEXT = {}
 
 
+def render_to_response(request, template, context):
+    """Much like Django's shortcut.render()"""
+    return Response(render_template(request, template, context))
+
+
+def redirect(request, *args, **kwargs):
+    """Returns a HTTPFound(), takes a request and then urlgen params"""
+    
+    querystring = None
+    if kwargs.get('querystring'):
+        querystring = kwargs.get('querystring')
+        del kwargs['querystring']
+
+    return exc.HTTPFound(
+        location=''.join([
+                request.urlgen(*args, **kwargs),
+                querystring if querystring else '']))
+
+
 def setup_user_in_request(request):
     """
     Examine a request and tack on a request.user parameter if that's
@@ -233,9 +268,9 @@ def send_email(from_addr, to_addrs, subject, message_body):
      - message_body: email body text
     """
     # TODO: make a mock mhost if testing is enabled
-    if TESTS_ENABLED or mgoblin_globals.email_debug_mode:
+    if TESTS_ENABLED or mg_globals.app_config['email_debug_mode']:
         mhost = FakeMhost()
-    elif not mgoblin_globals.email_debug_mode:
+    elif not mg_globals.app_config['email_debug_mode']:
         mhost = smtplib.SMTP()
 
     mhost.connect()
@@ -248,7 +283,7 @@ def send_email(from_addr, to_addrs, subject, message_body):
     if TESTS_ENABLED:
         EMAIL_TEST_INBOX.append(message)
 
-    if getattr(mgoblin_globals, 'email_debug_mode', False):
+    if mg_globals.app_config['email_debug_mode']:
         print u"===== Email ====="
         print u"From address: %s" % message['From']
         print u"To addresses: %s" % message['To']
@@ -265,7 +300,7 @@ def send_email(from_addr, to_addrs, subject, message_body):
 
 
 TRANSLATIONS_PATH = pkg_resources.resource_filename(
-    'mediagoblin', 'translations')
+    'mediagoblin', 'i18n')
 
 
 def locale_to_lower_upper(locale):
@@ -320,26 +355,103 @@ def get_locale_from_request(request):
     return locale_to_lower_upper(target_lang)
 
 
-def read_config_file(conf_file):
+# A super strict version of the lxml.html cleaner class
+HTML_CLEANER = Cleaner(
+    scripts=True,
+    javascript=True,
+    comments=True,
+    style=True,
+    links=True,
+    page_structure=True,
+    processing_instructions=True,
+    embedded=True,
+    frames=True,
+    forms=True,
+    annoying_tags=True,
+    allow_tags=[
+        'div', 'b', 'i', 'em', 'strong', 'p', 'ul', 'ol', 'li', 'a', 'br'],
+    remove_unknown_tags=False, # can't be used with allow_tags
+    safe_attrs_only=True,
+    add_nofollow=True, # for now
+    host_whitelist=(),
+    whitelist_tags=set([]))
+
+
+def clean_html(html):
+    # clean_html barfs on an empty string
+    if not html:
+        return u''
+
+    return HTML_CLEANER.clean_html(html)
+
+
+def convert_to_tag_list_of_dicts(tag_string):
     """
-    Read a paste deploy style config file and process it.
+    Filter input from incoming string containing user tags,
+
+    Strips trailing, leading, and internal whitespace, and also converts
+    the "tags" text into an array of tags
     """
-    if not os.path.exists(conf_file):
-        raise IOError(
-            "MEDIAGOBLIN_CONFIG not set or file does not exist")
+    taglist = []
+    if tag_string:
 
-    parser = NicerConfigParser(conf_file)
-    parser.read(conf_file)
-    parser._defaults.setdefault(
-        'here', os.path.dirname(os.path.abspath(conf_file)))
-    parser._defaults.setdefault(
-        '__file__', os.path.abspath(conf_file))
+        # Strip out internal, trailing, and leading whitespace
+        stripped_tag_string = u' '.join(tag_string.strip().split())
 
-    mgoblin_conf = dict(
-        [(section_name, dict(parser.items(section_name)))
-         for section_name in parser.sections()])
+        # Split the tag string into a list of tags
+        for tag in stripped_tag_string.split(
+                                       mg_globals.app_config['tags_delimiter']):
 
-    return mgoblin_conf
+            # Ignore empty or duplicate tags
+            if tag.strip() and tag.strip() not in [t['name'] for t in taglist]:
+
+                taglist.append({'name': tag.strip(),
+                                'slug': slugify(tag.strip())})
+    return taglist
+
+
+def media_tags_as_string(media_entry_tags):
+    """
+    Generate a string from a media item's tags, stored as a list of dicts
+
+    This is the opposite of convert_to_tag_list_of_dicts
+    """
+    media_tag_string = ''
+    if media_entry_tags:
+        media_tag_string = mg_globals.app_config['tags_delimiter'].join(
+                                      [tag['name'] for tag in media_entry_tags])
+    return media_tag_string
+
+TOO_LONG_TAG_WARNING = \
+    u'Tags must be shorter than %s characters.  Tags that are too long: %s'
+
+def tag_length_validator(form, field):
+    """
+    Make sure tags do not exceed the maximum tag length.
+    """
+    tags = convert_to_tag_list_of_dicts(field.data)
+    too_long_tags = [
+        tag['name'] for tag in tags
+        if len(tag['name']) > mg_globals.app_config['tags_max_length']]
+
+    if too_long_tags:
+        raise wtforms.ValidationError(
+            TOO_LONG_TAG_WARNING % (mg_globals.app_config['tags_max_length'], \
+                                    ', '.join(too_long_tags)))
+
+
+MARKDOWN_INSTANCE = markdown.Markdown(safe_mode='escape')
+
+def cleaned_markdown_conversion(text):
+    """
+    Take a block of text, run it through MarkDown, and clean its HTML.
+    """
+    # Markdown will do nothing with and clean_html can do nothing with
+    # an empty string :)
+    if not text:
+        return u''
+
+    return clean_html(MARKDOWN_INSTANCE.convert(text))
 
 
 SETUP_GETTEXTS = {}
@@ -362,7 +474,7 @@ def setup_gettext(locale):
         if exists(locale):
             SETUP_GETTEXTS[locale] = this_gettext
 
-    mgoblin_globals.setup_globals(
+    mg_globals.setup_globals(
         translations=this_gettext)
 
 
@@ -376,7 +488,8 @@ class Pagination(object):
     get actual data slice through __call__().
     """
 
-    def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE):
+    def __init__(self, page, cursor, per_page=PAGINATION_DEFAULT_PER_PAGE,
+                 jump_to_id=False):
         """
         Initializes Pagination
 
@@ -384,11 +497,25 @@ class Pagination(object):
          - page: requested page
          - per_page: number of objects per page
          - cursor: db cursor 
+         - jump_to_id: ObjectId, sets the page to the page containing the object
+           with _id == jump_to_id.
         """
-        self.page = page    
+        self.page = page
         self.per_page = per_page
         self.cursor = cursor
         self.total_count = self.cursor.count()
+        self.active_id = None
+
+        if jump_to_id:
+            cursor = copy.copy(self.cursor)
+
+            for (doc, increment) in izip(cursor, count(0)):
+                if doc['_id'] == jump_to_id:
+                    self.page = 1 + int(floor(increment / self.per_page))
+
+                    self.active_id = jump_to_id
+                    break
+
 
     def __call__(self):
         """