# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+ from collections import OrderedDict
import logging
import os
- from mediagoblin.db.util import atomic_update
from mediagoblin import mg_globals as mgg
-
+ from mediagoblin.db.util import atomic_update
+ from mediagoblin.db.models import MediaEntry
+ from mediagoblin.tools.pluginapi import hook_handle
from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
_log = logging.getLogger(__name__)
ext=self.ext)
- class ProcessingState(object):
- """
- The first and only argument to the "processor" of a media type
- This could be thought of as a "request" to the processor
- function. It has the main info for the request (media entry)
- and a bunch of tools for the request on it.
- It can get more fancy without impacting old media types.
+ class MediaProcessor(object):
+ """A particular processor for this media type.
+
+ While the ProcessingManager handles all types of MediaProcessing
+ possible for a particular media type, a MediaProcessor can be
+ thought of as a *particular* processing action for a media type.
+ For example, you may have separate MediaProcessors for:
+
+ - initial_processing: the intial processing of a media
+ - gen_thumb: generate a thumbnail
+ - resize: resize an image
+ - transcode: transcode a video
+
+ ... etc.
+
+ Some information on producing a new MediaProcessor for your media type:
+
+ - You *must* supply a name attribute. This must be a class level
+ attribute, and a string. This will be used to determine the
+ subcommand of your process
+ - It's recommended that you supply a class level description
+ attribute.
+ - Supply a media_is_eligible classmethod. This will be used to
+ determine whether or not a media entry is eligible to use this
+ processor type. See the method documentation for details.
+ - To give "./bin/gmg reprocess run" abilities to this media type,
+ supply both gnerate_parser and parser_to_request classmethods.
+ - The process method will be what actually processes your media.
"""
- def __init__(self, entry):
+ # You MUST override this in the child MediaProcessor!
+ name = None
+
+ # Optional, but will be used in various places to describe the
+ # action this MediaProcessor provides
+ description = None
+
+ def __init__(self, manager, entry):
+ self.manager = manager
self.entry = entry
+ self.entry_orig_state = entry.state
+
+ # Should be initialized at time of processing, at least
self.workbench = None
- self.queued_filename = None
- def set_workbench(self, wb):
- self.workbench = wb
+ def __enter__(self):
+ self.workbench = mgg.workbench_manager.create()
+ return self
+
+ def __exit__(self, *args):
+ self.workbench.destroy()
+ self.workbench = None
- def get_queued_filename(self):
+ # @with_workbench
+ def process(self, **kwargs):
"""
- Get the a filename for the original, on local storage
+ Actually process this media entry.
"""
- if self.queued_filename is not None:
- return self.queued_filename
- queued_filepath = self.entry.queued_media_file
- queued_filename = self.workbench.localized_file(
- mgg.queue_store, queued_filepath,
- 'source')
- self.queued_filename = queued_filename
- return queued_filename
-
- def copy_original(self, target_name, keyname=u"original"):
- self.store_public(keyname, self.get_queued_filename(), target_name)
-
- def store_public(self, keyname, local_file, target_name=None):
- if target_name is None:
- target_name = os.path.basename(local_file)
- target_filepath = create_pub_filepath(self.entry, target_name)
- if keyname in self.entry.media_files:
- _log.warn("store_public: keyname %r already used for file %r, "
- "replacing with %r", keyname,
- self.entry.media_files[keyname], target_filepath)
- mgg.public_store.copy_local_to_storage(local_file, target_filepath)
- self.entry.media_files[keyname] = target_filepath
+ raise NotImplementedError
+
+ @classmethod
+ def media_is_eligible(cls, entry=None, state=None):
+ raise NotImplementedError
+
+ ###############################
+ # Command line interface things
+ ###############################
+
+ @classmethod
+ def generate_parser(cls):
+ raise NotImplementedError
+
+ @classmethod
+ def args_to_request(cls, args):
+ raise NotImplementedError
+
+ ##########################################
+ # THE FUTURE: web interface things here :)
+ ##########################################
+
+ #####################
+ # Some common "steps"
+ #####################
def delete_queue_file(self):
# Remove queued media file from storage and database.
# be removed too, but fail if the directory is not empty to be on
# the super-safe side.
queued_filepath = self.entry.queued_media_file
- mgg.queue_store.delete_file(queued_filepath) # rm file
- mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
- self.entry.queued_media_file = []
+ if queued_filepath:
+ mgg.queue_store.delete_file(queued_filepath) # rm file
+ mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
+ self.entry.queued_media_file = []
+
+
+ class ProcessingKeyError(Exception): pass
+ class ProcessorDoesNotExist(ProcessingKeyError): pass
+ class ProcessorNotEligible(ProcessingKeyError): pass
+ class ProcessingManagerDoesNotExist(ProcessingKeyError): pass
+
+
+
+ class ProcessingManager(object):
+ """Manages all the processing actions available for a media type
+
+ Specific processing actions, MediaProcessor subclasses, are added
+ to the ProcessingManager.
+ """
+ def __init__(self):
+ # Dict of all MediaProcessors of this media type
+ self.processors = OrderedDict()
+
+ def add_processor(self, processor):
+ """
+ Add a processor class to this media type
+ """
+ name = processor.name
+ if name is None:
+ raise AttributeError("Processor class's .name attribute not set")
+
+ self.processors[name] = processor
+
+ def list_eligible_processors(self, entry):
+ """
+ List all processors that this media entry is eligible to be processed
+ for.
+ """
+ return [
+ processor
+ for processor in self.processors.values()
+ if processor.media_is_eligible(entry=entry)]
+
+ def list_all_processors_by_state(self, state):
+ """
+ List all processors that this media state is eligible to be processed
+ for.
+ """
+ return [
+ processor
+ for processor in self.processors.values()
+ if processor.media_is_eligible(state=state)]
+
+
+ def list_all_processors(self):
+ return self.processors.values()
+
+ def gen_process_request_via_cli(self, subparser):
+ # Got to figure out what actually goes here before I can write this properly
+ pass
+
+ def get_processor(self, key, entry=None):
+ """
+ Get the processor with this key.
+
+ If entry supplied, make sure this entry is actually compatible;
+ otherwise raise error.
+ """
+ try:
+ processor = self.processors[key]
+ except KeyError:
+ import pdb
+ pdb.set_trace()
+ raise ProcessorDoesNotExist(
+ "'%s' processor does not exist for this media type" % key)
+
+ if entry and not processor.media_is_eligible(entry):
+ raise ProcessorNotEligible(
+ "This entry is not eligible for processor with name '%s'" % key)
+
+ return processor
+
+
+ def request_from_args(args, which_args):
+ """
+ Generate a request from the values of some argparse parsed args
+ """
+ request = {}
+ for arg in which_args:
+ request[arg] = getattr(args, arg)
+
+ return request
+
+
+ class MediaEntryNotFound(Exception): pass
+
+
+ def get_processing_manager_for_type(media_type):
+ """
+ Get the appropriate media manager for this type
+ """
+ manager_class = hook_handle(('reprocess_manager', media_type))
+ if not manager_class:
+ raise ProcessingManagerDoesNotExist(
+ "A processing manager does not exist for {0}".format(media_type))
+ manager = manager_class()
+
+ return manager
+
+
+ def get_entry_and_processing_manager(media_id):
+ """
+ Get a MediaEntry, its media type, and its manager all in one go.
+
+ Returns a tuple of: `(entry, media_type, media_manager)`
+ """
+ entry = MediaEntry.query.filter_by(id=media_id).first()
+ if entry is None:
+ raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
+
+ manager = get_processing_manager_for_type(entry.media_type)
+
+ return entry, manager
def mark_entry_failed(entry_id, exc):
u'fail_metadata': {}})
+ def get_process_filename(entry, workbench, acceptable_files):
+ """
+ Try and get the queued file if available, otherwise return the first file
+ in the acceptable_files that we have.
+
+ If no acceptable_files, raise ProcessFileNotFound
+ """
+ if entry.queued_media_file:
+ filepath = entry.queued_media_file
+ storage = mgg.queue_store
+ else:
+ for keyname in acceptable_files:
+ if entry.media_files.get(keyname):
+ filepath = entry.media_files[keyname]
+ storage = mgg.public_store
+ break
+
+ if not filepath:
+ raise ProcessFileNotFound()
+
+ filename = workbench.localized_file(
+ storage, filepath,
+ 'source')
+
+ if not os.path.exists(filename):
+ raise ProcessFileNotFound()
+
+ return filename
+
+
+ def store_public(entry, keyname, local_file, target_name=None,
+ delete_if_exists=True):
+ if target_name is None:
+ target_name = os.path.basename(local_file)
+ target_filepath = create_pub_filepath(entry, target_name)
+
+ if keyname in entry.media_files:
+ _log.warn("store_public: keyname %r already used for file %r, "
+ "replacing with %r", keyname,
+ entry.media_files[keyname], target_filepath)
+ if delete_if_exists:
+ mgg.public_store.delete_file(entry.media_files[keyname])
+
+ try:
+ mgg.public_store.copy_local_to_storage(local_file, target_filepath)
+ except:
+ raise PublicStoreFail(keyname=keyname)
+
+ # raise an error if the file failed to copy
+ copied_filepath = mgg.public_store.get_local_path(target_filepath)
+ if not os.path.exists(copied_filepath):
+ raise PublicStoreFail(keyname=keyname)
+
+ entry.media_files[keyname] = target_filepath
+
+
+ def copy_original(entry, orig_filename, target_name, keyname=u"original"):
+ store_public(entry, keyname, orig_filename, target_name)
+
+
class BaseProcessingFail(Exception):
"""
Base exception that all other processing failure messages should
def __init__(self, **metadata):
self.metadata = metadata or {}
-
class BadMediaFail(BaseProcessingFail):
"""
Error that should be raised when an inappropriate file was given
for the media type specified.
"""
general_message = _(u'Invalid file given for media type.')
+
+
+ class PublicStoreFail(BaseProcessingFail):
+ """
+ Error that should be raised when copying to public store fails
+ """
+ general_message = _('Copying to public storage failed.')
+
+
+ class ProcessFileNotFound(BaseProcessingFail):
+ """
+ Error that should be raised when an acceptable file for processing
+ is not found.
+ """
+ general_message = _(u'An acceptable processing file was not found')
import urllib
import urllib2
-from celery import registry, task
+import celery
+from celery.registry import tasks
from mediagoblin import mg_globals as mgg
- from mediagoblin.db.models import MediaEntry
- from mediagoblin.processing import (mark_entry_failed, BaseProcessingFail,
- ProcessingState)
+ from . import mark_entry_failed, BaseProcessingFail
from mediagoblin.tools.processing import json_processing_callback
+ from mediagoblin.processing import get_entry_and_processing_manager
_log = logging.getLogger(__name__)
logging.basicConfig()
_log.setLevel(logging.DEBUG)
-@task.task(default_retry_delay=2 * 60)
+@celery.task(default_retry_delay=2 * 60)
def handle_push_urls(feed_url):
"""Subtask, notifying the PuSH servers of new content
'Giving up.'.format(feed_url))
return False
+
################################
# Media processing initial steps
################################
-
-class ProcessMedia(task.Task):
+class ProcessMedia(celery.Task):
"""
Pass this entry off for processing.
"""
- track_started=True
-
- def run(self, media_id, feed_url):
+ def run(self, media_id, feed_url, reprocess_action, reprocess_info=None):
"""
Pass the media entry off to the appropriate processing function
(for now just process_image...)
:param feed_url: The feed URL that the PuSH server needs to be
updated for.
+ :param reprocess: A dict containing all of the necessary reprocessing
+ info for the media_type.
"""
- entry = MediaEntry.query.get(media_id)
+ reprocess_info = reprocess_info or {}
+ entry, manager = get_entry_and_processing_manager(media_id)
# Try to process, and handle expected errors.
try:
- entry.state = u'processing'
- entry.save()
-
- _log.debug('Processing {0}'.format(entry))
-
- proc_state = ProcessingState(entry)
- with mgg.workbench_manager.create() as workbench:
- proc_state.set_workbench(workbench)
- # run the processing code
- entry.media_manager.processor(proc_state)
+ processor_class = manager.get_processor(reprocess_action, entry)
+
+ with processor_class(manager, entry) as processor:
+ # Initial state change has to be here because
+ # the entry.state gets recorded on processor_class init
+ entry.state = u'processing'
+ entry.save()
+
+ _log.debug('Processing {0}'.format(entry))
+
+ try:
+ processor.process(**reprocess_info)
+ except Exception as exc:
+ if processor.entry_orig_state == 'processed':
+ _log.error(
+ 'Entry {0} failed to process due to the following'
+ ' error: {1}'.format(entry.id, exc))
+ _log.info(
+ 'Setting entry.state back to "processed"')
+ pass
+ else:
+ raise
# We set the state to processed and save the entry here so there's
# no need to save at the end of the processing stage, probably ;)
entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
json_processing_callback(entry)
-# Register the task
-process_media = registry.tasks[ProcessMedia.name]
-
+tasks.register(ProcessMedia)