Add priority to the celery tasks
authorvijeth-aradhya <vijthaaa@gmail.com>
Mon, 12 Jun 2017 15:23:23 +0000 (20:53 +0530)
committervijeth-aradhya <vijthaaa@gmail.com>
Mon, 12 Jun 2017 15:23:23 +0000 (20:53 +0530)
Few more changes to be made before executing the tasks.
Also #1 should be handled soon after this.

mediagoblin/media_types/ascii/processing.py
mediagoblin/media_types/audio/processing.py
mediagoblin/media_types/image/processing.py
mediagoblin/media_types/pdf/processing.py
mediagoblin/media_types/raw_image/processing.py
mediagoblin/media_types/stl/processing.py
mediagoblin/media_types/video/processing.py
mediagoblin/processing/__init__.py
mediagoblin/submit/lib.py

index 71ccc86ef07b406ea013f035b0a9b36c67bf87ed..823dc4fd2caf25f4935770565fd1fdacb063c2f4 100644 (file)
@@ -274,7 +274,8 @@ class AsciiProcessingManager(ProcessingManager):
         self.add_processor(InitialProcessor)
         self.add_processor(Resizer)
 
-    def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+    def workflow(self, entry, manager, feed_url, reprocess_action,
+                 reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index a83d60f750539000d372e5cdb892a713032b6cb1..b74364bca36e8d7a5d44a0577a4049aac67506fe 100644 (file)
@@ -366,7 +366,8 @@ class AudioProcessingManager(ProcessingManager):
         self.add_processor(Resizer)
         self.add_processor(Transcoder)
 
-    def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+    def workflow(self, entry, manager, feed_url, reprocess_action,
+                 reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index 42234eff12288c778e2974a92203bdf3e6313888..a189fef3e7f09fb163e270e0df9ea25f467a1f8e 100644 (file)
@@ -431,7 +431,8 @@ class ImageProcessingManager(ProcessingManager):
         self.add_processor(Resizer)
         self.add_processor(MetadataProcessing)
 
-    def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+    def workflow(self, entry, manager, feed_url, reprocess_action,
+                 reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index d93b19bb70c8fcf29156e29df0e2f123ed6d6202..6a13c8e3656f5e017ad0ade6d2a21817d51e7440 100644 (file)
@@ -471,7 +471,8 @@ class PdfProcessingManager(ProcessingManager):
         self.add_processor(InitialProcessor)
         self.add_processor(Resizer)
 
-    def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+    def workflow(self, entry, manager, feed_url, reprocess_action,
+                 reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index a385d563b7294beec5cb4f8017b9168645368801..7f2d155a043c6c2ae7b85774b353d71cc90c0637 100644 (file)
@@ -81,7 +81,8 @@ class RawImageProcessingManager(ProcessingManager):
         self.add_processor(InitialRawProcessor)
         self.add_processor(Resizer)
 
-    def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+    def workflow(self, entry, manager, feed_url, reprocess_action,
+                 reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index 7f2f350d41a872ac51dce1a290c810b1f3d8fcca..9dd6d49ba07f273c68b4a9a227d13e393f389957 100644 (file)
@@ -369,7 +369,8 @@ class StlProcessingManager(ProcessingManager):
         self.add_processor(InitialProcessor)
         self.add_processor(Resizer)
 
-    def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+    def workflow(self, entry, manager, feed_url, reprocess_action,
+                 reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index 5cae42f5454a23fb7952c98b09f5fcf69cdd95b9..d039c24b57d2314cdba9c247452b8b12b8b05789 100644 (file)
@@ -22,6 +22,7 @@ import celery
 
 import six
 
+from celery import group, chord
 from mediagoblin import mg_globals as mgg
 from mediagoblin.processing import (
     FilenameBuilder, BaseProcessingFail,
@@ -34,7 +35,7 @@ from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
 from mediagoblin.media_types import MissingComponents
 
 from . import transcoders
-from .util import skip_transcode
+from .util import skip_transcode, ACCEPTED_RESOLUTIONS
 
 _log = logging.getLogger(__name__)
 _log.setLevel(logging.DEBUG)
@@ -165,26 +166,26 @@ def store_metadata(media_entry, metadata):
 
 
 @celery.task()
-def main_task(**process_info):
+def main_task(resolution, medium_size, **process_info):
     processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
-    processor.common_setup(process_info['resolution'])
-    processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'],
+    processor.common_setup(resolution)
+    processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
                         vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
     processor.generate_thumb(thumb_size=process_info['thumb_size'])
     processor.store_orig_metadata()
 
 
 @celery.task()
-def complimentary_task(**process_info):
+def complimentary_task(resolution, medium_size, **process_info):
     processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
-    processor.common_setup(process_info['resolution'])
-    processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'],
+    processor.common_setup(resolution)
+    processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
                         vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
 
 
 @celery.task()
-def processing_cleanup(**process_info):
-    processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
+def processing_cleanup(entry, manager):
+    processor = CommonVideoProcessor(manager, entry) # is it manager, entry or entry, manager?
     processor.delete_queue_file()
 
 # =====================
@@ -523,7 +524,20 @@ class VideoProcessingManager(ProcessingManager):
         self.add_processor(Resizer)
         self.add_processor(Transcoder)
 
-    def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
-        ProcessMedia().apply_async(
-            [entry.id, feed_url, reprocess_action, reprocess_info], {},
-            task_id=entry.queued_task_id)
+    def workflow(self, entry, manager, feed_url, reprocess_action,
+                 reprocess_info=None):
+
+        reprocess_info['entry'] = entry.id # ?
+        reprocess_info['manager'] = manager # can celery serialize this?
+
+        # Add args
+
+        transcoding_tasks = group(
+            main_task.signature(queue='default', priority=5, immutable=True),
+            complimentary_task.signature(queue='default', priority=4, immutable=True),
+            complimentary_task.signature(queue='default', priority=3, immutable=True),
+            complimentary_task.signature(queue='default', priority=2, immutable=True)
+            complimentary_task.signature(queue='default', priority=1, immutable=True)
+        )
+
+        chord(transcoding_tasks)(processing_cleanup.signature(queue='default', immutable=True))    
index 4e5853c113384645cbfbcf72fc20b13852243151..76f81faa53352d6fd93a6771d09c574ccae77cdb 100644 (file)
@@ -257,7 +257,8 @@ class ProcessingManager(object):
 
         return processor
 
-    def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+    def workflow(self, entry, manager, feed_url, reprocess_action,
+                 reprocess_info=None):
         """
         Returns the Celery command needed to proceed with media processing
         *This method has to be implemented in all media types*
index 402eb8515bfd3bcce41619c329034e91ec83e10b..1c78f73acf66a421621895cfab278ea3dbb61ad8 100644 (file)
@@ -267,7 +267,7 @@ def run_process_media(entry, feed_url=None,
     entry, manager = get_entry_and_processing_manager(entry.id)
 
     try:
-        manager.workflow(entry, feed_url, reprocess_action, reprocess_info)
+        manager.workflow(entry, manager, feed_url, reprocess_action, reprocess_info)
     except BaseException as exc:
         # The purpose of this section is because when running in "lazy"
         # or always-eager-with-exceptions-propagated celery mode that