X-Git-Url: https://vcs.fsf.org/?a=blobdiff_plain;f=mediagoblin%2Fprocessing%2F__init__.py;h=b7e36027ef7ccf412cead1ffcf5bc877e3430eae;hb=91f5f5e791e5bc3680cac2b0103429713517f7d4;hp=ae3652cf04a27a6b205da58948aa6b0e7d931157;hpb=bf2dafd1a04ef8050ebf08bb512862a1592998c0;p=mediagoblin.git diff --git a/mediagoblin/processing/__init__.py b/mediagoblin/processing/__init__.py index ae3652cf..b7e36027 100644 --- a/mediagoblin/processing/__init__.py +++ b/mediagoblin/processing/__init__.py @@ -14,12 +14,22 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +# Use an ordered dict if we can. If not, we'll just use a normal dict +# later. +try: + from collections import OrderedDict +except: + OrderedDict = None + import logging import os -from mediagoblin.db.util import atomic_update -from mediagoblin import mg_globals as mgg +import six +from mediagoblin import mg_globals as mgg +from mediagoblin.db.util import atomic_update +from mediagoblin.db.models import MediaEntry +from mediagoblin.tools.pluginapi import hook_handle from mediagoblin.tools.translate import lazy_pass_to_ugettext as _ _log = logging.getLogger(__name__) @@ -38,7 +48,7 @@ class ProgressCallback(object): def create_pub_filepath(entry, filename): return mgg.public_store.get_unique_filepath( ['media_entries', - unicode(entry.id), + six.text_type(entry.id), filename]) @@ -74,49 +84,89 @@ class FilenameBuilder(object): ext=self.ext) -class ProcessingState(object): - """ - The first and only argument to the "processor" of a media type - This could be thought of as a "request" to the processor - function. It has the main info for the request (media entry) - and a bunch of tools for the request on it. - It can get more fancy without impacting old media types. +class MediaProcessor(object): + """A particular processor for this media type. + + While the ProcessingManager handles all types of MediaProcessing + possible for a particular media type, a MediaProcessor can be + thought of as a *particular* processing action for a media type. + For example, you may have separate MediaProcessors for: + + - initial_processing: the intial processing of a media + - gen_thumb: generate a thumbnail + - resize: resize an image + - transcode: transcode a video + + ... etc. + + Some information on producing a new MediaProcessor for your media type: + + - You *must* supply a name attribute. This must be a class level + attribute, and a string. This will be used to determine the + subcommand of your process + - It's recommended that you supply a class level description + attribute. + - Supply a media_is_eligible classmethod. This will be used to + determine whether or not a media entry is eligible to use this + processor type. See the method documentation for details. + - To give "./bin/gmg reprocess run" abilities to this media type, + supply both gnerate_parser and parser_to_request classmethods. + - The process method will be what actually processes your media. """ - def __init__(self, entry): + # You MUST override this in the child MediaProcessor! + name = None + + # Optional, but will be used in various places to describe the + # action this MediaProcessor provides + description = None + + def __init__(self, manager, entry): + self.manager = manager self.entry = entry + self.entry_orig_state = entry.state + + # Should be initialized at time of processing, at least self.workbench = None - self.queued_filename = None - def set_workbench(self, wb): - self.workbench = wb + def __enter__(self): + self.workbench = mgg.workbench_manager.create() + return self + + def __exit__(self, *args): + self.workbench.destroy() + self.workbench = None - def get_queued_filename(self): + # @with_workbench + def process(self, **kwargs): """ - Get the a filename for the original, on local storage + Actually process this media entry. """ - if self.queued_filename is not None: - return self.queued_filename - queued_filepath = self.entry.queued_media_file - queued_filename = self.workbench.localized_file( - mgg.queue_store, queued_filepath, - 'source') - self.queued_filename = queued_filename - return queued_filename - - def copy_original(self, target_name, keyname=u"original"): - self.store_public(keyname, self.get_queued_filename(), target_name) - - def store_public(self, keyname, local_file, target_name=None): - if target_name is None: - target_name = os.path.basename(local_file) - target_filepath = create_pub_filepath(self.entry, target_name) - if keyname in self.entry.media_files: - _log.warn("store_public: keyname %r already used for file %r, " - "replacing with %r", keyname, - self.entry.media_files[keyname], target_filepath) - mgg.public_store.copy_local_to_storage(local_file, target_filepath) - self.entry.media_files[keyname] = target_filepath + raise NotImplementedError + + @classmethod + def media_is_eligible(cls, entry=None, state=None): + raise NotImplementedError + + ############################### + # Command line interface things + ############################### + + @classmethod + def generate_parser(cls): + raise NotImplementedError + + @classmethod + def args_to_request(cls, args): + raise NotImplementedError + + ########################################## + # THE FUTURE: web interface things here :) + ########################################## + + ##################### + # Some common "steps" + ##################### def delete_queue_file(self): # Remove queued media file from storage and database. @@ -124,9 +174,130 @@ class ProcessingState(object): # be removed too, but fail if the directory is not empty to be on # the super-safe side. queued_filepath = self.entry.queued_media_file - mgg.queue_store.delete_file(queued_filepath) # rm file - mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir - self.entry.queued_media_file = [] + if queued_filepath: + mgg.queue_store.delete_file(queued_filepath) # rm file + mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir + self.entry.queued_media_file = [] + + +class ProcessingKeyError(Exception): pass +class ProcessorDoesNotExist(ProcessingKeyError): pass +class ProcessorNotEligible(ProcessingKeyError): pass +class ProcessingManagerDoesNotExist(ProcessingKeyError): pass + + + +class ProcessingManager(object): + """Manages all the processing actions available for a media type + + Specific processing actions, MediaProcessor subclasses, are added + to the ProcessingManager. + """ + def __init__(self): + # Dict of all MediaProcessors of this media type + if OrderedDict is not None: + self.processors = OrderedDict() + else: + self.processors = {} + + def add_processor(self, processor): + """ + Add a processor class to this media type + """ + name = processor.name + if name is None: + raise AttributeError("Processor class's .name attribute not set") + + self.processors[name] = processor + + def list_eligible_processors(self, entry): + """ + List all processors that this media entry is eligible to be processed + for. + """ + return [ + processor + for processor in self.processors.values() + if processor.media_is_eligible(entry=entry)] + + def list_all_processors_by_state(self, state): + """ + List all processors that this media state is eligible to be processed + for. + """ + return [ + processor + for processor in self.processors.values() + if processor.media_is_eligible(state=state)] + + + def list_all_processors(self): + return self.processors.values() + + def gen_process_request_via_cli(self, subparser): + # Got to figure out what actually goes here before I can write this properly + pass + + def get_processor(self, key, entry=None): + """ + Get the processor with this key. + + If entry supplied, make sure this entry is actually compatible; + otherwise raise error. + """ + try: + processor = self.processors[key] + except KeyError: + raise ProcessorDoesNotExist( + "'%s' processor does not exist for this media type" % key) + + if entry and not processor.media_is_eligible(entry): + raise ProcessorNotEligible( + "This entry is not eligible for processor with name '%s'" % key) + + return processor + + +def request_from_args(args, which_args): + """ + Generate a request from the values of some argparse parsed args + """ + request = {} + for arg in which_args: + request[arg] = getattr(args, arg) + + return request + + +class MediaEntryNotFound(Exception): pass + + +def get_processing_manager_for_type(media_type): + """ + Get the appropriate media manager for this type + """ + manager_class = hook_handle(('reprocess_manager', media_type)) + if not manager_class: + raise ProcessingManagerDoesNotExist( + "A processing manager does not exist for {0}".format(media_type)) + manager = manager_class() + + return manager + + +def get_entry_and_processing_manager(media_id): + """ + Get a MediaEntry, its media type, and its manager all in one go. + + Returns a tuple of: `(entry, media_type, media_manager)` + """ + entry = MediaEntry.query.filter_by(id=media_id).first() + if entry is None: + raise MediaEntryNotFound("Can't find media with id '%s'" % media_id) + + manager = get_processing_manager_for_type(entry.media_type) + + return entry, manager def mark_entry_failed(entry_id, exc): @@ -150,7 +321,7 @@ def mark_entry_failed(entry_id, exc): atomic_update(mgg.database.MediaEntry, {'id': entry_id}, {u'state': u'failed', - u'fail_error': unicode(exc.exception_path), + u'fail_error': six.text_type(exc.exception_path), u'fail_metadata': exc.metadata}) else: _log.warn("No idea what happened here, but it failed: %r", exc) @@ -165,13 +336,71 @@ def mark_entry_failed(entry_id, exc): u'fail_metadata': {}}) +def get_process_filename(entry, workbench, acceptable_files): + """ + Try and get the queued file if available, otherwise return the first file + in the acceptable_files that we have. + + If no acceptable_files, raise ProcessFileNotFound + """ + if entry.queued_media_file: + filepath = entry.queued_media_file + storage = mgg.queue_store + else: + for keyname in acceptable_files: + if entry.media_files.get(keyname): + filepath = entry.media_files[keyname] + storage = mgg.public_store + break + + if not filepath: + raise ProcessFileNotFound() + + filename = workbench.localized_file( + storage, filepath, + 'source') + + if not os.path.exists(filename): + raise ProcessFileNotFound() + + return filename + + +def store_public(entry, keyname, local_file, target_name=None, + delete_if_exists=True): + if target_name is None: + target_name = os.path.basename(local_file) + target_filepath = create_pub_filepath(entry, target_name) + + if keyname in entry.media_files: + _log.warn("store_public: keyname %r already used for file %r, " + "replacing with %r", keyname, + entry.media_files[keyname], target_filepath) + if delete_if_exists: + mgg.public_store.delete_file(entry.media_files[keyname]) + try: + mgg.public_store.copy_local_to_storage(local_file, target_filepath) + except Exception as e: + _log.error(u'Exception happened: {0}'.format(e)) + raise PublicStoreFail(keyname=keyname) + # raise an error if the file failed to copy + if not mgg.public_store.file_exists(target_filepath): + raise PublicStoreFail(keyname=keyname) + + entry.media_files[keyname] = target_filepath + + +def copy_original(entry, orig_filename, target_name, keyname=u"original"): + store_public(entry, keyname, orig_filename, target_name) + + class BaseProcessingFail(Exception): """ Base exception that all other processing failure messages should subclass from. You shouldn't call this itself; instead you should subclass it - and provid the exception_path and general_message applicable to + and provide the exception_path and general_message applicable to this error. """ general_message = u'' @@ -181,10 +410,8 @@ class BaseProcessingFail(Exception): return u"%s:%s" % ( self.__class__.__module__, self.__class__.__name__) - def __init__(self, *args, **kwargs): - # next line is REQUIRED to have pickable exceptions if you want - # to be able to pass in custom arguments (see celery docs) - Exception.__init__(self, *args, **metadata) + def __init__(self, **metadata): + self.metadata = metadata or {} class BadMediaFail(BaseProcessingFail): """ @@ -192,3 +419,18 @@ class BadMediaFail(BaseProcessingFail): for the media type specified. """ general_message = _(u'Invalid file given for media type.') + + +class PublicStoreFail(BaseProcessingFail): + """ + Error that should be raised when copying to public store fails + """ + general_message = _('Copying to public storage failed.') + + +class ProcessFileNotFound(BaseProcessingFail): + """ + Error that should be raised when an acceptable file for processing + is not found. + """ + general_message = _(u'An acceptable processing file was not found')