Merge remote branch 'origin/master' into bug261-resized-filenames
authorBrett Smith <brettcsmith@brettcsmith.org>
Mon, 26 Mar 2012 18:10:22 +0000 (14:10 -0400)
committerBrett Smith <brettcsmith@brettcsmith.org>
Mon, 26 Mar 2012 18:10:22 +0000 (14:10 -0400)
This merge involved moving the new FilenameBuilder class to
processing/__init__.py, and putting the comment deletion tests back into
test_submission.py using the refactored functions.

1  2 
mediagoblin/media_types/image/processing.py
mediagoblin/processing/__init__.py
mediagoblin/tests/test_submission.py

index 718351d59159f82ebe35ec13e5ef1505c9b3b227,ecdfa8a9473e57afc38ee5cb687df42a1735a6a5..9dee3baa465b23d141c676564539e7c319bc0ad2
  # along with this program.  If not, see <http://www.gnu.org/licenses/>.
  
  import logging
 +import os
  
- from celery.task import Task
- from mediagoblin.db.util import ObjectId, atomic_update
+ from mediagoblin.db.util import atomic_update
  from mediagoblin import mg_globals as mgg
  
  from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
@@@ -38,91 -33,7 +34,38 @@@ def create_pub_filepath(entry, filename
               unicode(entry._id),
               filename])
  
- ################################
- # Media processing initial steps
- ################################
 +class FilenameBuilder(object):
 +    """Easily slice and dice filenames.
 +
 +    Initialize this class with an original file path, then use the fill()
 +    method to create new filenames based on the original.
 +
 +    """
 +    MAX_FILENAME_LENGTH = 255  # VFAT's maximum filename length
 +
 +    def __init__(self, path):
 +        """Initialize a builder from an original file path."""
 +        self.dirpath, self.basename = os.path.split(path)
 +        self.basename, self.ext = os.path.splitext(self.basename)
 +        self.ext = self.ext.lower()
 +
 +    def fill(self, fmtstr):
 +        """Build a new filename based on the original.
 +
 +        The fmtstr argument can include the following:
 +        {basename} -- the original basename, with the extension removed
 +        {ext} -- the original extension, always lowercase
 +
 +        If necessary, {basename} will be truncated so the filename does not
 +        exceed this class' MAX_FILENAME_LENGTH in length.
 +
 +        """
 +        basename_len = (self.MAX_FILENAME_LENGTH -
 +                        len(fmtstr.format(basename='', ext=self.ext)))
 +        return fmtstr.format(basename=self.basename[:basename_len],
 +                             ext=self.ext)
 +
  
- class ProcessMedia(Task):
-     """
-     DEPRECATED -- This now resides in the individual media plugins
-     Pass this entry off for processing.
-     """
-     def run(self, media_id):
-         """
-         Pass the media entry off to the appropriate processing function
-         (for now just process_image...)
-         """
-         entry = mgg.database.MediaEntry.one(
-             {'_id': ObjectId(media_id)})
-         # Try to process, and handle expected errors.
-         try:
-             #__import__(entry.media_type)
-             manager = get_media_manager(entry.media_type)
-             _log.debug('Processing {0}'.format(entry))
-             manager['processor'](entry)
-         except BaseProcessingFail, exc:
-             mark_entry_failed(entry._id, exc)
-             return
-         except ImportError, exc:
-             _log.error(
-                 'Entry {0} failed to process due to an import error: {1}'\
-                     .format(
-                     entry.title,
-                     exc))
-             mark_entry_failed(entry._id, exc)
-         entry.state = u'processed'
-         entry.save()
-     def on_failure(self, exc, task_id, args, kwargs, einfo):
-         """
-         If the processing failed we should mark that in the database.
-         Assuming that the exception raised is a subclass of
-         BaseProcessingFail, we can use that to get more information
-         about the failure and store that for conveying information to
-         users about the failure, etc.
-         """
-         entry_id = args[0]
-         mark_entry_failed(entry_id, exc)
  def mark_entry_failed(entry_id, exc):
      """
      Mark a media entry as having failed in its conversion.
index 8e30d9a2264a83a1672802615d06a96dc30322ed,1f56779e4cae0c202d24a65756691d0202a9ab54..ba80ba2086d65afb06eeefc278da61528676b6fb
@@@ -65,21 -61,6 +65,25 @@@ class TestSubmission
      def logout(self):
          self.test_app.get('/auth/logout/')
  
 +    def do_post(self, data, *context_keys, **kwargs):
 +        url = kwargs.pop('url', '/submit/')
 +        do_follow = kwargs.pop('do_follow', False)
 +        template.clear_test_template_context()
 +        response = self.test_app.post(url, data, **kwargs)
 +        if do_follow:
 +            response.follow()
 +        context_data = template.TEMPLATE_TEST_CONTEXT
 +        for key in context_keys:
 +            context_data = context_data[key]
 +        return response, context_data
 +        
 +    def upload_data(self, filename):
 +        return {'upload_files': [('file', filename)]}
 +
++    def check_comments(self, request, media, count):
++        comments = request.db.MediaComment.find({'media_entry': media._id})
++        assert_equal(count, len(list(comments)))
++
      def test_missing_fields(self):
          # Test blank form
          # ---------------
  
          # Test tags that are too long
          # ---------------
 -        template.clear_test_template_context()
 -        response = self.test_app.post(
 -            '/submit/', {
 -                'title': 'Balanced Goblin',
 -                'tags': BAD_TAG_STRING
 -                }, upload_files=[(
 -                    'file', GOOD_JPG)])
 -
 -        # Too long error should be raised
 -        context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html']
 -        form = context['submit_form']
 -        assert form.tags.errors == [
 -            u'Tags must be shorter than 50 characters.  Tags that are too long'\
 -             ': ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu']
 +        response, form = self.do_post({'title': 'Balanced Goblin',
 +                                       'tags': BAD_TAG_STRING},
 +                                      *FORM_CONTEXT,
 +                                      **self.upload_data(GOOD_JPG))
 +        assert_equal(form.tags.errors, [
 +                u'Tags must be shorter than 50 characters.  ' \
 +                    'Tags that are too long: ' \
 +                    'ffffffffffffffffffffffffffuuuuuuuuuuuuuuuuuuuuuuuuuu'])
  
      def test_delete(self):
 -        template.clear_test_template_context()
 -        response = self.test_app.post(
 -            '/submit/', {
 -                'title': 'Balanced Goblin',
 -                }, upload_files=[(
 -                    'file', GOOD_JPG)])
 -
 -        # Post image
 -        response.follow()
 -
 -        request = template.TEMPLATE_TEST_CONTEXT[
 -            'mediagoblin/user_pages/user.html']['request']
 -
 -        media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0]
 -
 -        # Does media entry exist?
 -        assert_true(media)
 +        response, request = self.do_post({'title': 'Balanced Goblin'},
 +                                         *REQUEST_CONTEXT, do_follow=True,
 +                                         **self.upload_data(GOOD_JPG))
 +        media = self.check_media(request, {'title': 'Balanced Goblin'}, 1)
  
 -        get_comments = lambda: list(
 -            request.db.MediaComment.find({'media_entry': media._id}))
 -        assert_false(get_comments())
 -        response = self.test_app.post(
 -            request.urlgen('mediagoblin.user_pages.media_post_comment',
 -                           user=self.test_user.username,
 -                           media=media._id),
 -            {'comment_content': 'i love this test'})
 -        response.follow()
 -        assert_true(get_comments())
+         # Add a comment, so we can test for its deletion later.
++        self.check_comments(request, media, 0)
++        comment_url = request.urlgen(
++            'mediagoblin.user_pages.media_post_comment',
++            user=self.test_user.username, media=media._id)
++        response = self.do_post({'comment_content': 'i love this test'},
++                                url=comment_url, do_follow=True)[0]
++        self.check_comments(request, media, 1)
          # Do not confirm deletion
          # ---------------------------------------------------
 -        response = self.test_app.post(
 -            request.urlgen('mediagoblin.user_pages.media_confirm_delete',
 -                           # No work: user=media.uploader().username,
 -                           user=self.test_user.username,
 -                           media=media._id),
 -            # no value means no confirm
 -            {})
 -
 -        response.follow()
 -
 -        request = template.TEMPLATE_TEST_CONTEXT[
 -            'mediagoblin/user_pages/user.html']['request']
 -
 -        media = request.db.MediaEntry.find({'title': 'Balanced Goblin'})[0]
 -
 -        # Does media entry still exist?
 -        assert_true(media)
 +        delete_url = request.urlgen(
 +            'mediagoblin.user_pages.media_confirm_delete',
 +            user=self.test_user.username, media=media._id)
 +        # Empty data means don't confirm
 +        response = self.do_post({}, do_follow=True, url=delete_url)[0]
 +        media = self.check_media(request, {'title': 'Balanced Goblin'}, 1)
  
          # Confirm deletion
          # ---------------------------------------------------
 -        response = self.test_app.post(
 -            request.urlgen('mediagoblin.user_pages.media_confirm_delete',
 -                           # No work: user=media.uploader().username,
 -                           user=self.test_user.username,
 -                           media=media._id),
 -            {'confirm': 'y'})
 -
 -        response.follow()
 -
 -        request = template.TEMPLATE_TEST_CONTEXT[
 -            'mediagoblin/user_pages/user.html']['request']
 +        response, request = self.do_post({'confirm': 'y'}, *REQUEST_CONTEXT,
 +                                         do_follow=True, url=delete_url)
 +        self.check_media(request, {'_id': media._id}, 0)
++        self.check_comments(request, media, 0)
  
 -        # Does media entry still exist?
 -        assert_false(
 -            request.db.MediaEntry.find(
 -                {'_id': media._id}).count())
 -
 -        # How about the comment?
 -        assert_false(get_comments())
 -
 -    def test_malicious_uploads(self):
 +    def test_evil_file(self):
          # Test non-suppoerted file with non-supported extension
          # -----------------------------------------------------
 -        template.clear_test_template_context()
 -        response = self.test_app.post(
 -            '/submit/', {
 -                'title': 'Malicious Upload 1'
 -                }, upload_files=[(
 -                    'file', EVIL_FILE)])
 -
 -        context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/submit/start.html']
 -        form = context['submit_form']
 -        assert re.match(r'^Could not extract any file extension from ".*?"$', str(form.file.errors[0]))
 -        assert len(form.file.errors) == 1
 -
 -        # NOTE: The following 2 tests will ultimately fail, but they
 +        response, form = self.do_post({'title': 'Malicious Upload 1'},
 +                                      *FORM_CONTEXT,
 +                                      **self.upload_data(EVIL_FILE))
 +        assert_equal(len(form.file.errors), 1)
 +        assert_true(re.match(
 +                r'^Could not extract any file extension from ".*?"$',
 +                str(form.file.errors[0])))
 +
 +    def check_false_image(self, title, filename):
 +        # NOTE: These images should ultimately fail, but they
          #   *will* pass the initial form submission step.  Instead,
          #   they'll be caught as failures during the processing step.
 +        response, context = self.do_post({'title': title}, do_follow=True,
 +                                         **self.upload_data(filename))
 +        self.check_url(response, '/u/{0}/'.format(self.test_user.username))
 +        entry = mg_globals.database.MediaEntry.find_one({'title': title})
 +        assert_equal(entry.state, 'failed')
 +        assert_equal(entry.fail_error, u'mediagoblin.processing:BadMediaFail')
  
 +    def test_evil_jpg(self):
          # Test non-supported file with .jpg extension
          # -------------------------------------------
 -        template.clear_test_template_context()
 -        response = self.test_app.post(
 -           '/submit/', {
 -               'title': 'Malicious Upload 2'
 -               }, upload_files=[(
 -                   'file', EVIL_JPG)])
 -        response.follow()
 -        assert_equal(
 -            urlparse.urlsplit(response.location)[2],
 -            '/u/chris/')
 -
 -        entry = mg_globals.database.MediaEntry.find_one(
 -            {'title': 'Malicious Upload 2'})
 -        assert_equal(entry.state, 'failed')
 -        assert_equal(
 -            entry.fail_error,
 -            u'mediagoblin.processing:BadMediaFail')
 +        self.check_false_image('Malicious Upload 2', EVIL_JPG)
  
 +    def test_evil_png(self):
          # Test non-supported file with .png extension
          # -------------------------------------------
 -        template.clear_test_template_context()
 -        response = self.test_app.post(
 -           '/submit/', {
 -               'title': 'Malicious Upload 3'
 -               }, upload_files=[(
 -                   'file', EVIL_PNG)])
 -        response.follow()
 -        assert_equal(
 -            urlparse.urlsplit(response.location)[2],
 -            '/u/chris/')
 -
 -        entry = mg_globals.database.MediaEntry.find_one(
 -            {'title': 'Malicious Upload 3'})
 -        assert_equal(entry.state, 'failed')
 -        assert_equal(
 -            entry.fail_error,
 -            u'mediagoblin.processing:BadMediaFail')
 +        self.check_false_image('Malicious Upload 3', EVIL_PNG)
 +
 +    def test_processing(self):
 +        data = {'title': 'Big Blue'}
 +        response, request = self.do_post(data, *REQUEST_CONTEXT, do_follow=True,
 +                                         **self.upload_data(BIG_BLUE))
 +        media = self.check_media(request, data, 1)
 +        last_size = 1024 ** 3  # Needs to be larger than bigblue.png
 +        for key, basename in (('original', 'bigblue.png'),
 +                              ('medium', 'bigblue.medium.png'),
 +                              ('thumb', 'bigblue.thumbnail.png')):
 +            # Does the processed image have a good filename?
 +            filename = resource_filename(
 +                'mediagoblin.tests',
 +                os.path.join('test_user_dev/media/public',
 +                             *media['media_files'].get(key, [])))
 +            assert_true(filename.endswith('_' + basename))
 +            # Is it smaller than the last processed image we looked at?
 +            size = os.stat(filename).st_size
 +            assert_true(last_size > size)
 +            last_size = size