1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from celery
.task
import Task
21 from celery
import registry
23 from mediagoblin
.db
.util
import ObjectId
24 from mediagoblin
import mg_globals
as mgg
26 from mediagoblin
.util
import lazy_pass_to_ugettext
as _
33 from arista
.transcoder
import TranscoderOptions
36 MEDIUM_SIZE
= 640, 640
37 ARISTA_DEVICE_KEY
= 'web'
43 def process_video(entry
):
45 Code to process a video
48 workbench
= mgg
.workbench_manager
.create_workbench()
50 queued_filepath
= entry
['queued_media_file']
51 queued_filename
= workbench
.localized_file(
52 mgg
.queue_store
, queued_filepath
,
57 devices
= arista
.presets
.get()
58 device
= devices
[ARISTA_DEVICE_KEY
]
60 queue
= arista
.queue
.TranscodeQueue()
62 info
['tmp_file'] = tmp_file
= tempfile
.NamedTemporaryFile()
64 info
['medium_filepath'] = medium_filepath
= create_pub_filepath(entry
, 'video.webm')
66 output
= tmp_file
.name
68 uri
= 'file://' + queued_filename
70 preset
= device
.presets
[device
.default
]
72 opts
= TranscoderOptions(uri
, preset
, output
)
78 queue
.connect("entry-start", entry_start
, info
)
79 # queue.connect("entry-pass-setup", entry_pass_setup, options)
80 queue
.connect("entry-error", entry_error
, info
)
81 queue
.connect("entry-complete", entry_complete
, info
)
83 info
['loop'] = loop
= gobject
.MainLoop()
87 # we have to re-read because unlike PIL, not everything reads
88 # things in string representation :)
89 queued_file
= file(queued_filename
, 'rb')
92 original_filepath
= create_pub_filepath(entry
, queued_filepath
[-1])
94 with mgg
.public_store
.get_file(original_filepath
, 'wb') as original_file
:
95 original_file
.write(queued_file
.read())
97 mgg
.queue_store
.delete_file(queued_filepath
)
98 entry
['queued_media_file'] = []
99 media_files_dict
= entry
.setdefault('media_files', {})
100 media_files_dict
['original'] = original_filepath
103 workbench
.destroy_self()
106 def create_pub_filepath(entry
, filename
):
107 return mgg
.public_store
.get_unique_filepath(
109 unicode(entry
['_id']),
113 class BaseProcessingFail(Exception):
115 Base exception that all other processing failure messages should
118 You shouldn't call this itself; instead you should subclass it
119 and provid the exception_path and general_message applicable to
122 general_message
= u
''
125 def exception_path(self
):
127 self
.__class
__.__module
__, self
.__class
__.__name
__)
129 def __init__(self
, **metadata
):
130 self
.metadata
= metadata
or {}
133 class BadMediaFail(BaseProcessingFail
):
135 Error that should be raised when an inappropriate file was given
136 for the media type specified.
138 general_message
= _(u
'Invalid file given for media type.')
141 ################################
142 # Media processing initial steps
143 ################################
145 class ProcessMedia(Task
):
147 Pass this entry off for processing.
149 def run(self
, media_id
):
151 Pass the media entry off to the appropriate processing function
152 (for now just process_image...)
154 entry
= mgg
.database
.MediaEntry
.one(
155 {'_id': ObjectId(media_id
)})
157 # Try to process, and handle expected errors.
160 except BaseProcessingFail
, exc
:
161 mark_entry_failed(entry
[u
'_id'], exc
)
164 entry
['state'] = u
'processed'
167 def on_failure(self
, exc
, task_id
, args
, kwargs
, einfo
):
169 If the processing failed we should mark that in the database.
171 Assuming that the exception raised is a subclass of BaseProcessingFail,
172 we can use that to get more information about the failure and store that
173 for conveying information to users about the failure, etc.
176 mark_entry_failed(entry_id
, exc
)
179 process_media
= registry
.tasks
[ProcessMedia
.name
]
182 def mark_entry_failed(entry_id
, exc
):
184 Mark a media entry as having failed in its conversion.
186 Uses the exception that was raised to mark more information. If the
187 exception is a derivative of BaseProcessingFail then we can store extra
188 information that can be useful for users telling them why their media failed
192 - entry_id: The id of the media entry
195 # Was this a BaseProcessingFail? In other words, was this a
196 # type of error that we know how to handle?
197 if isinstance(exc
, BaseProcessingFail
):
198 # Looks like yes, so record information about that failure and any
199 # metadata the user might have supplied.
200 mgg
.database
['media_entries'].update(
202 {'$set': {u
'state': u
'failed',
203 u
'fail_error': exc
.exception_path
,
204 u
'fail_metadata': exc
.metadata
}})
206 # Looks like no, so just mark it as failed and don't record a
207 # failure_error (we'll assume it wasn't handled) and don't record
208 # metadata (in fact overwrite it if somehow it had previous info
210 mgg
.database
['media_entries'].update(
212 {'$set': {u
'state': u
'failed',
214 u
'fail_metadata': {}}})
217 def entry_start(queue
, entry
, options
):
218 print(queue
, entry
, options
)
220 def entry_complete(queue
, entry
, info
):
221 entry
.transcoder
.stop()
222 gobject
.idle_add(info
['loop'].quit
)
224 with info
['tmp_file'] as tmp_file
:
225 mgg
.public_store
.get_file(info
['medium_filepath'], 'wb').write(
227 info
['entry']['media_files']['medium'] = info
['medium_filepath']
229 print('\n=== DONE! ===\n')
231 print(queue
, entry
, info
)
233 def entry_error(queue
, entry
, options
):
234 print(queue
, entry
, options
)
236 def signal_handler(signum
, frame
):
238 Handle Ctr-C gracefully and shut down the transcoder.
242 print _("Interrupt caught. Cleaning up... (Ctrl-C to force exit)")
244 signal
.signal(signal
.SIGINT
, signal
.SIG_DFL
)
246 def check_interrupted():
248 Check whether we have been interrupted by Ctrl-C and stop the
253 source
= transcoder
.pipe
.get_by_name("source")
254 source
.send_event(gst
.event_new_eos())
256 # Something pretty bad happened... just exit!
257 gobject
.idle_add(loop
.quit
)