Remove self.entry in VideoTranscoder
[mediagoblin.git] / mediagoblin / processing / __init__.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 # Use an ordered dict if we can. If not, we'll just use a normal dict
18 # later.
19 try:
20 from collections import OrderedDict
21 except:
22 OrderedDict = None
23
24 import logging
25 import os
26
27 import six
28
29 from mediagoblin import mg_globals as mgg
30 from mediagoblin.db.util import atomic_update
31 from mediagoblin.db.models import MediaEntry
32 from mediagoblin.tools.pluginapi import hook_handle
33 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
34
35 _log = logging.getLogger(__name__)
36
37
38 class ProgressCallback(object):
39 def __init__(self, entry):
40 self.entry = entry
41
42 def __call__(self, progress):
43 if progress:
44 if 100 - (self.entry.transcoding_progress + progress) < 0.01:
45 self.entry.transcoding_progress = 100
46 else:
47 self.entry.transcoding_progress += progress
48 self.entry.save()
49
50
51 def create_pub_filepath(entry, filename):
52 return mgg.public_store.get_unique_filepath(
53 ['media_entries',
54 six.text_type(entry.id),
55 filename])
56
57
58 class FilenameBuilder(object):
59 """Easily slice and dice filenames.
60
61 Initialize this class with an original file path, then use the fill()
62 method to create new filenames based on the original.
63
64 """
65 MAX_FILENAME_LENGTH = 255 # VFAT's maximum filename length
66
67 def __init__(self, path):
68 """Initialize a builder from an original file path."""
69 self.dirpath, self.basename = os.path.split(path)
70 self.basename, self.ext = os.path.splitext(self.basename)
71 self.ext = self.ext.lower()
72
73 def fill(self, fmtstr):
74 """Build a new filename based on the original.
75
76 The fmtstr argument can include the following:
77 {basename} -- the original basename, with the extension removed
78 {ext} -- the original extension, always lowercase
79
80 If necessary, {basename} will be truncated so the filename does not
81 exceed this class' MAX_FILENAME_LENGTH in length.
82
83 """
84 basename_len = (self.MAX_FILENAME_LENGTH -
85 len(fmtstr.format(basename='', ext=self.ext)))
86 return fmtstr.format(basename=self.basename[:basename_len],
87 ext=self.ext)
88
89
90
91 class MediaProcessor(object):
92 """A particular processor for this media type.
93
94 While the ProcessingManager handles all types of MediaProcessing
95 possible for a particular media type, a MediaProcessor can be
96 thought of as a *particular* processing action for a media type.
97 For example, you may have separate MediaProcessors for:
98
99 - initial_processing: the intial processing of a media
100 - gen_thumb: generate a thumbnail
101 - resize: resize an image
102 - transcode: transcode a video
103
104 ... etc.
105
106 Some information on producing a new MediaProcessor for your media type:
107
108 - You *must* supply a name attribute. This must be a class level
109 attribute, and a string. This will be used to determine the
110 subcommand of your process
111 - It's recommended that you supply a class level description
112 attribute.
113 - Supply a media_is_eligible classmethod. This will be used to
114 determine whether or not a media entry is eligible to use this
115 processor type. See the method documentation for details.
116 - To give "./bin/gmg reprocess run" abilities to this media type,
117 supply both gnerate_parser and parser_to_request classmethods.
118 - The process method will be what actually processes your media.
119 """
120 # You MUST override this in the child MediaProcessor!
121 name = None
122
123 # Optional, but will be used in various places to describe the
124 # action this MediaProcessor provides
125 description = None
126
127 def __init__(self, manager, entry):
128 self.manager = manager
129 self.entry = entry
130 self.entry_orig_state = entry.state
131
132 # Should be initialized at time of processing, at least
133 self.workbench = None
134
135 def __enter__(self):
136 self.workbench = mgg.workbench_manager.create()
137 return self
138
139 def __exit__(self, *args):
140 self.workbench.destroy()
141 self.workbench = None
142
143 # @with_workbench
144 def process(self, **kwargs):
145 """
146 Actually process this media entry.
147 """
148 raise NotImplementedError
149
150 @classmethod
151 def media_is_eligible(cls, entry=None, state=None):
152 raise NotImplementedError
153
154 ###############################
155 # Command line interface things
156 ###############################
157
158 @classmethod
159 def generate_parser(cls):
160 raise NotImplementedError
161
162 @classmethod
163 def args_to_request(cls, args):
164 raise NotImplementedError
165
166 ##########################################
167 # THE FUTURE: web interface things here :)
168 ##########################################
169
170 #####################
171 # Some common "steps"
172 #####################
173
174 def delete_queue_file(self):
175 # Remove queued media file from storage and database.
176 # queued_filepath is in the task_id directory which should
177 # be removed too, but fail if the directory is not empty to be on
178 # the super-safe side.
179 queued_filepath = self.entry.queued_media_file
180 if queued_filepath:
181 mgg.queue_store.delete_file(queued_filepath) # rm file
182 mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
183 self.entry.queued_media_file = []
184
185
186 class ProcessingKeyError(Exception): pass
187 class ProcessorDoesNotExist(ProcessingKeyError): pass
188 class ProcessorNotEligible(ProcessingKeyError): pass
189 class ProcessingManagerDoesNotExist(ProcessingKeyError): pass
190
191
192
193 class ProcessingManager(object):
194 """Manages all the processing actions available for a media type
195
196 Specific processing actions, MediaProcessor subclasses, are added
197 to the ProcessingManager.
198 """
199 def __init__(self):
200 # Dict of all MediaProcessors of this media type
201 if OrderedDict is not None:
202 self.processors = OrderedDict()
203 else:
204 self.processors = {}
205
206 def add_processor(self, processor):
207 """
208 Add a processor class to this media type
209 """
210 name = processor.name
211 if name is None:
212 raise AttributeError("Processor class's .name attribute not set")
213
214 self.processors[name] = processor
215
216 def list_eligible_processors(self, entry):
217 """
218 List all processors that this media entry is eligible to be processed
219 for.
220 """
221 return [
222 processor
223 for processor in self.processors.values()
224 if processor.media_is_eligible(entry=entry)]
225
226 def list_all_processors_by_state(self, state):
227 """
228 List all processors that this media state is eligible to be processed
229 for.
230 """
231 return [
232 processor
233 for processor in self.processors.values()
234 if processor.media_is_eligible(state=state)]
235
236
237 def list_all_processors(self):
238 return self.processors.values()
239
240 def gen_process_request_via_cli(self, subparser):
241 # Got to figure out what actually goes here before I can write this properly
242 pass
243
244 def get_processor(self, key, entry=None):
245 """
246 Get the processor with this key.
247
248 If entry supplied, make sure this entry is actually compatible;
249 otherwise raise error.
250 """
251 try:
252 processor = self.processors[key]
253 except KeyError:
254 raise ProcessorDoesNotExist(
255 "'%s' processor does not exist for this media type" % key)
256
257 if entry and not processor.media_is_eligible(entry):
258 raise ProcessorNotEligible(
259 "This entry is not eligible for processor with name '%s'" % key)
260
261 return processor
262
263 def workflow(self, entry, feed_url, reprocess_action, reprocess_info=None):
264 """
265 Returns the Celery command needed to proceed with media processing
266 """
267 return None
268
269
270 def request_from_args(args, which_args):
271 """
272 Generate a request from the values of some argparse parsed args
273 """
274 request = {}
275 for arg in which_args:
276 request[arg] = getattr(args, arg)
277
278 return request
279
280
281 class MediaEntryNotFound(Exception): pass
282
283
284 def get_processing_manager_for_type(media_type):
285 """
286 Get the appropriate media manager for this type
287 """
288 manager_class = hook_handle(('reprocess_manager', media_type))
289 if not manager_class:
290 raise ProcessingManagerDoesNotExist(
291 "A processing manager does not exist for {0}".format(media_type))
292 manager = manager_class()
293
294 return manager
295
296
297 def get_entry_and_processing_manager(media_id):
298 """
299 Get a MediaEntry, its media type, and its manager all in one go.
300
301 Returns a tuple of: `(entry, media_type, media_manager)`
302 """
303 entry = MediaEntry.query.filter_by(id=media_id).first()
304 if entry is None:
305 raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
306
307 manager = get_processing_manager_for_type(entry.media_type)
308
309 return entry, manager
310
311
312 def mark_entry_failed(entry_id, exc):
313 """
314 Mark a media entry as having failed in its conversion.
315
316 Uses the exception that was raised to mark more information. If
317 the exception is a derivative of BaseProcessingFail then we can
318 store extra information that can be useful for users telling them
319 why their media failed to process.
320
321 :param entry_id: The id of the media entry
322 :param exc: An instance of BaseProcessingFail
323
324 """
325 # Was this a BaseProcessingFail? In other words, was this a
326 # type of error that we know how to handle?
327 if isinstance(exc, BaseProcessingFail):
328 # Looks like yes, so record information about that failure and any
329 # metadata the user might have supplied.
330 atomic_update(mgg.database.MediaEntry,
331 {'id': entry_id},
332 {u'state': u'failed',
333 u'fail_error': six.text_type(exc.exception_path),
334 u'fail_metadata': exc.metadata})
335 else:
336 _log.warn("No idea what happened here, but it failed: %r", exc)
337 # Looks like no, let's record it so that admin could ask us about the
338 # reason
339 atomic_update(mgg.database.MediaEntry,
340 {'id': entry_id},
341 {u'state': u'failed',
342 u'fail_error': u'Unhandled exception: {0}'.format(
343 six.text_type(exc)),
344 u'fail_metadata': {}})
345
346
347 def get_process_filename(entry, workbench, acceptable_files):
348 """
349 Try and get the queued file if available, otherwise return the first file
350 in the acceptable_files that we have.
351
352 If no acceptable_files, raise ProcessFileNotFound
353 """
354 if entry.queued_media_file:
355 filepath = entry.queued_media_file
356 storage = mgg.queue_store
357 else:
358 for keyname in acceptable_files:
359 if entry.media_files.get(keyname):
360 filepath = entry.media_files[keyname]
361 storage = mgg.public_store
362 break
363
364 if not filepath:
365 raise ProcessFileNotFound()
366
367 filename = workbench.localized_file(
368 storage, filepath,
369 'source')
370
371 if not os.path.exists(filename):
372 raise ProcessFileNotFound()
373
374 return filename
375
376
377 def store_public(entry, keyname, local_file, target_name=None,
378 delete_if_exists=True):
379 if target_name is None:
380 target_name = os.path.basename(local_file)
381 target_filepath = create_pub_filepath(entry, target_name)
382
383 if keyname in entry.media_files:
384 _log.warn("store_public: keyname %r already used for file %r, "
385 "replacing with %r", keyname,
386 entry.media_files[keyname], target_filepath)
387 if delete_if_exists:
388 mgg.public_store.delete_file(entry.media_files[keyname])
389 try:
390 mgg.public_store.copy_local_to_storage(local_file, target_filepath)
391 except Exception as e:
392 _log.error(u'Exception happened: {0}'.format(e))
393 raise PublicStoreFail(keyname=keyname)
394 # raise an error if the file failed to copy
395 if not mgg.public_store.file_exists(target_filepath):
396 raise PublicStoreFail(keyname=keyname)
397
398 entry.media_files[keyname] = target_filepath
399
400
401 def copy_original(entry, orig_filename, target_name, keyname=u"original"):
402 store_public(entry, keyname, orig_filename, target_name)
403
404
405 class BaseProcessingFail(Exception):
406 """
407 Base exception that all other processing failure messages should
408 subclass from.
409
410 You shouldn't call this itself; instead you should subclass it
411 and provide the exception_path and general_message applicable to
412 this error.
413 """
414 general_message = u''
415
416 @property
417 def exception_path(self):
418 return u"%s:%s" % (
419 self.__class__.__module__, self.__class__.__name__)
420
421 def __init__(self, message=None, **metadata):
422 if message is not None:
423 super(BaseProcessingFail, self).__init__(message)
424 metadata['message'] = message
425 self.metadata = metadata
426
427 class BadMediaFail(BaseProcessingFail):
428 """
429 Error that should be raised when an inappropriate file was given
430 for the media type specified.
431 """
432 general_message = _(u'Invalid file given for media type.')
433
434
435 class PublicStoreFail(BaseProcessingFail):
436 """
437 Error that should be raised when copying to public store fails
438 """
439 general_message = _('Copying to public storage failed.')
440
441
442 class ProcessFileNotFound(BaseProcessingFail):
443 """
444 Error that should be raised when an acceptable file for processing
445 is not found.
446 """
447 general_message = _(u'An acceptable processing file was not found')