Simple hack to handle main workflow problem
[mediagoblin.git] / mediagoblin / processing / __init__.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 # Use an ordered dict if we can. If not, we'll just use a normal dict
18 # later.
19 try:
20 from collections import OrderedDict
21 except:
22 OrderedDict = None
23
24 import logging
25 import os
26
27 import six
28
29 from mediagoblin import mg_globals as mgg
30 from mediagoblin.db.util import atomic_update
31 from mediagoblin.db.models import MediaEntry
32 from mediagoblin.tools.pluginapi import hook_handle
33 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
34
35 _log = logging.getLogger(__name__)
36
37
38 class ProgressCallback(object):
39 def __init__(self, entry):
40 self.entry = entry
41
42 def __call__(self, progress):
43 if progress:
44 self.entry.transcoding_progress = progress
45 self.entry.save()
46
47
48 def create_pub_filepath(entry, filename):
49 return mgg.public_store.get_unique_filepath(
50 ['media_entries',
51 six.text_type(entry.id),
52 filename])
53
54
55 class FilenameBuilder(object):
56 """Easily slice and dice filenames.
57
58 Initialize this class with an original file path, then use the fill()
59 method to create new filenames based on the original.
60
61 """
62 MAX_FILENAME_LENGTH = 255 # VFAT's maximum filename length
63
64 def __init__(self, path):
65 """Initialize a builder from an original file path."""
66 self.dirpath, self.basename = os.path.split(path)
67 self.basename, self.ext = os.path.splitext(self.basename)
68 self.ext = self.ext.lower()
69
70 def fill(self, fmtstr):
71 """Build a new filename based on the original.
72
73 The fmtstr argument can include the following:
74 {basename} -- the original basename, with the extension removed
75 {ext} -- the original extension, always lowercase
76
77 If necessary, {basename} will be truncated so the filename does not
78 exceed this class' MAX_FILENAME_LENGTH in length.
79
80 """
81 basename_len = (self.MAX_FILENAME_LENGTH -
82 len(fmtstr.format(basename='', ext=self.ext)))
83 return fmtstr.format(basename=self.basename[:basename_len],
84 ext=self.ext)
85
86
87
88 class MediaProcessor(object):
89 """A particular processor for this media type.
90
91 While the ProcessingManager handles all types of MediaProcessing
92 possible for a particular media type, a MediaProcessor can be
93 thought of as a *particular* processing action for a media type.
94 For example, you may have separate MediaProcessors for:
95
96 - initial_processing: the intial processing of a media
97 - gen_thumb: generate a thumbnail
98 - resize: resize an image
99 - transcode: transcode a video
100
101 ... etc.
102
103 Some information on producing a new MediaProcessor for your media type:
104
105 - You *must* supply a name attribute. This must be a class level
106 attribute, and a string. This will be used to determine the
107 subcommand of your process
108 - It's recommended that you supply a class level description
109 attribute.
110 - Supply a media_is_eligible classmethod. This will be used to
111 determine whether or not a media entry is eligible to use this
112 processor type. See the method documentation for details.
113 - To give "./bin/gmg reprocess run" abilities to this media type,
114 supply both gnerate_parser and parser_to_request classmethods.
115 - The process method will be what actually processes your media.
116 """
117 # You MUST override this in the child MediaProcessor!
118 name = None
119
120 # Optional, but will be used in various places to describe the
121 # action this MediaProcessor provides
122 description = None
123
124 def __init__(self, manager, entry):
125 self.manager = manager
126 self.entry = entry
127 self.entry_orig_state = entry.state
128
129 # Should be initialized at time of processing, at least
130 self.workbench = None
131
132 def __enter__(self):
133 self.workbench = mgg.workbench_manager.create()
134 return self
135
136 def __exit__(self, *args):
137 self.workbench.destroy()
138 self.workbench = None
139
140 # @with_workbench
141 def process(self, **kwargs):
142 """
143 Actually process this media entry.
144 """
145 raise NotImplementedError
146
147 @classmethod
148 def media_is_eligible(cls, entry=None, state=None):
149 raise NotImplementedError
150
151 ###############################
152 # Command line interface things
153 ###############################
154
155 @classmethod
156 def generate_parser(cls):
157 raise NotImplementedError
158
159 @classmethod
160 def args_to_request(cls, args):
161 raise NotImplementedError
162
163 ##########################################
164 # THE FUTURE: web interface things here :)
165 ##########################################
166
167 #####################
168 # Some common "steps"
169 #####################
170
171 def delete_queue_file(self):
172 # Remove queued media file from storage and database.
173 # queued_filepath is in the task_id directory which should
174 # be removed too, but fail if the directory is not empty to be on
175 # the super-safe side.
176 queued_filepath = self.entry.queued_media_file
177 if queued_filepath:
178 mgg.queue_store.delete_file(queued_filepath) # rm file
179 mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
180 self.entry.queued_media_file = []
181
182
183 class ProcessingKeyError(Exception): pass
184 class ProcessorDoesNotExist(ProcessingKeyError): pass
185 class ProcessorNotEligible(ProcessingKeyError): pass
186 class ProcessingManagerDoesNotExist(ProcessingKeyError): pass
187
188
189
190 class ProcessingManager(object):
191 """Manages all the processing actions available for a media type
192
193 Specific processing actions, MediaProcessor subclasses, are added
194 to the ProcessingManager.
195 """
196 def __init__(self):
197 # Dict of all MediaProcessors of this media type
198 if OrderedDict is not None:
199 self.processors = OrderedDict()
200 else:
201 self.processors = {}
202
203 def add_processor(self, processor):
204 """
205 Add a processor class to this media type
206 """
207 name = processor.name
208 if name is None:
209 raise AttributeError("Processor class's .name attribute not set")
210
211 self.processors[name] = processor
212
213 def list_eligible_processors(self, entry):
214 """
215 List all processors that this media entry is eligible to be processed
216 for.
217 """
218 return [
219 processor
220 for processor in self.processors.values()
221 if processor.media_is_eligible(entry=entry)]
222
223 def list_all_processors_by_state(self, state):
224 """
225 List all processors that this media state is eligible to be processed
226 for.
227 """
228 return [
229 processor
230 for processor in self.processors.values()
231 if processor.media_is_eligible(state=state)]
232
233
234 def list_all_processors(self):
235 return self.processors.values()
236
237 def gen_process_request_via_cli(self, subparser):
238 # Got to figure out what actually goes here before I can write this properly
239 pass
240
241 def get_processor(self, key, entry=None):
242 """
243 Get the processor with this key.
244
245 If entry supplied, make sure this entry is actually compatible;
246 otherwise raise error.
247 """
248 try:
249 processor = self.processors[key]
250 except KeyError:
251 raise ProcessorDoesNotExist(
252 "'%s' processor does not exist for this media type" % key)
253
254 if entry and not processor.media_is_eligible(entry):
255 raise ProcessorNotEligible(
256 "This entry is not eligible for processor with name '%s'" % key)
257
258 return processor
259
260 def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
261 """
262 Returns the Celery command needed to proceed with media processing
263 """
264 return None
265
266
267 def request_from_args(args, which_args):
268 """
269 Generate a request from the values of some argparse parsed args
270 """
271 request = {}
272 for arg in which_args:
273 request[arg] = getattr(args, arg)
274
275 return request
276
277
278 class MediaEntryNotFound(Exception): pass
279
280
281 def get_processing_manager_for_type(media_type):
282 """
283 Get the appropriate media manager for this type
284 """
285 manager_class = hook_handle(('reprocess_manager', media_type))
286 if not manager_class:
287 raise ProcessingManagerDoesNotExist(
288 "A processing manager does not exist for {0}".format(media_type))
289 manager = manager_class()
290
291 return manager
292
293
294 def get_entry_and_processing_manager(media_id):
295 """
296 Get a MediaEntry, its media type, and its manager all in one go.
297
298 Returns a tuple of: `(entry, media_type, media_manager)`
299 """
300 entry = MediaEntry.query.filter_by(id=media_id).first()
301 if entry is None:
302 raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
303
304 manager = get_processing_manager_for_type(entry.media_type)
305
306 return entry, manager
307
308
309 def mark_entry_failed(entry_id, exc):
310 """
311 Mark a media entry as having failed in its conversion.
312
313 Uses the exception that was raised to mark more information. If
314 the exception is a derivative of BaseProcessingFail then we can
315 store extra information that can be useful for users telling them
316 why their media failed to process.
317
318 :param entry_id: The id of the media entry
319 :param exc: An instance of BaseProcessingFail
320
321 """
322 # Was this a BaseProcessingFail? In other words, was this a
323 # type of error that we know how to handle?
324 if isinstance(exc, BaseProcessingFail):
325 # Looks like yes, so record information about that failure and any
326 # metadata the user might have supplied.
327 atomic_update(mgg.database.MediaEntry,
328 {'id': entry_id},
329 {u'state': u'failed',
330 u'fail_error': six.text_type(exc.exception_path),
331 u'fail_metadata': exc.metadata})
332 else:
333 _log.warn("No idea what happened here, but it failed: %r", exc)
334 # Looks like no, let's record it so that admin could ask us about the
335 # reason
336 atomic_update(mgg.database.MediaEntry,
337 {'id': entry_id},
338 {u'state': u'failed',
339 u'fail_error': u'Unhandled exception: {0}'.format(
340 six.text_type(exc)),
341 u'fail_metadata': {}})
342
343
344 def get_process_filename(entry, workbench, acceptable_files):
345 """
346 Try and get the queued file if available, otherwise return the first file
347 in the acceptable_files that we have.
348
349 If no acceptable_files, raise ProcessFileNotFound
350 """
351 if entry.queued_media_file:
352 filepath = entry.queued_media_file
353 storage = mgg.queue_store
354 else:
355 for keyname in acceptable_files:
356 if entry.media_files.get(keyname):
357 filepath = entry.media_files[keyname]
358 storage = mgg.public_store
359 break
360
361 if not filepath:
362 raise ProcessFileNotFound()
363
364 filename = workbench.localized_file(
365 storage, filepath,
366 'source')
367
368 if not os.path.exists(filename):
369 raise ProcessFileNotFound()
370
371 return filename
372
373
374 def store_public(entry, keyname, local_file, target_name=None,
375 delete_if_exists=True):
376 if target_name is None:
377 target_name = os.path.basename(local_file)
378 target_filepath = create_pub_filepath(entry, target_name)
379
380 if keyname in entry.media_files:
381 _log.warn("store_public: keyname %r already used for file %r, "
382 "replacing with %r", keyname,
383 entry.media_files[keyname], target_filepath)
384 if delete_if_exists:
385 mgg.public_store.delete_file(entry.media_files[keyname])
386 try:
387 mgg.public_store.copy_local_to_storage(local_file, target_filepath)
388 except Exception as e:
389 _log.error(u'Exception happened: {0}'.format(e))
390 raise PublicStoreFail(keyname=keyname)
391 # raise an error if the file failed to copy
392 if not mgg.public_store.file_exists(target_filepath):
393 raise PublicStoreFail(keyname=keyname)
394
395 entry.media_files[keyname] = target_filepath
396
397
398 def copy_original(entry, orig_filename, target_name, keyname=u"original"):
399 store_public(entry, keyname, orig_filename, target_name)
400
401
402 class BaseProcessingFail(Exception):
403 """
404 Base exception that all other processing failure messages should
405 subclass from.
406
407 You shouldn't call this itself; instead you should subclass it
408 and provide the exception_path and general_message applicable to
409 this error.
410 """
411 general_message = u''
412
413 @property
414 def exception_path(self):
415 return u"%s:%s" % (
416 self.__class__.__module__, self.__class__.__name__)
417
418 def __init__(self, message=None, **metadata):
419 if message is not None:
420 super(BaseProcessingFail, self).__init__(message)
421 metadata['message'] = message
422 self.metadata = metadata
423
424 class BadMediaFail(BaseProcessingFail):
425 """
426 Error that should be raised when an inappropriate file was given
427 for the media type specified.
428 """
429 general_message = _(u'Invalid file given for media type.')
430
431
432 class PublicStoreFail(BaseProcessingFail):
433 """
434 Error that should be raised when copying to public store fails
435 """
436 general_message = _('Copying to public storage failed.')
437
438
439 class ProcessFileNotFound(BaseProcessingFail):
440 """
441 Error that should be raised when an acceptable file for processing
442 is not found.
443 """
444 general_message = _(u'An acceptable processing file was not found')