1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 # Use an ordered dict if we can. If not, we'll just use a normal dict
20 from collections
import OrderedDict
29 from mediagoblin
import mg_globals
as mgg
30 from mediagoblin
.db
.util
import atomic_update
31 from mediagoblin
.db
.models
import MediaEntry
32 from mediagoblin
.tools
.pluginapi
import hook_handle
33 from mediagoblin
.tools
.translate
import lazy_pass_to_ugettext
as _
35 _log
= logging
.getLogger(__name__
)
38 class ProgressCallback(object):
39 def __init__(self
, entry
):
42 def __call__(self
, progress
, default_quality_progress
=None):
44 if 100 - (self
.entry
.transcoding_progress
+ progress
) < 0.01:
45 self
.entry
.transcoding_progress
= 100
47 self
.entry
.transcoding_progress
+= round(progress
, 2)
48 if default_quality_progress
:
49 self
.entry
.main_transcoding_progress
= default_quality_progress
53 def create_pub_filepath(entry
, filename
):
54 return mgg
.public_store
.get_unique_filepath(
56 six
.text_type(entry
.id),
60 class FilenameBuilder(object):
61 """Easily slice and dice filenames.
63 Initialize this class with an original file path, then use the fill()
64 method to create new filenames based on the original.
67 MAX_FILENAME_LENGTH
= 255 # VFAT's maximum filename length
69 def __init__(self
, path
):
70 """Initialize a builder from an original file path."""
71 self
.dirpath
, self
.basename
= os
.path
.split(path
)
72 self
.basename
, self
.ext
= os
.path
.splitext(self
.basename
)
73 self
.ext
= self
.ext
.lower()
75 def fill(self
, fmtstr
):
76 """Build a new filename based on the original.
78 The fmtstr argument can include the following:
79 {basename} -- the original basename, with the extension removed
80 {ext} -- the original extension, always lowercase
82 If necessary, {basename} will be truncated so the filename does not
83 exceed this class' MAX_FILENAME_LENGTH in length.
86 basename_len
= (self
.MAX_FILENAME_LENGTH
-
87 len(fmtstr
.format(basename
='', ext
=self
.ext
)))
88 return fmtstr
.format(basename
=self
.basename
[:basename_len
],
93 class MediaProcessor(object):
94 """A particular processor for this media type.
96 While the ProcessingManager handles all types of MediaProcessing
97 possible for a particular media type, a MediaProcessor can be
98 thought of as a *particular* processing action for a media type.
99 For example, you may have separate MediaProcessors for:
101 - initial_processing: the intial processing of a media
102 - gen_thumb: generate a thumbnail
103 - resize: resize an image
104 - transcode: transcode a video
108 Some information on producing a new MediaProcessor for your media type:
110 - You *must* supply a name attribute. This must be a class level
111 attribute, and a string. This will be used to determine the
112 subcommand of your process
113 - It's recommended that you supply a class level description
115 - Supply a media_is_eligible classmethod. This will be used to
116 determine whether or not a media entry is eligible to use this
117 processor type. See the method documentation for details.
118 - To give "./bin/gmg reprocess run" abilities to this media type,
119 supply both gnerate_parser and parser_to_request classmethods.
120 - The process method will be what actually processes your media.
122 # You MUST override this in the child MediaProcessor!
125 # Optional, but will be used in various places to describe the
126 # action this MediaProcessor provides
129 def __init__(self
, manager
, entry
):
130 self
.manager
= manager
132 self
.entry_orig_state
= entry
.state
134 # Should be initialized at time of processing, at least
135 self
.workbench
= None
138 self
.workbench
= mgg
.workbench_manager
.create()
141 def __exit__(self
, *args
):
142 self
.workbench
.destroy()
143 self
.workbench
= None
146 def process(self
, **kwargs
):
148 Actually process this media entry.
150 raise NotImplementedError
153 def media_is_eligible(cls
, entry
=None, state
=None):
154 raise NotImplementedError
156 ###############################
157 # Command line interface things
158 ###############################
161 def generate_parser(cls
):
162 raise NotImplementedError
165 def args_to_request(cls
, args
):
166 raise NotImplementedError
168 ##########################################
169 # THE FUTURE: web interface things here :)
170 ##########################################
172 #####################
173 # Some common "steps"
174 #####################
176 def delete_queue_file(self
):
177 # Remove queued media file from storage and database.
178 # queued_filepath is in the task_id directory which should
179 # be removed too, but fail if the directory is not empty to be on
180 # the super-safe side.
181 queued_filepath
= self
.entry
.queued_media_file
183 mgg
.queue_store
.delete_file(queued_filepath
) # rm file
184 mgg
.queue_store
.delete_dir(queued_filepath
[:-1]) # rm dir
185 self
.entry
.queued_media_file
= []
188 class ProcessingKeyError(Exception): pass
189 class ProcessorDoesNotExist(ProcessingKeyError
): pass
190 class ProcessorNotEligible(ProcessingKeyError
): pass
191 class ProcessingManagerDoesNotExist(ProcessingKeyError
): pass
195 class ProcessingManager(object):
196 """Manages all the processing actions available for a media type
198 Specific processing actions, MediaProcessor subclasses, are added
199 to the ProcessingManager.
202 # Dict of all MediaProcessors of this media type
203 if OrderedDict
is not None:
204 self
.processors
= OrderedDict()
208 def add_processor(self
, processor
):
210 Add a processor class to this media type
212 name
= processor
.name
214 raise AttributeError("Processor class's .name attribute not set")
216 self
.processors
[name
] = processor
218 def list_eligible_processors(self
, entry
):
220 List all processors that this media entry is eligible to be processed
225 for processor
in self
.processors
.values()
226 if processor
.media_is_eligible(entry
=entry
)]
228 def list_all_processors_by_state(self
, state
):
230 List all processors that this media state is eligible to be processed
235 for processor
in self
.processors
.values()
236 if processor
.media_is_eligible(state
=state
)]
239 def list_all_processors(self
):
240 return self
.processors
.values()
242 def gen_process_request_via_cli(self
, subparser
):
243 # Got to figure out what actually goes here before I can write this properly
246 def get_processor(self
, key
, entry
=None):
248 Get the processor with this key.
250 If entry supplied, make sure this entry is actually compatible;
251 otherwise raise error.
254 processor
= self
.processors
[key
]
256 raise ProcessorDoesNotExist(
257 "'%s' processor does not exist for this media type" % key
)
259 if entry
and not processor
.media_is_eligible(entry
):
260 raise ProcessorNotEligible(
261 "This entry is not eligible for processor with name '%s'" % key
)
265 def workflow(self
, entry
, feed_url
, reprocess_action
, reprocess_info
=None):
267 Returns the Celery command needed to proceed with media processing
272 def request_from_args(args
, which_args
):
274 Generate a request from the values of some argparse parsed args
277 for arg
in which_args
:
278 request
[arg
] = getattr(args
, arg
)
283 class MediaEntryNotFound(Exception): pass
286 def get_processing_manager_for_type(media_type
):
288 Get the appropriate media manager for this type
290 manager_class
= hook_handle(('reprocess_manager', media_type
))
291 if not manager_class
:
292 raise ProcessingManagerDoesNotExist(
293 "A processing manager does not exist for {0}".format(media_type
))
294 manager
= manager_class()
299 def get_entry_and_processing_manager(media_id
):
301 Get a MediaEntry, its media type, and its manager all in one go.
303 Returns a tuple of: `(entry, media_type, media_manager)`
305 entry
= MediaEntry
.query
.filter_by(id=media_id
).first()
307 raise MediaEntryNotFound("Can't find media with id '%s'" % media_id
)
309 manager
= get_processing_manager_for_type(entry
.media_type
)
311 return entry
, manager
314 def mark_entry_failed(entry_id
, exc
):
316 Mark a media entry as having failed in its conversion.
318 Uses the exception that was raised to mark more information. If
319 the exception is a derivative of BaseProcessingFail then we can
320 store extra information that can be useful for users telling them
321 why their media failed to process.
323 :param entry_id: The id of the media entry
324 :param exc: An instance of BaseProcessingFail
327 # Was this a BaseProcessingFail? In other words, was this a
328 # type of error that we know how to handle?
329 if isinstance(exc
, BaseProcessingFail
):
330 # Looks like yes, so record information about that failure and any
331 # metadata the user might have supplied.
332 atomic_update(mgg
.database
.MediaEntry
,
334 {u
'state': u
'failed',
335 u
'fail_error': six
.text_type(exc
.exception_path
),
336 u
'fail_metadata': exc
.metadata
})
338 _log
.warn("No idea what happened here, but it failed: %r", exc
)
339 # Looks like no, let's record it so that admin could ask us about the
341 atomic_update(mgg
.database
.MediaEntry
,
343 {u
'state': u
'failed',
344 u
'fail_error': u
'Unhandled exception: {0}'.format(
346 u
'fail_metadata': {}})
349 def get_process_filename(entry
, workbench
, acceptable_files
):
351 Try and get the queued file if available, otherwise return the first file
352 in the acceptable_files that we have.
354 If no acceptable_files, raise ProcessFileNotFound
356 if entry
.queued_media_file
:
357 filepath
= entry
.queued_media_file
358 storage
= mgg
.queue_store
360 for keyname
in acceptable_files
:
361 if entry
.media_files
.get(keyname
):
362 filepath
= entry
.media_files
[keyname
]
363 storage
= mgg
.public_store
367 raise ProcessFileNotFound()
369 filename
= workbench
.localized_file(
373 if not os
.path
.exists(filename
):
374 raise ProcessFileNotFound()
379 def store_public(entry
, keyname
, local_file
, target_name
=None,
380 delete_if_exists
=True):
381 if target_name
is None:
382 target_name
= os
.path
.basename(local_file
)
383 target_filepath
= create_pub_filepath(entry
, target_name
)
385 if keyname
in entry
.media_files
:
386 _log
.warn("store_public: keyname %r already used for file %r, "
387 "replacing with %r", keyname
,
388 entry
.media_files
[keyname
], target_filepath
)
390 mgg
.public_store
.delete_file(entry
.media_files
[keyname
])
392 mgg
.public_store
.copy_local_to_storage(local_file
, target_filepath
)
393 except Exception as e
:
394 _log
.error(u
'Exception happened: {0}'.format(e
))
395 raise PublicStoreFail(keyname
=keyname
)
396 # raise an error if the file failed to copy
397 if not mgg
.public_store
.file_exists(target_filepath
):
398 raise PublicStoreFail(keyname
=keyname
)
400 entry
.media_files
[keyname
] = target_filepath
403 def copy_original(entry
, orig_filename
, target_name
, keyname
=u
"original"):
404 store_public(entry
, keyname
, orig_filename
, target_name
)
407 class BaseProcessingFail(Exception):
409 Base exception that all other processing failure messages should
412 You shouldn't call this itself; instead you should subclass it
413 and provide the exception_path and general_message applicable to
416 general_message
= u
''
419 def exception_path(self
):
421 self
.__class
__.__module
__, self
.__class
__.__name
__)
423 def __init__(self
, message
=None, **metadata
):
424 if message
is not None:
425 super(BaseProcessingFail
, self
).__init
__(message
)
426 metadata
['message'] = message
427 self
.metadata
= metadata
429 class BadMediaFail(BaseProcessingFail
):
431 Error that should be raised when an inappropriate file was given
432 for the media type specified.
434 general_message
= _(u
'Invalid file given for media type.')
437 class PublicStoreFail(BaseProcessingFail
):
439 Error that should be raised when copying to public store fails
441 general_message
= _('Copying to public storage failed.')
444 class ProcessFileNotFound(BaseProcessingFail
):
446 Error that should be raised when an acceptable file for processing
449 general_message
= _(u
'An acceptable processing file was not found')