Proper handling of processor failures, working as hoped!
authorChristopher Allan Webber <cwebber@dustycloud.org>
Sat, 13 Aug 2011 15:59:34 +0000 (10:59 -0500)
committerChristopher Allan Webber <cwebber@dustycloud.org>
Sat, 13 Aug 2011 15:59:34 +0000 (10:59 -0500)
BaseProcessingFail based exceptions recorded and marked appropriately
in the database.  Other exceptions also caught and marked (or rather
not marked) appropriately in the database as well.

mediagoblin/process_media/__init__.py
mediagoblin/submit/views.py

index 00402d7ef6cfac41e765269d66880c53f9aab053..d6cdd74761eae236a46fec1f9660f61b5a841155 100644 (file)
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import Image
-from mediagoblin.db.util import ObjectId
-from celery.task import task
 
-from mediagoblin import mg_globals as mgg
 from contextlib import contextmanager
+from celery.task import task, Task
+from celery import registry
 
-from mediagoblin.process_media.errors import BadMediaFail
+from mediagoblin.db.util import ObjectId
+from mediagoblin import mg_globals as mgg
+from mediagoblin.process_media.errors import BaseProcessingFail, BadMediaFail
 
 
 THUMB_SIZE = 180, 180
@@ -34,6 +35,7 @@ def create_pub_filepath(entry, filename):
              unicode(entry['_id']),
              filename])
 
+
 @contextmanager
 def closing(callback):
     try:
@@ -41,12 +43,66 @@ def closing(callback):
     finally:
         pass
 
-@task
-def process_media_initial(media_id):
-    workbench = mgg.workbench_manager.create_workbench()
 
-    entry = mgg.database.MediaEntry.one(
-        {'_id': ObjectId(media_id)})
+################################
+# Media processing initial steps
+################################
+
+class ProcessMedia(Task):
+    """
+    Pass this entry off for processing.
+    """
+    def run(self, media_id):
+        """
+        Pass the media entry off to the appropriate processing function
+        (for now just process_image...)
+        """
+        entry = mgg.database.MediaEntry.one(
+            {'_id': ObjectId(media_id)})
+        process_image(entry)
+        entry['state'] = u'processed'
+        entry.save()
+
+    def on_failure(self, exc, task_id, args, kwargs, einfo):
+        """
+        If the processing failed we should mark that in the database.
+
+        Assuming that the exception raised is a subclass of BaseProcessingFail,
+        we can use that to get more information about the failure and store that
+        for conveying information to users about the failure, etc.
+        """
+        media_id = args[0]
+        entry = mgg.database.MediaEntry.one(
+            {'_id': ObjectId(media_id)})
+
+        entry[u'state'] = u'failed'
+
+        # Was this a BaseProcessingFail?  In other words, was this a
+        # type of error that we know how to handle?
+        if isinstance(exc, BaseProcessingFail):
+            # Looks like yes, so record information about that failure and any
+            # metadata the user might have supplied.
+            entry[u'fail_error'] = exc.exception_path
+            entry[u'fail_metadata'] = exc.metadata
+        else:
+            # Looks like no, so just mark it as failed and don't record a
+            # failure_error (we'll assume it wasn't handled) and don't record
+            # metadata (in fact overwrite it if somehow it had previous info
+            # here)
+            entry[u'fail_error'] = None
+            entry[u'fail_metadata'] = {}
+
+        entry.save()
+
+
+process_media = registry.tasks[ProcessMedia.name]
+
+
+def process_image(entry):
+    """
+    Code to process an image
+    """
+    workbench = mgg.workbench_manager.create_workbench()
 
     queued_filepath = entry['queued_media_file']
     queued_filename = workbench.localized_file(
@@ -107,8 +163,6 @@ def process_media_initial(media_id):
     media_files_dict['original'] = original_filepath
     if medium_processed:
         media_files_dict['medium'] = medium_filepath
-    entry['state'] = u'processed'
-    entry.save()
 
     # clean up workbench
     workbench.destroy_self()
index 1e8c6a68a81a952cd05fb9d9ce3cf44c28efe3cd..25b3664b5cad1db63c4a64a01a36bf702a9caf7e 100644 (file)
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import uuid
+
 from os.path import splitext
 from cgi import FieldStorage
-from string import split
 
 from werkzeug.utils import secure_filename
 
@@ -27,7 +28,7 @@ from mediagoblin.util import (
 from mediagoblin.util import pass_to_ugettext as _
 from mediagoblin.decorators import require_active_login
 from mediagoblin.submit import forms as submit_forms, security
-from mediagoblin.process_media import process_media_initial
+from mediagoblin.process_media import process_media
 from mediagoblin.messages import add_message, SUCCESS
 
 
@@ -87,15 +88,24 @@ def submit_start(request):
             # Add queued filename to the entry
             entry['queued_media_file'] = queue_filepath
 
-            # Save now so we have this data before kicking off processing
-            entry.save(validate=False)
-
-            result = process_media_initial.delay(unicode(entry['_id']))
+            # We generate this ourselves so we know what the taks id is for
+            # retrieval later.
+            # (If we got it off the task's auto-generation, there'd be a risk of
+            # a race condition when we'd save after sending off the task)
+            task_id = unicode(uuid.uuid4())
+            entry['queued_task_id'] = task_id
 
-            # Save the task id
-            entry['queued_task_id'] = unicode(result.task_id)
+            # Save now so we have this data before kicking off processing
             entry.save(validate=True)
 
+            # Pass off to processing
+            #
+            # (... don't change entry after this point to avoid race
+            # conditions with changes to the document via processing code)
+            process_media.apply_async(
+                [unicode(entry['_id'])], {},
+                task_id=task_id)
+
             add_message(request, SUCCESS, _('Woohoo! Submitted!'))
 
             return redirect(request, "mediagoblin.user_pages.user_home",