docs: Document video resolution config.
[mediagoblin.git] / mediagoblin / processing / __init__.py
... / ...
CommitLineData
1# GNU MediaGoblin -- federated, autonomous media hosting
2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17# Use an ordered dict if we can. If not, we'll just use a normal dict
18# later.
19try:
20 from collections import OrderedDict
21except:
22 OrderedDict = None
23
24import logging
25import os
26
27import six
28
29from mediagoblin import mg_globals as mgg
30from mediagoblin.db.util import atomic_update
31from mediagoblin.db.models import MediaEntry
32from mediagoblin.tools.pluginapi import hook_handle
33from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
34
35_log = logging.getLogger(__name__)
36
37
38class ProgressCallback(object):
39 def __init__(self, entry):
40 self.entry = entry
41
42 def __call__(self, progress, default_quality_progress=None):
43 if progress:
44 if 100 - (self.entry.transcoding_progress + progress) < 0.01:
45 self.entry.transcoding_progress = 100
46 else:
47 self.entry.transcoding_progress += round(progress, 2)
48 if default_quality_progress:
49 self.entry.main_transcoding_progress = default_quality_progress
50 self.entry.save()
51
52
53def create_pub_filepath(entry, filename):
54 return mgg.public_store.get_unique_filepath(
55 ['media_entries',
56 six.text_type(entry.id),
57 filename])
58
59
60class FilenameBuilder(object):
61 """Easily slice and dice filenames.
62
63 Initialize this class with an original file path, then use the fill()
64 method to create new filenames based on the original.
65
66 """
67 MAX_FILENAME_LENGTH = 255 # VFAT's maximum filename length
68
69 def __init__(self, path):
70 """Initialize a builder from an original file path."""
71 self.dirpath, self.basename = os.path.split(path)
72 self.basename, self.ext = os.path.splitext(self.basename)
73 self.ext = self.ext.lower()
74
75 def fill(self, fmtstr):
76 """Build a new filename based on the original.
77
78 The fmtstr argument can include the following:
79 {basename} -- the original basename, with the extension removed
80 {ext} -- the original extension, always lowercase
81
82 If necessary, {basename} will be truncated so the filename does not
83 exceed this class' MAX_FILENAME_LENGTH in length.
84
85 """
86 basename_len = (self.MAX_FILENAME_LENGTH -
87 len(fmtstr.format(basename='', ext=self.ext)))
88 return fmtstr.format(basename=self.basename[:basename_len],
89 ext=self.ext)
90
91
92
93class MediaProcessor(object):
94 """A particular processor for this media type.
95
96 While the ProcessingManager handles all types of MediaProcessing
97 possible for a particular media type, a MediaProcessor can be
98 thought of as a *particular* processing action for a media type.
99 For example, you may have separate MediaProcessors for:
100
101 - initial_processing: the intial processing of a media
102 - gen_thumb: generate a thumbnail
103 - resize: resize an image
104 - transcode: transcode a video
105
106 ... etc.
107
108 Some information on producing a new MediaProcessor for your media type:
109
110 - You *must* supply a name attribute. This must be a class level
111 attribute, and a string. This will be used to determine the
112 subcommand of your process
113 - It's recommended that you supply a class level description
114 attribute.
115 - Supply a media_is_eligible classmethod. This will be used to
116 determine whether or not a media entry is eligible to use this
117 processor type. See the method documentation for details.
118 - To give "./bin/gmg reprocess run" abilities to this media type,
119 supply both gnerate_parser and parser_to_request classmethods.
120 - The process method will be what actually processes your media.
121 """
122 # You MUST override this in the child MediaProcessor!
123 name = None
124
125 # Optional, but will be used in various places to describe the
126 # action this MediaProcessor provides
127 description = None
128
129 def __init__(self, manager, entry):
130 self.manager = manager
131 self.entry = entry
132 self.entry_orig_state = entry.state
133
134 # Should be initialized at time of processing, at least
135 self.workbench = None
136
137 def __enter__(self):
138 self.workbench = mgg.workbench_manager.create()
139 return self
140
141 def __exit__(self, *args):
142 self.workbench.destroy()
143 self.workbench = None
144
145 # @with_workbench
146 def process(self, **kwargs):
147 """
148 Actually process this media entry.
149 """
150 raise NotImplementedError
151
152 @classmethod
153 def media_is_eligible(cls, entry=None, state=None):
154 raise NotImplementedError
155
156 ###############################
157 # Command line interface things
158 ###############################
159
160 @classmethod
161 def generate_parser(cls):
162 raise NotImplementedError
163
164 @classmethod
165 def args_to_request(cls, args):
166 raise NotImplementedError
167
168 ##########################################
169 # THE FUTURE: web interface things here :)
170 ##########################################
171
172 #####################
173 # Some common "steps"
174 #####################
175
176 def delete_queue_file(self):
177 # Remove queued media file from storage and database.
178 # queued_filepath is in the task_id directory which should
179 # be removed too, but fail if the directory is not empty to be on
180 # the super-safe side.
181 queued_filepath = self.entry.queued_media_file
182 if queued_filepath:
183 mgg.queue_store.delete_file(queued_filepath) # rm file
184 mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
185 self.entry.queued_media_file = []
186
187
188class ProcessingKeyError(Exception): pass
189class ProcessorDoesNotExist(ProcessingKeyError): pass
190class ProcessorNotEligible(ProcessingKeyError): pass
191class ProcessingManagerDoesNotExist(ProcessingKeyError): pass
192
193
194
195class ProcessingManager(object):
196 """Manages all the processing actions available for a media type
197
198 Specific processing actions, MediaProcessor subclasses, are added
199 to the ProcessingManager.
200 """
201 def __init__(self):
202 # Dict of all MediaProcessors of this media type
203 if OrderedDict is not None:
204 self.processors = OrderedDict()
205 else:
206 self.processors = {}
207
208 def add_processor(self, processor):
209 """
210 Add a processor class to this media type
211 """
212 name = processor.name
213 if name is None:
214 raise AttributeError("Processor class's .name attribute not set")
215
216 self.processors[name] = processor
217
218 def list_eligible_processors(self, entry):
219 """
220 List all processors that this media entry is eligible to be processed
221 for.
222 """
223 return [
224 processor
225 for processor in self.processors.values()
226 if processor.media_is_eligible(entry=entry)]
227
228 def list_all_processors_by_state(self, state):
229 """
230 List all processors that this media state is eligible to be processed
231 for.
232 """
233 return [
234 processor
235 for processor in self.processors.values()
236 if processor.media_is_eligible(state=state)]
237
238
239 def list_all_processors(self):
240 return self.processors.values()
241
242 def gen_process_request_via_cli(self, subparser):
243 # Got to figure out what actually goes here before I can write this properly
244 pass
245
246 def get_processor(self, key, entry=None):
247 """
248 Get the processor with this key.
249
250 If entry supplied, make sure this entry is actually compatible;
251 otherwise raise error.
252 """
253 try:
254 processor = self.processors[key]
255 except KeyError:
256 raise ProcessorDoesNotExist(
257 "'%s' processor does not exist for this media type" % key)
258
259 if entry and not processor.media_is_eligible(entry):
260 raise ProcessorNotEligible(
261 "This entry is not eligible for processor with name '%s'" % key)
262
263 return processor
264
265 def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
266 """
267 Returns the Celery command needed to proceed with media processing
268 """
269 return None
270
271
272def request_from_args(args, which_args):
273 """
274 Generate a request from the values of some argparse parsed args
275 """
276 request = {}
277 for arg in which_args:
278 request[arg] = getattr(args, arg)
279
280 return request
281
282
283class MediaEntryNotFound(Exception): pass
284
285
286def get_processing_manager_for_type(media_type):
287 """
288 Get the appropriate media manager for this type
289 """
290 manager_class = hook_handle(('reprocess_manager', media_type))
291 if not manager_class:
292 raise ProcessingManagerDoesNotExist(
293 "A processing manager does not exist for {0}".format(media_type))
294 manager = manager_class()
295
296 return manager
297
298
299def get_entry_and_processing_manager(media_id):
300 """
301 Get a MediaEntry, its media type, and its manager all in one go.
302
303 Returns a tuple of: `(entry, media_type, media_manager)`
304 """
305 entry = MediaEntry.query.filter_by(id=media_id).first()
306 if entry is None:
307 raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
308
309 manager = get_processing_manager_for_type(entry.media_type)
310
311 return entry, manager
312
313
314def mark_entry_failed(entry_id, exc):
315 """
316 Mark a media entry as having failed in its conversion.
317
318 Uses the exception that was raised to mark more information. If
319 the exception is a derivative of BaseProcessingFail then we can
320 store extra information that can be useful for users telling them
321 why their media failed to process.
322
323 :param entry_id: The id of the media entry
324 :param exc: An instance of BaseProcessingFail
325
326 """
327 # Was this a BaseProcessingFail? In other words, was this a
328 # type of error that we know how to handle?
329 if isinstance(exc, BaseProcessingFail):
330 # Looks like yes, so record information about that failure and any
331 # metadata the user might have supplied.
332 atomic_update(mgg.database.MediaEntry,
333 {'id': entry_id},
334 {u'state': u'failed',
335 u'fail_error': six.text_type(exc.exception_path),
336 u'fail_metadata': exc.metadata})
337 else:
338 _log.warn("No idea what happened here, but it failed: %r", exc)
339 # Looks like no, let's record it so that admin could ask us about the
340 # reason
341 atomic_update(mgg.database.MediaEntry,
342 {'id': entry_id},
343 {u'state': u'failed',
344 u'fail_error': u'Unhandled exception: {0}'.format(
345 six.text_type(exc)),
346 u'fail_metadata': {}})
347
348
349def get_process_filename(entry, workbench, acceptable_files):
350 """
351 Try and get the queued file if available, otherwise return the first file
352 in the acceptable_files that we have.
353
354 If no acceptable_files, raise ProcessFileNotFound
355 """
356 if entry.queued_media_file:
357 filepath = entry.queued_media_file
358 storage = mgg.queue_store
359 else:
360 for keyname in acceptable_files:
361 if entry.media_files.get(keyname):
362 filepath = entry.media_files[keyname]
363 storage = mgg.public_store
364 break
365
366 if not filepath:
367 raise ProcessFileNotFound()
368
369 filename = workbench.localized_file(
370 storage, filepath,
371 'source')
372
373 if not os.path.exists(filename):
374 raise ProcessFileNotFound()
375
376 return filename
377
378
379def store_public(entry, keyname, local_file, target_name=None,
380 delete_if_exists=True):
381 if target_name is None:
382 target_name = os.path.basename(local_file)
383 target_filepath = create_pub_filepath(entry, target_name)
384
385 if keyname in entry.media_files:
386 _log.warn("store_public: keyname %r already used for file %r, "
387 "replacing with %r", keyname,
388 entry.media_files[keyname], target_filepath)
389 if delete_if_exists:
390 mgg.public_store.delete_file(entry.media_files[keyname])
391 try:
392 mgg.public_store.copy_local_to_storage(local_file, target_filepath)
393 except Exception as e:
394 _log.error(u'Exception happened: {0}'.format(e))
395 raise PublicStoreFail(keyname=keyname)
396 # raise an error if the file failed to copy
397 if not mgg.public_store.file_exists(target_filepath):
398 raise PublicStoreFail(keyname=keyname)
399
400 entry.media_files[keyname] = target_filepath
401
402
403def copy_original(entry, orig_filename, target_name, keyname=u"original"):
404 store_public(entry, keyname, orig_filename, target_name)
405
406
407class BaseProcessingFail(Exception):
408 """
409 Base exception that all other processing failure messages should
410 subclass from.
411
412 You shouldn't call this itself; instead you should subclass it
413 and provide the exception_path and general_message applicable to
414 this error.
415 """
416 general_message = u''
417
418 @property
419 def exception_path(self):
420 return u"%s:%s" % (
421 self.__class__.__module__, self.__class__.__name__)
422
423 def __init__(self, message=None, **metadata):
424 if message is not None:
425 super(BaseProcessingFail, self).__init__(message)
426 metadata['message'] = message
427 self.metadata = metadata
428
429class BadMediaFail(BaseProcessingFail):
430 """
431 Error that should be raised when an inappropriate file was given
432 for the media type specified.
433 """
434 general_message = _(u'Invalid file given for media type.')
435
436
437class PublicStoreFail(BaseProcessingFail):
438 """
439 Error that should be raised when copying to public store fails
440 """
441 general_message = _('Copying to public storage failed.')
442
443
444class ProcessFileNotFound(BaseProcessingFail):
445 """
446 Error that should be raised when an acceptable file for processing
447 is not found.
448 """
449 general_message = _(u'An acceptable processing file was not found')