From 77ea4c9bd1e8372fb7206596ca5125738033ced5 Mon Sep 17 00:00:00 2001 From: Christopher Allan Webber Date: Sun, 11 Aug 2013 14:34:45 -0500 Subject: [PATCH] Updating to the point where we can allllmost run with the new reprocessing code MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This commit sponsored by Odin Hørthe Omdal. Thank you! --- mediagoblin/gmg_commands/reprocess.py | 38 +++++++++---------------- mediagoblin/processing/__init__.py | 40 +++++++++++++++++++++++++-- mediagoblin/processing/task.py | 26 ++++++----------- mediagoblin/submit/lib.py | 8 ++++-- 4 files changed, 64 insertions(+), 48 deletions(-) diff --git a/mediagoblin/gmg_commands/reprocess.py b/mediagoblin/gmg_commands/reprocess.py index 6d04427e..24fcde37 100644 --- a/mediagoblin/gmg_commands/reprocess.py +++ b/mediagoblin/gmg_commands/reprocess.py @@ -19,9 +19,12 @@ import os from mediagoblin import mg_globals from mediagoblin.db.models import MediaEntry from mediagoblin.gmg_commands import util as commands_util +from mediagoblin.submit.lib import run_process_media from mediagoblin.tools.translate import lazy_pass_to_ugettext as _ from mediagoblin.tools.pluginapi import hook_handle -from mediagoblin.processing import ProcessorDoesNotExist, ProcessorNotEligible +from mediagoblin.processing import ( + ProcessorDoesNotExist, ProcessorNotEligible, + get_entry_and_manager, get_manager_for_type) def reprocess_parser_setup(subparser): @@ -205,31 +208,16 @@ def _set_media_state(args): args[0].state = 'processed' -class MediaEntryNotFound(Exception): pass - -def extract_entry_and_type(media_id): - """ - Fetch a media entry, as well as its media type - """ - entry = MediaEntry.query.filter_by(id=media_id).first() - if entry is None: - raise MediaEntryNotFound("Can't find media with id '%s'" % media_id) - - return entry.media_type, entry - - def available(args): # Get the media type, either by looking up media id, or by specific type try: - media_id = int(args.id_or_type) - media_type, media_entry = extract_entry_and_type(media_id) + media_entry, manager = get_entry_and_manager(args.id_or_type) + media_type = media_entry.type except ValueError: media_type = args.id_or_type media_entry = None + manager = get_manager_for_type(media_type) - manager_class = hook_handle(('reprocess_manager', media_type)) - manager = manager_class() - if media_entry is None: processors = manager.list_all_processors() else: @@ -257,10 +245,7 @@ def available(args): def run(args): - media_type, media_entry = extract_entry_and_type(args.media_id) - - manager_class = hook_handle(('reprocess_manager', media_type)) - manager = manager_class() + media_entry, manager = get_entry_and_manager(args.media_id) # TODO: (maybe?) This could probably be handled entirely by the # processor class... @@ -279,8 +264,11 @@ def run(args): reprocess_parser = processor_class.generate_parser() reprocess_args = reprocess_parser.parse_args(args.reprocess_args) reprocess_request = processor_class.args_to_request(reprocess_args) - processor = processor_class(manager, media_entry) - processor.process(**reprocess_request) + run_process_media( + media_entry, + reprocess_action=args.reprocess_command, + reprocess_info=reprocess_request) + manager.process(media_entry, args.reprocess_command, **reprocess_request) def reprocess(args): diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py index 1c8f7202..b668baa7 100644 --- a/mediagoblin/processing/__init__.py +++ b/mediagoblin/processing/__init__.py @@ -18,9 +18,10 @@ from collections import OrderedDict import logging import os -from mediagoblin.db.util import atomic_update from mediagoblin import mg_globals as mgg - +from mediagoblin.db.util import atomic_update +from mediagoblin.db.models import MediaEntry +from mediagoblin.tools.pluginapi import hook_handle from mediagoblin.tools.translate import lazy_pass_to_ugettext as _ _log = logging.getLogger(__name__) @@ -208,7 +209,7 @@ class ProcessingManager(object): return processor - def process(self, entry, directive, request): + def process_from_args(self, entry, reprocess_command, request): """ Process a media entry. """ @@ -226,6 +227,39 @@ def request_from_args(args, which_args): return request +class MediaEntryNotFound(Exception): pass + + +def get_manager_for_type(media_type): + """ + Get the appropriate media manager for this type + """ + manager_class = hook_handle(('reprocess_manager', media_type)) + manager = manager_class() + + return manager + + +def get_entry_and_manager(media_id): + """ + Get a MediaEntry, its media type, and its manager all in one go. + + Returns a tuple of: `(entry, media_type, media_manager)` + """ + entry = MediaEntry.query.filter_by(id=media_id).first() + if entry is None: + raise MediaEntryNotFound("Can't find media with id '%s'" % media_id) + + manager = get_manager_for_type(entry.media_type) + + return entry, manager + + +################################################ +# TODO: This ProcessingState is OUTDATED, +# and needs to be refactored into other tools! +################################################ + class ProcessingState(object): """ The first and only argument to the "processor" of a media type diff --git a/mediagoblin/processing/task.py b/mediagoblin/processing/task.py index 36ee31fd..240be4e5 100644 --- a/mediagoblin/processing/task.py +++ b/mediagoblin/processing/task.py @@ -21,9 +21,9 @@ import urllib2 from celery import registry, task from mediagoblin import mg_globals as mgg -from mediagoblin.db.models import MediaEntry -from . import mark_entry_failed, BaseProcessingFail, ProcessingState +from . import mark_entry_failed, BaseProcessingFail from mediagoblin.tools.processing import json_processing_callback +from mediagoblin.processing import get_entry_and_manager _log = logging.getLogger(__name__) logging.basicConfig() @@ -68,7 +68,7 @@ class ProcessMedia(task.Task): """ Pass this entry off for processing. """ - def run(self, media_id, feed_url, reprocess_info=None): + def run(self, media_id, feed_url, reprocess_action, reprocess_info=None): """ Pass the media entry off to the appropriate processing function (for now just process_image...) @@ -78,28 +78,20 @@ class ProcessMedia(task.Task): :param reprocess: A dict containing all of the necessary reprocessing info for the media_type. """ - entry = MediaEntry.query.get(media_id) + reprocess_info = reprocess_info or {} + entry, manager = get_entry_and_manager(media_id) # Try to process, and handle expected errors. try: + processor_class = manager.get_processor(reprocess_action, entry) + entry.state = u'processing' entry.save() _log.debug('Processing {0}'.format(entry)) - proc_state = ProcessingState(entry) - with mgg.workbench_manager.create() as workbench: - - proc_state.set_workbench(workbench) - processor = entry.media_manager.processor(proc_state) - - # If we have reprocess_info, let's reprocess - if reprocess_info: - processor.reprocess(reprocess_info) - - # Run initial processing - else: - processor.initial_processing() + with processor_class(manager, entry) as processor: + processor.process(**reprocess_info) # We set the state to processed and save the entry here so there's # no need to save at the end of the processing stage, probably ;) diff --git a/mediagoblin/submit/lib.py b/mediagoblin/submit/lib.py index 3619a329..ad37203d 100644 --- a/mediagoblin/submit/lib.py +++ b/mediagoblin/submit/lib.py @@ -76,7 +76,8 @@ def prepare_queue_task(app, entry, filename): return queue_file -def run_process_media(entry, feed_url=None, reprocess_info=None): +def run_process_media(entry, feed_url=None, + reprocess_action="inital", reprocess_info=None): """Process the media asynchronously :param entry: MediaEntry() instance to be processed. @@ -84,11 +85,12 @@ def run_process_media(entry, feed_url=None, reprocess_info=None): should be notified of. This will be sth like: `request.urlgen( 'mediagoblin.user_pages.atom_feed',qualified=True, user=request.user.username)` - :param reprocess: A dict containing all of the necessary reprocessing + :param reprocess_action: What particular action should be run. + :param reprocess_info: A dict containing all of the necessary reprocessing info for the given media_type""" try: process_media.apply_async( - [entry.id, feed_url, reprocess_info], {}, + [entry.id, feed_url, reprocess_action, reprocess_info], {}, task_id=entry.queued_task_id) except BaseException as exc: # The purpose of this section is because when running in "lazy" -- 2.25.1