# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+# Use an ordered dict if we can. If not, we'll just use a normal dict
+# later.
+try:
+ from collections import OrderedDict
+except:
+ OrderedDict = None
+
import logging
import os
-from mediagoblin.db.util import atomic_update
-from mediagoblin import mg_globals as mgg
+import six
+from mediagoblin import mg_globals as mgg
+from mediagoblin.db.util import atomic_update
+from mediagoblin.db.models import MediaEntry
+from mediagoblin.tools.pluginapi import hook_handle
from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
_log = logging.getLogger(__name__)
def create_pub_filepath(entry, filename):
return mgg.public_store.get_unique_filepath(
['media_entries',
- unicode(entry.id),
+ six.text_type(entry.id),
filename])
ext=self.ext)
-class ProcessingState(object):
- def __init__(self, entry):
+
+class MediaProcessor(object):
+ """A particular processor for this media type.
+
+ While the ProcessingManager handles all types of MediaProcessing
+ possible for a particular media type, a MediaProcessor can be
+ thought of as a *particular* processing action for a media type.
+ For example, you may have separate MediaProcessors for:
+
+ - initial_processing: the intial processing of a media
+ - gen_thumb: generate a thumbnail
+ - resize: resize an image
+ - transcode: transcode a video
+
+ ... etc.
+
+ Some information on producing a new MediaProcessor for your media type:
+
+ - You *must* supply a name attribute. This must be a class level
+ attribute, and a string. This will be used to determine the
+ subcommand of your process
+ - It's recommended that you supply a class level description
+ attribute.
+ - Supply a media_is_eligible classmethod. This will be used to
+ determine whether or not a media entry is eligible to use this
+ processor type. See the method documentation for details.
+ - To give "./bin/gmg reprocess run" abilities to this media type,
+ supply both gnerate_parser and parser_to_request classmethods.
+ - The process method will be what actually processes your media.
+ """
+ # You MUST override this in the child MediaProcessor!
+ name = None
+
+ # Optional, but will be used in various places to describe the
+ # action this MediaProcessor provides
+ description = None
+
+ def __init__(self, manager, entry):
+ self.manager = manager
self.entry = entry
+ self.entry_orig_state = entry.state
+
+ # Should be initialized at time of processing, at least
self.workbench = None
- self.queued_filename = None
- # Monkey patch us onto the entry
- entry.proc_state = self
+ def __enter__(self):
+ self.workbench = mgg.workbench_manager.create()
+ return self
- def set_workbench(self, wb):
- self.workbench = wb
+ def __exit__(self, *args):
+ self.workbench.destroy()
+ self.workbench = None
- def get_queued_filename(self):
+ # @with_workbench
+ def process(self, **kwargs):
"""
- Get the a filename for the original, on local storage
+ Actually process this media entry.
"""
- if self.queued_filename is not None:
- return self.queued_filename
- queued_filepath = self.entry.queued_media_file
- queued_filename = self.workbench.localized_file(
- mgg.queue_store, queued_filepath,
- 'source')
- self.queued_filename = queued_filename
- return queued_filename
+ raise NotImplementedError
+
+ @classmethod
+ def media_is_eligible(cls, entry=None, state=None):
+ raise NotImplementedError
+
+ ###############################
+ # Command line interface things
+ ###############################
+
+ @classmethod
+ def generate_parser(cls):
+ raise NotImplementedError
+
+ @classmethod
+ def args_to_request(cls, args):
+ raise NotImplementedError
+
+ ##########################################
+ # THE FUTURE: web interface things here :)
+ ##########################################
+
+ #####################
+ # Some common "steps"
+ #####################
def delete_queue_file(self):
+ # Remove queued media file from storage and database.
+ # queued_filepath is in the task_id directory which should
+ # be removed too, but fail if the directory is not empty to be on
+ # the super-safe side.
queued_filepath = self.entry.queued_media_file
- mgg.queue_store.delete_file(queued_filepath)
- self.entry.queued_media_file = []
+ if queued_filepath:
+ mgg.queue_store.delete_file(queued_filepath) # rm file
+ mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
+ self.entry.queued_media_file = []
+
+
+class ProcessingKeyError(Exception): pass
+class ProcessorDoesNotExist(ProcessingKeyError): pass
+class ProcessorNotEligible(ProcessingKeyError): pass
+class ProcessingManagerDoesNotExist(ProcessingKeyError): pass
+
+
+
+class ProcessingManager(object):
+ """Manages all the processing actions available for a media type
+
+ Specific processing actions, MediaProcessor subclasses, are added
+ to the ProcessingManager.
+ """
+ def __init__(self):
+ # Dict of all MediaProcessors of this media type
+ if OrderedDict is not None:
+ self.processors = OrderedDict()
+ else:
+ self.processors = {}
+
+ def add_processor(self, processor):
+ """
+ Add a processor class to this media type
+ """
+ name = processor.name
+ if name is None:
+ raise AttributeError("Processor class's .name attribute not set")
+
+ self.processors[name] = processor
+
+ def list_eligible_processors(self, entry):
+ """
+ List all processors that this media entry is eligible to be processed
+ for.
+ """
+ return [
+ processor
+ for processor in self.processors.values()
+ if processor.media_is_eligible(entry=entry)]
+
+ def list_all_processors_by_state(self, state):
+ """
+ List all processors that this media state is eligible to be processed
+ for.
+ """
+ return [
+ processor
+ for processor in self.processors.values()
+ if processor.media_is_eligible(state=state)]
+
+
+ def list_all_processors(self):
+ return self.processors.values()
+
+ def gen_process_request_via_cli(self, subparser):
+ # Got to figure out what actually goes here before I can write this properly
+ pass
+
+ def get_processor(self, key, entry=None):
+ """
+ Get the processor with this key.
+
+ If entry supplied, make sure this entry is actually compatible;
+ otherwise raise error.
+ """
+ try:
+ processor = self.processors[key]
+ except KeyError:
+ raise ProcessorDoesNotExist(
+ "'%s' processor does not exist for this media type" % key)
+
+ if entry and not processor.media_is_eligible(entry):
+ raise ProcessorNotEligible(
+ "This entry is not eligible for processor with name '%s'" % key)
+
+ return processor
+
+
+def request_from_args(args, which_args):
+ """
+ Generate a request from the values of some argparse parsed args
+ """
+ request = {}
+ for arg in which_args:
+ request[arg] = getattr(args, arg)
+
+ return request
+
+
+class MediaEntryNotFound(Exception): pass
+
+
+def get_processing_manager_for_type(media_type):
+ """
+ Get the appropriate media manager for this type
+ """
+ manager_class = hook_handle(('reprocess_manager', media_type))
+ if not manager_class:
+ raise ProcessingManagerDoesNotExist(
+ "A processing manager does not exist for {0}".format(media_type))
+ manager = manager_class()
+
+ return manager
+
+
+def get_entry_and_processing_manager(media_id):
+ """
+ Get a MediaEntry, its media type, and its manager all in one go.
+
+ Returns a tuple of: `(entry, media_type, media_manager)`
+ """
+ entry = MediaEntry.query.filter_by(id=media_id).first()
+ if entry is None:
+ raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
+
+ manager = get_processing_manager_for_type(entry.media_type)
+
+ return entry, manager
def mark_entry_failed(entry_id, exc):
atomic_update(mgg.database.MediaEntry,
{'id': entry_id},
{u'state': u'failed',
- u'fail_error': unicode(exc.exception_path),
+ u'fail_error': six.text_type(exc.exception_path),
u'fail_metadata': exc.metadata})
else:
_log.warn("No idea what happened here, but it failed: %r", exc)
u'fail_metadata': {}})
+def get_process_filename(entry, workbench, acceptable_files):
+ """
+ Try and get the queued file if available, otherwise return the first file
+ in the acceptable_files that we have.
+
+ If no acceptable_files, raise ProcessFileNotFound
+ """
+ if entry.queued_media_file:
+ filepath = entry.queued_media_file
+ storage = mgg.queue_store
+ else:
+ for keyname in acceptable_files:
+ if entry.media_files.get(keyname):
+ filepath = entry.media_files[keyname]
+ storage = mgg.public_store
+ break
+
+ if not filepath:
+ raise ProcessFileNotFound()
+
+ filename = workbench.localized_file(
+ storage, filepath,
+ 'source')
+
+ if not os.path.exists(filename):
+ raise ProcessFileNotFound()
+
+ return filename
+
+
+def store_public(entry, keyname, local_file, target_name=None,
+ delete_if_exists=True):
+ if target_name is None:
+ target_name = os.path.basename(local_file)
+ target_filepath = create_pub_filepath(entry, target_name)
+
+ if keyname in entry.media_files:
+ _log.warn("store_public: keyname %r already used for file %r, "
+ "replacing with %r", keyname,
+ entry.media_files[keyname], target_filepath)
+ if delete_if_exists:
+ mgg.public_store.delete_file(entry.media_files[keyname])
+ try:
+ mgg.public_store.copy_local_to_storage(local_file, target_filepath)
+ except Exception as e:
+ _log.error(u'Exception happened: {0}'.format(e))
+ raise PublicStoreFail(keyname=keyname)
+ # raise an error if the file failed to copy
+ if not mgg.public_store.file_exists(target_filepath):
+ raise PublicStoreFail(keyname=keyname)
+
+ entry.media_files[keyname] = target_filepath
+
+
+def copy_original(entry, orig_filename, target_name, keyname=u"original"):
+ store_public(entry, keyname, orig_filename, target_name)
+
+
class BaseProcessingFail(Exception):
"""
Base exception that all other processing failure messages should
subclass from.
You shouldn't call this itself; instead you should subclass it
- and provid the exception_path and general_message applicable to
+ and provide the exception_path and general_message applicable to
this error.
"""
general_message = u''
def __init__(self, **metadata):
self.metadata = metadata or {}
-
class BadMediaFail(BaseProcessingFail):
"""
Error that should be raised when an inappropriate file was given
for the media type specified.
"""
general_message = _(u'Invalid file given for media type.')
+
+
+class PublicStoreFail(BaseProcessingFail):
+ """
+ Error that should be raised when copying to public store fails
+ """
+ general_message = _('Copying to public storage failed.')
+
+
+class ProcessFileNotFound(BaseProcessingFail):
+ """
+ Error that should be raised when an acceptable file for processing
+ is not found.
+ """
+ general_message = _(u'An acceptable processing file was not found')