Tweak Celery Task
authorSebastian Spaeth <Sebastian@SSpaeth.de>
Thu, 20 Dec 2012 12:42:37 +0000 (13:42 +0100)
committerRodney Ewing <ewing.rj@gmail.com>
Mon, 19 Aug 2013 21:42:13 +0000 (14:42 -0700)
- Make sure Exceptions are pickleable (not sure if this was not the
  case but this is the pattern as documented in the celery docs.
- Don't create a task_id in the GMG code, but save the one
  implicitely created by celery.
- Don't create a task-id directory per upload. Just store queued uploads
  in a single directory (this is the most controversial change and might
  need discussion!!!)

Signed-off-by: Sebastian Spaeth <Sebastian@SSpaeth.de>
mediagoblin/db/models_v0.py
mediagoblin/processing/__init__.py
mediagoblin/processing/task.py
mediagoblin/submit/views.py

index bdedec2e4b65070762432c9c9a0635eb47b6d318..5e463d452aafba69f963eabea98767a54d761a14 100644 (file)
@@ -109,7 +109,7 @@ class MediaEntry(Base_v0):
 
     queued_media_file = Column(PathTupleWithSlashes)
 
-    queued_task_id = Column(Unicode)
+    queued_task_id = Column(Unicode, default=None)
 
     __table_args__ = (
         UniqueConstraint('uploader', 'slug'),
index f3a85940ca74f454e49932f99f0bb95330c4f7a5..ae3652cf04a27a6b205da58948aa6b0e7d931157 100644 (file)
@@ -181,9 +181,10 @@ class BaseProcessingFail(Exception):
         return u"%s:%s" % (
             self.__class__.__module__, self.__class__.__name__)
 
-    def __init__(self, **metadata):
-        self.metadata = metadata or {}
-
+    def __init__(self, *args, **kwargs):
+        # next line is REQUIRED to have pickable exceptions if you want
+        # to be able to pass in custom arguments (see celery docs)
+        Exception.__init__(self, *args, **metadata)
 
 class BadMediaFail(BaseProcessingFail):
     """
index 9af192edb97f0f902dc15b253c1918dcae40cd40..550906d03a765727df54b64800795f84b1871999 100644 (file)
@@ -18,11 +18,13 @@ import logging
 import urllib
 import urllib2
 
-from celery import registry, task
+#TODO: newer celeries use from celery import Task. Change when we upgrade
+from celery.task import Task
+from celery.registry import tasks
 
 from mediagoblin import mg_globals as mgg
-from mediagoblin.db.models import MediaEntry
-from . import mark_entry_failed, BaseProcessingFail, ProcessingState
+from mediagoblin.db.sql.models import MediaEntry
+from mediagoblin.processing import mark_entry_failed, BaseProcessingFail
 from mediagoblin.tools.processing import json_processing_callback
 
 _log = logging.getLogger(__name__)
@@ -63,12 +65,10 @@ def handle_push_urls(feed_url):
 ################################
 # Media processing initial steps
 ################################
+class ProcessMedia(Task):
+    track_started=True
 
-class ProcessMedia(task.Task):
-    """
-    Pass this entry off for processing.
-    """
-    def run(self, media_id, feed_url):
+    def run(self, media_id):
         """
         Pass the media entry off to the appropriate processing function
         (for now just process_image...)
@@ -81,8 +81,8 @@ class ProcessMedia(task.Task):
         # Try to process, and handle expected errors.
         try:
             entry.state = u'processing'
+            entry.queued_task_id = self.request.id
             entry.save()
-
             _log.debug('Processing {0}'.format(entry))
 
             proc_state = ProcessingState(entry)
@@ -140,6 +140,4 @@ class ProcessMedia(task.Task):
         entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
         json_processing_callback(entry)
 
-# Register the task
-process_media = registry.tasks[ProcessMedia.name]
-
+tasks.register(ProcessMedia)
index 3f9d5b2df2ac0adf654934d52cb4e8075586ffa7..6bb95ecb07875bc82cdaddc5675ff516440cadac 100644 (file)
@@ -89,7 +89,7 @@ def submit_start(request):
                 # Save now so we have this data before kicking off processing
                 entry.save()
 
-                # Pass off to processing
+                # Pass off to async processing
                 #
                 # (... don't change entry after this point to avoid race
                 # conditions with changes to the document via processing code)
@@ -97,6 +97,7 @@ def submit_start(request):
                     'mediagoblin.user_pages.atom_feed',
                     qualified=True, user=request.user.username)
                 run_process_media(entry, feed_url)
+
                 add_message(request, SUCCESS, _('Woohoo! Submitted!'))
 
                 add_comment_subscription(request.user, entry)