Testing pipeline completed.
[libre-streamer.git] / stream_2016 / gstconf.py
1 #!/usr/bin/env python3.4
2 # -*- coding: utf-8 -*-
3
4 # This file is part of ABYSS.
5 # ABYSS Broadcast Your Streaming Successfully
6 #
7 # ABYSS is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # ABYSS is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with ABYSS. If not, see <http://www.gnu.org/licenses/>.
19 #
20 # Copyright (c) 2016 David Testé
21
22 from os import rename
23 from os import listdir
24 from time import localtime, strftime
25
26 import gi
27 from gi.repository import Gst
28 from gi.repository import GstVideo
29
30 # Pathname has to be defined
31 PATHNAME = ''
32 AUDIO_DEFAULT = PATHNAME + 'AUDIO_DEFAULT'
33 RAWVIDEO_DEFAULT = PATHNAME + 'RAWVIDEO_DEFAULT'
34 STREAM_DEFAULT = PATHNAME + 'STREAM_DEFAULT'
35 BACKUP_SUFFIX = '_BACKUP'
36 FAILED_SUFFIX = '_FAILED_'
37 fail_counter = 1
38 AUDIO_BACKUP = AUDIO_DEFAULT + BACKUP_SUFFIX
39 RAWVIDEO_BACKUP = RAWVIDEO_DEFAULT + BACKUP_SUFFIX
40 STREAM_BACKUP = STREAM_DEFAULT + BACKUP_SUFFIX
41 ERROR = '[ERROR] '
42 INFO = '[INFO] '
43 WARN = '[WARN] '
44
45 AUDIO_INPUT = 'alsa_input.usb-Burr-Brown_from_TI_USB_Audio_CODEC-00-CODEC.analog-stereo'
46 AUDIO_OUTPUT = 'alsa_output.pci-0000_00_1b.0.analog-stereo'
47
48
49 class New_user_pipeline():
50
51
52 def __init__(self, rtsp_address, feed='main'):
53 self.rtsp_address = 'rtsp://' + rtsp_address
54 self.feed = feed
55 self.user_pipeline = self.create_gstreamer_pipeline()
56
57 def create_video_sources(self):
58 """Create video inputs from various sources."""
59 self.videosrc = Gst.ElementFactory.make('rtspsrc', 'videosrc')
60 self.videosrc.set_property('location', self.rtsp_address)
61 ## self.videosrc.set_property('location', 'rtsp://192.168.48.2:554')
62 self.videosrc.set_property('latency', 0)
63 ## self.videosrc.set_property('debug', True)
64 if self.feed == 'backup':
65 self.videosrc_backup = Gst.ElementFactory.make('v4l2src',
66 'videosrc_backup')
67 device_location = self.find_webcam_device()
68 self.videosrc_backup.set_property('device', device_location)
69
70 def find_webcam_device(self):
71 """Look out for the USB webcam device."""
72 devices = [dev for dev in listdir('/dev/') if 'video' in dev]
73 for item in devices:
74 # In case of computer having a built-in webcam
75 if item != 'video0' and len(devices) > 1:
76 return '/dev/' + item
77 # Without built-in webcam
78 elif len(devices) == 1:
79 return '/dev/video0'
80 print(ERROR, gettime(), 'No webcam device found.')
81
82 def find_mixingdesk_device(self):
83 """Look out for the USB mixing desk device.
84 Product used here: Behringer XENYX Q1002USB.
85 """
86 # shell cmd : 'pactl list | grep alsa_input'
87 # AUDIO_INPUT --> const used currently
88 pass
89
90 def create_pipeline_callbacks(self):
91 """Callbacks to connect dynamically created pads."""
92 self.videosrc.connect('pad-added', self.on_pad_added_to_rtspsrc)
93
94 def on_pad_added_to_rtspsrc(self, rtspsrc, pad):
95 """Connect the dynamic 'src'pad of an RTSP source."""
96 sinkpad = self.queuev_1.get_static_pad('sink')
97 pad.link(sinkpad)
98
99 def create_audio_sources(self):
100 """Create audio inputs from various sources."""
101 self.audiosrc = Gst.ElementFactory.make('pulsesrc', 'audiosrc')
102 self.audiosrc.set_property('device', AUDIO_INPUT)
103
104 def create_audiolevel_plugin(self):
105 """Create audio level plugin to feed a vu-meter."""
106 self.audiolevel = Gst.ElementFactory.make('level', 'audiolevel')
107 self.audiolevel.set_property('interval', 200000000)
108
109 def create_filesink(self):
110 """Create storable output elements."""
111 self.disksink_rawvideo = Gst.ElementFactory.make('filesink')
112 #[TO DO]: File location has to be defined
113 self.disksink_rawvideo.set_property('location', RAWVIDEO_DEFAULT)
114 self.disksink_audio = Gst.ElementFactory.make('filesink')
115 self.disksink_audio.set_property('location', AUDIO_DEFAULT)
116 self.disksink_stream = Gst.ElementFactory.make('filesink')
117 self.disksink_stream.set_property('location', STREAM_DEFAULT)
118 if self.feed == 'backup':
119 self.disksink_rawvideo.set_property('location', RAWVIDEO_BACKUP)
120 self.disksink_audio.set_property('location', AUDIO_BACKUP)
121 self.disksink_stream.set_property('location', STREAM_BACKUP)
122
123 def create_streamsink(self):
124 """Create streamable output elements."""
125 # To local screen:
126 self.screensink = Gst.ElementFactory.make('xvimagesink', 'screensink')
127 self.screensink.set_property('sync', False)
128 # To local audio output (headphones):
129 self.audiosink = Gst.ElementFactory.make('pulsesink', 'audiosink')
130 self.audiosink.set_property('device', AUDIO_OUTPUT)
131 self.audiosink.set_property('sync', False)
132 # To icecast server:
133 self.icecastsink_audio = Gst.ElementFactory.make('shout2send', 'icecastsink_audio')
134 self.icecastsink_audio.set_property('sync', False)
135 ## Configuration should be written on a file locally to keep safe private addresses
136 self.icecastsink_audio.set_property('ip', 'live2.fsf.org')
137 self.icecastsink_audio.set_property('port', 80)
138 self.icecastsink_audio.set_property('mount', 'testaudio.ogg')
139 self.icecastsink_audio.set_property('password', 'thahw3Wiez')
140 self.icecastsink_stream = Gst.ElementFactory.make('shout2send', 'icecastsink_stream')
141 self.icecastsink_stream.set_property('sync', False)
142 self.icecastsink_stream.set_property('ip', 'live2.fsf.org')
143 self.icecastsink_stream.set_property('port', 80)
144 self.icecastsink_stream.set_property('mount', 'teststream.webm')
145 self.icecastsink_stream.set_property('password', 'thahw3Wiez')
146
147 def create_payloader_elements(self):
148 pass
149
150 def create_depayloader_elements(self):
151 self.rtpjpegdepay = Gst.ElementFactory.make('rtpjpegdepay', 'rtpjpegdepay')
152
153 def create_encoder_elements(self):
154 # Audio encoders:
155 self.vorbisenc = Gst.ElementFactory.make('vorbisenc', 'vorbisenc')
156 # Video encoders:
157 self.vp8enc = Gst.ElementFactory.make('vp8enc', 'vp8enc')
158 self.vp8enc.set_property('min_quantizer', 1)
159 self.vp8enc.set_property('max_quantizer', 13)
160 self.vp8enc.set_property('cpu-used', 5)
161 self.vp8enc.set_property('deadline', 42000)
162 self.vp8enc.set_property('threads', 2)
163 self.vp8enc.set_property('sharpness', 7)
164
165 def create_decoder_elements(self):
166 self.jpegdec = Gst.ElementFactory.make('jpegdec', 'jpegdec')
167 self.jpegdec.set_property('max-errors', -1)
168
169 def create_muxer_elements(self):
170 self.oggmux = Gst.ElementFactory.make('oggmux', 'oggmux')
171 self.mkvmux = Gst.ElementFactory.make('matroskamux', 'mkvmux')
172 self.webmmux = Gst.ElementFactory.make('webmmux', 'webmmux')
173 self.webmmux.set_property('streamable', True)
174
175 def create_demuxer_elements(self):
176 pass
177
178 def create_filtering_elements(self):
179 self.scaling = Gst.ElementFactory.make('videoscale', 'scaling')
180 caps = Gst.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
181 self.capsfilter = Gst.ElementFactory.make('capsfilter', 'capsfilter')
182 self.capsfilter.set_property('caps', caps)
183
184 caps_backup = Gst.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
185 self.capsfilter_backup = Gst.ElementFactory.make('capsfilter', 'capsfilter_backup')
186 self.capsfilter_backup.set_property('caps', caps_backup)
187
188 def create_tee_elements(self):
189 """Create tee elements to divide feeds."""
190 self.tee_rawvideo = Gst.ElementFactory.make('tee', 'tee_rawvideo')
191 self.tee_videodecoded = Gst.ElementFactory.make('tee', 'tee_videodecoded')
192 self.tee_streamfull = Gst.ElementFactory.make('tee', 'tee_streamfull')
193 self.tee_rawaudio = Gst.ElementFactory.make('tee', 'tee_rawaudio')
194 self.tee_streamaudio = Gst.ElementFactory.make('tee', 'tee_streamaudio')
195
196 def connect_tee(self,
197 tee_element,
198 input_element,
199 output_element_1,
200 output_element_2,
201 output_element_3=None,):
202 """Links input and outputs of a given tee element."""
203 # Find a way to check if the element given are in the pipeline
204 # then pass the result to the 'if' statement.
205 ## argcheck = [True for arg in locals() if arg in 'the_list_of_elements_added']
206 ## print('[DEBUG] ArgList check: ', argcheck)
207 ## if False not in argcheck
208 if True:
209 input_element.link(tee_element)
210 tee_element.link(output_element_1)
211 tee_element.link(output_element_2)
212 if output_element_3:
213 tee_element.link(output_element_3)
214 else:
215 print(ERROR,
216 gettime(),
217 'Couldn\'t link the tee. Element(s) probably not in the pipeline ')
218
219 def create_queues(self):
220 # For video feed:
221 self.queuev_1 = Gst.ElementFactory.make('queue', 'queuev_1')
222 self.queuev_2 = Gst.ElementFactory.make('queue', 'queuev_2')
223 self.queuev_3 = Gst.ElementFactory.make('queue', 'queuev_3')
224 self.queuev_4 = Gst.ElementFactory.make('queue', 'queuev_4')
225 self.queuev_5 = Gst.ElementFactory.make('queue', 'queuev_5')
226 self.queuev_6 = Gst.ElementFactory.make('queue', 'queuev_6')
227 # For audio feed:
228 self.queuea_1 = Gst.ElementFactory.make('queue', 'queuea_1')
229 self.queuea_2 = Gst.ElementFactory.make('queue', 'queuea_2')
230 self.queuea_3 = Gst.ElementFactory.make('queue', 'queuea_3')
231 self.queuea_4 = Gst.ElementFactory.make('queue', 'queuea_4')
232 self.queuea_4.set_property('leaky', 2)
233 self.queuea_5 = Gst.ElementFactory.make('queue', 'queuea_5')
234 # For audio+video muxer:
235 self.queuem_1 = Gst.ElementFactory.make('queue', 'queuem_1')
236 self.queuem_2 = Gst.ElementFactory.make('queue', 'queuem_2')
237 self.queuem_2.set_property('leaky', 2)
238
239 def create_pipeline_elements(self):
240 print(INFO, gettime(), 'Pipeline creation state: creating elements... ', end='')
241 # Inputs elements:
242 self.create_video_sources()
243 self.create_audio_sources()
244 # Middle elements:
245 self.create_audiolevel_plugin()
246 self.create_payloader_elements()
247 self.create_depayloader_elements()
248 self.create_encoder_elements()
249 self.create_decoder_elements()
250 self.create_muxer_elements()
251 self.create_filtering_elements()
252 self.create_tee_elements()
253 self.create_queues()
254 # Output elements:
255 self.create_filesink()
256 self.create_streamsink()
257 if self.feed == 'test':
258 print('TEST OK...', end='')
259 print('created')
260 if self.feed == 'backup':
261 print (INFO,
262 gettime(),
263 'Webcam device location: ',
264 self.videosrc_backup.get_property('device'))
265
266
267 def add_elements_to_pipeline(self):
268 print(INFO, gettime(), 'Pipeline creation state: adding elements... ', end='')
269 cond = self.feed != 'test'
270
271 # Inputs elements:
272 self.streampipe.add(self.audiosrc)
273 # Middle elements:
274 self.streampipe.add(self.audiolevel)
275 self.streampipe.add(self.queuea_1)
276 self.streampipe.add(self.queuev_3)
277 if cond:
278 self.streampipe.add(self.vorbisenc)
279 self.streampipe.add(self.oggmux)
280 self.streampipe.add(self.queuea_2)
281 self.streampipe.add(self.queuea_3)
282 self.streampipe.add(self.vp8enc)
283 self.streampipe.add(self.mkvmux)
284 self.streampipe.add(self.webmmux)
285 self.streampipe.add(self.tee_rawaudio)
286 self.streampipe.add(self.tee_rawvideo)
287 self.streampipe.add(self.tee_streamaudio)
288 self.streampipe.add(self.tee_streamfull)
289 self.streampipe.add(self.queuev_2)
290 self.streampipe.add(self.queuev_4)
291 self.streampipe.add(self.queuev_5)
292 self.streampipe.add(self.queuea_4)
293 self.streampipe.add(self.queuea_5)
294 self.streampipe.add(self.queuem_1)
295 self.streampipe.add(self.queuem_2)
296 # Outputs elements:
297 self.streampipe.add(self.screensink)
298 if cond:
299 self.streampipe.add(self.disksink_rawvideo)
300 self.streampipe.add(self.disksink_audio)
301 self.streampipe.add(self.disksink_stream)
302 self.streampipe.add(self.icecastsink_audio)
303 self.streampipe.add(self.icecastsink_stream)
304 else:
305 self.streampipe.add(self.audiosink)
306
307 if self.feed == 'main' or self.feed == 'test':
308 # Inputs elements:
309 self.streampipe.add(self.videosrc)
310 # Middle elements:
311 self.streampipe.add(self.rtpjpegdepay)
312 self.streampipe.add(self.jpegdec)
313 self.streampipe.add(self.scaling)
314 self.streampipe.add(self.capsfilter)
315 self.streampipe.add(self.tee_videodecoded)
316 self.streampipe.add(self.queuev_1)
317 if self.feed == 'test':
318 print ('TEST OK...', end='')
319 elif self.feed == 'backup':
320 # Inputs elements:
321 self.streampipe.add(self.videosrc_backup)
322 # Middle elements:
323 self.streampipe.add(self.capsfilter_backup)
324 print ('BACKUP OK...', end='')
325 print('added')
326
327 def link_pipeline_elements(self):
328 """Link all elements with static pads."""
329 print(INFO, gettime(), 'Pipeline creation state: linking elements... ', end='')
330 cond = self.feed != 'test'
331
332 # Audio feed:
333 self.audiosrc.link(self.audiolevel)
334 self.audiolevel.link(self.queuea_1)
335 if cond:
336 self.queuea_1.link(self.vorbisenc)
337 self.connect_tee(self.tee_rawaudio,
338 self.vorbisenc,
339 self.queuea_2,
340 self.queuea_5,)
341 self.queuea_2.link(self.oggmux)
342 self.connect_tee(self.tee_streamaudio,
343 self.oggmux,
344 self.queuea_3,
345 self.queuea_4,)
346 self.queuea_3.link(self.disksink_audio)
347 self.queuea_4.link(self.icecastsink_audio)
348 self.queuea_5.link(self.webmmux)
349 else:
350 self.queuea_1.link(self.audiosink)
351
352 # Video feed:
353 if cond:
354 self.queuev_2.link(self.mkvmux)
355 self.mkvmux.link(self.queuev_4)
356 self.queuev_4.link(self.disksink_rawvideo)
357 else:
358 self.queuev_1.link(self.rtpjpegdepay)
359 self.rtpjpegdepay.link(self.jpegdec)
360 self.jpegdec.link(self.queuev_3)
361 self.queuev_3.link(self.screensink)
362
363 # Stream (audio+video) feed:
364 if cond:
365 self.vp8enc.link(self.queuev_5)
366 self.queuev_5.link(self.webmmux)
367 self.connect_tee(self.tee_streamfull,
368 self.webmmux,
369 self.queuem_1,
370 self.queuem_2,)
371 self.queuem_1.link(self.disksink_stream)
372 self.queuem_2.link(self.icecastsink_stream)
373 if self.feed == 'main':
374 # linking here RTSP feed
375 self.queuev_1.link(self.rtpjpegdepay)
376 self.connect_tee(self.tee_rawvideo,
377 self.rtpjpegdepay,
378 self.queuev_2,
379 self.jpegdec,)
380 self.connect_tee(self.tee_videodecoded,
381 self.jpegdec,
382 self.queuev_3,
383 self.scaling,)
384 # Stream (video) feed:
385 self.scaling.link(self.capsfilter)
386 self.capsfilter.link(self.vp8enc)
387 elif self.feed == 'backup':
388 # linking here backup feed (WEBCAM)
389 self.videosrc_backup.link(self.capsfilter_backup)
390 self.connect_tee(self.tee_rawvideo,
391 self.capsfilter_backup,
392 self.queuev_2,
393 self.queuev_3,
394 output_element_3=self.vp8enc)
395 ## self.capsfilter_backup.link(self.queuev_3)
396 print('BACKUP OK...', end='')
397 if not cond:
398 print('TEST OK...', end='')
399 print('linked')
400
401 def create_gstreamer_pipeline(self):
402 # New empty pipeline:
403 self.streampipe = Gst.Pipeline()
404 self.create_pipeline_elements()
405 # Setting-up:
406 self.add_elements_to_pipeline()
407 self.link_pipeline_elements()
408 if self.feed == 'main' or self.feed == 'test':
409 self.create_pipeline_callbacks()
410
411 global bus
412 bus = self.streampipe.get_bus()
413 bus.add_signal_watch()
414 bus.enable_sync_message_emission()
415 # Used to get messages that GStreamer emits.
416 bus.connect("message", self.on_message)
417
418 print(INFO, gettime(), 'Pipeline creation state: successfully done.')
419 return self.streampipe
420
421 def on_message(self, bus, message):
422 t = message.type
423 if t == Gst.MessageType.EOS:
424 self.streampipe.set_state(Gst.State.NULL)
425 elif t == Gst.MessageType.ERROR:
426 err, debug = message.parse_error()
427 print (ERROR, '%s' % err, debug)
428
429 def stream_play(self):
430 self.streampipe.set_state(Gst.State.PLAYING)
431 if self.feed == 'backup':
432 print(WARN, gettime(), 'Backup pipeline started.')
433 print(INFO, gettime(), 'PLAYING State resquested')
434
435 def stream_stop(self):
436 self.streampipe.set_state(Gst.State.NULL)
437 print(INFO, gettime(), 'STOPPED State resquested')
438
439 def set_filenames(self, string, streamfailed=False):
440 """Sets filename and location for each sink."""
441 global fail_counter
442 filename = string
443 audio = PATHNAME + filename + '_AUDIO'
444 rawvideo = PATHNAME + filename + '_RAWVIDEO'
445 stream = PATHNAME + filename + '_STREAM'
446 print('FEED STATE: ', self.feed)
447 if self.feed == 'main':
448 if streamfailed and filename:
449 audio = audio + FAILED_SUFFIX + str(fail_counter)
450 rawvideo = rawvideo + FAILED_SUFFIX + str(fail_counter)
451 stream = stream + FAILED_SUFFIX + str(fail_counter)
452 rename(AUDIO_DEFAULT, audio)
453 rename(RAWVIDEO_DEFAULT, rawvideo)
454 rename(STREAM_DEFAULT, stream)
455 fail_counter += 1
456 elif streamfailed:
457 audio = AUDIO_DEFAULT + FAILED_SUFFIX + str(fail_counter)
458 rawvideo = RAWVIDEO_DEFAULT + FAILED_SUFFIX + str(fail_counter)
459 stream = STREAM_DEFAULT + FAILED_SUFFIX + str(fail_counter)
460 rename(AUDIO_DEFAULT, audio)
461 rename(RAWVIDEO_DEFAULT, rawvideo)
462 rename(STREAM_DEFAULT, stream)
463 fail_counter += 1
464 else:
465 rename(AUDIO_DEFAULT, audio)
466 rename(RAWVIDEO_DEFAULT, rawvideo)
467 rename(STREAM_DEFAULT, stream)
468 elif self.feed == 'backup':
469 print('INSIDE BACKUP RENAMING')
470 rename(AUDIO_BACKUP, audio)
471 rename(RAWVIDEO_BACKUP, rawvideo)
472 rename(STREAM_BACKUP, stream)
473
474 def rename_files():
475 pass
476
477 def get_gstreamer_bus():
478 return bus
479
480 def gettime():
481 return strftime('%y-%m-%d_%H:%M:%S ', localtime())