bdbe0441bda352970bb748d180801fd68c9be8cc
[mediagoblin.git] / mediagoblin / processing / __init__.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from collections import OrderedDict
18 import logging
19 import os
20
21 from mediagoblin import mg_globals as mgg
22 from mediagoblin.db.util import atomic_update
23 from mediagoblin.db.models import MediaEntry
24 from mediagoblin.tools.pluginapi import hook_handle
25 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
26
27 _log = logging.getLogger(__name__)
28
29
30 class ProgressCallback(object):
31 def __init__(self, entry):
32 self.entry = entry
33
34 def __call__(self, progress):
35 if progress:
36 self.entry.transcoding_progress = progress
37 self.entry.save()
38
39
40 def create_pub_filepath(entry, filename):
41 return mgg.public_store.get_unique_filepath(
42 ['media_entries',
43 unicode(entry.id),
44 filename])
45
46
47 class FilenameBuilder(object):
48 """Easily slice and dice filenames.
49
50 Initialize this class with an original file path, then use the fill()
51 method to create new filenames based on the original.
52
53 """
54 MAX_FILENAME_LENGTH = 255 # VFAT's maximum filename length
55
56 def __init__(self, path):
57 """Initialize a builder from an original file path."""
58 self.dirpath, self.basename = os.path.split(path)
59 self.basename, self.ext = os.path.splitext(self.basename)
60 self.ext = self.ext.lower()
61
62 def fill(self, fmtstr):
63 """Build a new filename based on the original.
64
65 The fmtstr argument can include the following:
66 {basename} -- the original basename, with the extension removed
67 {ext} -- the original extension, always lowercase
68
69 If necessary, {basename} will be truncated so the filename does not
70 exceed this class' MAX_FILENAME_LENGTH in length.
71
72 """
73 basename_len = (self.MAX_FILENAME_LENGTH -
74 len(fmtstr.format(basename='', ext=self.ext)))
75 return fmtstr.format(basename=self.basename[:basename_len],
76 ext=self.ext)
77
78
79
80 class MediaProcessor(object):
81 """A particular processor for this media type.
82
83 While the ProcessingManager handles all types of MediaProcessing
84 possible for a particular media type, a MediaProcessor can be
85 thought of as a *particular* processing action for a media type.
86 For example, you may have separate MediaProcessors for:
87
88 - initial_processing: the intial processing of a media
89 - gen_thumb: generate a thumbnail
90 - resize: resize an image
91 - transcode: transcode a video
92
93 ... etc.
94
95 Some information on producing a new MediaProcessor for your media type:
96
97 - You *must* supply a name attribute. This must be a class level
98 attribute, and a string. This will be used to determine the
99 subcommand of your process
100 - It's recommended that you supply a class level description
101 attribute.
102 - Supply a media_is_eligible classmethod. This will be used to
103 determine whether or not a media entry is eligible to use this
104 processor type. See the method documentation for details.
105 - To give "./bin/gmg reprocess run" abilities to this media type,
106 supply both gnerate_parser and parser_to_request classmethods.
107 - The process method will be what actually processes your media.
108 """
109 # You MUST override this in the child MediaProcessor!
110 name = None
111
112 # Optional, but will be used in various places to describe the
113 # action this MediaProcessor provides
114 description = None
115
116 def __init__(self, manager, entry):
117 self.manager = manager
118 self.entry = entry
119 self.entry_orig_state = entry.state
120
121 # Should be initialized at time of processing, at least
122 self.workbench = None
123
124 def __enter__(self):
125 self.workbench = mgg.workbench_manager.create()
126 return self
127
128 def __exit__(self, *args):
129 self.workbench.destroy()
130 self.workbench = None
131
132 # @with_workbench
133 def process(self, **kwargs):
134 """
135 Actually process this media entry.
136 """
137 raise NotImplementedError
138
139 @classmethod
140 def media_is_eligible(cls, entry=None, state=None):
141 raise NotImplementedError
142
143 ###############################
144 # Command line interface things
145 ###############################
146
147 @classmethod
148 def generate_parser(cls):
149 raise NotImplementedError
150
151 @classmethod
152 def args_to_request(cls, args):
153 raise NotImplementedError
154
155 ##########################################
156 # THE FUTURE: web interface things here :)
157 ##########################################
158
159 #####################
160 # Some common "steps"
161 #####################
162
163 def delete_queue_file(self):
164 # Remove queued media file from storage and database.
165 # queued_filepath is in the task_id directory which should
166 # be removed too, but fail if the directory is not empty to be on
167 # the super-safe side.
168 queued_filepath = self.entry.queued_media_file
169 if queued_filepath:
170 mgg.queue_store.delete_file(queued_filepath) # rm file
171 mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
172 self.entry.queued_media_file = []
173
174
175 class ProcessingKeyError(Exception): pass
176 class ProcessorDoesNotExist(ProcessingKeyError): pass
177 class ProcessorNotEligible(ProcessingKeyError): pass
178 class ProcessingManagerDoesNotExist(ProcessingKeyError): pass
179
180
181
182 class ProcessingManager(object):
183 """Manages all the processing actions available for a media type
184
185 Specific processing actions, MediaProcessor subclasses, are added
186 to the ProcessingManager.
187 """
188 def __init__(self):
189 # Dict of all MediaProcessors of this media type
190 self.processors = OrderedDict()
191
192 def add_processor(self, processor):
193 """
194 Add a processor class to this media type
195 """
196 name = processor.name
197 if name is None:
198 raise AttributeError("Processor class's .name attribute not set")
199
200 self.processors[name] = processor
201
202 def list_eligible_processors(self, entry):
203 """
204 List all processors that this media entry is eligible to be processed
205 for.
206 """
207 return [
208 processor
209 for processor in self.processors.values()
210 if processor.media_is_eligible(entry=entry)]
211
212 def list_all_processors_by_state(self, state):
213 """
214 List all processors that this media state is eligible to be processed
215 for.
216 """
217 return [
218 processor
219 for processor in self.processors.values()
220 if processor.media_is_eligible(state=state)]
221
222
223 def list_all_processors(self):
224 return self.processors.values()
225
226 def gen_process_request_via_cli(self, subparser):
227 # Got to figure out what actually goes here before I can write this properly
228 pass
229
230 def get_processor(self, key, entry=None):
231 """
232 Get the processor with this key.
233
234 If entry supplied, make sure this entry is actually compatible;
235 otherwise raise error.
236 """
237 try:
238 processor = self.processors[key]
239 except KeyError:
240 import pdb
241 pdb.set_trace()
242 raise ProcessorDoesNotExist(
243 "'%s' processor does not exist for this media type" % key)
244
245 if entry and not processor.media_is_eligible(entry):
246 raise ProcessorNotEligible(
247 "This entry is not eligible for processor with name '%s'" % key)
248
249 return processor
250
251
252 def request_from_args(args, which_args):
253 """
254 Generate a request from the values of some argparse parsed args
255 """
256 request = {}
257 for arg in which_args:
258 request[arg] = getattr(args, arg)
259
260 return request
261
262
263 class MediaEntryNotFound(Exception): pass
264
265
266 def get_processing_manager_for_type(media_type):
267 """
268 Get the appropriate media manager for this type
269 """
270 manager_class = hook_handle(('reprocess_manager', media_type))
271 if not manager_class:
272 raise ProcessingManagerDoesNotExist(
273 "A processing manager does not exist for {0}".format(media_type))
274 manager = manager_class()
275
276 return manager
277
278
279 def get_entry_and_processing_manager(media_id):
280 """
281 Get a MediaEntry, its media type, and its manager all in one go.
282
283 Returns a tuple of: `(entry, media_type, media_manager)`
284 """
285 entry = MediaEntry.query.filter_by(id=media_id).first()
286 if entry is None:
287 raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
288
289 manager = get_processing_manager_for_type(entry.media_type)
290
291 return entry, manager
292
293
294 def mark_entry_failed(entry_id, exc):
295 """
296 Mark a media entry as having failed in its conversion.
297
298 Uses the exception that was raised to mark more information. If
299 the exception is a derivative of BaseProcessingFail then we can
300 store extra information that can be useful for users telling them
301 why their media failed to process.
302
303 Args:
304 - entry_id: The id of the media entry
305
306 """
307 # Was this a BaseProcessingFail? In other words, was this a
308 # type of error that we know how to handle?
309 if isinstance(exc, BaseProcessingFail):
310 # Looks like yes, so record information about that failure and any
311 # metadata the user might have supplied.
312 atomic_update(mgg.database.MediaEntry,
313 {'id': entry_id},
314 {u'state': u'failed',
315 u'fail_error': unicode(exc.exception_path),
316 u'fail_metadata': exc.metadata})
317 else:
318 _log.warn("No idea what happened here, but it failed: %r", exc)
319 # Looks like no, so just mark it as failed and don't record a
320 # failure_error (we'll assume it wasn't handled) and don't record
321 # metadata (in fact overwrite it if somehow it had previous info
322 # here)
323 atomic_update(mgg.database.MediaEntry,
324 {'id': entry_id},
325 {u'state': u'failed',
326 u'fail_error': None,
327 u'fail_metadata': {}})
328
329
330 def get_process_filename(entry, workbench, acceptable_files):
331 """
332 Try and get the queued file if available, otherwise return the first file
333 in the acceptable_files that we have.
334
335 If no acceptable_files, raise ProcessFileNotFound
336 """
337 if entry.queued_media_file:
338 filepath = entry.queued_media_file
339 storage = mgg.queue_store
340 else:
341 for keyname in acceptable_files:
342 if entry.media_files.get(keyname):
343 filepath = entry.media_files[keyname]
344 storage = mgg.public_store
345 break
346
347 if not filepath:
348 raise ProcessFileNotFound()
349
350 filename = workbench.localized_file(
351 storage, filepath,
352 'source')
353
354 if not os.path.exists(filename):
355 raise ProcessFileNotFound()
356
357 return filename
358
359
360 def store_public(entry, keyname, local_file, target_name=None,
361 delete_if_exists=True):
362 if target_name is None:
363 target_name = os.path.basename(local_file)
364 target_filepath = create_pub_filepath(entry, target_name)
365
366 if keyname in entry.media_files:
367 _log.warn("store_public: keyname %r already used for file %r, "
368 "replacing with %r", keyname,
369 entry.media_files[keyname], target_filepath)
370 if delete_if_exists:
371 mgg.public_store.delete_file(entry.media_files[keyname])
372
373 try:
374 mgg.public_store.copy_local_to_storage(local_file, target_filepath)
375 except:
376 raise PublicStoreFail(keyname=keyname)
377
378 # raise an error if the file failed to copy
379 if not mgg.public_store.file_exists(target_filepath):
380 raise PublicStoreFail(keyname=keyname)
381
382 entry.media_files[keyname] = target_filepath
383
384
385 def copy_original(entry, orig_filename, target_name, keyname=u"original"):
386 store_public(entry, keyname, orig_filename, target_name)
387
388
389 class BaseProcessingFail(Exception):
390 """
391 Base exception that all other processing failure messages should
392 subclass from.
393
394 You shouldn't call this itself; instead you should subclass it
395 and provid the exception_path and general_message applicable to
396 this error.
397 """
398 general_message = u''
399
400 @property
401 def exception_path(self):
402 return u"%s:%s" % (
403 self.__class__.__module__, self.__class__.__name__)
404
405 def __init__(self, **metadata):
406 self.metadata = metadata or {}
407
408 class BadMediaFail(BaseProcessingFail):
409 """
410 Error that should be raised when an inappropriate file was given
411 for the media type specified.
412 """
413 general_message = _(u'Invalid file given for media type.')
414
415
416 class PublicStoreFail(BaseProcessingFail):
417 """
418 Error that should be raised when copying to public store fails
419 """
420 general_message = _('Copying to public storage failed.')
421
422
423 class ProcessFileNotFound(BaseProcessingFail):
424 """
425 Error that should be raised when an acceptable file for processing
426 is not found.
427 """
428 general_message = _(u'An acceptable processing file was not found')