From 25ecdec9971394064063db397232eb7f0e89fae3 Mon Sep 17 00:00:00 2001 From: vijeth-aradhya Date: Mon, 12 Jun 2017 20:53:23 +0530 Subject: [PATCH] Add priority to the celery tasks Few more changes to be made before executing the tasks. Also #1 should be handled soon after this. --- mediagoblin/media_types/ascii/processing.py | 3 +- mediagoblin/media_types/audio/processing.py | 3 +- mediagoblin/media_types/image/processing.py | 3 +- mediagoblin/media_types/pdf/processing.py | 3 +- .../media_types/raw_image/processing.py | 3 +- mediagoblin/media_types/stl/processing.py | 3 +- mediagoblin/media_types/video/processing.py | 40 +++++++++++++------ mediagoblin/processing/__init__.py | 3 +- mediagoblin/submit/lib.py | 2 +- 9 files changed, 42 insertions(+), 21 deletions(-) diff --git a/mediagoblin/media_types/ascii/processing.py b/mediagoblin/media_types/ascii/processing.py index 71ccc86e..823dc4fd 100644 --- a/mediagoblin/media_types/ascii/processing.py +++ b/mediagoblin/media_types/ascii/processing.py @@ -274,7 +274,8 @@ class AsciiProcessingManager(ProcessingManager): self.add_processor(InitialProcessor) self.add_processor(Resizer) - def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None): + def workflow(self, entry, manager, feed_url, reprocess_action, + reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/audio/processing.py b/mediagoblin/media_types/audio/processing.py index a83d60f7..b74364bc 100644 --- a/mediagoblin/media_types/audio/processing.py +++ b/mediagoblin/media_types/audio/processing.py @@ -366,7 +366,8 @@ class AudioProcessingManager(ProcessingManager): self.add_processor(Resizer) self.add_processor(Transcoder) - def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None): + def workflow(self, entry, manager, feed_url, reprocess_action, + reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/image/processing.py b/mediagoblin/media_types/image/processing.py index 42234eff..a189fef3 100644 --- a/mediagoblin/media_types/image/processing.py +++ b/mediagoblin/media_types/image/processing.py @@ -431,7 +431,8 @@ class ImageProcessingManager(ProcessingManager): self.add_processor(Resizer) self.add_processor(MetadataProcessing) - def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None): + def workflow(self, entry, manager, feed_url, reprocess_action, + reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/pdf/processing.py b/mediagoblin/media_types/pdf/processing.py index d93b19bb..6a13c8e3 100644 --- a/mediagoblin/media_types/pdf/processing.py +++ b/mediagoblin/media_types/pdf/processing.py @@ -471,7 +471,8 @@ class PdfProcessingManager(ProcessingManager): self.add_processor(InitialProcessor) self.add_processor(Resizer) - def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None): + def workflow(self, entry, manager, feed_url, reprocess_action, + reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/raw_image/processing.py b/mediagoblin/media_types/raw_image/processing.py index a385d563..7f2d155a 100644 --- a/mediagoblin/media_types/raw_image/processing.py +++ b/mediagoblin/media_types/raw_image/processing.py @@ -81,7 +81,8 @@ class RawImageProcessingManager(ProcessingManager): self.add_processor(InitialRawProcessor) self.add_processor(Resizer) - def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None): + def workflow(self, entry, manager, feed_url, reprocess_action, + reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/stl/processing.py b/mediagoblin/media_types/stl/processing.py index 7f2f350d..9dd6d49b 100644 --- a/mediagoblin/media_types/stl/processing.py +++ b/mediagoblin/media_types/stl/processing.py @@ -369,7 +369,8 @@ class StlProcessingManager(ProcessingManager): self.add_processor(InitialProcessor) self.add_processor(Resizer) - def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None): + def workflow(self, entry, manager, feed_url, reprocess_action, + reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/video/processing.py b/mediagoblin/media_types/video/processing.py index 5cae42f5..d039c24b 100644 --- a/mediagoblin/media_types/video/processing.py +++ b/mediagoblin/media_types/video/processing.py @@ -22,6 +22,7 @@ import celery import six +from celery import group, chord from mediagoblin import mg_globals as mgg from mediagoblin.processing import ( FilenameBuilder, BaseProcessingFail, @@ -34,7 +35,7 @@ from mediagoblin.tools.translate import lazy_pass_to_ugettext as _ from mediagoblin.media_types import MissingComponents from . import transcoders -from .util import skip_transcode +from .util import skip_transcode, ACCEPTED_RESOLUTIONS _log = logging.getLogger(__name__) _log.setLevel(logging.DEBUG) @@ -165,26 +166,26 @@ def store_metadata(media_entry, metadata): @celery.task() -def main_task(**process_info): +def main_task(resolution, medium_size, **process_info): processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) - processor.common_setup(process_info['resolution']) - processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'], + processor.common_setup(resolution) + processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'], vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality']) processor.generate_thumb(thumb_size=process_info['thumb_size']) processor.store_orig_metadata() @celery.task() -def complimentary_task(**process_info): +def complimentary_task(resolution, medium_size, **process_info): processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) - processor.common_setup(process_info['resolution']) - processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'], + processor.common_setup(resolution) + processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'], vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality']) @celery.task() -def processing_cleanup(**process_info): - processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) +def processing_cleanup(entry, manager): + processor = CommonVideoProcessor(manager, entry) # is it manager, entry or entry, manager? processor.delete_queue_file() # ===================== @@ -523,7 +524,20 @@ class VideoProcessingManager(ProcessingManager): self.add_processor(Resizer) self.add_processor(Transcoder) - def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None): - ProcessMedia().apply_async( - [entry.id, feed_url, reprocess_action, reprocess_info], {}, - task_id=entry.queued_task_id) + def workflow(self, entry, manager, feed_url, reprocess_action, + reprocess_info=None): + + reprocess_info['entry'] = entry.id # ? + reprocess_info['manager'] = manager # can celery serialize this? + + # Add args + + transcoding_tasks = group( + main_task.signature(queue='default', priority=5, immutable=True), + complimentary_task.signature(queue='default', priority=4, immutable=True), + complimentary_task.signature(queue='default', priority=3, immutable=True), + complimentary_task.signature(queue='default', priority=2, immutable=True) + complimentary_task.signature(queue='default', priority=1, immutable=True) + ) + + chord(transcoding_tasks)(processing_cleanup.signature(queue='default', immutable=True)) diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py index 4e5853c1..76f81faa 100644 --- a/mediagoblin/processing/__init__.py +++ b/mediagoblin/processing/__init__.py @@ -257,7 +257,8 @@ class ProcessingManager(object): return processor - def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None): + def workflow(self, entry, manager, feed_url, reprocess_action, + reprocess_info=None): """ Returns the Celery command needed to proceed with media processing *This method has to be implemented in all media types* diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index 402eb851..1c78f73a 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -267,7 +267,7 @@ def run_process_media(entry, feed_url=None, entry, manager = get_entry_and_processing_manager(entry.id) try: - manager.workflow(entry, feed_url, reprocess_action, reprocess_info) + manager.workflow(entry, manager, feed_url, reprocess_action, reprocess_info) except BaseException as exc: # The purpose of this section is because when running in "lazy" # or always-eager-with-exceptions-propagated celery mode that -- 2.25.1