added davidt
[libre-streamer.git] / gstconf.py
1 #!/usr/bin/env python3.4
2 # -*- coding: utf-8 -*-
3
4 # This file is part of ABYSS.
5 # ABYSS Broadcast Your Streaming Successfully
6 #
7 # ABYSS is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # ABYSS is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with ABYSS. If not, see <http://www.gnu.org/licenses/>.
19 #
20 # Copyright (c) 2016 David Testé
21
22 from os import rename
23 from os import listdir
24 from os import path
25 from os import mkdir
26 from time import localtime, strftime
27 import configparser
28
29 import gi
30 from gi.repository import Gst
31 from gi.repository import GstVideo
32
33 DIR_NAME = 'FILES_RECORDED'
34 AUDIO_DEFAULT = './' + DIR_NAME + '/' + 'AUDIO_DEFAULT'
35 RAWVIDEO_DEFAULT = './' + DIR_NAME + '/' + 'RAWVIDEO_DEFAULT'
36 STREAM_DEFAULT = './' + DIR_NAME + '/' + 'STREAM_DEFAULT'
37 BACKUP_SUFFIX = '_BACKUP'
38 FAILED_SUFFIX = '_FAILED_'
39 fail_counter = 1
40 AUDIO_BACKUP = AUDIO_DEFAULT + BACKUP_SUFFIX
41 RAWVIDEO_BACKUP = RAWVIDEO_DEFAULT + BACKUP_SUFFIX
42 STREAM_BACKUP = STREAM_DEFAULT + BACKUP_SUFFIX
43 ERROR = '[ERROR] '
44 INFO = '[INFO] '
45 WARN = '[WARN] '
46 CONFIG = '.abyss'
47
48 sources = {'RTSP_IP' : None,
49 'AUDIO_INPUT' : None,}
50 sinks = {'AUDIO_OUTPUT' : None,
51 'DIR': None,
52 'STREAM_SERVER_IP' : None,
53 'SERVER_PORT' : None,
54 'PASSWORD' : None,
55 'AUDIO_MOUNT' : None,
56 'VIDEO_MOUNT' : None,}
57
58 ##AUDIO_INPUT = 'alsa_input.usb-Burr-Brown_from_TI_USB_Audio_CODEC-00-CODEC.analog-stereo'
59 ##AUDIO_OUTPUT = 'alsa_output.pci-0000_00_1b.0.analog-stereo'
60
61 config = configparser.RawConfigParser()
62 if path.exists(CONFIG):
63 config.read(CONFIG)
64 try:
65 sources = {key : config.get('sources', key) for key in sources}
66 sinks = {key : config.get('sinks', key) for key in sinks}
67 except:
68 print(ERROR, gettime(), 'Failed to parse config file.')
69 else:
70 print(ERROR, gettime(), '".abyss" config file doesn\'t exist.')
71
72 if not path.isdir(DIR_NAME):
73 mkdir(DIR_NAME)
74
75 class New_user_pipeline():
76
77
78 def __init__(self, feed='main'):
79 self.rtsp_address = 'rtsp://' + sources['RTSP_IP']
80 self.feed = feed
81 self.user_pipeline = self.create_gstreamer_pipeline()
82
83 def create_video_sources(self):
84 """Create video inputs from various sources."""
85 self.videosrc = Gst.ElementFactory.make('rtspsrc', 'videosrc')
86 self.videosrc.set_property('location', self.rtsp_address)
87 self.videosrc.set_property('latency', 0)
88 ## self.videosrc.set_property('debug', True)
89 if self.feed == 'backup':
90 self.videosrc_backup = Gst.ElementFactory.make('v4l2src',
91 'videosrc_backup')
92 device_location = self.find_webcam_device()
93 self.videosrc_backup.set_property('device', device_location)
94
95 def find_webcam_device(self):
96 """Look out for the USB webcam device."""
97 devices = [dev for dev in listdir('/dev/') if 'video' in dev]
98 for item in devices:
99 # In case of computer having a built-in webcam
100 if item != 'video0' and len(devices) > 1:
101 return '/dev/' + item
102 # Without built-in webcam
103 elif len(devices) == 1:
104 return '/dev/video0'
105 print(ERROR, gettime(), 'No webcam device found.')
106
107 def find_mixingdesk_device(self):
108 """Look out for the USB mixing desk device.
109 Product used here: Behringer XENYX Q1002USB.
110 """
111 # shell cmd : 'pactl list | grep alsa_input'
112 # AUDIO_INPUT --> const used currently
113 pass
114
115 def create_pipeline_callbacks(self):
116 """Callbacks to connect dynamically created pads."""
117 self.videosrc.connect('pad-added', self.on_pad_added_to_rtspsrc)
118
119 def on_pad_added_to_rtspsrc(self, rtspsrc, pad):
120 """Connect the dynamic 'src'pad of an RTSP source."""
121 sinkpad = self.queuev_1.get_static_pad('sink')
122 pad.link(sinkpad)
123
124 def create_audio_sources(self):
125 """Create audio inputs from various sources."""
126 self.audiosrc = Gst.ElementFactory.make('pulsesrc', 'audiosrc')
127 self.audiosrc.set_property('device', sources['AUDIO_INPUT'])
128
129 def create_audiolevel_plugin(self):
130 """Create audio level plugin to feed a vu-meter."""
131 self.audiolevel = Gst.ElementFactory.make('level', 'audiolevel')
132 self.audiolevel.set_property('interval', 200000000)
133
134 def create_filesink(self):
135 """Create storable output elements."""
136 self.disksink_rawvideo = Gst.ElementFactory.make('filesink')
137 self.disksink_rawvideo.set_property('location', RAWVIDEO_DEFAULT)
138 self.disksink_audio = Gst.ElementFactory.make('filesink')
139 self.disksink_audio.set_property('location', AUDIO_DEFAULT)
140 self.disksink_stream = Gst.ElementFactory.make('filesink')
141 self.disksink_stream.set_property('location', STREAM_DEFAULT)
142 if self.feed == 'backup':
143 self.disksink_rawvideo.set_property('location', RAWVIDEO_BACKUP)
144 self.disksink_audio.set_property('location', AUDIO_BACKUP)
145 self.disksink_stream.set_property('location', STREAM_BACKUP)
146
147 def create_streamsink(self):
148 """Create streamable output elements."""
149 # To local screen:
150 self.screensink = Gst.ElementFactory.make('xvimagesink', 'screensink')
151 self.screensink.set_property('sync', False)
152 # To local audio output (headphones):
153 self.audiosink = Gst.ElementFactory.make('pulsesink', 'audiosink')
154 self.audiosink.set_property('device', sinks['AUDIO_OUTPUT'])
155 self.audiosink.set_property('sync', False)
156 # To icecast server:
157 self.icecastsink_audio = Gst.ElementFactory.make('shout2send', 'icecastsink_audio')
158 self.icecastsink_audio.set_property('sync', False)
159 self.icecastsink_audio.set_property('ip', sinks['STREAM_SERVER_IP'])
160 self.icecastsink_audio.set_property('port', int(sinks['SERVER_PORT']))
161 self.icecastsink_audio.set_property('mount', sinks['AUDIO_MOUNT'])
162 self.icecastsink_audio.set_property('password', sinks['PASSWORD'])
163 self.icecastsink_stream = Gst.ElementFactory.make('shout2send', 'icecastsink_stream')
164 self.icecastsink_stream.set_property('sync', False)
165 self.icecastsink_stream.set_property('ip', sinks['STREAM_SERVER_IP'])
166 self.icecastsink_stream.set_property('port', int(sinks['SERVER_PORT']))
167 self.icecastsink_stream.set_property('mount', sinks['VIDEO_MOUNT'])
168 self.icecastsink_stream.set_property('password', sinks['PASSWORD'])
169
170 def create_payloader_elements(self):
171 pass
172
173 def create_depayloader_elements(self):
174 self.rtpjpegdepay = Gst.ElementFactory.make('rtpjpegdepay', 'rtpjpegdepay')
175
176 def create_encoder_elements(self):
177 # Audio encoders:
178 self.vorbisenc = Gst.ElementFactory.make('vorbisenc', 'vorbisenc')
179 # Video encoders:
180 self.vp8enc = Gst.ElementFactory.make('vp8enc', 'vp8enc')
181 self.vp8enc.set_property('min_quantizer', 1)
182 self.vp8enc.set_property('max_quantizer', 13)
183 self.vp8enc.set_property('cpu-used', 5)
184 self.vp8enc.set_property('deadline', 42000)
185 self.vp8enc.set_property('threads', 2)
186 self.vp8enc.set_property('sharpness', 7)
187
188 def create_decoder_elements(self):
189 self.jpegdec = Gst.ElementFactory.make('jpegdec', 'jpegdec')
190 self.jpegdec.set_property('max-errors', -1)
191
192 def create_muxer_elements(self):
193 self.oggmux = Gst.ElementFactory.make('oggmux', 'oggmux')
194 self.mkvmux = Gst.ElementFactory.make('matroskamux', 'mkvmux')
195 self.webmmux = Gst.ElementFactory.make('webmmux', 'webmmux')
196 self.webmmux.set_property('streamable', True)
197
198 def create_demuxer_elements(self):
199 pass
200
201 def create_filtering_elements(self):
202 self.scaling = Gst.ElementFactory.make('videoscale', 'scaling')
203 caps = Gst.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
204 self.capsfilter = Gst.ElementFactory.make('capsfilter', 'capsfilter')
205 self.capsfilter.set_property('caps', caps)
206
207 caps_backup = Gst.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
208 self.capsfilter_backup = Gst.ElementFactory.make('capsfilter', 'capsfilter_backup')
209 self.capsfilter_backup.set_property('caps', caps_backup)
210
211 def create_tee_elements(self):
212 """Create tee elements to divide feeds."""
213 self.tee_rawvideo = Gst.ElementFactory.make('tee', 'tee_rawvideo')
214 self.tee_videodecoded = Gst.ElementFactory.make('tee', 'tee_videodecoded')
215 self.tee_streamfull = Gst.ElementFactory.make('tee', 'tee_streamfull')
216 self.tee_rawaudio = Gst.ElementFactory.make('tee', 'tee_rawaudio')
217 self.tee_streamaudio = Gst.ElementFactory.make('tee', 'tee_streamaudio')
218
219 def connect_tee(self,
220 tee_element,
221 input_element,
222 output_element_1,
223 output_element_2,
224 output_element_3=None,):
225 """Links input and outputs of a given tee element."""
226 # Find a way to check if the element given are in the pipeline
227 # then pass the result to the 'if' statement.
228 ## argcheck = [True for arg in locals() if arg in 'the_list_of_elements_added']
229 ## print('[DEBUG] ArgList check: ', argcheck)
230 ## if False not in argcheck
231 if True:
232 input_element.link(tee_element)
233 tee_element.link(output_element_1)
234 tee_element.link(output_element_2)
235 if output_element_3:
236 tee_element.link(output_element_3)
237 else:
238 print(ERROR,
239 gettime(),
240 'Couldn\'t link the tee. Element(s) probably not in the pipeline ')
241
242 def create_queues(self):
243 # For video feed:
244 self.queuev_1 = Gst.ElementFactory.make('queue', 'queuev_1')
245 self.queuev_2 = Gst.ElementFactory.make('queue', 'queuev_2')
246 self.queuev_3 = Gst.ElementFactory.make('queue', 'queuev_3')
247 self.queuev_4 = Gst.ElementFactory.make('queue', 'queuev_4')
248 self.queuev_5 = Gst.ElementFactory.make('queue', 'queuev_5')
249 # For audio feed:
250 self.queuea_1 = Gst.ElementFactory.make('queue', 'queuea_1')
251 self.queuea_2 = Gst.ElementFactory.make('queue', 'queuea_2')
252 self.queuea_3 = Gst.ElementFactory.make('queue', 'queuea_3')
253 self.queuea_4 = Gst.ElementFactory.make('queue', 'queuea_4')
254 self.queuea_4.set_property('leaky', 2)
255 self.queuea_5 = Gst.ElementFactory.make('queue', 'queuea_5')
256 # For audio+video muxer:
257 self.queuem_1 = Gst.ElementFactory.make('queue', 'queuem_1')
258 self.queuem_2 = Gst.ElementFactory.make('queue', 'queuem_2')
259 self.queuem_2.set_property('leaky', 2)
260
261 def create_pipeline_elements(self):
262 print(INFO, gettime(), 'Pipeline creation state: creating elements... ', end='')
263 # Inputs elements:
264 self.create_video_sources()
265 self.create_audio_sources()
266 # Middle elements:
267 self.create_audiolevel_plugin()
268 self.create_payloader_elements()
269 self.create_depayloader_elements()
270 self.create_encoder_elements()
271 self.create_decoder_elements()
272 self.create_muxer_elements()
273 self.create_filtering_elements()
274 self.create_tee_elements()
275 self.create_queues()
276 # Output elements:
277 self.create_filesink()
278 self.create_streamsink()
279 if self.feed == 'test':
280 print('TEST OK...', end='')
281 print('created')
282 if self.feed == 'backup':
283 print (INFO,
284 gettime(),
285 'Webcam device location: ',
286 self.videosrc_backup.get_property('device'))
287
288
289 def add_elements_to_pipeline(self):
290 print(INFO, gettime(), 'Pipeline creation state: adding elements... ', end='')
291 cond = self.feed != 'test'
292
293 # Inputs elements:
294 self.streampipe.add(self.audiosrc)
295 # Middle elements:
296 self.streampipe.add(self.audiolevel)
297 self.streampipe.add(self.queuea_1)
298 self.streampipe.add(self.queuev_3)
299 if cond:
300 self.streampipe.add(self.vorbisenc)
301 self.streampipe.add(self.oggmux)
302 self.streampipe.add(self.queuea_2)
303 self.streampipe.add(self.queuea_3)
304 self.streampipe.add(self.vp8enc)
305 self.streampipe.add(self.mkvmux)
306 self.streampipe.add(self.webmmux)
307 self.streampipe.add(self.tee_rawaudio)
308 self.streampipe.add(self.tee_rawvideo)
309 self.streampipe.add(self.tee_streamaudio)
310 self.streampipe.add(self.tee_streamfull)
311 self.streampipe.add(self.queuev_2)
312 self.streampipe.add(self.queuev_4)
313 self.streampipe.add(self.queuev_5)
314 self.streampipe.add(self.queuea_4)
315 self.streampipe.add(self.queuea_5)
316 self.streampipe.add(self.queuem_1)
317 self.streampipe.add(self.queuem_2)
318 # Outputs elements:
319 self.streampipe.add(self.screensink)
320 if cond:
321 self.streampipe.add(self.disksink_rawvideo)
322 self.streampipe.add(self.disksink_audio)
323 self.streampipe.add(self.disksink_stream)
324 self.streampipe.add(self.icecastsink_audio)
325 self.streampipe.add(self.icecastsink_stream)
326 else:
327 self.streampipe.add(self.audiosink)
328
329 if self.feed == 'main' or self.feed == 'test':
330 # Inputs elements:
331 self.streampipe.add(self.videosrc)
332 # Middle elements:
333 self.streampipe.add(self.rtpjpegdepay)
334 self.streampipe.add(self.jpegdec)
335 self.streampipe.add(self.scaling)
336 self.streampipe.add(self.capsfilter)
337 self.streampipe.add(self.tee_videodecoded)
338 self.streampipe.add(self.queuev_1)
339 if self.feed == 'test':
340 print ('TEST OK...', end='')
341 elif self.feed == 'backup':
342 # Inputs elements:
343 self.streampipe.add(self.videosrc_backup)
344 # Middle elements:
345 self.streampipe.add(self.capsfilter_backup)
346 print ('BACKUP OK...', end='')
347 print('added')
348
349 def link_pipeline_elements(self):
350 """Link all elements with static pads."""
351 print(INFO, gettime(), 'Pipeline creation state: linking elements... ', end='')
352 cond = self.feed != 'test'
353
354 # Audio feed:
355 self.audiosrc.link(self.audiolevel)
356 self.audiolevel.link(self.queuea_1)
357 if cond:
358 self.queuea_1.link(self.vorbisenc)
359 self.connect_tee(self.tee_rawaudio,
360 self.vorbisenc,
361 self.queuea_2,
362 self.queuea_5,)
363 self.queuea_2.link(self.oggmux)
364 self.connect_tee(self.tee_streamaudio,
365 self.oggmux,
366 self.queuea_3,
367 self.queuea_4,)
368 self.queuea_3.link(self.disksink_audio)
369 self.queuea_4.link(self.icecastsink_audio)
370 self.queuea_5.link(self.webmmux)
371 else:
372 self.queuea_1.link(self.audiosink)
373
374 # Video feed:
375 if cond:
376 self.queuev_2.link(self.mkvmux)
377 self.mkvmux.link(self.queuev_4)
378 self.queuev_4.link(self.disksink_rawvideo)
379 else:
380 self.queuev_1.link(self.rtpjpegdepay)
381 self.rtpjpegdepay.link(self.jpegdec)
382 self.jpegdec.link(self.queuev_3)
383 self.queuev_3.link(self.screensink)
384
385 # Stream (audio+video) feed:
386 if cond:
387 self.vp8enc.link(self.queuev_5)
388 self.queuev_5.link(self.webmmux)
389 self.connect_tee(self.tee_streamfull,
390 self.webmmux,
391 self.queuem_1,
392 self.queuem_2,)
393 self.queuem_1.link(self.disksink_stream)
394 self.queuem_2.link(self.icecastsink_stream)
395 if self.feed == 'main':
396 # linking here RTSP feed
397 self.queuev_1.link(self.rtpjpegdepay)
398 self.connect_tee(self.tee_rawvideo,
399 self.rtpjpegdepay,
400 self.queuev_2,
401 self.jpegdec,)
402 self.connect_tee(self.tee_videodecoded,
403 self.jpegdec,
404 self.queuev_3,
405 self.scaling,)
406 # Stream (video) feed:
407 self.scaling.link(self.capsfilter)
408 self.capsfilter.link(self.vp8enc)
409 elif self.feed == 'backup':
410 # linking here backup feed (WEBCAM)
411 self.videosrc_backup.link(self.capsfilter_backup)
412 self.connect_tee(self.tee_rawvideo,
413 self.capsfilter_backup,
414 self.queuev_2,
415 self.queuev_3,
416 output_element_3=self.vp8enc)
417 ## self.capsfilter_backup.link(self.queuev_3)
418 print('BACKUP OK...', end='')
419 if not cond:
420 print('TEST OK...', end='')
421 print('linked')
422
423 def create_gstreamer_pipeline(self):
424 # New empty pipeline:
425 self.streampipe = Gst.Pipeline()
426 self.create_pipeline_elements()
427 # Setting-up:
428 self.add_elements_to_pipeline()
429 self.link_pipeline_elements()
430 if self.feed == 'main' or self.feed == 'test':
431 self.create_pipeline_callbacks()
432
433 global bus
434 bus = self.streampipe.get_bus()
435 bus.add_signal_watch()
436 bus.enable_sync_message_emission()
437 # Used to get messages that GStreamer emits.
438 bus.connect("message", self.on_message)
439
440 print(INFO, gettime(), 'Pipeline creation state: successfully done.')
441 return self.streampipe
442
443 def on_message(self, bus, message):
444 t = message.type
445 if t == Gst.MessageType.EOS:
446 self.streampipe.set_state(Gst.State.NULL)
447 elif t == Gst.MessageType.ERROR:
448 err, debug = message.parse_error()
449 print (ERROR, '%s' % err, debug)
450
451 def stream_play(self):
452 self.streampipe.set_state(Gst.State.PLAYING)
453 if self.feed == 'backup':
454 print(WARN, gettime(), 'Backup pipeline started.')
455 print(INFO, gettime(), 'PLAYING State resquested.')
456
457 def stream_stop(self):
458 self.streampipe.set_state(Gst.State.NULL)
459 print(INFO, gettime(), 'STOPPED State resquested.')
460
461 def set_filenames(self, string, streamfailed=False):
462 """Sets filename and location for each sink."""
463 global fail_counter
464 filename = string
465 audio = './' + DIR_NAME + '/' + filename + '_AUDIO'
466 rawvideo = './' + DIR_NAME + '/' + filename + '_RAWVIDEO'
467 stream = './' + DIR_NAME + '/' + filename + '_STREAM'
468 if self.feed == 'main':
469 if streamfailed and filename:
470 audio = audio + FAILED_SUFFIX + str(fail_counter)
471 rawvideo = rawvideo + FAILED_SUFFIX + str(fail_counter)
472 stream = stream + FAILED_SUFFIX + str(fail_counter)
473 self.rename_files(audio, rawvideo, stream)
474 fail_counter += 1
475 elif streamfailed:
476 audio = AUDIO_DEFAULT + FAILED_SUFFIX + str(fail_counter)
477 rawvideo = RAWVIDEO_DEFAULT + FAILED_SUFFIX + str(fail_counter)
478 stream = STREAM_DEFAULT + FAILED_SUFFIX + str(fail_counter)
479 self.rename_files(audio, rawvideo, stream)
480 fail_counter += 1
481 else:
482 self.rename_files(audio, rawvideo, stream)
483 elif self.feed == 'backup':
484 ## print('INSIDE BACKUP RENAMING')
485 rename(AUDIO_BACKUP, audio)
486 rename(RAWVIDEO_BACKUP, rawvideo)
487 rename(STREAM_BACKUP, stream)
488
489 print(INFO, gettime(), 'Audio file written on disk.')
490 print(INFO, gettime(), 'Raw video file written on disk.')
491 print(INFO, gettime(), 'Streamed file written on disk.')
492
493 def rename_files(self, audio_name, rawvideo_name, stream_name):
494 rename(AUDIO_DEFAULT, audio_name)
495 rename(RAWVIDEO_DEFAULT, rawvideo_name)
496 rename(STREAM_DEFAULT, stream_name)
497
498 def get_gstreamer_bus():
499 return bus
500
501 def gettime():
502 return strftime('%y-%m-%d_%H:%M:%S ', localtime())