115c2d0ed53afcf1a5a878ead391a14a69b681d1
1 #!/usr/bin/env python3.4
2 # -*- coding: utf-8 -*-
4 # This file is part of ABYSS.
5 # ABYSS Broadcast Your Streaming Successfully
7 # ABYSS is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # ABYSS is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with ABYSS. If not, see <http://www.gnu.org/licenses/>.
20 # Copyright (c) 2016 David Testé
23 from os
import listdir
26 from time
import localtime
, strftime
30 from gi
.repository
import Gst
31 from gi
.repository
import GstVideo
33 DIR_NAME
= 'FILES_RECORDED'
34 AUDIO_DEFAULT
= './' + DIR_NAME
+ '/' + 'AUDIO_DEFAULT'
35 RAWVIDEO_DEFAULT
= './' + DIR_NAME
+ '/' + 'RAWVIDEO_DEFAULT'
36 STREAM_DEFAULT
= './' + DIR_NAME
+ '/' + 'STREAM_DEFAULT'
37 BACKUP_SUFFIX
= '_BACKUP'
38 FAILED_SUFFIX
= '_FAILED_'
40 AUDIO_BACKUP
= AUDIO_DEFAULT
+ BACKUP_SUFFIX
41 RAWVIDEO_BACKUP
= RAWVIDEO_DEFAULT
+ BACKUP_SUFFIX
42 STREAM_BACKUP
= STREAM_DEFAULT
+ BACKUP_SUFFIX
48 sources
= {'RTSP_IP' : None,
49 'AUDIO_INPUT' : None,}
50 sinks
= {'AUDIO_OUTPUT' : None,
52 'STREAM_SERVER_IP' : None,
56 'VIDEO_MOUNT' : None,}
58 ##AUDIO_INPUT = 'alsa_input.usb-Burr-Brown_from_TI_USB_Audio_CODEC-00-CODEC.analog-stereo'
59 ##AUDIO_OUTPUT = 'alsa_output.pci-0000_00_1b.0.analog-stereo'
61 config
= configparser
.RawConfigParser()
62 if path
.exists(CONFIG
):
65 sources
= {key
: config
.get('sources', key
) for key
in sources
}
66 sinks
= {key
: config
.get('sinks', key
) for key
in sinks
}
68 print(ERROR
, gettime(), 'Failed to parse config file.')
70 print(ERROR
, gettime(), '".abyss" config file doesn\'t exist.')
72 if not path
.isdir(DIR_NAME
):
75 class New_user_pipeline():
78 def __init__(self
, feed
='main'):
79 self
.rtsp_address
= 'rtsp://' + sources
['RTSP_IP']
81 self
.user_pipeline
= self
.create_gstreamer_pipeline()
83 def create_video_sources(self
):
84 """Create video inputs from various sources."""
85 self
.videosrc
= Gst
.ElementFactory
.make('rtspsrc', 'videosrc')
86 self
.videosrc
.set_property('location', self
.rtsp_address
)
87 self
.videosrc
.set_property('latency', 0)
88 ## self.videosrc.set_property('debug', True)
89 if self
.feed
== 'backup':
90 self
.videosrc_backup
= Gst
.ElementFactory
.make('v4l2src',
92 device_location
= self
.find_webcam_device()
93 self
.videosrc_backup
.set_property('device', device_location
)
95 def find_webcam_device(self
):
96 """Look out for the USB webcam device."""
97 devices
= [dev
for dev
in listdir('/dev/') if 'video' in dev
]
99 # In case of computer having a built-in webcam
100 if item
!= 'video0' and len(devices
) > 1:
101 return '/dev/' + item
102 # Without built-in webcam
103 elif len(devices
) == 1:
105 print(ERROR
, gettime(), 'No webcam device found.')
107 def find_mixingdesk_device(self
):
108 """Look out for the USB mixing desk device.
109 Product used here: Behringer XENYX Q1002USB.
111 # shell cmd : 'pactl list | grep alsa_input'
112 # AUDIO_INPUT --> const used currently
115 def create_pipeline_callbacks(self
):
116 """Callbacks to connect dynamically created pads."""
117 self
.videosrc
.connect('pad-added', self
.on_pad_added_to_rtspsrc
)
119 def on_pad_added_to_rtspsrc(self
, rtspsrc
, pad
):
120 """Connect the dynamic 'src'pad of an RTSP source."""
121 sinkpad
= self
.queuev_1
.get_static_pad('sink')
124 def create_audio_sources(self
):
125 """Create audio inputs from various sources."""
126 self
.audiosrc
= Gst
.ElementFactory
.make('pulsesrc', 'audiosrc')
127 self
.audiosrc
.set_property('device', sources
['AUDIO_INPUT'])
129 def create_audiolevel_plugin(self
):
130 """Create audio level plugin to feed a vu-meter."""
131 self
.audiolevel
= Gst
.ElementFactory
.make('level', 'audiolevel')
132 self
.audiolevel
.set_property('interval', 200000000)
134 def create_filesink(self
):
135 """Create storable output elements."""
136 self
.disksink_rawvideo
= Gst
.ElementFactory
.make('filesink')
137 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_DEFAULT
)
138 self
.disksink_audio
= Gst
.ElementFactory
.make('filesink')
139 self
.disksink_audio
.set_property('location', AUDIO_DEFAULT
)
140 self
.disksink_stream
= Gst
.ElementFactory
.make('filesink')
141 self
.disksink_stream
.set_property('location', STREAM_DEFAULT
)
142 if self
.feed
== 'backup':
143 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_BACKUP
)
144 self
.disksink_audio
.set_property('location', AUDIO_BACKUP
)
145 self
.disksink_stream
.set_property('location', STREAM_BACKUP
)
147 def create_streamsink(self
):
148 """Create streamable output elements."""
150 self
.screensink
= Gst
.ElementFactory
.make('xvimagesink', 'screensink')
151 self
.screensink
.set_property('sync', False)
152 # To local audio output (headphones):
153 self
.audiosink
= Gst
.ElementFactory
.make('pulsesink', 'audiosink')
154 self
.audiosink
.set_property('device', sinks
['AUDIO_OUTPUT'])
155 self
.audiosink
.set_property('sync', False)
157 self
.icecastsink_audio
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_audio')
158 self
.icecastsink_audio
.set_property('sync', False)
159 self
.icecastsink_audio
.set_property('ip', sinks
['STREAM_SERVER_IP'])
160 self
.icecastsink_audio
.set_property('port', int(sinks
['SERVER_PORT']))
161 self
.icecastsink_audio
.set_property('mount', sinks
['AUDIO_MOUNT'])
162 self
.icecastsink_audio
.set_property('password', sinks
['PASSWORD'])
163 self
.icecastsink_stream
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_stream')
164 self
.icecastsink_stream
.set_property('sync', False)
165 self
.icecastsink_stream
.set_property('ip', sinks
['STREAM_SERVER_IP'])
166 self
.icecastsink_stream
.set_property('port', int(sinks
['SERVER_PORT']))
167 self
.icecastsink_stream
.set_property('mount', sinks
['VIDEO_MOUNT'])
168 self
.icecastsink_stream
.set_property('password', sinks
['PASSWORD'])
170 def create_payloader_elements(self
):
173 def create_depayloader_elements(self
):
174 self
.rtpjpegdepay
= Gst
.ElementFactory
.make('rtpjpegdepay', 'rtpjpegdepay')
176 def create_encoder_elements(self
):
178 self
.vorbisenc
= Gst
.ElementFactory
.make('vorbisenc', 'vorbisenc')
180 self
.vp8enc
= Gst
.ElementFactory
.make('vp8enc', 'vp8enc')
181 self
.vp8enc
.set_property('min_quantizer', 1)
182 self
.vp8enc
.set_property('max_quantizer', 13)
183 self
.vp8enc
.set_property('cpu-used', 5)
184 self
.vp8enc
.set_property('deadline', 42000)
185 self
.vp8enc
.set_property('threads', 2)
186 self
.vp8enc
.set_property('sharpness', 7)
188 def create_decoder_elements(self
):
189 self
.jpegdec
= Gst
.ElementFactory
.make('jpegdec', 'jpegdec')
190 self
.jpegdec
.set_property('max-errors', -1)
192 def create_muxer_elements(self
):
193 self
.oggmux
= Gst
.ElementFactory
.make('oggmux', 'oggmux')
194 self
.mkvmux
= Gst
.ElementFactory
.make('matroskamux', 'mkvmux')
195 self
.webmmux
= Gst
.ElementFactory
.make('webmmux', 'webmmux')
196 self
.webmmux
.set_property('streamable', True)
198 def create_demuxer_elements(self
):
201 def create_filtering_elements(self
):
202 self
.scaling
= Gst
.ElementFactory
.make('videoscale', 'scaling')
203 caps
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
204 self
.capsfilter
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter')
205 self
.capsfilter
.set_property('caps', caps
)
207 caps_backup
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
208 self
.capsfilter_backup
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter_backup')
209 self
.capsfilter_backup
.set_property('caps', caps_backup
)
211 def create_tee_elements(self
):
212 """Create tee elements to divide feeds."""
213 self
.tee_rawvideo
= Gst
.ElementFactory
.make('tee', 'tee_rawvideo')
214 self
.tee_videodecoded
= Gst
.ElementFactory
.make('tee', 'tee_videodecoded')
215 self
.tee_streamfull
= Gst
.ElementFactory
.make('tee', 'tee_streamfull')
216 self
.tee_rawaudio
= Gst
.ElementFactory
.make('tee', 'tee_rawaudio')
217 self
.tee_streamaudio
= Gst
.ElementFactory
.make('tee', 'tee_streamaudio')
219 def connect_tee(self
,
224 output_element_3
=None,):
225 """Links input and outputs of a given tee element."""
226 # Find a way to check if the element given are in the pipeline
227 # then pass the result to the 'if' statement.
228 ## argcheck = [True for arg in locals() if arg in 'the_list_of_elements_added']
229 ## print('[DEBUG] ArgList check: ', argcheck)
230 ## if False not in argcheck
232 input_element
.link(tee_element
)
233 tee_element
.link(output_element_1
)
234 tee_element
.link(output_element_2
)
236 tee_element
.link(output_element_3
)
240 'Couldn\'t link the tee. Element(s) probably not in the pipeline ')
242 def create_queues(self
):
244 self
.queuev_1
= Gst
.ElementFactory
.make('queue', 'queuev_1')
245 self
.queuev_2
= Gst
.ElementFactory
.make('queue', 'queuev_2')
246 self
.queuev_3
= Gst
.ElementFactory
.make('queue', 'queuev_3')
247 self
.queuev_4
= Gst
.ElementFactory
.make('queue', 'queuev_4')
248 self
.queuev_5
= Gst
.ElementFactory
.make('queue', 'queuev_5')
250 self
.queuea_1
= Gst
.ElementFactory
.make('queue', 'queuea_1')
251 self
.queuea_2
= Gst
.ElementFactory
.make('queue', 'queuea_2')
252 self
.queuea_3
= Gst
.ElementFactory
.make('queue', 'queuea_3')
253 self
.queuea_4
= Gst
.ElementFactory
.make('queue', 'queuea_4')
254 self
.queuea_4
.set_property('leaky', 2)
255 self
.queuea_5
= Gst
.ElementFactory
.make('queue', 'queuea_5')
256 # For audio+video muxer:
257 self
.queuem_1
= Gst
.ElementFactory
.make('queue', 'queuem_1')
258 self
.queuem_2
= Gst
.ElementFactory
.make('queue', 'queuem_2')
259 self
.queuem_2
.set_property('leaky', 2)
261 def create_pipeline_elements(self
):
262 print(INFO
, gettime(), 'Pipeline creation state: creating elements... ', end
='')
264 self
.create_video_sources()
265 self
.create_audio_sources()
267 self
.create_audiolevel_plugin()
268 self
.create_payloader_elements()
269 self
.create_depayloader_elements()
270 self
.create_encoder_elements()
271 self
.create_decoder_elements()
272 self
.create_muxer_elements()
273 self
.create_filtering_elements()
274 self
.create_tee_elements()
277 self
.create_filesink()
278 self
.create_streamsink()
279 if self
.feed
== 'test':
280 print('TEST OK...', end
='')
282 if self
.feed
== 'backup':
285 'Webcam device location: ',
286 self
.videosrc_backup
.get_property('device'))
289 def add_elements_to_pipeline(self
):
290 print(INFO
, gettime(), 'Pipeline creation state: adding elements... ', end
='')
291 cond
= self
.feed
!= 'test'
294 self
.streampipe
.add(self
.audiosrc
)
296 self
.streampipe
.add(self
.audiolevel
)
297 self
.streampipe
.add(self
.queuea_1
)
298 self
.streampipe
.add(self
.queuev_3
)
300 self
.streampipe
.add(self
.vorbisenc
)
301 self
.streampipe
.add(self
.oggmux
)
302 self
.streampipe
.add(self
.queuea_2
)
303 self
.streampipe
.add(self
.queuea_3
)
304 self
.streampipe
.add(self
.vp8enc
)
305 self
.streampipe
.add(self
.mkvmux
)
306 self
.streampipe
.add(self
.webmmux
)
307 self
.streampipe
.add(self
.tee_rawaudio
)
308 self
.streampipe
.add(self
.tee_rawvideo
)
309 self
.streampipe
.add(self
.tee_streamaudio
)
310 self
.streampipe
.add(self
.tee_streamfull
)
311 self
.streampipe
.add(self
.queuev_2
)
312 self
.streampipe
.add(self
.queuev_4
)
313 self
.streampipe
.add(self
.queuev_5
)
314 self
.streampipe
.add(self
.queuea_4
)
315 self
.streampipe
.add(self
.queuea_5
)
316 self
.streampipe
.add(self
.queuem_1
)
317 self
.streampipe
.add(self
.queuem_2
)
319 self
.streampipe
.add(self
.screensink
)
321 self
.streampipe
.add(self
.disksink_rawvideo
)
322 self
.streampipe
.add(self
.disksink_audio
)
323 self
.streampipe
.add(self
.disksink_stream
)
324 self
.streampipe
.add(self
.icecastsink_audio
)
325 self
.streampipe
.add(self
.icecastsink_stream
)
327 self
.streampipe
.add(self
.audiosink
)
329 if self
.feed
== 'main' or self
.feed
== 'test':
331 self
.streampipe
.add(self
.videosrc
)
333 self
.streampipe
.add(self
.rtpjpegdepay
)
334 self
.streampipe
.add(self
.jpegdec
)
335 self
.streampipe
.add(self
.scaling
)
336 self
.streampipe
.add(self
.capsfilter
)
337 self
.streampipe
.add(self
.tee_videodecoded
)
338 self
.streampipe
.add(self
.queuev_1
)
339 if self
.feed
== 'test':
340 print ('TEST OK...', end
='')
341 elif self
.feed
== 'backup':
343 self
.streampipe
.add(self
.videosrc_backup
)
345 self
.streampipe
.add(self
.capsfilter_backup
)
346 print ('BACKUP OK...', end
='')
349 def link_pipeline_elements(self
):
350 """Link all elements with static pads."""
351 print(INFO
, gettime(), 'Pipeline creation state: linking elements... ', end
='')
352 cond
= self
.feed
!= 'test'
355 self
.audiosrc
.link(self
.audiolevel
)
356 self
.audiolevel
.link(self
.queuea_1
)
358 self
.queuea_1
.link(self
.vorbisenc
)
359 self
.connect_tee(self
.tee_rawaudio
,
363 self
.queuea_2
.link(self
.oggmux
)
364 self
.connect_tee(self
.tee_streamaudio
,
368 self
.queuea_3
.link(self
.disksink_audio
)
369 self
.queuea_4
.link(self
.icecastsink_audio
)
370 self
.queuea_5
.link(self
.webmmux
)
372 self
.queuea_1
.link(self
.audiosink
)
376 self
.queuev_2
.link(self
.mkvmux
)
377 self
.mkvmux
.link(self
.queuev_4
)
378 self
.queuev_4
.link(self
.disksink_rawvideo
)
380 self
.queuev_1
.link(self
.rtpjpegdepay
)
381 self
.rtpjpegdepay
.link(self
.jpegdec
)
382 self
.jpegdec
.link(self
.queuev_3
)
383 self
.queuev_3
.link(self
.screensink
)
385 # Stream (audio+video) feed:
387 self
.vp8enc
.link(self
.queuev_5
)
388 self
.queuev_5
.link(self
.webmmux
)
389 self
.connect_tee(self
.tee_streamfull
,
393 self
.queuem_1
.link(self
.disksink_stream
)
394 self
.queuem_2
.link(self
.icecastsink_stream
)
395 if self
.feed
== 'main':
396 # linking here RTSP feed
397 self
.queuev_1
.link(self
.rtpjpegdepay
)
398 self
.connect_tee(self
.tee_rawvideo
,
402 self
.connect_tee(self
.tee_videodecoded
,
406 # Stream (video) feed:
407 self
.scaling
.link(self
.capsfilter
)
408 self
.capsfilter
.link(self
.vp8enc
)
409 elif self
.feed
== 'backup':
410 # linking here backup feed (WEBCAM)
411 self
.videosrc_backup
.link(self
.capsfilter_backup
)
412 self
.connect_tee(self
.tee_rawvideo
,
413 self
.capsfilter_backup
,
416 output_element_3
=self
.vp8enc
)
417 ## self.capsfilter_backup.link(self.queuev_3)
418 print('BACKUP OK...', end
='')
420 print('TEST OK...', end
='')
423 def create_gstreamer_pipeline(self
):
424 # New empty pipeline:
425 self
.streampipe
= Gst
.Pipeline()
426 self
.create_pipeline_elements()
428 self
.add_elements_to_pipeline()
429 self
.link_pipeline_elements()
430 if self
.feed
== 'main' or self
.feed
== 'test':
431 self
.create_pipeline_callbacks()
434 bus
= self
.streampipe
.get_bus()
435 bus
.add_signal_watch()
436 bus
.enable_sync_message_emission()
437 # Used to get messages that GStreamer emits.
438 bus
.connect("message", self
.on_message
)
440 print(INFO
, gettime(), 'Pipeline creation state: successfully done.')
441 return self
.streampipe
443 def on_message(self
, bus
, message
):
445 if t
== Gst
.MessageType
.EOS
:
446 self
.streampipe
.set_state(Gst
.State
.NULL
)
447 elif t
== Gst
.MessageType
.ERROR
:
448 err
, debug
= message
.parse_error()
449 print (ERROR
, '%s' % err
, debug
)
451 def stream_play(self
):
452 self
.streampipe
.set_state(Gst
.State
.PLAYING
)
453 if self
.feed
== 'backup':
454 print(WARN
, gettime(), 'Backup pipeline started.')
455 print(INFO
, gettime(), 'PLAYING State resquested.')
457 def stream_stop(self
):
458 self
.streampipe
.set_state(Gst
.State
.NULL
)
459 print(INFO
, gettime(), 'STOPPED State resquested.')
461 def set_filenames(self
, string
, streamfailed
=False):
462 """Sets filename and location for each sink."""
465 audio
= './' + DIR_NAME
+ '/' + filename
+ '_AUDIO'
466 rawvideo
= './' + DIR_NAME
+ '/' + filename
+ '_RAWVIDEO'
467 stream
= './' + DIR_NAME
+ '/' + filename
+ '_STREAM'
468 if self
.feed
== 'main':
469 if streamfailed
and filename
:
470 audio
= audio
+ FAILED_SUFFIX
+ str(fail_counter
)
471 rawvideo
= rawvideo
+ FAILED_SUFFIX
+ str(fail_counter
)
472 stream
= stream
+ FAILED_SUFFIX
+ str(fail_counter
)
473 self
.rename_files(audio
, rawvideo
, stream
)
476 audio
= AUDIO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
477 rawvideo
= RAWVIDEO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
478 stream
= STREAM_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
479 self
.rename_files(audio
, rawvideo
, stream
)
482 self
.rename_files(audio
, rawvideo
, stream
)
483 elif self
.feed
== 'backup':
484 ## print('INSIDE BACKUP RENAMING')
485 rename(AUDIO_BACKUP
, audio
)
486 rename(RAWVIDEO_BACKUP
, rawvideo
)
487 rename(STREAM_BACKUP
, stream
)
489 print(INFO
, gettime(), 'Audio file written on disk.')
490 print(INFO
, gettime(), 'Raw video file written on disk.')
491 print(INFO
, gettime(), 'Streamed file written on disk.')
493 def rename_files(self
, audio_name
, rawvideo_name
, stream_name
):
494 rename(AUDIO_DEFAULT
, audio_name
)
495 rename(RAWVIDEO_DEFAULT
, rawvideo_name
)
496 rename(STREAM_DEFAULT
, stream_name
)
498 def get_gstreamer_bus():
502 return strftime('%y-%m-%d_%H:%M:%S ', localtime())