Merge remote-tracking branch 'refs/remotes/rodney757/reprocessing'
authorChristopher Allan Webber <cwebber@dustycloud.org>
Wed, 21 Aug 2013 17:39:31 +0000 (12:39 -0500)
committerChristopher Allan Webber <cwebber@dustycloud.org>
Wed, 21 Aug 2013 17:39:38 +0000 (12:39 -0500)
Conflicts:
mediagoblin/processing/task.py
mediagoblin/submit/lib.py

1  2 
mediagoblin/processing/__init__.py
mediagoblin/processing/task.py
mediagoblin/submit/lib.py

index 454eb09b2ba45a689801565487233edb858b8ef0,746f4d8ee3d780cd62b7001168074e59eb8bcf08..361a9736e65e025971538f9367d56160d90f6eaa
  # You should have received a copy of the GNU Affero General Public License
  # along with this program.  If not, see <http://www.gnu.org/licenses/>.
  
+ from collections import OrderedDict
  import logging
  import os
  
- from mediagoblin.db.util import atomic_update
  from mediagoblin import mg_globals as mgg
+ from mediagoblin.db.util import atomic_update
+ from mediagoblin.db.models import MediaEntry
+ from mediagoblin.tools.pluginapi import hook_handle
  from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
  
  _log = logging.getLogger(__name__)
@@@ -74,49 -76,89 +76,89 @@@ class FilenameBuilder(object)
                               ext=self.ext)
  
  
- class ProcessingState(object):
-     """
-     The first and only argument to the "processor" of a media type
  
-     This could be thought of as a "request" to the processor
-     function. It has the main info for the request (media entry)
-     and a bunch of tools for the request on it.
-     It can get more fancy without impacting old media types.
+ class MediaProcessor(object):
+     """A particular processor for this media type.
+     While the ProcessingManager handles all types of MediaProcessing
+     possible for a particular media type, a MediaProcessor can be
+     thought of as a *particular* processing action for a media type.
+     For example, you may have separate MediaProcessors for:
+     - initial_processing: the intial processing of a media
+     - gen_thumb: generate a thumbnail
+     - resize: resize an image
+     - transcode: transcode a video
+     ... etc.
+     Some information on producing a new MediaProcessor for your media type:
+     - You *must* supply a name attribute.  This must be a class level
+       attribute, and a string.  This will be used to determine the
+       subcommand of your process
+     - It's recommended that you supply a class level description
+       attribute.
+     - Supply a media_is_eligible classmethod.  This will be used to
+       determine whether or not a media entry is eligible to use this
+       processor type.  See the method documentation for details.
+     - To give "./bin/gmg reprocess run" abilities to this media type,
+       supply both gnerate_parser and parser_to_request classmethods.
+     - The process method will be what actually processes your media.
      """
-     def __init__(self, entry):
+     # You MUST override this in the child MediaProcessor!
+     name = None
+     # Optional, but will be used in various places to describe the
+     # action this MediaProcessor provides
+     description = None
+     def __init__(self, manager, entry):
+         self.manager = manager
          self.entry = entry
+         self.entry_orig_state = entry.state
+         # Should be initialized at time of processing, at least
          self.workbench = None
-         self.queued_filename = None
  
-     def set_workbench(self, wb):
-         self.workbench = wb
+     def __enter__(self):
+         self.workbench = mgg.workbench_manager.create()
+         return self
+     def __exit__(self, *args):
+         self.workbench.destroy()
+         self.workbench = None
  
-     def get_queued_filename(self):
+     # @with_workbench
+     def process(self, **kwargs):
          """
-         Get the a filename for the original, on local storage
+         Actually process this media entry.
          """
-         if self.queued_filename is not None:
-             return self.queued_filename
-         queued_filepath = self.entry.queued_media_file
-         queued_filename = self.workbench.localized_file(
-             mgg.queue_store, queued_filepath,
-             'source')
-         self.queued_filename = queued_filename
-         return queued_filename
-     def copy_original(self, target_name, keyname=u"original"):
-         self.store_public(keyname, self.get_queued_filename(), target_name)
-     def store_public(self, keyname, local_file, target_name=None):
-         if target_name is None:
-             target_name = os.path.basename(local_file)
-         target_filepath = create_pub_filepath(self.entry, target_name)
-         if keyname in self.entry.media_files:
-             _log.warn("store_public: keyname %r already used for file %r, "
-                       "replacing with %r", keyname,
-                       self.entry.media_files[keyname], target_filepath)
-         mgg.public_store.copy_local_to_storage(local_file, target_filepath)
-         self.entry.media_files[keyname] = target_filepath
+         raise NotImplementedError
+     @classmethod
+     def media_is_eligible(cls, entry=None, state=None):
+         raise NotImplementedError
+     ###############################
+     # Command line interface things
+     ###############################
+     @classmethod
+     def generate_parser(cls):
+         raise NotImplementedError
+     @classmethod
+     def args_to_request(cls, args):
+         raise NotImplementedError
+     ##########################################
+     # THE FUTURE: web interface things here :)
+     ##########################################
+     #####################
+     # Some common "steps"
+     #####################
  
      def delete_queue_file(self):
          # Remove queued media file from storage and database.
          # be removed too, but fail if the directory is not empty to be on
          # the super-safe side.
          queued_filepath = self.entry.queued_media_file
-         mgg.queue_store.delete_file(queued_filepath)      # rm file
-         mgg.queue_store.delete_dir(queued_filepath[:-1])  # rm dir
-         self.entry.queued_media_file = []
+         if queued_filepath:
+             mgg.queue_store.delete_file(queued_filepath)      # rm file
+             mgg.queue_store.delete_dir(queued_filepath[:-1])  # rm dir
+             self.entry.queued_media_file = []
+ class ProcessingKeyError(Exception): pass
+ class ProcessorDoesNotExist(ProcessingKeyError): pass
+ class ProcessorNotEligible(ProcessingKeyError): pass
+ class ProcessingManagerDoesNotExist(ProcessingKeyError): pass
+ class ProcessingManager(object):
+     """Manages all the processing actions available for a media type
+     Specific processing actions, MediaProcessor subclasses, are added
+     to the ProcessingManager.
+     """
+     def __init__(self):
+         # Dict of all MediaProcessors of this media type
+         self.processors = OrderedDict()
+     def add_processor(self, processor):
+         """
+         Add a processor class to this media type
+         """
+         name = processor.name
+         if name is None:
+             raise AttributeError("Processor class's .name attribute not set")
+         self.processors[name] = processor
+     def list_eligible_processors(self, entry):
+         """
+         List all processors that this media entry is eligible to be processed
+         for.
+         """
+         return [
+             processor
+             for processor in self.processors.values()
+             if processor.media_is_eligible(entry=entry)]
+     def list_all_processors_by_state(self, state):
+         """
+         List all processors that this media state is eligible to be processed
+         for.
+         """
+         return [
+             processor
+             for processor in self.processors.values()
+             if processor.media_is_eligible(state=state)]
+     def list_all_processors(self):
+         return self.processors.values()
+     def gen_process_request_via_cli(self, subparser):
+         # Got to figure out what actually goes here before I can write this properly
+         pass
+     def get_processor(self, key, entry=None):
+         """
+         Get the processor with this key.
+         If entry supplied, make sure this entry is actually compatible;
+         otherwise raise error.
+         """
+         try:
+             processor = self.processors[key]
+         except KeyError:
+             import pdb
+             pdb.set_trace()
+             raise ProcessorDoesNotExist(
+                 "'%s' processor does not exist for this media type" % key)
+         if entry and not processor.media_is_eligible(entry):
+             raise ProcessorNotEligible(
+                 "This entry is not eligible for processor with name '%s'" % key)
+         return processor
+ def request_from_args(args, which_args):
+     """
+     Generate a request from the values of some argparse parsed args
+     """
+     request = {}
+     for arg in which_args:
+         request[arg] = getattr(args, arg)
+     return request
+ class MediaEntryNotFound(Exception): pass
+ def get_processing_manager_for_type(media_type):
+     """
+     Get the appropriate media manager for this type
+     """
+     manager_class = hook_handle(('reprocess_manager', media_type))
+     if not manager_class:
+         raise ProcessingManagerDoesNotExist(
+             "A processing manager does not exist for {0}".format(media_type))
+     manager = manager_class()
+     return manager
+ def get_entry_and_processing_manager(media_id):
+     """
+     Get a MediaEntry, its media type, and its manager all in one go.
+     Returns a tuple of: `(entry, media_type, media_manager)`
+     """
+     entry = MediaEntry.query.filter_by(id=media_id).first()
+     if entry is None:
+         raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
+     manager = get_processing_manager_for_type(entry.media_type)
+     return entry, manager
  
  
  def mark_entry_failed(entry_id, exc):
               u'fail_metadata': {}})
  
  
+ def get_process_filename(entry, workbench, acceptable_files):
+     """
+     Try and get the queued file if available, otherwise return the first file
+     in the acceptable_files that we have.
+     If no acceptable_files, raise ProcessFileNotFound
+     """
+     if entry.queued_media_file:
+         filepath = entry.queued_media_file
+         storage = mgg.queue_store
+     else:
+         for keyname in acceptable_files:
+             if entry.media_files.get(keyname):
+                 filepath = entry.media_files[keyname]
+                 storage = mgg.public_store
+                 break
+     if not filepath:
+         raise ProcessFileNotFound()
+     filename = workbench.localized_file(
+         storage, filepath,
+         'source')
+     if not os.path.exists(filename):
+         raise ProcessFileNotFound()
+     return filename
+ def store_public(entry, keyname, local_file, target_name=None,
+                  delete_if_exists=True):
+     if target_name is None:
+         target_name = os.path.basename(local_file)
+     target_filepath = create_pub_filepath(entry, target_name)
+     if keyname in entry.media_files:
+         _log.warn("store_public: keyname %r already used for file %r, "
+                   "replacing with %r", keyname,
+                   entry.media_files[keyname], target_filepath)
+         if delete_if_exists:
+             mgg.public_store.delete_file(entry.media_files[keyname])
+     try:
+         mgg.public_store.copy_local_to_storage(local_file, target_filepath)
+     except:
+         raise PublicStoreFail(keyname=keyname)
+     # raise an error if the file failed to copy
+     copied_filepath = mgg.public_store.get_local_path(target_filepath)
+     if not os.path.exists(copied_filepath):
+         raise PublicStoreFail(keyname=keyname)
+     entry.media_files[keyname] = target_filepath
+ def copy_original(entry, orig_filename, target_name, keyname=u"original"):
+     store_public(entry, keyname, orig_filename, target_name)
  class BaseProcessingFail(Exception):
      """
      Base exception that all other processing failure messages should
      def __init__(self, **metadata):
          self.metadata = metadata or {}
  
 -
  class BadMediaFail(BaseProcessingFail):
      """
      Error that should be raised when an inappropriate file was given
      for the media type specified.
      """
      general_message = _(u'Invalid file given for media type.')
+ class PublicStoreFail(BaseProcessingFail):
+     """
+     Error that should be raised when copying to public store fails
+     """
+     general_message = _('Copying to public storage failed.')
+ class ProcessFileNotFound(BaseProcessingFail):
+     """
+     Error that should be raised when an acceptable file for processing
+     is not found.
+     """
+     general_message = _(u'An acceptable processing file was not found')
index 05cac844aa0ca0e7ccda4813f49bfb6039d6d3e6,df44dd7ac0bdfffa07d015996b585cc41edfda99..7f68348566b5897b68d23c0c4a8b634c08b07edd
@@@ -18,21 -18,19 +18,20 @@@ import loggin
  import urllib
  import urllib2
  
 -from celery import registry, task
 +import celery
 +from celery.registry import tasks
  
  from mediagoblin import mg_globals as mgg
- from mediagoblin.db.models import MediaEntry
- from mediagoblin.processing import (mark_entry_failed, BaseProcessingFail,
-                                     ProcessingState)
+ from . import mark_entry_failed, BaseProcessingFail
  from mediagoblin.tools.processing import json_processing_callback
+ from mediagoblin.processing import get_entry_and_processing_manager
  
  _log = logging.getLogger(__name__)
  logging.basicConfig()
  _log.setLevel(logging.DEBUG)
  
  
 -@task.task(default_retry_delay=2 * 60)
 +@celery.task(default_retry_delay=2 * 60)
  def handle_push_urls(feed_url):
      """Subtask, notifying the PuSH servers of new content
  
                            'Giving up.'.format(feed_url))
                  return False
  
 +
  ################################
  # Media processing initial steps
  ################################
 -
 -class ProcessMedia(task.Task):
 +class ProcessMedia(celery.Task):
      """
      Pass this entry off for processing.
      """
-     track_started=True
-     def run(self, media_id, feed_url):
+     def run(self, media_id, feed_url, reprocess_action, reprocess_info=None):
          """
          Pass the media entry off to the appropriate processing function
          (for now just process_image...)
  
          :param feed_url: The feed URL that the PuSH server needs to be
              updated for.
+         :param reprocess: A dict containing all of the necessary reprocessing
+             info for the media_type.
          """
-         entry = MediaEntry.query.get(media_id)
+         reprocess_info = reprocess_info or {}
+         entry, manager = get_entry_and_processing_manager(media_id)
  
          # Try to process, and handle expected errors.
          try:
-             entry.state = u'processing'
-             entry.save()
-             _log.debug('Processing {0}'.format(entry))
-             proc_state = ProcessingState(entry)
-             with mgg.workbench_manager.create() as workbench:
-                 proc_state.set_workbench(workbench)
-                 # run the processing code
-                 entry.media_manager.processor(proc_state)
+             processor_class = manager.get_processor(reprocess_action, entry)
+             with processor_class(manager, entry) as processor:
+                 # Initial state change has to be here because
+                 # the entry.state gets recorded on processor_class init
+                 entry.state = u'processing'
+                 entry.save()
+                 _log.debug('Processing {0}'.format(entry))
+                 try:
+                     processor.process(**reprocess_info)
+                 except Exception as exc:
+                     if processor.entry_orig_state == 'processed':
+                         _log.error(
+                             'Entry {0} failed to process due to the following'
+                             ' error: {1}'.format(entry.id, exc))
+                         _log.info(
+                             'Setting entry.state back to "processed"')
+                         pass
+                     else:
+                         raise
  
              # We set the state to processed and save the entry here so there's
              # no need to save at the end of the processing stage, probably ;)
          entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
          json_processing_callback(entry)
  
 -# Register the task
 -process_media = registry.tasks[ProcessMedia.name]
 -
 +tasks.register(ProcessMedia)
index 33687a72e1405f7ea09eae0262d3d8975130788d,1a45e44796767b8cc4b281b5376fcf3cdbff3a9e..1bbf2cb8ef013fe846e1cb841c7f5c58ed443df7
@@@ -21,7 -21,7 +21,7 @@@ from werkzeug.datastructures import Fil
  
  from mediagoblin.db.models import MediaEntry
  from mediagoblin.processing import mark_entry_failed
 -from mediagoblin.processing.task import process_media
 +from mediagoblin.processing.task import ProcessMedia
  
  
  _log = logging.getLogger(__name__)
@@@ -76,17 -76,21 +76,21 @@@ def prepare_queue_task(app, entry, file
      return queue_file
  
  
- def run_process_media(entry, feed_url=None):
+ def run_process_media(entry, feed_url=None,
+                       reprocess_action="initial", reprocess_info=None):
      """Process the media asynchronously
  
      :param entry: MediaEntry() instance to be processed.
      :param feed_url: A string indicating the feed_url that the PuSH servers
          should be notified of. This will be sth like: `request.urlgen(
              'mediagoblin.user_pages.atom_feed',qualified=True,
-             user=request.user.username)`"""
+             user=request.user.username)`
+     :param reprocess_action: What particular action should be run.
+     :param reprocess_info: A dict containing all of the necessary reprocessing
+         info for the given media_type"""
      try:
 -        process_media.apply_async(
 +        ProcessMedia().apply_async(
-             [entry.id, feed_url], {},
+             [entry.id, feed_url, reprocess_action, reprocess_info], {},
              task_id=entry.queued_task_id)
      except BaseException as exc:
          # The purpose of this section is because when running in "lazy"