From bf2dafd1a04ef8050ebf08bb512862a1592998c0 Mon Sep 17 00:00:00 2001 From: Sebastian Spaeth Date: Thu, 20 Dec 2012 13:42:37 +0100 Subject: [PATCH] Tweak Celery Task - Make sure Exceptions are pickleable (not sure if this was not the case but this is the pattern as documented in the celery docs. - Don't create a task_id in the GMG code, but save the one implicitely created by celery. - Don't create a task-id directory per upload. Just store queued uploads in a single directory (this is the most controversial change and might need discussion!!!) Signed-off-by: Sebastian Spaeth --- mediagoblin/db/models_v0.py | 2 +- mediagoblin/processing/__init__.py | 7 ++++--- mediagoblin/processing/task.py | 22 ++++++++++------------ mediagoblin/submit/views.py | 3 ++- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/mediagoblin/db/models_v0.py b/mediagoblin/db/models_v0.py index bdedec2e..5e463d45 100644 --- a/mediagoblin/db/models_v0.py +++ b/mediagoblin/db/models_v0.py @@ -109,7 +109,7 @@ class MediaEntry(Base_v0): queued_media_file = Column(PathTupleWithSlashes) - queued_task_id = Column(Unicode) + queued_task_id = Column(Unicode, default=None) __table_args__ = ( UniqueConstraint('uploader', 'slug'), diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py index f3a85940..ae3652cf 100644 --- a/mediagoblin/processing/__init__.py +++ b/mediagoblin/processing/__init__.py @@ -181,9 +181,10 @@ class BaseProcessingFail(Exception): return u"%s:%s" % ( self.__class__.__module__, self.__class__.__name__) - def __init__(self, **metadata): - self.metadata = metadata or {} - + def __init__(self, *args, **kwargs): + # next line is REQUIRED to have pickable exceptions if you want + # to be able to pass in custom arguments (see celery docs) + Exception.__init__(self, *args, **metadata) class BadMediaFail(BaseProcessingFail): """ diff --git a/mediagoblin/processing/task.py b/mediagoblin/processing/task.py index 9af192ed..550906d0 100644 --- a/mediagoblin/processing/task.py +++ b/mediagoblin/processing/task.py @@ -18,11 +18,13 @@ import logging import urllib import urllib2 -from celery import registry, task +#TODO: newer celeries use from celery import Task. Change when we upgrade +from celery.task import Task +from celery.registry import tasks from mediagoblin import mg_globals as mgg -from mediagoblin.db.models import MediaEntry -from . import mark_entry_failed, BaseProcessingFail, ProcessingState +from mediagoblin.db.sql.models import MediaEntry +from mediagoblin.processing import mark_entry_failed, BaseProcessingFail from mediagoblin.tools.processing import json_processing_callback _log = logging.getLogger(__name__) @@ -63,12 +65,10 @@ def handle_push_urls(feed_url): ################################ # Media processing initial steps ################################ +class ProcessMedia(Task): + track_started=True -class ProcessMedia(task.Task): - """ - Pass this entry off for processing. - """ - def run(self, media_id, feed_url): + def run(self, media_id): """ Pass the media entry off to the appropriate processing function (for now just process_image...) @@ -81,8 +81,8 @@ class ProcessMedia(task.Task): # Try to process, and handle expected errors. try: entry.state = u'processing' + entry.queued_task_id = self.request.id entry.save() - _log.debug('Processing {0}'.format(entry)) proc_state = ProcessingState(entry) @@ -140,6 +140,4 @@ class ProcessMedia(task.Task): entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first() json_processing_callback(entry) -# Register the task -process_media = registry.tasks[ProcessMedia.name] - +tasks.register(ProcessMedia) diff --git a/mediagoblin/submit/views.py b/mediagoblin/submit/views.py index 3f9d5b2d..6bb95ecb 100644 --- a/mediagoblin/submit/views.py +++ b/mediagoblin/submit/views.py @@ -89,7 +89,7 @@ def submit_start(request): # Save now so we have this data before kicking off processing entry.save() - # Pass off to processing + # Pass off to async processing # # (... don't change entry after this point to avoid race # conditions with changes to the document via processing code) @@ -97,6 +97,7 @@ def submit_start(request): 'mediagoblin.user_pages.atom_feed', qualified=True, user=request.user.username) run_process_media(entry, feed_url) + add_message(request, SUCCESS, _('Woohoo! Submitted!')) add_comment_subscription(request.user, entry) -- 2.25.1