From d77eb56280f57e547294e29e6a1b2b4d46c15ac6 Mon Sep 17 00:00:00 2001 From: vijeth-aradhya Date: Tue, 13 Jun 2017 01:43:43 +0530 Subject: [PATCH] Celery Priority testing with debug statements Error at this line: `self.entry.set_file_metadata(self.curr_file, **file_metadata)` Otherwise, celery part should work fine. --- mediagoblin/init/celery/__init__.py | 3 + mediagoblin/media_types/ascii/processing.py | 3 +- mediagoblin/media_types/audio/processing.py | 3 +- mediagoblin/media_types/image/processing.py | 3 +- mediagoblin/media_types/pdf/processing.py | 3 +- .../media_types/raw_image/processing.py | 3 +- mediagoblin/media_types/stl/processing.py | 3 +- mediagoblin/media_types/video/processing.py | 101 ++++++++++++------ mediagoblin/processing/__init__.py | 3 +- mediagoblin/submit/lib.py | 2 +- 10 files changed, 80 insertions(+), 47 deletions(-) diff --git a/mediagoblin/init/celery/__init__.py b/mediagoblin/init/celery/__init__.py index 9a67942c..a3335958 100644 --- a/mediagoblin/init/celery/__init__.py +++ b/mediagoblin/init/celery/__init__.py @@ -55,6 +55,9 @@ def get_celery_settings_dict(app_config, global_config, queue_arguments={'x-max-priority': 10}), ) + print "CELERY_ACKS_LATE", celery_conf['CELERY_ACKS_LATE'] + print "CELERYD_PREFETCH_MULTIPLIER", celery_conf['CELERYD_PREFETCH_MULTIPLIER'] + celery_settings = {} # Add all celery settings from config diff --git a/mediagoblin/media_types/ascii/processing.py b/mediagoblin/media_types/ascii/processing.py index 823dc4fd..c9b47fb5 100644 --- a/mediagoblin/media_types/ascii/processing.py +++ b/mediagoblin/media_types/ascii/processing.py @@ -274,8 +274,7 @@ class AsciiProcessingManager(ProcessingManager): self.add_processor(InitialProcessor) self.add_processor(Resizer) - def workflow(self, entry, manager, feed_url, reprocess_action, - reprocess_info=None): + def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/audio/processing.py b/mediagoblin/media_types/audio/processing.py index b74364bc..15d0b0a7 100644 --- a/mediagoblin/media_types/audio/processing.py +++ b/mediagoblin/media_types/audio/processing.py @@ -366,8 +366,7 @@ class AudioProcessingManager(ProcessingManager): self.add_processor(Resizer) self.add_processor(Transcoder) - def workflow(self, entry, manager, feed_url, reprocess_action, - reprocess_info=None): + def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/image/processing.py b/mediagoblin/media_types/image/processing.py index a189fef3..7224a8fd 100644 --- a/mediagoblin/media_types/image/processing.py +++ b/mediagoblin/media_types/image/processing.py @@ -431,8 +431,7 @@ class ImageProcessingManager(ProcessingManager): self.add_processor(Resizer) self.add_processor(MetadataProcessing) - def workflow(self, entry, manager, feed_url, reprocess_action, - reprocess_info=None): + def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/pdf/processing.py b/mediagoblin/media_types/pdf/processing.py index 6a13c8e3..e6e6e0a9 100644 --- a/mediagoblin/media_types/pdf/processing.py +++ b/mediagoblin/media_types/pdf/processing.py @@ -471,8 +471,7 @@ class PdfProcessingManager(ProcessingManager): self.add_processor(InitialProcessor) self.add_processor(Resizer) - def workflow(self, entry, manager, feed_url, reprocess_action, - reprocess_info=None): + def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/raw_image/processing.py b/mediagoblin/media_types/raw_image/processing.py index 7f2d155a..4bfd9f3a 100644 --- a/mediagoblin/media_types/raw_image/processing.py +++ b/mediagoblin/media_types/raw_image/processing.py @@ -81,8 +81,7 @@ class RawImageProcessingManager(ProcessingManager): self.add_processor(InitialRawProcessor) self.add_processor(Resizer) - def workflow(self, entry, manager, feed_url, reprocess_action, - reprocess_info=None): + def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/stl/processing.py b/mediagoblin/media_types/stl/processing.py index 9dd6d49b..cd3ffd8c 100644 --- a/mediagoblin/media_types/stl/processing.py +++ b/mediagoblin/media_types/stl/processing.py @@ -369,8 +369,7 @@ class StlProcessingManager(ProcessingManager): self.add_processor(InitialProcessor) self.add_processor(Resizer) - def workflow(self, entry, manager, feed_url, reprocess_action, - reprocess_info=None): + def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None): ProcessMedia().apply_async( [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) diff --git a/mediagoblin/media_types/video/processing.py b/mediagoblin/media_types/video/processing.py index 64cacb5f..c3257c84 100644 --- a/mediagoblin/media_types/video/processing.py +++ b/mediagoblin/media_types/video/processing.py @@ -29,7 +29,7 @@ from mediagoblin.processing import ( ProgressCallback, MediaProcessor, ProcessingManager, request_from_args, get_process_filename, store_public, - copy_original) + copy_original, get_entry_and_processing_manager) from mediagoblin.processing.task import ProcessMedia from mediagoblin.tools.translate import lazy_pass_to_ugettext as _ from mediagoblin.media_types import MissingComponents @@ -166,27 +166,35 @@ def store_metadata(media_entry, metadata): @celery.task() -def main_task(resolution, medium_size, **process_info): - processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) - processor.common_setup(resolution) - processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'], - vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality']) - processor.generate_thumb(thumb_size=process_info['thumb_size']) - processor.store_orig_metadata() +def main_task(entry_id, resolution, medium_size, **process_info): + entry, manager = get_entry_and_processing_manager(entry_id) + print "\nEntered main_task\n" + with CommonVideoProcessor(manager, entry) as processor: + processor.common_setup(resolution) + processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'], + vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality']) + processor.generate_thumb(thumb_size=process_info['thumb_size']) + processor.store_orig_metadata() + print "\nExited main_task\n" @celery.task() -def complimentary_task(resolution, medium_size, **process_info): - processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) - processor.common_setup(resolution) - processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'], - vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality']) +def complimentary_task(entry_id, resolution, medium_size, **process_info): + entry, manager = get_entry_and_processing_manager(entry_id) + print "\nEntered complimentary_task\n" + with CommonVideoProcessor(manager, entry) as processor: + processor.common_setup(resolution) + processor.transcode(medium_size=tuple(medium_size), vp8_quality=process_info['vp8_quality'], + vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality']) + print "\nExited complimentary_task\n" @celery.task() -def processing_cleanup(entry, manager): - processor = CommonVideoProcessor(manager, entry) - processor.delete_queue_file() +def processing_cleanup(entry_id): + entry, manager = get_entry_and_processing_manager(entry_id) + with CommonVideoProcessor(manager, entry) as processor: + processor.delete_queue_file() + print "\nDeleted queue_file\n" # ===================== @@ -206,7 +214,7 @@ class CommonVideoProcessor(MediaProcessor): self.process_filename = get_process_filename( self.entry, self.workbench, self.acceptable_files) self.name_builder = FilenameBuilder(self.process_filename) - + self.transcoder = transcoders.VideoTranscoder() self.did_transcode = False @@ -218,6 +226,8 @@ class CommonVideoProcessor(MediaProcessor): self.curr_file = 'webm_video' self.part_filename = self.name_builder.fill('{basename}.medium.webm') + print self.curr_file, ": Done common_setup()" + def copy_original(self): # If we didn't transcode, then we need to keep the original raise NotImplementedError @@ -254,6 +264,7 @@ class CommonVideoProcessor(MediaProcessor): def transcode(self, medium_size=None, vp8_quality=None, vp8_threads=None, vorbis_quality=None): + print self.curr_file, ": Enter transcode" progress_callback = ProgressCallback(self.entry) tmp_dst = os.path.join(self.workbench.dir, self.part_filename) @@ -292,25 +303,34 @@ class CommonVideoProcessor(MediaProcessor): self.entry.media_files[self.curr_file].delete() else: + print self.curr_file, ": ->1.1" + print type(medium_size) + medium_size = tuple(medium_size) + print type(medium_size) + print self.curr_file, ": ->1.2" self.transcoder.transcode(self.process_filename, tmp_dst, vp8_quality=vp8_quality, vp8_threads=vp8_threads, vorbis_quality=vorbis_quality, progress_callback=progress_callback, dimensions=tuple(medium_size)) + print self.curr_file, ": ->2" if self.transcoder.dst_data: + print self.curr_file, ": ->3" # Push transcoded video to public storage _log.debug('Saving medium...') - store_public(self.entry, 'webm_video', tmp_dst, - self.name_builder.fill('{basename}.medium.webm')) + store_public(self.entry, 'webm_video', tmp_dst, self.part_filename) _log.debug('Saved medium') + print self.curr_file, ": ->4" # Is this the file_metadata that paroneayea was talking about? self.entry.set_file_metadata(self.curr_file, **file_metadata) self.did_transcode = True + print self.curr_file, ": Done transcode()" def generate_thumb(self, thumb_size=None): + print self.curr_file, ": Enter generate_thumb()" # Temporary file for the video thumbnail (cleaned up with workbench) tmp_thumb = os.path.join(self.workbench.dir, self.name_builder.fill( @@ -339,9 +359,10 @@ class CommonVideoProcessor(MediaProcessor): self.name_builder.fill('{basename}.thumbnail.jpg')) self.entry.set_file_metadata('thumb', thumb_size=thumb_size) + print self.curr_file, ": Done generate_thumb()" def store_orig_metadata(self): - + print self.curr_file, ": 2" # Extract metadata and keep a record of it metadata = transcoders.discover(self.process_filename) @@ -524,25 +545,41 @@ class VideoProcessingManager(ProcessingManager): self.add_processor(Resizer) self.add_processor(Transcoder) - def workflow(self, entry, manager, feed_url, reprocess_action, - reprocess_info=None): + def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None): - reprocess_info['entry'] = entry - reprocess_info['manager'] = manager + reprocess_info = reprocess_info or {} + if 'vp8_quality' not in reprocess_info: + reprocess_info['vp8_quality'] = None + if 'vorbis_quality' not in reprocess_info: + reprocess_info['vorbis_quality'] = None + if 'vp8_threads' not in reprocess_info: + reprocess_info['vp8_threads'] = None + if 'thumb_size' not in reprocess_info: + reprocess_info['thumb_size'] = None - transcoding_tasks = group( - main_task.signature(args=('480p', ACCEPTED_RESOLUTIONS['480p']), + transcoding_tasks = group([ + main_task.signature(args=(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p']), kwargs=reprocess_info, queue='default', priority=5, immutable=True), - complimentary_task.signature(args=('360p', ACCEPTED_RESOLUTIONS['360p']), + ]) + + cleanup_task = processing_cleanup.signature(args=(entry_id,), + queue='default', immutable=True) + + """ + complimentary_task.signature(args=(entry_id, '360p', ACCEPTED_RESOLUTIONS['360p']), kwargs=reprocess_info, queue='default', priority=4, immutable=True), - complimentary_task.signature(args=('720p', ACCEPTED_RESOLUTIONS['720p']), + complimentary_task.signature(args=(entry_id, '720p', ACCEPTED_RESOLUTIONS['720p']), kwargs=reprocess_info, queue='default', priority=3, immutable=True), - ) + main_task.apply_async(args=(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p']), + kwargs=reprocess_info, queue='default', + priority=5, immutable=True) + processing_cleanup.apply_async(args=(entry_id,), queue='default', immutable=True) + """ - cleanup_task = processing_cleanup.signature(args=(entry, manager), - queue='default', immutable=True) - chord(transcoding_tasks)(cleanup_task) + + # main_task(entry_id, '480p', ACCEPTED_RESOLUTIONS['480p'], **reprocess_info) + # processing_cleanup(entry_id) diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py index 76f81faa..98031bbc 100644 --- a/mediagoblin/processing/__init__.py +++ b/mediagoblin/processing/__init__.py @@ -257,8 +257,7 @@ class ProcessingManager(object): return processor - def workflow(self, entry, manager, feed_url, reprocess_action, - reprocess_info=None): + def workflow(self, entry_id, feed_url, reprocess_action, reprocess_info=None): """ Returns the Celery command needed to proceed with media processing *This method has to be implemented in all media types* diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index 1c78f73a..f347e715 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -267,7 +267,7 @@ def run_process_media(entry, feed_url=None, entry, manager = get_entry_and_processing_manager(entry.id) try: - manager.workflow(entry, manager, feed_url, reprocess_action, reprocess_info) + manager.workflow(entry.id, feed_url, reprocess_action, reprocess_info) except BaseException as exc: # The purpose of this section is because when running in "lazy" # or always-eager-with-exceptions-propagated celery mode that -- 2.25.1