1 #!/usr/bin/env python3.4
2 # -*- coding: utf-8 -*-
4 # This file is part of ABYSS.
5 # ABYSS Broadcast Your Streaming Successfully
7 # ABYSS is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # ABYSS is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with ABYSS. If not, see <http://www.gnu.org/licenses/>.
20 # Copyright (c) 2016 David Testé
23 from os
import listdir
26 from time
import localtime
, strftime
30 from gi
.repository
import Gst
31 from gi
.repository
import GstVideo
33 DIR_NAME
= 'FILES_RECORDED'
34 AUDIO_DEFAULT
= './' + DIR_NAME
+ '/' + 'AUDIO_DEFAULT'
35 RAWVIDEO_DEFAULT
= './' + DIR_NAME
+ '/' + 'RAWVIDEO_DEFAULT'
36 STREAM_DEFAULT
= './' + DIR_NAME
+ '/' + 'STREAM_DEFAULT'
37 BACKUP_SUFFIX
= '_BACKUP'
38 FAILED_SUFFIX
= '_FAILED_'
40 AUDIO_BACKUP
= AUDIO_DEFAULT
+ BACKUP_SUFFIX
41 RAWVIDEO_BACKUP
= RAWVIDEO_DEFAULT
+ BACKUP_SUFFIX
42 STREAM_BACKUP
= STREAM_DEFAULT
+ BACKUP_SUFFIX
48 sources
= {'RTSP_IP' : None,
49 'AUDIO_INPUT' : None,}
50 sinks
= {'AUDIO_OUTPUT' : None,
52 'STREAM_SERVER_IP' : None,
56 'VIDEO_MOUNT' : None,}
58 ##AUDIO_INPUT = 'alsa_input.usb-Burr-Brown_from_TI_USB_Audio_CODEC-00-CODEC.analog-stereo'
59 ##AUDIO_OUTPUT = 'alsa_output.pci-0000_00_1b.0.analog-stereo'
61 config
= configparser
.RawConfigParser()
62 if path
.exists(CONFIG
):
65 sources
= {key
: config
.get('sources', key
) for key
in sources
}
66 sinks
= {key
: config
.get('sinks', key
) for key
in sinks
}
68 print(ERROR
, gettime(), 'Failed to parse config file.')
70 print(ERROR
, gettime(), '".abyss" config file doesn\'t exist.')
72 if not path
.isdir(DIR_NAME
):
75 class New_user_pipeline():
78 def __init__(self
, feed
='main'):
79 self
.rtsp_address
= 'rtsp://' + sources
['RTSP_IP']
81 self
.user_pipeline
= self
.create_gstreamer_pipeline()
83 def create_video_sources(self
):
84 """Create video inputs from various sources."""
85 self
.videosrc
= Gst
.ElementFactory
.make('rtspsrc', 'videosrc')
86 self
.videosrc
.set_property('location', self
.rtsp_address
)
87 self
.videosrc
.set_property('latency', 0)
88 ## self.videosrc.set_property('debug', True)
89 if self
.feed
== 'backup':
90 self
.videosrc_backup
= Gst
.ElementFactory
.make('v4l2src',
92 device_location
= self
.find_webcam_device()
93 self
.videosrc_backup
.set_property('device', device_location
)
95 def find_webcam_device(self
):
96 """Look out for the USB webcam device."""
97 devices
= [dev
for dev
in listdir('/dev/') if 'video' in dev
]
99 # In case of computer having a built-in webcam
100 if item
!= 'video0' and len(devices
) > 1:
101 return '/dev/' + item
102 # Without built-in webcam
103 elif len(devices
) == 1:
105 print(ERROR
, gettime(), 'No webcam device found.')
107 def find_mixingdesk_device(self
):
108 """Look out for the USB mixing desk device.
109 Product used here: Behringer XENYX Q1002USB.
111 # shell cmd : 'pactl list | grep alsa_input'
112 # AUDIO_INPUT --> const used currently
115 def create_pipeline_callbacks(self
):
116 """Callbacks to connect dynamically created pads."""
117 self
.videosrc
.connect('pad-added', self
.on_pad_added_to_rtspsrc
)
119 def on_pad_added_to_rtspsrc(self
, rtspsrc
, pad
):
120 """Connect the dynamic 'src'pad of an RTSP source."""
121 sinkpad
= self
.queuev_1
.get_static_pad('sink')
124 def create_audio_sources(self
):
125 """Create audio inputs from various sources."""
126 self
.audiosrc
= Gst
.ElementFactory
.make('pulsesrc', 'audiosrc')
127 self
.audiosrc
.set_property('device', sources
['AUDIO_INPUT'])
129 def create_audiolevel_plugin(self
):
130 """Create audio level plugin to feed a vu-meter."""
131 self
.audiolevel
= Gst
.ElementFactory
.make('level', 'audiolevel')
132 self
.audiolevel
.set_property('interval', 200000000)
134 def create_filesink(self
):
135 """Create storable output elements."""
136 self
.disksink_rawvideo
= Gst
.ElementFactory
.make('filesink')
137 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_DEFAULT
)
138 self
.disksink_audio
= Gst
.ElementFactory
.make('filesink')
139 self
.disksink_audio
.set_property('location', AUDIO_DEFAULT
)
140 self
.disksink_stream
= Gst
.ElementFactory
.make('filesink')
141 self
.disksink_stream
.set_property('location', STREAM_DEFAULT
)
142 if self
.feed
== 'backup':
143 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_BACKUP
)
144 self
.disksink_audio
.set_property('location', AUDIO_BACKUP
)
145 self
.disksink_stream
.set_property('location', STREAM_BACKUP
)
147 def create_streamsink(self
):
148 """Create streamable output elements."""
150 self
.screensink
= Gst
.ElementFactory
.make('xvimagesink', 'screensink')
151 self
.screensink
.set_property('sync', False)
152 # To local audio output (headphones):
153 self
.audiosink
= Gst
.ElementFactory
.make('pulsesink', 'audiosink')
154 self
.audiosink
.set_property('device', sinks
['AUDIO_OUTPUT'])
155 self
.audiosink
.set_property('sync', False)
157 self
.icecastsink_audio
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_audio')
158 self
.icecastsink_audio
.set_property('sync', False)
159 self
.icecastsink_audio
.set_property('ip', sinks
['STREAM_SERVER_IP'])
160 self
.icecastsink_audio
.set_property('port', int(sinks
['SERVER_PORT']))
161 self
.icecastsink_audio
.set_property('mount', sinks
['AUDIO_MOUNT'])
162 self
.icecastsink_audio
.set_property('password', sinks
['PASSWORD'])
163 self
.icecastsink_stream
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_stream')
164 self
.icecastsink_stream
.set_property('sync', False)
165 self
.icecastsink_stream
.set_property('ip', sinks
['STREAM_SERVER_IP'])
166 self
.icecastsink_stream
.set_property('port', int(sinks
['SERVER_PORT']))
167 self
.icecastsink_stream
.set_property('mount', sinks
['VIDEO_MOUNT'])
168 self
.icecastsink_stream
.set_property('password', sinks
['PASSWORD'])
170 def create_payloader_elements(self
):
173 def create_depayloader_elements(self
):
174 self
.rtpjpegdepay
= Gst
.ElementFactory
.make('rtpjpegdepay', 'rtpjpegdepay')
176 def create_encoder_elements(self
):
178 self
.vorbisenc
= Gst
.ElementFactory
.make('vorbisenc', 'vorbisenc')
180 self
.vp8enc
= Gst
.ElementFactory
.make('vp8enc', 'vp8enc')
181 self
.vp8enc
.set_property('min_quantizer', 1)
182 self
.vp8enc
.set_property('max_quantizer', 13)
183 self
.vp8enc
.set_property('cpu-used', 5)
184 self
.vp8enc
.set_property('deadline', 1)
185 self
.vp8enc
.set_property('threads', 2)
186 self
.vp8enc
.set_property('sharpness', 7)
187 self
.vp8enc
.set_property('keyframe-max-dist', 25)
188 self
.vp8enc
.set_property('target-bitrate', 2000000)
190 def create_decoder_elements(self
):
191 self
.jpegdec
= Gst
.ElementFactory
.make('jpegdec', 'jpegdec')
192 self
.jpegdec
.set_property('max-errors', -1)
194 def create_muxer_elements(self
):
195 self
.oggmux
= Gst
.ElementFactory
.make('oggmux', 'oggmux')
196 self
.mkvmux
= Gst
.ElementFactory
.make('matroskamux', 'mkvmux')
197 self
.webmmux
= Gst
.ElementFactory
.make('webmmux', 'webmmux')
198 self
.webmmux
.set_property('streamable', True)
200 def create_demuxer_elements(self
):
203 def create_filtering_elements(self
):
204 self
.scaling
= Gst
.ElementFactory
.make('videoscale', 'scaling')
205 caps
= Gst
.caps_from_string(
206 'video/x-raw, width=(int)640, height=(int)360, framerate=(float)25/1')
207 self
.capsfilter
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter')
208 self
.capsfilter
.set_property('caps', caps
)
210 caps_backup
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
211 self
.capsfilter_backup
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter_backup')
212 self
.capsfilter_backup
.set_property('caps', caps_backup
)
214 def create_tee_elements(self
):
215 """Create tee elements to divide feeds."""
216 self
.tee_rawvideo
= Gst
.ElementFactory
.make('tee', 'tee_rawvideo')
217 self
.tee_videodecoded
= Gst
.ElementFactory
.make('tee', 'tee_videodecoded')
218 self
.tee_streamfull
= Gst
.ElementFactory
.make('tee', 'tee_streamfull')
219 self
.tee_rawaudio
= Gst
.ElementFactory
.make('tee', 'tee_rawaudio')
220 self
.tee_streamaudio
= Gst
.ElementFactory
.make('tee', 'tee_streamaudio')
222 def connect_tee(self
,
227 output_element_3
=None,):
228 """Links input and outputs of a given tee element."""
229 # Find a way to check if the element given are in the pipeline
230 # then pass the result to the 'if' statement.
231 ## argcheck = [True for arg in locals() if arg in 'the_list_of_elements_added']
232 ## print('[DEBUG] ArgList check: ', argcheck)
233 ## if False not in argcheck
235 input_element
.link(tee_element
)
236 tee_element
.link(output_element_1
)
237 tee_element
.link(output_element_2
)
239 tee_element
.link(output_element_3
)
243 'Couldn\'t link the tee. Element(s) probably not in the pipeline ')
245 def create_queues(self
):
247 self
.queuev_1
= Gst
.ElementFactory
.make('queue', 'queuev_1')
248 self
.queuev_2
= Gst
.ElementFactory
.make('queue', 'queuev_2')
249 self
.queuev_3
= Gst
.ElementFactory
.make('queue', 'queuev_3')
250 self
.queuev_4
= Gst
.ElementFactory
.make('queue', 'queuev_4')
251 self
.queuev_5
= Gst
.ElementFactory
.make('queue', 'queuev_5')
253 self
.queuea_1
= Gst
.ElementFactory
.make('queue', 'queuea_1')
254 self
.queuea_2
= Gst
.ElementFactory
.make('queue', 'queuea_2')
255 self
.queuea_3
= Gst
.ElementFactory
.make('queue', 'queuea_3')
256 self
.queuea_4
= Gst
.ElementFactory
.make('queue', 'queuea_4')
257 self
.queuea_4
.set_property('leaky', 2)
258 self
.queuea_5
= Gst
.ElementFactory
.make('queue', 'queuea_5')
259 # For audio+video muxer:
260 self
.queuem_1
= Gst
.ElementFactory
.make('queue', 'queuem_1')
261 self
.queuem_2
= Gst
.ElementFactory
.make('queue', 'queuem_2')
262 self
.queuem_2
.set_property('leaky', 2)
264 def create_pipeline_elements(self
):
265 print(INFO
, gettime(), 'Pipeline creation state: creating elements... ', end
='')
267 self
.create_video_sources()
268 self
.create_audio_sources()
270 self
.create_audiolevel_plugin()
271 self
.create_payloader_elements()
272 self
.create_depayloader_elements()
273 self
.create_encoder_elements()
274 self
.create_decoder_elements()
275 self
.create_muxer_elements()
276 self
.create_filtering_elements()
277 self
.create_tee_elements()
280 self
.create_filesink()
281 self
.create_streamsink()
282 if self
.feed
== 'test':
283 print('TEST OK...', end
='')
285 if self
.feed
== 'backup':
288 'Webcam device location: ',
289 self
.videosrc_backup
.get_property('device'))
292 def add_elements_to_pipeline(self
):
293 print(INFO
, gettime(), 'Pipeline creation state: adding elements... ', end
='')
294 cond
= self
.feed
!= 'test'
297 self
.streampipe
.add(self
.audiosrc
)
299 self
.streampipe
.add(self
.audiolevel
)
300 self
.streampipe
.add(self
.queuea_1
)
301 self
.streampipe
.add(self
.queuev_3
)
303 self
.streampipe
.add(self
.vorbisenc
)
304 self
.streampipe
.add(self
.oggmux
)
305 self
.streampipe
.add(self
.queuea_2
)
306 self
.streampipe
.add(self
.queuea_3
)
307 self
.streampipe
.add(self
.vp8enc
)
308 self
.streampipe
.add(self
.mkvmux
)
309 self
.streampipe
.add(self
.webmmux
)
310 self
.streampipe
.add(self
.tee_rawaudio
)
311 self
.streampipe
.add(self
.tee_rawvideo
)
312 self
.streampipe
.add(self
.tee_streamaudio
)
313 self
.streampipe
.add(self
.tee_streamfull
)
314 self
.streampipe
.add(self
.queuev_2
)
315 self
.streampipe
.add(self
.queuev_4
)
316 self
.streampipe
.add(self
.queuev_5
)
317 self
.streampipe
.add(self
.queuea_4
)
318 self
.streampipe
.add(self
.queuea_5
)
319 self
.streampipe
.add(self
.queuem_1
)
320 self
.streampipe
.add(self
.queuem_2
)
322 self
.streampipe
.add(self
.screensink
)
324 self
.streampipe
.add(self
.disksink_rawvideo
)
325 self
.streampipe
.add(self
.disksink_audio
)
326 self
.streampipe
.add(self
.disksink_stream
)
327 self
.streampipe
.add(self
.icecastsink_audio
)
328 self
.streampipe
.add(self
.icecastsink_stream
)
330 self
.streampipe
.add(self
.audiosink
)
332 if self
.feed
== 'main' or self
.feed
== 'test':
334 self
.streampipe
.add(self
.videosrc
)
336 self
.streampipe
.add(self
.rtpjpegdepay
)
337 self
.streampipe
.add(self
.jpegdec
)
338 self
.streampipe
.add(self
.scaling
)
339 self
.streampipe
.add(self
.capsfilter
)
340 self
.streampipe
.add(self
.tee_videodecoded
)
341 self
.streampipe
.add(self
.queuev_1
)
342 if self
.feed
== 'test':
343 print ('TEST OK...', end
='')
344 elif self
.feed
== 'backup':
346 self
.streampipe
.add(self
.videosrc_backup
)
348 self
.streampipe
.add(self
.capsfilter_backup
)
349 print ('BACKUP OK...', end
='')
352 def link_pipeline_elements(self
):
353 """Link all elements with static pads."""
354 print(INFO
, gettime(), 'Pipeline creation state: linking elements... ', end
='')
355 cond
= self
.feed
!= 'test'
358 self
.audiosrc
.link(self
.audiolevel
)
359 self
.audiolevel
.link(self
.queuea_1
)
361 self
.queuea_1
.link(self
.vorbisenc
)
362 self
.connect_tee(self
.tee_rawaudio
,
366 self
.queuea_2
.link(self
.oggmux
)
367 self
.connect_tee(self
.tee_streamaudio
,
371 self
.queuea_3
.link(self
.disksink_audio
)
372 self
.queuea_4
.link(self
.icecastsink_audio
)
373 self
.queuea_5
.link(self
.webmmux
)
375 self
.queuea_1
.link(self
.audiosink
)
379 self
.queuev_2
.link(self
.mkvmux
)
380 self
.mkvmux
.link(self
.queuev_4
)
381 self
.queuev_4
.link(self
.disksink_rawvideo
)
383 self
.queuev_1
.link(self
.rtpjpegdepay
)
384 self
.rtpjpegdepay
.link(self
.jpegdec
)
385 self
.jpegdec
.link(self
.queuev_3
)
386 self
.queuev_3
.link(self
.screensink
)
388 # Stream (audio+video) feed:
390 self
.vp8enc
.link(self
.queuev_5
)
391 self
.queuev_5
.link(self
.webmmux
)
392 self
.connect_tee(self
.tee_streamfull
,
396 self
.queuem_1
.link(self
.disksink_stream
)
397 self
.queuem_2
.link(self
.icecastsink_stream
)
398 if self
.feed
== 'main':
399 # linking here RTSP feed
400 self
.queuev_1
.link(self
.rtpjpegdepay
)
401 self
.connect_tee(self
.tee_rawvideo
,
405 self
.connect_tee(self
.tee_videodecoded
,
409 # Stream (video) feed:
410 self
.scaling
.link(self
.capsfilter
)
411 self
.capsfilter
.link(self
.vp8enc
)
412 elif self
.feed
== 'backup':
413 # linking here backup feed (WEBCAM)
414 self
.videosrc_backup
.link(self
.capsfilter_backup
)
415 self
.connect_tee(self
.tee_rawvideo
,
416 self
.capsfilter_backup
,
419 output_element_3
=self
.vp8enc
)
420 print('BACKUP OK...', end
='')
422 print('TEST OK...', end
='')
425 def create_gstreamer_pipeline(self
):
426 # New empty pipeline:
427 self
.streampipe
= Gst
.Pipeline()
428 self
.create_pipeline_elements()
430 self
.add_elements_to_pipeline()
431 self
.link_pipeline_elements()
432 if self
.feed
== 'main' or self
.feed
== 'test':
433 self
.create_pipeline_callbacks()
436 bus
= self
.streampipe
.get_bus()
437 bus
.add_signal_watch()
438 bus
.enable_sync_message_emission()
439 # Used to get messages that GStreamer emits.
440 bus
.connect("message", self
.on_message
)
442 print(INFO
, gettime(), 'Pipeline creation state: successfully done.')
443 return self
.streampipe
445 def on_message(self
, bus
, message
):
447 if t
== Gst
.MessageType
.EOS
:
448 self
.streampipe
.set_state(Gst
.State
.NULL
)
449 elif t
== Gst
.MessageType
.ERROR
:
450 err
, debug
= message
.parse_error()
451 print (ERROR
, '%s' % err
, debug
)
453 def stream_play(self
):
454 self
.streampipe
.set_state(Gst
.State
.PLAYING
)
455 if self
.feed
== 'backup':
456 print(WARN
, gettime(), 'Backup pipeline started.')
457 print(INFO
, gettime(), 'PLAYING State resquested.')
459 def stream_stop(self
):
460 self
.streampipe
.set_state(Gst
.State
.NULL
)
461 print(INFO
, gettime(), 'STOPPED State resquested.')
463 def set_filenames(self
, string
, streamfailed
=False):
464 """Sets filename and location for each sink."""
467 audio
= './' + DIR_NAME
+ '/' + filename
+ '_AUDIO'
468 rawvideo
= './' + DIR_NAME
+ '/' + filename
+ '_RAWVIDEO'
469 stream
= './' + DIR_NAME
+ '/' + filename
+ '_STREAM'
470 if self
.feed
== 'main':
471 if streamfailed
and filename
:
472 audio
= audio
+ FAILED_SUFFIX
+ str(fail_counter
)
473 rawvideo
= rawvideo
+ FAILED_SUFFIX
+ str(fail_counter
)
474 stream
= stream
+ FAILED_SUFFIX
+ str(fail_counter
)
475 self
.rename_files(audio
, rawvideo
, stream
)
478 audio
= AUDIO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
479 rawvideo
= RAWVIDEO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
480 stream
= STREAM_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
481 self
.rename_files(audio
, rawvideo
, stream
)
484 self
.rename_files(audio
, rawvideo
, stream
)
485 elif self
.feed
== 'backup':
486 ## print('INSIDE BACKUP RENAMING')
487 rename(AUDIO_BACKUP
, audio
)
488 rename(RAWVIDEO_BACKUP
, rawvideo
)
489 rename(STREAM_BACKUP
, stream
)
491 print(INFO
, gettime(), 'Audio file written on disk.')
492 print(INFO
, gettime(), 'Raw video file written on disk.')
493 print(INFO
, gettime(), 'Streamed file written on disk.')
495 def rename_files(self
, audio_name
, rawvideo_name
, stream_name
):
496 rename(AUDIO_DEFAULT
, audio_name
)
497 rename(RAWVIDEO_DEFAULT
, rawvideo_name
)
498 rename(STREAM_DEFAULT
, stream_name
)
500 def get_gstreamer_bus():
504 return strftime('%y-%m-%d_%H:%M:%S ', localtime())