Merge branch 'master' of git://gitorious.org/mediagoblin/mediagoblin
[mediagoblin.git] / mediagoblin / processing / __init__.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 # Use an ordered dict if we can. If not, we'll just use a normal dict
18 # later.
19 try:
20 from collections import OrderedDict
21 except:
22 OrderedDict = None
23
24 import logging
25 import os
26
27 from mediagoblin import mg_globals as mgg
28 from mediagoblin.db.util import atomic_update
29 from mediagoblin.db.models import MediaEntry
30 from mediagoblin.tools.pluginapi import hook_handle
31 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
32
33 _log = logging.getLogger(__name__)
34
35
36 class ProgressCallback(object):
37 def __init__(self, entry):
38 self.entry = entry
39
40 def __call__(self, progress):
41 if progress:
42 self.entry.transcoding_progress = progress
43 self.entry.save()
44
45
46 def create_pub_filepath(entry, filename):
47 return mgg.public_store.get_unique_filepath(
48 ['media_entries',
49 unicode(entry.id),
50 filename])
51
52
53 class FilenameBuilder(object):
54 """Easily slice and dice filenames.
55
56 Initialize this class with an original file path, then use the fill()
57 method to create new filenames based on the original.
58
59 """
60 MAX_FILENAME_LENGTH = 255 # VFAT's maximum filename length
61
62 def __init__(self, path):
63 """Initialize a builder from an original file path."""
64 self.dirpath, self.basename = os.path.split(path)
65 self.basename, self.ext = os.path.splitext(self.basename)
66 self.ext = self.ext.lower()
67
68 def fill(self, fmtstr):
69 """Build a new filename based on the original.
70
71 The fmtstr argument can include the following:
72 {basename} -- the original basename, with the extension removed
73 {ext} -- the original extension, always lowercase
74
75 If necessary, {basename} will be truncated so the filename does not
76 exceed this class' MAX_FILENAME_LENGTH in length.
77
78 """
79 basename_len = (self.MAX_FILENAME_LENGTH -
80 len(fmtstr.format(basename='', ext=self.ext)))
81 return fmtstr.format(basename=self.basename[:basename_len],
82 ext=self.ext)
83
84
85
86 class MediaProcessor(object):
87 """A particular processor for this media type.
88
89 While the ProcessingManager handles all types of MediaProcessing
90 possible for a particular media type, a MediaProcessor can be
91 thought of as a *particular* processing action for a media type.
92 For example, you may have separate MediaProcessors for:
93
94 - initial_processing: the intial processing of a media
95 - gen_thumb: generate a thumbnail
96 - resize: resize an image
97 - transcode: transcode a video
98
99 ... etc.
100
101 Some information on producing a new MediaProcessor for your media type:
102
103 - You *must* supply a name attribute. This must be a class level
104 attribute, and a string. This will be used to determine the
105 subcommand of your process
106 - It's recommended that you supply a class level description
107 attribute.
108 - Supply a media_is_eligible classmethod. This will be used to
109 determine whether or not a media entry is eligible to use this
110 processor type. See the method documentation for details.
111 - To give "./bin/gmg reprocess run" abilities to this media type,
112 supply both gnerate_parser and parser_to_request classmethods.
113 - The process method will be what actually processes your media.
114 """
115 # You MUST override this in the child MediaProcessor!
116 name = None
117
118 # Optional, but will be used in various places to describe the
119 # action this MediaProcessor provides
120 description = None
121
122 def __init__(self, manager, entry):
123 self.manager = manager
124 self.entry = entry
125 self.entry_orig_state = entry.state
126
127 # Should be initialized at time of processing, at least
128 self.workbench = None
129
130 def __enter__(self):
131 self.workbench = mgg.workbench_manager.create()
132 return self
133
134 def __exit__(self, *args):
135 self.workbench.destroy()
136 self.workbench = None
137
138 # @with_workbench
139 def process(self, **kwargs):
140 """
141 Actually process this media entry.
142 """
143 raise NotImplementedError
144
145 @classmethod
146 def media_is_eligible(cls, entry=None, state=None):
147 raise NotImplementedError
148
149 ###############################
150 # Command line interface things
151 ###############################
152
153 @classmethod
154 def generate_parser(cls):
155 raise NotImplementedError
156
157 @classmethod
158 def args_to_request(cls, args):
159 raise NotImplementedError
160
161 ##########################################
162 # THE FUTURE: web interface things here :)
163 ##########################################
164
165 #####################
166 # Some common "steps"
167 #####################
168
169 def delete_queue_file(self):
170 # Remove queued media file from storage and database.
171 # queued_filepath is in the task_id directory which should
172 # be removed too, but fail if the directory is not empty to be on
173 # the super-safe side.
174 queued_filepath = self.entry.queued_media_file
175 if queued_filepath:
176 mgg.queue_store.delete_file(queued_filepath) # rm file
177 mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
178 self.entry.queued_media_file = []
179
180
181 class ProcessingKeyError(Exception): pass
182 class ProcessorDoesNotExist(ProcessingKeyError): pass
183 class ProcessorNotEligible(ProcessingKeyError): pass
184 class ProcessingManagerDoesNotExist(ProcessingKeyError): pass
185
186
187
188 class ProcessingManager(object):
189 """Manages all the processing actions available for a media type
190
191 Specific processing actions, MediaProcessor subclasses, are added
192 to the ProcessingManager.
193 """
194 def __init__(self):
195 # Dict of all MediaProcessors of this media type
196 if OrderedDict is not None:
197 self.processors = OrderedDict()
198 else:
199 self.processors = {}
200
201 def add_processor(self, processor):
202 """
203 Add a processor class to this media type
204 """
205 name = processor.name
206 if name is None:
207 raise AttributeError("Processor class's .name attribute not set")
208
209 self.processors[name] = processor
210
211 def list_eligible_processors(self, entry):
212 """
213 List all processors that this media entry is eligible to be processed
214 for.
215 """
216 return [
217 processor
218 for processor in self.processors.values()
219 if processor.media_is_eligible(entry=entry)]
220
221 def list_all_processors_by_state(self, state):
222 """
223 List all processors that this media state is eligible to be processed
224 for.
225 """
226 return [
227 processor
228 for processor in self.processors.values()
229 if processor.media_is_eligible(state=state)]
230
231
232 def list_all_processors(self):
233 return self.processors.values()
234
235 def gen_process_request_via_cli(self, subparser):
236 # Got to figure out what actually goes here before I can write this properly
237 pass
238
239 def get_processor(self, key, entry=None):
240 """
241 Get the processor with this key.
242
243 If entry supplied, make sure this entry is actually compatible;
244 otherwise raise error.
245 """
246 try:
247 processor = self.processors[key]
248 except KeyError:
249 import pdb
250 pdb.set_trace()
251 raise ProcessorDoesNotExist(
252 "'%s' processor does not exist for this media type" % key)
253
254 if entry and not processor.media_is_eligible(entry):
255 raise ProcessorNotEligible(
256 "This entry is not eligible for processor with name '%s'" % key)
257
258 return processor
259
260
261 def request_from_args(args, which_args):
262 """
263 Generate a request from the values of some argparse parsed args
264 """
265 request = {}
266 for arg in which_args:
267 request[arg] = getattr(args, arg)
268
269 return request
270
271
272 class MediaEntryNotFound(Exception): pass
273
274
275 def get_processing_manager_for_type(media_type):
276 """
277 Get the appropriate media manager for this type
278 """
279 manager_class = hook_handle(('reprocess_manager', media_type))
280 if not manager_class:
281 raise ProcessingManagerDoesNotExist(
282 "A processing manager does not exist for {0}".format(media_type))
283 manager = manager_class()
284
285 return manager
286
287
288 def get_entry_and_processing_manager(media_id):
289 """
290 Get a MediaEntry, its media type, and its manager all in one go.
291
292 Returns a tuple of: `(entry, media_type, media_manager)`
293 """
294 entry = MediaEntry.query.filter_by(id=media_id).first()
295 if entry is None:
296 raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
297
298 manager = get_processing_manager_for_type(entry.media_type)
299
300 return entry, manager
301
302
303 def mark_entry_failed(entry_id, exc):
304 """
305 Mark a media entry as having failed in its conversion.
306
307 Uses the exception that was raised to mark more information. If
308 the exception is a derivative of BaseProcessingFail then we can
309 store extra information that can be useful for users telling them
310 why their media failed to process.
311
312 Args:
313 - entry_id: The id of the media entry
314
315 """
316 # Was this a BaseProcessingFail? In other words, was this a
317 # type of error that we know how to handle?
318 if isinstance(exc, BaseProcessingFail):
319 # Looks like yes, so record information about that failure and any
320 # metadata the user might have supplied.
321 atomic_update(mgg.database.MediaEntry,
322 {'id': entry_id},
323 {u'state': u'failed',
324 u'fail_error': unicode(exc.exception_path),
325 u'fail_metadata': exc.metadata})
326 else:
327 _log.warn("No idea what happened here, but it failed: %r", exc)
328 # Looks like no, so just mark it as failed and don't record a
329 # failure_error (we'll assume it wasn't handled) and don't record
330 # metadata (in fact overwrite it if somehow it had previous info
331 # here)
332 atomic_update(mgg.database.MediaEntry,
333 {'id': entry_id},
334 {u'state': u'failed',
335 u'fail_error': None,
336 u'fail_metadata': {}})
337
338
339 def get_process_filename(entry, workbench, acceptable_files):
340 """
341 Try and get the queued file if available, otherwise return the first file
342 in the acceptable_files that we have.
343
344 If no acceptable_files, raise ProcessFileNotFound
345 """
346 if entry.queued_media_file:
347 filepath = entry.queued_media_file
348 storage = mgg.queue_store
349 else:
350 for keyname in acceptable_files:
351 if entry.media_files.get(keyname):
352 filepath = entry.media_files[keyname]
353 storage = mgg.public_store
354 break
355
356 if not filepath:
357 raise ProcessFileNotFound()
358
359 filename = workbench.localized_file(
360 storage, filepath,
361 'source')
362
363 if not os.path.exists(filename):
364 raise ProcessFileNotFound()
365
366 return filename
367
368
369 def store_public(entry, keyname, local_file, target_name=None,
370 delete_if_exists=True):
371 if target_name is None:
372 target_name = os.path.basename(local_file)
373 target_filepath = create_pub_filepath(entry, target_name)
374
375 if keyname in entry.media_files:
376 _log.warn("store_public: keyname %r already used for file %r, "
377 "replacing with %r", keyname,
378 entry.media_files[keyname], target_filepath)
379 if delete_if_exists:
380 mgg.public_store.delete_file(entry.media_files[keyname])
381
382 try:
383 mgg.public_store.copy_local_to_storage(local_file, target_filepath)
384 except:
385 raise PublicStoreFail(keyname=keyname)
386
387 # raise an error if the file failed to copy
388 if not mgg.public_store.file_exists(target_filepath):
389 raise PublicStoreFail(keyname=keyname)
390
391 entry.media_files[keyname] = target_filepath
392
393
394 def copy_original(entry, orig_filename, target_name, keyname=u"original"):
395 store_public(entry, keyname, orig_filename, target_name)
396
397
398 class BaseProcessingFail(Exception):
399 """
400 Base exception that all other processing failure messages should
401 subclass from.
402
403 You shouldn't call this itself; instead you should subclass it
404 and provide the exception_path and general_message applicable to
405 this error.
406 """
407 general_message = u''
408
409 @property
410 def exception_path(self):
411 return u"%s:%s" % (
412 self.__class__.__module__, self.__class__.__name__)
413
414 def __init__(self, **metadata):
415 self.metadata = metadata or {}
416
417 class BadMediaFail(BaseProcessingFail):
418 """
419 Error that should be raised when an inappropriate file was given
420 for the media type specified.
421 """
422 general_message = _(u'Invalid file given for media type.')
423
424
425 class PublicStoreFail(BaseProcessingFail):
426 """
427 Error that should be raised when copying to public store fails
428 """
429 general_message = _('Copying to public storage failed.')
430
431
432 class ProcessFileNotFound(BaseProcessingFail):
433 """
434 Error that should be raised when an acceptable file for processing
435 is not found.
436 """
437 general_message = _(u'An acceptable processing file was not found')