Updating to the point where we can allllmost run with the new reprocessing code
authorChristopher Allan Webber <cwebber@dustycloud.org>
Sun, 11 Aug 2013 19:34:45 +0000 (14:34 -0500)
committerRodney Ewing <ewing.rj@gmail.com>
Fri, 16 Aug 2013 22:30:16 +0000 (15:30 -0700)
This commit sponsored by Odin Hørthe Omdal.  Thank you!

mediagoblin/gmg_commands/reprocess.py
mediagoblin/processing/__init__.py
mediagoblin/processing/task.py
mediagoblin/submit/lib.py

index 6d04427e0919e2b68ade67cef86dcb1acba0b007..24fcde371b93ae43511a6c1afc61840f25870f14 100644 (file)
@@ -19,9 +19,12 @@ import os
 from mediagoblin import mg_globals
 from mediagoblin.db.models import MediaEntry
 from mediagoblin.gmg_commands import util as commands_util
+from mediagoblin.submit.lib import run_process_media
 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
 from mediagoblin.tools.pluginapi import hook_handle
-from mediagoblin.processing import ProcessorDoesNotExist, ProcessorNotEligible
+from mediagoblin.processing import (
+    ProcessorDoesNotExist, ProcessorNotEligible,
+    get_entry_and_manager, get_manager_for_type)
 
 
 def reprocess_parser_setup(subparser):
@@ -205,31 +208,16 @@ def _set_media_state(args):
         args[0].state = 'processed'
 
 
-class MediaEntryNotFound(Exception): pass
-
-def extract_entry_and_type(media_id):
-    """
-    Fetch a media entry, as well as its media type
-    """
-    entry = MediaEntry.query.filter_by(id=media_id).first()
-    if entry is None:
-        raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
-
-    return entry.media_type, entry
-
-
 def available(args):
     # Get the media type, either by looking up media id, or by specific type
     try:
-        media_id = int(args.id_or_type)
-        media_type, media_entry = extract_entry_and_type(media_id)
+        media_entry, manager = get_entry_and_manager(args.id_or_type)
+        media_type = media_entry.type
     except ValueError:
         media_type = args.id_or_type
         media_entry = None
+        manager = get_manager_for_type(media_type)
 
-    manager_class = hook_handle(('reprocess_manager', media_type))
-    manager = manager_class()
-    
     if media_entry is None:
         processors = manager.list_all_processors()
     else:
@@ -257,10 +245,7 @@ def available(args):
 
 
 def run(args):
-    media_type, media_entry = extract_entry_and_type(args.media_id)
-
-    manager_class = hook_handle(('reprocess_manager', media_type))
-    manager = manager_class()
+    media_entry, manager = get_entry_and_manager(args.media_id)
 
     # TODO: (maybe?) This could probably be handled entirely by the
     # processor class...
@@ -279,8 +264,11 @@ def run(args):
     reprocess_parser = processor_class.generate_parser()
     reprocess_args = reprocess_parser.parse_args(args.reprocess_args)
     reprocess_request = processor_class.args_to_request(reprocess_args)
-    processor = processor_class(manager, media_entry)
-    processor.process(**reprocess_request)
+    run_process_media(
+        media_entry,
+        reprocess_action=args.reprocess_command,
+        reprocess_info=reprocess_request)
+    manager.process(media_entry, args.reprocess_command, **reprocess_request)
 
 
 def reprocess(args):
index 1c8f72029b06eb287dac76092c6c965b907ce9db..b668baa7ff9672c5f2687fb7a16e992291c372ab 100644 (file)
@@ -18,9 +18,10 @@ from collections import OrderedDict
 import logging
 import os
 
-from mediagoblin.db.util import atomic_update
 from mediagoblin import mg_globals as mgg
-
+from mediagoblin.db.util import atomic_update
+from mediagoblin.db.models import MediaEntry
+from mediagoblin.tools.pluginapi import hook_handle
 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
 
 _log = logging.getLogger(__name__)
@@ -208,7 +209,7 @@ class ProcessingManager(object):
 
         return processor
 
-    def process(self, entry, directive, request):
+    def process_from_args(self, entry, reprocess_command, request):
         """
         Process a media entry.
         """
@@ -226,6 +227,39 @@ def request_from_args(args, which_args):
     return request
 
 
+class MediaEntryNotFound(Exception): pass
+
+
+def get_manager_for_type(media_type):
+    """
+    Get the appropriate media manager for this type
+    """
+    manager_class = hook_handle(('reprocess_manager', media_type))
+    manager = manager_class()
+
+    return manager
+
+
+def get_entry_and_manager(media_id):
+    """
+    Get a MediaEntry, its media type, and its manager all in one go.
+
+    Returns a tuple of: `(entry, media_type, media_manager)`
+    """
+    entry = MediaEntry.query.filter_by(id=media_id).first()
+    if entry is None:
+        raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
+
+    manager = get_manager_for_type(entry.media_type)
+
+    return entry, manager
+
+
+################################################
+# TODO: This ProcessingState is OUTDATED,
+#   and needs to be refactored into other tools!
+################################################
+
 class ProcessingState(object):
     """
     The first and only argument to the "processor" of a media type
index 36ee31fd217132b61c0371fd1bf36fe9a56f438a..240be4e5a13f07e975f2098af9c4b23d72f02ed5 100644 (file)
@@ -21,9 +21,9 @@ import urllib2
 from celery import registry, task
 
 from mediagoblin import mg_globals as mgg
-from mediagoblin.db.models import MediaEntry
-from . import mark_entry_failed, BaseProcessingFail, ProcessingState
+from . import mark_entry_failed, BaseProcessingFail
 from mediagoblin.tools.processing import json_processing_callback
+from mediagoblin.processing import get_entry_and_manager
 
 _log = logging.getLogger(__name__)
 logging.basicConfig()
@@ -68,7 +68,7 @@ class ProcessMedia(task.Task):
     """
     Pass this entry off for processing.
     """
-    def run(self, media_id, feed_url, reprocess_info=None):
+    def run(self, media_id, feed_url, reprocess_action, reprocess_info=None):
         """
         Pass the media entry off to the appropriate processing function
         (for now just process_image...)
@@ -78,28 +78,20 @@ class ProcessMedia(task.Task):
         :param reprocess: A dict containing all of the necessary reprocessing
             info for the media_type.
         """
-        entry = MediaEntry.query.get(media_id)
+        reprocess_info = reprocess_info or {}
+        entry, manager = get_entry_and_manager(media_id)
 
         # Try to process, and handle expected errors.
         try:
+            processor_class = manager.get_processor(reprocess_action, entry)
+
             entry.state = u'processing'
             entry.save()
 
             _log.debug('Processing {0}'.format(entry))
 
-            proc_state = ProcessingState(entry)
-            with mgg.workbench_manager.create() as workbench:
-
-                proc_state.set_workbench(workbench)
-                processor = entry.media_manager.processor(proc_state)
-
-                # If we have reprocess_info, let's reprocess
-                if reprocess_info:
-                    processor.reprocess(reprocess_info)
-
-                # Run initial processing
-                else:
-                    processor.initial_processing()
+            with processor_class(manager, entry) as processor:
+                processor.process(**reprocess_info)
 
             # We set the state to processed and save the entry here so there's
             # no need to save at the end of the processing stage, probably ;)
index 3619a3290aab267f0032fb0ad1c5cef52847a129..ad37203db6a528716998523c3b9404893adf1583 100644 (file)
@@ -76,7 +76,8 @@ def prepare_queue_task(app, entry, filename):
     return queue_file
 
 
-def run_process_media(entry, feed_url=None, reprocess_info=None):
+def run_process_media(entry, feed_url=None,
+                      reprocess_action="inital", reprocess_info=None):
     """Process the media asynchronously
 
     :param entry: MediaEntry() instance to be processed.
@@ -84,11 +85,12 @@ def run_process_media(entry, feed_url=None, reprocess_info=None):
         should be notified of. This will be sth like: `request.urlgen(
             'mediagoblin.user_pages.atom_feed',qualified=True,
             user=request.user.username)`
-    :param reprocess: A dict containing all of the necessary reprocessing
+    :param reprocess_action: What particular action should be run.
+    :param reprocess_info: A dict containing all of the necessary reprocessing
         info for the given media_type"""
     try:
         process_media.apply_async(
-            [entry.id, feed_url, reprocess_info], {},
+            [entry.id, feed_url, reprocess_action, reprocess_info], {},
             task_id=entry.queued_task_id)
     except BaseException as exc:
         # The purpose of this section is because when running in "lazy"