Few more changes to be made before executing the tasks.
Also #1 should be handled soon after this.
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
self.add_processor(Resizer)
self.add_processor(Transcoder)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
self.add_processor(Resizer)
self.add_processor(MetadataProcessing)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
self.add_processor(InitialRawProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
self.add_processor(InitialProcessor)
self.add_processor(Resizer)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
ProcessMedia().apply_async(
[entry.id, feed_url, reprocess_action, reprocess_info], {},
task_id=entry.queued_task_id)
import six
+from celery import group, chord
from mediagoblin import mg_globals as mgg
from mediagoblin.processing import (
FilenameBuilder, BaseProcessingFail,
from mediagoblin.media_types import MissingComponents
from . import transcoders
-from .util import skip_transcode
+from .util import skip_transcode, ACCEPTED_RESOLUTIONS
_log = logging.getLogger(__name__)
_log.setLevel(logging.DEBUG)
@celery.task()
-def main_task(**process_info):
+def main_task(resolution, medium_size, **process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
- processor.common_setup(process_info['resolution'])
- processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'],
+ processor.common_setup(resolution)
+ processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
processor.generate_thumb(thumb_size=process_info['thumb_size'])
processor.store_orig_metadata()
@celery.task()
-def complimentary_task(**process_info):
+def complimentary_task(resolution, medium_size, **process_info):
processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
- processor.common_setup(process_info['resolution'])
- processor.transcode(medium_size=process_info['medium_size'], vp8_quality=process_info['vp8_quality'],
+ processor.common_setup(resolution)
+ processor.transcode(medium_size=medium_size, vp8_quality=process_info['vp8_quality'],
vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
@celery.task()
-def processing_cleanup(**process_info):
- processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
+def processing_cleanup(entry, manager):
+ processor = CommonVideoProcessor(manager, entry) # is it manager, entry or entry, manager?
processor.delete_queue_file()
# =====================
self.add_processor(Resizer)
self.add_processor(Transcoder)
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
- ProcessMedia().apply_async(
- [entry.id, feed_url, reprocess_action, reprocess_info], {},
- task_id=entry.queued_task_id)
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
+
+ reprocess_info['entry'] = entry.id # ?
+ reprocess_info['manager'] = manager # can celery serialize this?
+
+ # Add args
+
+ transcoding_tasks = group(
+ main_task.signature(queue='default', priority=5, immutable=True),
+ complimentary_task.signature(queue='default', priority=4, immutable=True),
+ complimentary_task.signature(queue='default', priority=3, immutable=True),
+ complimentary_task.signature(queue='default', priority=2, immutable=True)
+ complimentary_task.signature(queue='default', priority=1, immutable=True)
+ )
+
+ chord(transcoding_tasks)(processing_cleanup.signature(queue='default', immutable=True))
return processor
- def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
+ def workflow(self, entry, manager, feed_url, reprocess_action,
+ reprocess_info=None):
"""
Returns the Celery command needed to proceed with media processing
*This method has to be implemented in all media types*
entry, manager = get_entry_and_processing_manager(entry.id)
try:
- manager.workflow(entry, feed_url, reprocess_action, reprocess_info)
+ manager.workflow(entry, manager, feed_url, reprocess_action, reprocess_info)
except BaseException as exc:
# The purpose of this section is because when running in "lazy"
# or always-eager-with-exceptions-propagated celery mode that