Add priority to the celery tasks
[mediagoblin.git] / mediagoblin / processing / __init__.py
CommitLineData
41f446f4 1# GNU MediaGoblin -- federated, autonomous media hosting
cf29e8a8 2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
41f446f4
CAW
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
66cafc3b
CAW
17# Use an ordered dict if we can. If not, we'll just use a normal dict
18# later.
19try:
20 from collections import OrderedDict
21except:
22 OrderedDict = None
23
64da09e8 24import logging
095fbdaf 25import os
64da09e8 26
e49b7e02
BP
27import six
28
4a477e24 29from mediagoblin import mg_globals as mgg
77ea4c9b
CAW
30from mediagoblin.db.util import atomic_update
31from mediagoblin.db.models import MediaEntry
32from mediagoblin.tools.pluginapi import hook_handle
6506b1e2 33from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
8e5f9746 34
64da09e8 35_log = logging.getLogger(__name__)
41f446f4 36
41f446f4 37
64712915
JW
38class ProgressCallback(object):
39 def __init__(self, entry):
40 self.entry = entry
41
42 def __call__(self, progress):
43 if progress:
44 self.entry.transcoding_progress = progress
45 self.entry.save()
46
47
180bdbde 48def create_pub_filepath(entry, filename):
48a7ba1e 49 return mgg.public_store.get_unique_filepath(
180bdbde 50 ['media_entries',
e49b7e02 51 six.text_type(entry.id),
180bdbde
E
52 filename])
53
64712915 54
28f364bd 55class FilenameBuilder(object):
4774cfa3
BS
56 """Easily slice and dice filenames.
57
28f364bd 58 Initialize this class with an original file path, then use the fill()
4774cfa3
BS
59 method to create new filenames based on the original.
60
61 """
62 MAX_FILENAME_LENGTH = 255 # VFAT's maximum filename length
095fbdaf
BS
63
64 def __init__(self, path):
28f364bd 65 """Initialize a builder from an original file path."""
095fbdaf
BS
66 self.dirpath, self.basename = os.path.split(path)
67 self.basename, self.ext = os.path.splitext(self.basename)
68 self.ext = self.ext.lower()
69
28f364bd
BS
70 def fill(self, fmtstr):
71 """Build a new filename based on the original.
4774cfa3 72
28f364bd
BS
73 The fmtstr argument can include the following:
74 {basename} -- the original basename, with the extension removed
75 {ext} -- the original extension, always lowercase
76
77 If necessary, {basename} will be truncated so the filename does not
78 exceed this class' MAX_FILENAME_LENGTH in length.
4774cfa3
BS
79
80 """
095fbdaf
BS
81 basename_len = (self.MAX_FILENAME_LENGTH -
82 len(fmtstr.format(basename='', ext=self.ext)))
83 return fmtstr.format(basename=self.basename[:basename_len],
84 ext=self.ext)
85
86
e6bd03d4 87
14565fb7 88class MediaProcessor(object):
274a0f67
CAW
89 """A particular processor for this media type.
90
91 While the ProcessingManager handles all types of MediaProcessing
92 possible for a particular media type, a MediaProcessor can be
93 thought of as a *particular* processing action for a media type.
94 For example, you may have separate MediaProcessors for:
95
96 - initial_processing: the intial processing of a media
97 - gen_thumb: generate a thumbnail
98 - resize: resize an image
99 - transcode: transcode a video
100
58350141 101 ... etc.
274a0f67
CAW
102
103 Some information on producing a new MediaProcessor for your media type:
104
105 - You *must* supply a name attribute. This must be a class level
106 attribute, and a string. This will be used to determine the
107 subcommand of your process
108 - It's recommended that you supply a class level description
109 attribute.
110 - Supply a media_is_eligible classmethod. This will be used to
111 determine whether or not a media entry is eligible to use this
112 processor type. See the method documentation for details.
113 - To give "./bin/gmg reprocess run" abilities to this media type,
114 supply both gnerate_parser and parser_to_request classmethods.
115 - The process method will be what actually processes your media.
e6bd03d4 116 """
14565fb7
CAW
117 # You MUST override this in the child MediaProcessor!
118 name = None
119
274a0f67
CAW
120 # Optional, but will be used in various places to describe the
121 # action this MediaProcessor provides
122 description = None
123
5fd239fa 124 def __init__(self, manager, entry):
14565fb7 125 self.manager = manager
93b14fc3 126 self.entry = entry
5fd239fa 127 self.entry_orig_state = entry.state
14565fb7 128
e4bdc909 129 # Should be initialized at time of processing, at least
93b14fc3 130 self.workbench = None
93b14fc3 131
55cfa340
CAW
132 def __enter__(self):
133 self.workbench = mgg.workbench_manager.create()
7a85bf98 134 return self
55cfa340
CAW
135
136 def __exit__(self, *args):
137 self.workbench.destroy()
138 self.workbench = None
93b14fc3 139
e4bdc909 140 # @with_workbench
274a0f67 141 def process(self, **kwargs):
93b14fc3 142 """
274a0f67 143 Actually process this media entry.
93b14fc3 144 """
14565fb7
CAW
145 raise NotImplementedError
146
274a0f67 147 @classmethod
7584080b 148 def media_is_eligible(cls, entry=None, state=None):
14565fb7
CAW
149 raise NotImplementedError
150
274a0f67
CAW
151 ###############################
152 # Command line interface things
153 ###############################
154
155 @classmethod
55a10fef 156 def generate_parser(cls):
14565fb7
CAW
157 raise NotImplementedError
158
274a0f67 159 @classmethod
d1e9913b 160 def args_to_request(cls, args):
274a0f67
CAW
161 raise NotImplementedError
162
163 ##########################################
164 # THE FUTURE: web interface things here :)
165 ##########################################
166
5fd239fa
CAW
167 #####################
168 # Some common "steps"
169 #####################
715ea495 170
93b14fc3 171 def delete_queue_file(self):
8ec87dc3
E
172 # Remove queued media file from storage and database.
173 # queued_filepath is in the task_id directory which should
174 # be removed too, but fail if the directory is not empty to be on
175 # the super-safe side.
93b14fc3 176 queued_filepath = self.entry.queued_media_file
882779f5
RE
177 if queued_filepath:
178 mgg.queue_store.delete_file(queued_filepath) # rm file
179 mgg.queue_store.delete_dir(queued_filepath[:-1]) # rm dir
180 self.entry.queued_media_file = []
5fd239fa 181
14565fb7 182
4ba5bdd9
CAW
183class ProcessingKeyError(Exception): pass
184class ProcessorDoesNotExist(ProcessingKeyError): pass
185class ProcessorNotEligible(ProcessingKeyError): pass
4e601368
RE
186class ProcessingManagerDoesNotExist(ProcessingKeyError): pass
187
4ba5bdd9
CAW
188
189
14565fb7 190class ProcessingManager(object):
e4bdc909 191 """Manages all the processing actions available for a media type
14565fb7 192
e4bdc909
CAW
193 Specific processing actions, MediaProcessor subclasses, are added
194 to the ProcessingManager.
195 """
196 def __init__(self):
14565fb7 197 # Dict of all MediaProcessors of this media type
66cafc3b
CAW
198 if OrderedDict is not None:
199 self.processors = OrderedDict()
200 else:
201 self.processors = {}
14565fb7
CAW
202
203 def add_processor(self, processor):
204 """
205 Add a processor class to this media type
206 """
207 name = processor.name
208 if name is None:
209 raise AttributeError("Processor class's .name attribute not set")
58350141 210
14565fb7
CAW
211 self.processors[name] = processor
212
e4bdc909 213 def list_eligible_processors(self, entry):
14565fb7
CAW
214 """
215 List all processors that this media entry is eligible to be processed
216 for.
217 """
218 return [
219 processor
85ead8ac 220 for processor in self.processors.values()
7584080b
RE
221 if processor.media_is_eligible(entry=entry)]
222
223 def list_all_processors_by_state(self, state):
224 """
225 List all processors that this media state is eligible to be processed
226 for.
227 """
228 return [
229 processor
230 for processor in self.processors.values()
231 if processor.media_is_eligible(state=state)]
232
e4bdc909 233
85ead8ac
CAW
234 def list_all_processors(self):
235 return self.processors.values()
236
e4bdc909
CAW
237 def gen_process_request_via_cli(self, subparser):
238 # Got to figure out what actually goes here before I can write this properly
239 pass
14565fb7 240
4ba5bdd9
CAW
241 def get_processor(self, key, entry=None):
242 """
243 Get the processor with this key.
244
245 If entry supplied, make sure this entry is actually compatible;
246 otherwise raise error.
247 """
248 try:
249 processor = self.processors[key]
250 except KeyError:
251 raise ProcessorDoesNotExist(
252 "'%s' processor does not exist for this media type" % key)
253
254 if entry and not processor.media_is_eligible(entry):
255 raise ProcessorNotEligible(
256 "This entry is not eligible for processor with name '%s'" % key)
257
258 return processor
259
25ecdec9 260 def workflow(self, entry, manager, feed_url, reprocess_action,
261 reprocess_info=None):
d3390c43 262 """
263 Returns the Celery command needed to proceed with media processing
264 *This method has to be implemented in all media types*
265 """
266 raise NotImplementedError
267
14565fb7 268
d1e9913b
CAW
269def request_from_args(args, which_args):
270 """
271 Generate a request from the values of some argparse parsed args
272 """
273 request = {}
274 for arg in which_args:
275 request[arg] = getattr(args, arg)
276
277 return request
278
279
77ea4c9b
CAW
280class MediaEntryNotFound(Exception): pass
281
282
55cfa340 283def get_processing_manager_for_type(media_type):
77ea4c9b
CAW
284 """
285 Get the appropriate media manager for this type
286 """
287 manager_class = hook_handle(('reprocess_manager', media_type))
4e601368
RE
288 if not manager_class:
289 raise ProcessingManagerDoesNotExist(
290 "A processing manager does not exist for {0}".format(media_type))
77ea4c9b
CAW
291 manager = manager_class()
292
293 return manager
294
295
55cfa340 296def get_entry_and_processing_manager(media_id):
77ea4c9b
CAW
297 """
298 Get a MediaEntry, its media type, and its manager all in one go.
299
300 Returns a tuple of: `(entry, media_type, media_manager)`
301 """
302 entry = MediaEntry.query.filter_by(id=media_id).first()
303 if entry is None:
304 raise MediaEntryNotFound("Can't find media with id '%s'" % media_id)
305
55cfa340 306 manager = get_processing_manager_for_type(entry.media_type)
77ea4c9b
CAW
307
308 return entry, manager
93b14fc3
E
309
310
6788b412 311def mark_entry_failed(entry_id, exc):
2e5ea6b9
CAW
312 """
313 Mark a media entry as having failed in its conversion.
314
243c3843
NY
315 Uses the exception that was raised to mark more information. If
316 the exception is a derivative of BaseProcessingFail then we can
317 store extra information that can be useful for users telling them
318 why their media failed to process.
2e5ea6b9 319
945a1c56
BB
320 :param entry_id: The id of the media entry
321 :param exc: An instance of BaseProcessingFail
2e5ea6b9
CAW
322
323 """
6788b412
CAW
324 # Was this a BaseProcessingFail? In other words, was this a
325 # type of error that we know how to handle?
326 if isinstance(exc, BaseProcessingFail):
327 # Looks like yes, so record information about that failure and any
328 # metadata the user might have supplied.
82cd9683 329 atomic_update(mgg.database.MediaEntry,
5c2b8486 330 {'id': entry_id},
82cd9683 331 {u'state': u'failed',
e49b7e02 332 u'fail_error': six.text_type(exc.exception_path),
82cd9683 333 u'fail_metadata': exc.metadata})
6788b412 334 else:
baae1578 335 _log.warn("No idea what happened here, but it failed: %r", exc)
e3663c7b
BB
336 # Looks like no, let's record it so that admin could ask us about the
337 # reason
82cd9683 338 atomic_update(mgg.database.MediaEntry,
5c2b8486 339 {'id': entry_id},
82cd9683 340 {u'state': u'failed',
41076dc9
BB
341 u'fail_error': u'Unhandled exception: {0}'.format(
342 six.text_type(exc)),
82cd9683 343 u'fail_metadata': {}})
4a477e24
CAW
344
345
1cefccc7 346def get_process_filename(entry, workbench, acceptable_files):
eb372949 347 """
1cefccc7
RE
348 Try and get the queued file if available, otherwise return the first file
349 in the acceptable_files that we have.
eb372949 350
1cefccc7 351 If no acceptable_files, raise ProcessFileNotFound
eb372949
CAW
352 """
353 if entry.queued_media_file:
1cefccc7 354 filepath = entry.queued_media_file
eb372949
CAW
355 storage = mgg.queue_store
356 else:
1cefccc7
RE
357 for keyname in acceptable_files:
358 if entry.media_files.get(keyname):
359 filepath = entry.media_files[keyname]
360 storage = mgg.public_store
361 break
362
363 if not filepath:
364 raise ProcessFileNotFound()
eb372949 365
1cefccc7
RE
366 filename = workbench.localized_file(
367 storage, filepath,
eb372949
CAW
368 'source')
369
1cefccc7
RE
370 if not os.path.exists(filename):
371 raise ProcessFileNotFound()
372
373 return filename
eb372949
CAW
374
375
fb56676b
RE
376def store_public(entry, keyname, local_file, target_name=None,
377 delete_if_exists=True):
5fd239fa
CAW
378 if target_name is None:
379 target_name = os.path.basename(local_file)
380 target_filepath = create_pub_filepath(entry, target_name)
79f84d7e 381
5fd239fa
CAW
382 if keyname in entry.media_files:
383 _log.warn("store_public: keyname %r already used for file %r, "
384 "replacing with %r", keyname,
385 entry.media_files[keyname], target_filepath)
fb56676b
RE
386 if delete_if_exists:
387 mgg.public_store.delete_file(entry.media_files[keyname])
79f84d7e
RE
388 try:
389 mgg.public_store.copy_local_to_storage(local_file, target_filepath)
91f5f5e7
BB
390 except Exception as e:
391 _log.error(u'Exception happened: {0}'.format(e))
79f84d7e 392 raise PublicStoreFail(keyname=keyname)
79f84d7e 393 # raise an error if the file failed to copy
6375cf73 394 if not mgg.public_store.file_exists(target_filepath):
79f84d7e
RE
395 raise PublicStoreFail(keyname=keyname)
396
5fd239fa
CAW
397 entry.media_files[keyname] = target_filepath
398
399
400def copy_original(entry, orig_filename, target_name, keyname=u"original"):
401 store_public(entry, keyname, orig_filename, target_name)
402
403
8e5f9746
JW
404class BaseProcessingFail(Exception):
405 """
406 Base exception that all other processing failure messages should
407 subclass from.
408
409 You shouldn't call this itself; instead you should subclass it
2392fbc0 410 and provide the exception_path and general_message applicable to
8e5f9746
JW
411 this error.
412 """
413 general_message = u''
414
415 @property
416 def exception_path(self):
417 return u"%s:%s" % (
418 self.__class__.__module__, self.__class__.__name__)
419
a2608d6b
BB
420 def __init__(self, message=None, **metadata):
421 if message is not None:
422 super(BaseProcessingFail, self).__init__(message)
423 metadata['message'] = message
424 self.metadata = metadata
8e5f9746
JW
425
426class BadMediaFail(BaseProcessingFail):
4a477e24 427 """
8e5f9746
JW
428 Error that should be raised when an inappropriate file was given
429 for the media type specified.
4a477e24 430 """
8e5f9746 431 general_message = _(u'Invalid file given for media type.')
79f84d7e
RE
432
433
434class PublicStoreFail(BaseProcessingFail):
435 """
436 Error that should be raised when copying to public store fails
437 """
438 general_message = _('Copying to public storage failed.')
1cefccc7
RE
439
440
441class ProcessFileNotFound(BaseProcessingFail):
442 """
443 Error that should be raised when an acceptable file for processing
444 is not found.
445 """
446 general_message = _(u'An acceptable processing file was not found')