Celery Priority testing with debug statements
authorvijeth-aradhya <vijthaaa@gmail.com>
Mon, 12 Jun 2017 20:13:43 +0000 (01:43 +0530)
committervijeth-aradhya <vijthaaa@gmail.com>
Mon, 12 Jun 2017 20:13:43 +0000 (01:43 +0530)
Error at this line:
`self.entry.set_file_metadata(self.curr_file, **file_metadata)`
Otherwise, celery part should work fine.

mediagoblin/init/celery/__init__.py
mediagoblin/media_types/ascii/processing.py
mediagoblin/media_types/audio/processing.py
mediagoblin/media_types/image/processing.py
mediagoblin/media_types/pdf/processing.py
mediagoblin/media_types/raw_image/processing.py
mediagoblin/media_types/stl/processing.py
mediagoblin/media_types/video/processing.py
mediagoblin/processing/__init__.py
mediagoblin/submit/lib.py

index 9a67942cbf2b23e1633ced8decdbf65ffb523b13..a33359583a01f3f98e3c976089c97cbfa631c725 100644 (file)
@@ -55,6 +55,9 @@ def get_celery_settings_dict(app_config, global_config,
               queue_arguments={'x-max-priority': 10}),
     )
 
+    print "CELERY_ACKS_LATE", celery_conf['CELERY_ACKS_LATE']
+    print "CELERYD_PREFETCH_MULTIPLIER", celery_conf['CELERYD_PREFETCH_MULTIPLIER']
+
     celery_settings = {}
 
     # Add all celery settings from config
index 823dc4fd2caf25f4935770565fd1fdacb063c2f4..c9b47fb5d620f788b44b2f723763456edb48fd01 100644 (file)
@@ -274,8 +274,7 @@ class AsciiProcessingManager(ProcessingManager):
         self.add_processor(InitialProcessor)
         self.add_processor(Resizer)
 
-    def workflow(self, entry, manager, feed_url, reprocess_action,
-                 reprocess_info=None):
+    def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index b74364bca36e8d7a5d44a0577a4049aac67506fe..15d0b0a7773c55083effda2c67be8ae4eb9ac39f 100644 (file)
@@ -366,8 +366,7 @@ class AudioProcessingManager(ProcessingManager):
         self.add_processor(Resizer)
         self.add_processor(Transcoder)
 
-    def workflow(self, entry, manager, feed_url, reprocess_action,
-                 reprocess_info=None):
+    def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index a189fef3e7f09fb163e270e0df9ea25f467a1f8e..7224a8fdc7e87df9db59beff7375c386bcb6ec1d 100644 (file)
@@ -431,8 +431,7 @@ class ImageProcessingManager(ProcessingManager):
         self.add_processor(Resizer)
         self.add_processor(MetadataProcessing)
 
-    def workflow(self, entry, manager, feed_url, reprocess_action,
-                 reprocess_info=None):
+    def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index 6a13c8e3656f5e017ad0ade6d2a21817d51e7440..e6e6e0a9b8e8a5033f02ace5abf9cf070d7e011a 100644 (file)
@@ -471,8 +471,7 @@ class PdfProcessingManager(ProcessingManager):
         self.add_processor(InitialProcessor)
         self.add_processor(Resizer)
 
-    def workflow(self, entry, manager, feed_url, reprocess_action,
-                 reprocess_info=None):
+    def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index 7f2d155a043c6c2ae7b85774b353d71cc90c0637..4bfd9f3a87b10030bbe6c5f28e0da90551af5b67 100644 (file)
@@ -81,8 +81,7 @@ class RawImageProcessingManager(ProcessingManager):
         self.add_processor(InitialRawProcessor)
         self.add_processor(Resizer)
 
-    def workflow(self, entry, manager, feed_url, reprocess_action,
-                 reprocess_info=None):
+    def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index 9dd6d49ba07f273c68b4a9a227d13e393f389957..cd3ffd8c84852a9f50bb679953e879a9e50b4f07 100644 (file)
@@ -369,8 +369,7 @@ class StlProcessingManager(ProcessingManager):
         self.add_processor(InitialProcessor)
         self.add_processor(Resizer)
 
-    def workflow(self, entry, manager, feed_url, reprocess_action,
-                 reprocess_info=None):
+    def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
         ProcessMedia().apply_async(
             [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
index 64cacb5f0a2ff879f1750c549c9f77d7fc2e23a3..c3257c8407c5c0ff540bdf2fc8b97027866f8be0 100644 (file)
@@ -29,7 +29,7 @@ from mediagoblin.processing import (
     ProgressCallback, MediaProcessor,
     ProcessingManager, request_from_args,
     get_process_filename, store_public,
-    copy_original)
+    copy_original, get_entry_and_processing_manager)
 from mediagoblin.processing.task import ProcessMedia
 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
 from mediagoblin.media_types import MissingComponents
@@ -166,27 +166,35 @@ def store_metadata(media_entry, metadata):
 
 
 @celery.task()
-def main_task(resolution, medium_size, **process_info):
-    processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
-    processor.common_setup(resolution)
-    processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
-                        vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
-    processor.generate_thumb(thumb_size=process_info['thumb_size'])
-    processor.store_orig_metadata()
+def main_task(entry_id, resolution, medium_size, **process_info):
+    entry, manager = get_entry_and_processing_manager(entry_id)
+    print "\nEntered main_task\n"
+    with CommonVideoProcessor(manager, entry) as processor:
+        processor.common_setup(resolution)
+        processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'],
+                            vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
+        processor.generate_thumb(thumb_size=process_info['thumb_size'])
+        processor.store_orig_metadata()
+        print "\nExited main_task\n"
 
 
 @celery.task()
-def complimentary_task(resolution, medium_size, **process_info):
-    processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
-    processor.common_setup(resolution)
-    processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
-                        vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
+def complimentary_task(entry_id, resolution, medium_size, **process_info):    
+    entry, manager = get_entry_and_processing_manager(entry_id)
+    print "\nEntered complimentary_task\n"
+    with CommonVideoProcessor(manager, entry) as processor:
+        processor.common_setup(resolution)
+        processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'],
+                            vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
+        print "\nExited complimentary_task\n"
 
 
 @celery.task()
-def processing_cleanup(entry, manager):
-    processor = CommonVideoProcessor(manager, entry)
-    processor.delete_queue_file()
+def processing_cleanup(entry_id):
+    entry, manager = get_entry_and_processing_manager(entry_id)
+    with CommonVideoProcessor(manager, entry) as processor:
+        processor.delete_queue_file()
+        print "\nDeleted queue_file\n"
 
 # =====================
 
@@ -206,7 +214,7 @@ class CommonVideoProcessor(MediaProcessor):
         self.process_filename = get_process_filename(
             self.entry, self.workbench, self.acceptable_files)
         self.name_builder = FilenameBuilder(self.process_filename)
-
+        
         self.transcoder = transcoders.VideoTranscoder()
         self.did_transcode = False
 
@@ -218,6 +226,8 @@ class CommonVideoProcessor(MediaProcessor):
             self.curr_file = 'webm_video'
             self.part_filename = self.name_builder.fill('{basename}.medium.webm')
 
+        print self.curr_file, ":      Done common_setup()"
+
     def copy_original(self):
         # If we didn't transcode, then we need to keep the original
         raise NotImplementedError
@@ -254,6 +264,7 @@ class CommonVideoProcessor(MediaProcessor):
 
     def transcode(self, medium_size=None, vp8_quality=None, vp8_threads=None,
                   vorbis_quality=None):
+        print self.curr_file, ":      Enter transcode"
         progress_callback = ProgressCallback(self.entry)
         tmp_dst = os.path.join(self.workbench.dir, self.part_filename)
 
@@ -292,25 +303,34 @@ class CommonVideoProcessor(MediaProcessor):
                 self.entry.media_files[self.curr_file].delete()
 
         else:
+            print self.curr_file, ":      ->1.1"
+            print type(medium_size)
+            medium_size = tuple(medium_size)
+            print type(medium_size)
+            print self.curr_file, ":      ->1.2"
             self.transcoder.transcode(self.process_filename, tmp_dst,
                                       vp8_quality=vp8_quality,
                                       vp8_threads=vp8_threads,
                                       vorbis_quality=vorbis_quality,
                                       progress_callback=progress_callback,
                                       dimensions=tuple(medium_size))
+            print self.curr_file, ":      ->2"
             if self.transcoder.dst_data:
+                print self.curr_file, ":      ->3"
                 # Push transcoded video to public storage
                 _log.debug('Saving medium...')
-                store_public(self.entry, 'webm_video', tmp_dst,
-                             self.name_builder.fill('{basename}.medium.webm'))
+                store_public(self.entry, 'webm_video', tmp_dst, self.part_filename)
                 _log.debug('Saved medium')
 
+                print self.curr_file, ":      ->4"
                 # Is this the file_metadata that paroneayea was talking about?
                 self.entry.set_file_metadata(self.curr_file, **file_metadata)
 
                 self.did_transcode = True
+                print self.curr_file, ":      Done transcode()"
 
     def generate_thumb(self, thumb_size=None):
+        print self.curr_file, ":      Enter generate_thumb()"
         # Temporary file for the video thumbnail (cleaned up with workbench)
         tmp_thumb = os.path.join(self.workbench.dir,
                                  self.name_builder.fill(
@@ -339,9 +359,10 @@ class CommonVideoProcessor(MediaProcessor):
                      self.name_builder.fill('{basename}.thumbnail.jpg'))
 
         self.entry.set_file_metadata('thumb', thumb_size=thumb_size)
+        print self.curr_file, ":      Done generate_thumb()"
 
     def store_orig_metadata(self):
-
+        print self.curr_file, ":      2"
         # Extract metadata and keep a record of it
         metadata = transcoders.discover(self.process_filename)
 
@@ -524,25 +545,41 @@ class VideoProcessingManager(ProcessingManager):
         self.add_processor(Resizer)
         self.add_processor(Transcoder)
 
-    def workflow(self, entry, manager, feed_url, reprocess_action,
-                 reprocess_info=None):
+    def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
 
-        reprocess_info['entry'] = entry
-        reprocess_info['manager'] = manager
+        reprocess_info = reprocess_info or {}
+        if 'vp8_quality' not in reprocess_info:
+            reprocess_info['vp8_quality'] = None
+        if 'vorbis_quality' not in reprocess_info:
+            reprocess_info['vorbis_quality'] = None
+        if 'vp8_threads' not in reprocess_info:
+            reprocess_info['vp8_threads'] = None
+        if 'thumb_size' not in reprocess_info:
+            reprocess_info['thumb_size'] = None
 
-        transcoding_tasks = group(
-            main_task.signature(args=('480p', ACCEPTED_RESOLUTIONS['480p']),
+        transcoding_tasks = group([
+            main_task.signature(args=(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p']),
                                 kwargs=reprocess_info, queue='default',
                                 priority=5, immutable=True),
-            complimentary_task.signature(args=('360p', ACCEPTED_RESOLUTIONS['360p']),
+        ])
+
+        cleanup_task = processing_cleanup.signature(args=(entry_id,),
+                                                    queue='default', immutable=True)
+
+        """
+            complimentary_task.signature(args=(entry_id, '360p', ACCEPTED_RESOLUTIONS['360p']),
                                          kwargs=reprocess_info, queue='default',
                                          priority=4, immutable=True),
-            complimentary_task.signature(args=('720p', ACCEPTED_RESOLUTIONS['720p']),
+            complimentary_task.signature(args=(entry_id, '720p', ACCEPTED_RESOLUTIONS['720p']),
                                          kwargs=reprocess_info, queue='default',
                                          priority=3, immutable=True),
-        )
+        main_task.apply_async(args=(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p']),
+                              kwargs=reprocess_info, queue='default',
+                              priority=5, immutable=True)
+        processing_cleanup.apply_async(args=(entry_id,), queue='default', immutable=True)
+        """
 
-        cleanup_task = processing_cleanup.signature(args=(entry, manager),
-                                                    queue='default', immutable=True)
-        
         chord(transcoding_tasks)(cleanup_task)
+
+        # main_task(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p'], **reprocess_info)
+        # processing_cleanup(entry_id)
index 76f81faa53352d6fd93a6771d09c574ccae77cdb..98031bbcaba183bd91049915f7af50f4a14f5b35 100644 (file)
@@ -257,8 +257,7 @@ class ProcessingManager(object):
 
         return processor
 
-    def workflow(self, entry, manager, feed_url, reprocess_action,
-                 reprocess_info=None):
+    def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None):
         """
         Returns the Celery command needed to proceed with media processing
         *This method has to be implemented in all media types*
index 1c78f73acf66a421621895cfab278ea3dbb61ad8..f347e715944e4be4f78c4f44ae5dcefb407acf70 100644 (file)
@@ -267,7 +267,7 @@ def run_process_media(entry, feed_url=None,
     entry, manager = get_entry_and_processing_manager(entry.id)
 
     try:
-        manager.workflow(entry, manager, feed_url, reprocess_action, reprocess_info)
+        manager.workflow(entry.id, feed_url, reprocess_action, reprocess_info)
     except BaseException as exc:
         # The purpose of this section is because when running in "lazy"
         # or always-eager-with-exceptions-propagated celery mode that