From: vijeth-aradhya Date: Mon, 12 Jun 2017 17:47:44 +0000 (+0530) Subject: Few more changes to celery priority tasking X-Git-Url: https://vcs.fsf.org/?a=commitdiff_plain;h=bd011c940eeeddd060ccf921ad3519d20d77a015;p=mediagoblin.git Few more changes to celery priority tasking Addition of arguments to the celery tasks. --- diff --git a/mediagoblin/media_types/video/processing.py b/mediagoblin/media_types/video/processing.py index d039c24b..64cacb5f 100644 --- a/mediagoblin/media_types/video/processing.py +++ b/mediagoblin/media_types/video/processing.py @@ -185,7 +185,7 @@ def complimentary_task(resolution, medium_size, **process_info): @celery.task() def processing_cleanup(entry, manager): - processor = CommonVideoProcessor(manager, entry) # is it manager, entry or entry, manager? + processor = CommonVideoProcessor(manager, entry) processor.delete_queue_file() # ===================== @@ -527,17 +527,22 @@ class VideoProcessingManager(ProcessingManager): def workflow(self, entry, manager, feed_url, reprocess_action, reprocess_info=None): - reprocess_info['entry'] = entry.id # ? - reprocess_info['manager'] = manager # can celery serialize this? - - # Add args + reprocess_info['entry'] = entry + reprocess_info['manager'] = manager transcoding_tasks = group( - main_task.signature(queue='default', priority=5, immutable=True), - complimentary_task.signature(queue='default', priority=4, immutable=True), - complimentary_task.signature(queue='default', priority=3, immutable=True), - complimentary_task.signature(queue='default', priority=2, immutable=True) - complimentary_task.signature(queue='default', priority=1, immutable=True) + main_task.signature(args=('480p', ACCEPTED_RESOLUTIONS['480p']), + kwargs=reprocess_info, queue='default', + priority=5, immutable=True), + complimentary_task.signature(args=('360p', ACCEPTED_RESOLUTIONS['360p']), + kwargs=reprocess_info, queue='default', + priority=4, immutable=True), + complimentary_task.signature(args=('720p', ACCEPTED_RESOLUTIONS['720p']), + kwargs=reprocess_info, queue='default', + priority=3, immutable=True), ) - chord(transcoding_tasks)(processing_cleanup.signature(queue='default', immutable=True)) + cleanup_task = processing_cleanup.signature(args=(entry, manager), + queue='default', immutable=True) + + chord(transcoding_tasks)(cleanup_task)