Remove self.entry in VideoTranscoder
[mediagoblin.git] / mediagoblin / processing / __init__.py
index 19e88199a44cbf5a73395df61943c47295d4fea7..a9d5442b40abd7ee8d2289dbd25dc00468b9a378 100644 (file)
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-from collections import OrderedDict
+# Use an ordered dict if we can.  If not, we'll just use a normal dict
+# later.
+try:
+    from collections import OrderedDict
+except:
+    OrderedDict = None
+
 import logging
 import os
 
+import six
+
 from mediagoblin import mg_globals as mgg
 from mediagoblin.db.util import atomic_update
 from mediagoblin.db.models import MediaEntry
@@ -33,14 +41,17 @@ class ProgressCallback(object):
 
     def __call__(self, progress):
         if progress:
-            self.entry.transcoding_progress = progress
+            if 100 - (self.entry.transcoding_progress + progress) < 0.01:
+                self.entry.transcoding_progress = 100
+            else:
+                self.entry.transcoding_progress += progress
             self.entry.save()
 
 
 def create_pub_filepath(entry, filename):
     return mgg.public_store.get_unique_filepath(
             ['media_entries',
-             unicode(entry.id),
+             six.text_type(entry.id),
              filename])
 
 
@@ -137,7 +148,7 @@ class MediaProcessor(object):
         raise NotImplementedError
 
     @classmethod
-    def media_is_eligible(cls, entry):
+    def media_is_eligible(cls, entry=None, state=None):
         raise NotImplementedError
 
     ###############################
@@ -166,14 +177,17 @@ class MediaProcessor(object):
         # be removed too, but fail if the directory is not empty to be on
         # the super-safe side.
         queued_filepath = self.entry.queued_media_file
-        mgg.queue_store.delete_file(queued_filepath)      # rm file
-        mgg.queue_store.delete_dir(queued_filepath[:-1])  # rm dir
-        self.entry.queued_media_file = []
+        if queued_filepath:
+            mgg.queue_store.delete_file(queued_filepath)      # rm file
+            mgg.queue_store.delete_dir(queued_filepath[:-1])  # rm dir
+            self.entry.queued_media_file = []
 
 
 class ProcessingKeyError(Exception): pass
 class ProcessorDoesNotExist(ProcessingKeyError): pass
 class ProcessorNotEligible(ProcessingKeyError): pass
+class ProcessingManagerDoesNotExist(ProcessingKeyError): pass
+
 
 
 class ProcessingManager(object):
@@ -184,7 +198,10 @@ class ProcessingManager(object):
     """
     def __init__(self):
         # Dict of all MediaProcessors of this media type
-        self.processors = OrderedDict()
+        if OrderedDict is not None:
+            self.processors = OrderedDict()
+        else:
+            self.processors = {}
 
     def add_processor(self, processor):
         """
@@ -204,7 +221,18 @@ class ProcessingManager(object):
         return [
             processor
             for processor in self.processors.values()
-            if processor.media_is_eligible(entry)]
+            if processor.media_is_eligible(entry=entry)]
+
+    def list_all_processors_by_state(self, state):
+        """
+        List all processors that this media state is eligible to be processed
+        for.
+        """
+        return [
+            processor
+            for processor in self.processors.values()
+            if processor.media_is_eligible(state=state)]
+
 
     def list_all_processors(self):
         return self.processors.values()
@@ -223,8 +251,6 @@ class ProcessingManager(object):
         try:
             processor = self.processors[key]
         except KeyError:
-            import pdb
-            pdb.set_trace()
             raise ProcessorDoesNotExist(
                 "'%s' processor does not exist for this media type" % key)
 
@@ -234,6 +260,12 @@ class ProcessingManager(object):
 
         return processor
 
+    def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+        """
+        Returns the Celery command needed to proceed with media processing
+        """
+        return None
+
 
 def request_from_args(args, which_args):
     """
@@ -254,6 +286,9 @@ def get_processing_manager_for_type(media_type):
     Get the appropriate media manager for this type
     """
     manager_class = hook_handle(('reprocess_manager', media_type))
+    if not manager_class:
+        raise ProcessingManagerDoesNotExist(
+            "A processing manager does not exist for {0}".format(media_type))
     manager = manager_class()
 
     return manager
@@ -283,8 +318,8 @@ def mark_entry_failed(entry_id, exc):
     store extra information that can be useful for users telling them
     why their media failed to process.
 
-    Args:
-     - entry_id: The id of the media entry
+    :param entry_id: The id of the media entry
+    :param exc: An instance of BaseProcessingFail
 
     """
     # Was this a BaseProcessingFail?  In other words, was this a
@@ -295,44 +330,48 @@ def mark_entry_failed(entry_id, exc):
         atomic_update(mgg.database.MediaEntry,
             {'id': entry_id},
             {u'state': u'failed',
-             u'fail_error': unicode(exc.exception_path),
+             u'fail_error': six.text_type(exc.exception_path),
              u'fail_metadata': exc.metadata})
     else:
         _log.warn("No idea what happened here, but it failed: %r", exc)
-        # Looks like no, so just mark it as failed and don't record a
-        # failure_error (we'll assume it wasn't handled) and don't record
-        # metadata (in fact overwrite it if somehow it had previous info
-        # here)
+        # Looks like no, let's record it so that admin could ask us about the
+        # reason
         atomic_update(mgg.database.MediaEntry,
             {'id': entry_id},
             {u'state': u'failed',
-             u'fail_error': None,
+             u'fail_error': u'Unhandled exception: {0}'.format(
+                 six.text_type(exc)),
              u'fail_metadata': {}})
 
 
-def get_orig_filename(entry, workbench):
+def get_process_filename(entry, workbench, acceptable_files):
     """
-    Get the a filename for the original, on local storage
+    Try and get the queued file if available, otherwise return the first file
+    in the acceptable_files that we have.
 
-    If the media entry has a queued_media_file, use that, otherwise
-    use the original.
-
-    In the future, this will return the highest quality file available
-    if neither the original or queued file are available by checking
-    some ordered list of preferred keys.
+    If no acceptable_files, raise ProcessFileNotFound
     """
     if entry.queued_media_file:
-        orig_filepath = entry.queued_media_file
+        filepath = entry.queued_media_file
         storage = mgg.queue_store
     else:
-        orig_filepath = entry.media_files['original']
-        storage = mgg.public_store
+        for keyname in acceptable_files:
+            if entry.media_files.get(keyname):
+                filepath = entry.media_files[keyname]
+                storage = mgg.public_store
+                break
+
+    if not filepath:
+        raise ProcessFileNotFound()
 
-    orig_filename = workbench.localized_file(
-        storage, orig_filepath,
+    filename = workbench.localized_file(
+        storage, filepath,
         'source')
 
-    return orig_filename
+    if not os.path.exists(filename):
+        raise ProcessFileNotFound()
+
+    return filename
 
 
 def store_public(entry, keyname, local_file, target_name=None,
@@ -340,13 +379,22 @@ def store_public(entry, keyname, local_file, target_name=None,
     if target_name is None:
         target_name = os.path.basename(local_file)
     target_filepath = create_pub_filepath(entry, target_name)
+
     if keyname in entry.media_files:
         _log.warn("store_public: keyname %r already used for file %r, "
                   "replacing with %r", keyname,
                   entry.media_files[keyname], target_filepath)
         if delete_if_exists:
             mgg.public_store.delete_file(entry.media_files[keyname])
-    mgg.public_store.copy_local_to_storage(local_file, target_filepath)
+    try:
+        mgg.public_store.copy_local_to_storage(local_file, target_filepath)
+    except Exception as e:
+        _log.error(u'Exception happened: {0}'.format(e))
+        raise PublicStoreFail(keyname=keyname)
+    # raise an error if the file failed to copy
+    if not mgg.public_store.file_exists(target_filepath):
+        raise PublicStoreFail(keyname=keyname)
+
     entry.media_files[keyname] = target_filepath
 
 
@@ -360,7 +408,7 @@ class BaseProcessingFail(Exception):
     subclass from.
 
     You shouldn't call this itself; instead you should subclass it
-    and provid the exception_path and general_message applicable to
+    and provide the exception_path and general_message applicable to
     this error.
     """
     general_message = u''
@@ -370,9 +418,11 @@ class BaseProcessingFail(Exception):
         return u"%s:%s" % (
             self.__class__.__module__, self.__class__.__name__)
 
-    def __init__(self, **metadata):
-        self.metadata = metadata or {}
-
+    def __init__(self, message=None, **metadata):
+        if message is not None:
+            super(BaseProcessingFail, self).__init__(message)
+            metadata['message'] = message
+        self.metadata = metadata
 
 class BadMediaFail(BaseProcessingFail):
     """
@@ -380,3 +430,18 @@ class BadMediaFail(BaseProcessingFail):
     for the media type specified.
     """
     general_message = _(u'Invalid file given for media type.')
+
+
+class PublicStoreFail(BaseProcessingFail):
+    """
+    Error that should be raised when copying to public store fails
+    """
+    general_message = _('Copying to public storage failed.')
+
+
+class ProcessFileNotFound(BaseProcessingFail):
+    """
+    Error that should be raised when an acceptable file for processing
+    is not found.
+    """
+    general_message = _(u'An acceptable processing file was not found')