Porting video to GStreamer 1.0
[mediagoblin.git] / mediagoblin / processing / __init__.py
index 41028fbb4decf3302553f2d8f86b6391df4b39fd..b7e36027ef7ccf412cead1ffcf5bc877e3430eae 100644 (file)
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+# Use an ordered dict if we can.  If not, we'll just use a normal dict
+# later.
+try:
+    from collections import OrderedDict
+except:
+    OrderedDict = None
+
 import logging
 import os
 
-from mediagoblin.db.util import atomic_update
-from mediagoblin import mg_globals as mgg
+import six
 
+from mediagoblin import mg_globals as mgg
+from mediagoblin.db.util import atomic_update
+from mediagoblin.db.models import MediaEntry
+from mediagoblin.tools.pluginapi import hook_handle
 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
 
 _log = logging.getLogger(__name__)
@@ -38,7 +48,7 @@ class ProgressCallback(object):
 def create_pub_filepath(entry, filename):
     return mgg.public_store.get_unique_filepath(
             ['media_entries',
-             unicode(entry.id),
+             six.text_type(entry.id),
              filename])
 
 
@@ -88,7 +98,7 @@ class MediaProcessor(object):
     - resize: resize an image
     - transcode: transcode a video
 
-    ... etc.  
+    ... etc.
 
     Some information on producing a new MediaProcessor for your media type:
 
@@ -111,9 +121,23 @@ class MediaProcessor(object):
     # action this MediaProcessor provides
     description = None
 
-    def __init__(self, manager):
+    def __init__(self, manager, entry):
         self.manager = manager
+        self.entry = entry
+        self.entry_orig_state = entry.state
+
+        # Should be initialized at time of processing, at least
+        self.workbench = None
+
+    def __enter__(self):
+        self.workbench = mgg.workbench_manager.create()
+        return self
+
+    def __exit__(self, *args):
+        self.workbench.destroy()
+        self.workbench = None
 
+    # @with_workbench
     def process(self, **kwargs):
         """
         Actually process this media entry.
@@ -121,7 +145,7 @@ class MediaProcessor(object):
         raise NotImplementedError
 
     @classmethod
-    def media_is_eligibile(self, media_entry):
+    def media_is_eligible(cls, entry=None, state=None):
         raise NotImplementedError
 
     ###############################
@@ -129,28 +153,52 @@ class MediaProcessor(object):
     ###############################
 
     @classmethod
-    def generate_parser(self):
+    def generate_parser(cls):
         raise NotImplementedError
 
     @classmethod
-    def parser_to_request(self, parser):
+    def args_to_request(cls, args):
         raise NotImplementedError
 
     ##########################################
     # THE FUTURE: web interface things here :)
     ##########################################
 
+    #####################
+    # Some common "steps"
+    #####################
+
+    def delete_queue_file(self):
+        # Remove queued media file from storage and database.
+        # queued_filepath is in the task_id directory which should
+        # be removed too, but fail if the directory is not empty to be on
+        # the super-safe side.
+        queued_filepath = self.entry.queued_media_file
+        if queued_filepath:
+            mgg.queue_store.delete_file(queued_filepath)      # rm file
+            mgg.queue_store.delete_dir(queued_filepath[:-1])  # rm dir
+            self.entry.queued_media_file = []
+
+
+class ProcessingKeyError(Exception): pass
+class ProcessorDoesNotExist(ProcessingKeyError): pass
+class ProcessorNotEligible(ProcessingKeyError): pass
+class ProcessingManagerDoesNotExist(ProcessingKeyError): pass
+
+
 
 class ProcessingManager(object):
-    """
-    """
-    def __init__(self, entry):
-        self.entry = entry
-        # May merge these two classes soon....
-        self.state = ProcessingState(entry)
+    """Manages all the processing actions available for a media type
 
+    Specific processing actions, MediaProcessor subclasses, are added
+    to the ProcessingManager.
+    """
+    def __init__(self):
         # Dict of all MediaProcessors of this media type
-        self.processors = {}
+        if OrderedDict is not None:
+            self.processors = OrderedDict()
+        else:
+            self.processors = {}
 
     def add_processor(self, processor):
         """
@@ -159,89 +207,97 @@ class ProcessingManager(object):
         name = processor.name
         if name is None:
             raise AttributeError("Processor class's .name attribute not set")
-        
+
         self.processors[name] = processor
 
-    def list_eligible_processors(self):
+    def list_eligible_processors(self, entry):
         """
         List all processors that this media entry is eligible to be processed
         for.
         """
         return [
             processor
-            for processor in self.processors.keys()
-            if processor.media_is_eligible(self.entry)]
+            for processor in self.processors.values()
+            if processor.media_is_eligible(entry=entry)]
 
-    def process(self, directive, request):
+    def list_all_processors_by_state(self, state):
+        """
+        List all processors that this media state is eligible to be processed
+        for.
+        """
+        return [
+            processor
+            for processor in self.processors.values()
+            if processor.media_is_eligible(state=state)]
+
+
+    def list_all_processors(self):
+        return self.processors.values()
+
+    def gen_process_request_via_cli(self, subparser):
+        # Got to figure out what actually goes here before I can write this properly
         pass
 
+    def get_processor(self, key, entry=None):
+        """
+        Get the processor with this key.
+
+        If entry supplied, make sure this entry is actually compatible;
+        otherwise raise error.
+        """
+        try:
+            processor = self.processors[key]
+        except KeyError:
+            raise ProcessorDoesNotExist(
+                "'%s' processor does not exist for this media type" % key)
+
+        if entry and not processor.media_is_eligible(entry):
+            raise ProcessorNotEligible(
+                "This entry is not eligible for processor with name '%s'" % key)
+
+        return processor
 
-class ProcessingState(object):
+
+def request_from_args(args, which_args):
+    """
+    Generate a request from the values of some argparse parsed args
     """
-    The first and only argument to the "processor" of a media type
+    request = {}
+    for arg in which_args:
+        request[arg] = getattr(args, arg)
+
+    return request
+
+
+class MediaEntryNotFound(Exception): pass
+
 
-    This could be thought of as a "request" to the processor
-    function. It has the main info for the request (media entry)
-    and a bunch of tools for the request on it.
-    It can get more fancy without impacting old media types.
+def get_processing_manager_for_type(media_type):
     """
-    def __init__(self, entry):
-        self.entry = entry
-        self.workbench = None
-        self.orig_filename = None
+    Get the appropriate media manager for this type
+    """
+    manager_class = hook_handle(('reprocess_manager', media_type))
+    if not manager_class:
+        raise ProcessingManagerDoesNotExist(
+            "A processing manager does not exist for {0}".format(media_type))
+    manager = manager_class()
 
-    def set_workbench(self, wb):
-        self.workbench = wb
+    return manager
 
-    def get_orig_filename(self):
-        """
-        Get the a filename for the original, on local storage
 
-        If the media entry has a queued_media_file, use that, otherwise
-        use the original.
+def get_entry_and_processing_manager(media_id):
+    """
+    Get a MediaEntry, its media type, and its manager all in one go.
 
-        In the future, this will return the highest quality file available
-        if neither the original or queued file are available
-        """
-        if self.orig_filename is not None:
-            return self.orig_filename
+    Returns a tuple of: `(entry, media_type, media_manager)`
+    """
+    entry = MediaEntry.query.filter_by(id=media_id).first()
+    if entry is None:
+        raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
 
-        if self.entry.queued_media_file:
-            orig_filepath = self.entry.queued_media_file
-            storage = mgg.queue_store
-        else:
-            orig_filepath = self.entry.media_files['original']
-            storage = mgg.public_store
-
-        orig_filename = self.workbench.localized_file(
-            storage, orig_filepath,
-            'source')
-        self.orig_filename = orig_filename
-        return orig_filename
-
-    def copy_original(self, target_name, keyname=u"original"):
-        self.store_public(keyname, self.get_orig_filename(), target_name)
-
-    def store_public(self, keyname, local_file, target_name=None):
-        if target_name is None:
-            target_name = os.path.basename(local_file)
-        target_filepath = create_pub_filepath(self.entry, target_name)
-        if keyname in self.entry.media_files:
-            _log.warn("store_public: keyname %r already used for file %r, "
-                      "replacing with %r", keyname,
-                      self.entry.media_files[keyname], target_filepath)
-        mgg.public_store.copy_local_to_storage(local_file, target_filepath)
-        self.entry.media_files[keyname] = target_filepath
+    manager = get_processing_manager_for_type(entry.media_type)
 
-    def delete_queue_file(self):
-        # Remove queued media file from storage and database.
-        # queued_filepath is in the task_id directory which should
-        # be removed too, but fail if the directory is not empty to be on
-        # the super-safe side.
-        queued_filepath = self.entry.queued_media_file
-        mgg.queue_store.delete_file(queued_filepath)      # rm file
-        mgg.queue_store.delete_dir(queued_filepath[:-1])  # rm dir
-        self.entry.queued_media_file = []
+    return entry, manager
 
 
 def mark_entry_failed(entry_id, exc):
@@ -265,7 +321,7 @@ def mark_entry_failed(entry_id, exc):
         atomic_update(mgg.database.MediaEntry,
             {'id': entry_id},
             {u'state': u'failed',
-             u'fail_error': unicode(exc.exception_path),
+             u'fail_error': six.text_type(exc.exception_path),
              u'fail_metadata': exc.metadata})
     else:
         _log.warn("No idea what happened here, but it failed: %r", exc)
@@ -280,13 +336,71 @@ def mark_entry_failed(entry_id, exc):
              u'fail_metadata': {}})
 
 
+def get_process_filename(entry, workbench, acceptable_files):
+    """
+    Try and get the queued file if available, otherwise return the first file
+    in the acceptable_files that we have.
+
+    If no acceptable_files, raise ProcessFileNotFound
+    """
+    if entry.queued_media_file:
+        filepath = entry.queued_media_file
+        storage = mgg.queue_store
+    else:
+        for keyname in acceptable_files:
+            if entry.media_files.get(keyname):
+                filepath = entry.media_files[keyname]
+                storage = mgg.public_store
+                break
+
+    if not filepath:
+        raise ProcessFileNotFound()
+
+    filename = workbench.localized_file(
+        storage, filepath,
+        'source')
+
+    if not os.path.exists(filename):
+        raise ProcessFileNotFound()
+
+    return filename
+
+
+def store_public(entry, keyname, local_file, target_name=None,
+                 delete_if_exists=True):
+    if target_name is None:
+        target_name = os.path.basename(local_file)
+    target_filepath = create_pub_filepath(entry, target_name)
+
+    if keyname in entry.media_files:
+        _log.warn("store_public: keyname %r already used for file %r, "
+                  "replacing with %r", keyname,
+                  entry.media_files[keyname], target_filepath)
+        if delete_if_exists:
+            mgg.public_store.delete_file(entry.media_files[keyname])
+    try:
+        mgg.public_store.copy_local_to_storage(local_file, target_filepath)
+    except Exception as e:
+        _log.error(u'Exception happened: {0}'.format(e))
+        raise PublicStoreFail(keyname=keyname)
+    # raise an error if the file failed to copy
+    if not mgg.public_store.file_exists(target_filepath):
+        raise PublicStoreFail(keyname=keyname)
+
+    entry.media_files[keyname] = target_filepath
+
+
+def copy_original(entry, orig_filename, target_name, keyname=u"original"):
+    store_public(entry, keyname, orig_filename, target_name)
+
+
 class BaseProcessingFail(Exception):
     """
     Base exception that all other processing failure messages should
     subclass from.
 
     You shouldn't call this itself; instead you should subclass it
-    and provid the exception_path and general_message applicable to
+    and provide the exception_path and general_message applicable to
     this error.
     """
     general_message = u''
@@ -299,10 +413,24 @@ class BaseProcessingFail(Exception):
     def __init__(self, **metadata):
         self.metadata = metadata or {}
 
-
 class BadMediaFail(BaseProcessingFail):
     """
     Error that should be raised when an inappropriate file was given
     for the media type specified.
     """
     general_message = _(u'Invalid file given for media type.')
+
+
+class PublicStoreFail(BaseProcessingFail):
+    """
+    Error that should be raised when copying to public store fails
+    """
+    general_message = _('Copying to public storage failed.')
+
+
+class ProcessFileNotFound(BaseProcessingFail):
+    """
+    Error that should be raised when an acceptable file for processing
+    is not found.
+    """
+    general_message = _(u'An acceptable processing file was not found')