#!/usr/bin/env python3.4 # -*- coding: utf-8 -*- # This file is part of ABYSS. # ABYSS Broadcast Your Streaming Successfully # # ABYSS is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # ABYSS is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with ABYSS. If not, see . # # Copyright (c) 2016 David Testé from os import rename from os import listdir from os import path from os import mkdir from time import localtime, strftime import configparser import gi from gi.repository import Gst from gi.repository import GstVideo DIR_NAME = 'FILES_RECORDED' AUDIO_DEFAULT = './' + DIR_NAME + '/' + 'AUDIO_DEFAULT' RAWVIDEO_DEFAULT = './' + DIR_NAME + '/' + 'RAWVIDEO_DEFAULT' STREAM_DEFAULT = './' + DIR_NAME + '/' + 'STREAM_DEFAULT' BACKUP_SUFFIX = '_BACKUP' FAILED_SUFFIX = '_FAILED_' fail_counter = 1 AUDIO_BACKUP = AUDIO_DEFAULT + BACKUP_SUFFIX RAWVIDEO_BACKUP = RAWVIDEO_DEFAULT + BACKUP_SUFFIX STREAM_BACKUP = STREAM_DEFAULT + BACKUP_SUFFIX ERROR = '[ERROR] ' INFO = '[INFO] ' WARN = '[WARN] ' CONFIG = '.abyss' sources = {'RTSP_IP' : None, 'AUDIO_INPUT' : None,} sinks = {'AUDIO_OUTPUT' : None, 'DIR': None, 'STREAM_SERVER_IP' : None, 'SERVER_PORT' : None, 'PASSWORD' : None, 'AUDIO_MOUNT' : None, 'VIDEO_MOUNT' : None,} ##AUDIO_INPUT = 'alsa_input.usb-Burr-Brown_from_TI_USB_Audio_CODEC-00-CODEC.analog-stereo' ##AUDIO_OUTPUT = 'alsa_output.pci-0000_00_1b.0.analog-stereo' config = configparser.RawConfigParser() if path.exists(CONFIG): config.read(CONFIG) try: sources = {key : config.get('sources', key) for key in sources} sinks = {key : config.get('sinks', key) for key in sinks} except: print(ERROR, gettime(), 'Failed to parse config file.') else: print(ERROR, gettime(), '".abyss" config file doesn\'t exist.') if not path.isdir(DIR_NAME): mkdir(DIR_NAME) class New_user_pipeline(): def __init__(self, feed='main'): self.rtsp_address = 'rtsp://' + sources['RTSP_IP'] self.feed = feed self.user_pipeline = self.create_gstreamer_pipeline() def create_video_sources(self): """Create video inputs from various sources.""" self.videosrc = Gst.ElementFactory.make('rtspsrc', 'videosrc') self.videosrc.set_property('location', self.rtsp_address) self.videosrc.set_property('latency', 0) ## self.videosrc.set_property('debug', True) if self.feed == 'backup': self.videosrc_backup = Gst.ElementFactory.make('v4l2src', 'videosrc_backup') device_location = self.find_webcam_device() self.videosrc_backup.set_property('device', device_location) def find_webcam_device(self): """Look out for the USB webcam device.""" devices = [dev for dev in listdir('/dev/') if 'video' in dev] for item in devices: # In case of computer having a built-in webcam if item != 'video0' and len(devices) > 1: return '/dev/' + item # Without built-in webcam elif len(devices) == 1: return '/dev/video0' print(ERROR, gettime(), 'No webcam device found.') def find_mixingdesk_device(self): """Look out for the USB mixing desk device. Product used here: Behringer XENYX Q1002USB. """ # shell cmd : 'pactl list | grep alsa_input' # AUDIO_INPUT --> const used currently pass def create_pipeline_callbacks(self): """Callbacks to connect dynamically created pads.""" self.videosrc.connect('pad-added', self.on_pad_added_to_rtspsrc) def on_pad_added_to_rtspsrc(self, rtspsrc, pad): """Connect the dynamic 'src'pad of an RTSP source.""" sinkpad = self.queuev_1.get_static_pad('sink') pad.link(sinkpad) def create_audio_sources(self): """Create audio inputs from various sources.""" self.audiosrc = Gst.ElementFactory.make('pulsesrc', 'audiosrc') self.audiosrc.set_property('device', sources['AUDIO_INPUT']) def create_audiolevel_plugin(self): """Create audio level plugin to feed a vu-meter.""" self.audiolevel = Gst.ElementFactory.make('level', 'audiolevel') self.audiolevel.set_property('interval', 200000000) def create_filesink(self): """Create storable output elements.""" self.disksink_rawvideo = Gst.ElementFactory.make('filesink') self.disksink_rawvideo.set_property('location', RAWVIDEO_DEFAULT) self.disksink_audio = Gst.ElementFactory.make('filesink') self.disksink_audio.set_property('location', AUDIO_DEFAULT) self.disksink_stream = Gst.ElementFactory.make('filesink') self.disksink_stream.set_property('location', STREAM_DEFAULT) if self.feed == 'backup': self.disksink_rawvideo.set_property('location', RAWVIDEO_BACKUP) self.disksink_audio.set_property('location', AUDIO_BACKUP) self.disksink_stream.set_property('location', STREAM_BACKUP) def create_streamsink(self): """Create streamable output elements.""" # To local screen: self.screensink = Gst.ElementFactory.make('xvimagesink', 'screensink') self.screensink.set_property('sync', False) # To local audio output (headphones): self.audiosink = Gst.ElementFactory.make('pulsesink', 'audiosink') self.audiosink.set_property('device', sinks['AUDIO_OUTPUT']) self.audiosink.set_property('sync', False) # To icecast server: self.icecastsink_audio = Gst.ElementFactory.make('shout2send', 'icecastsink_audio') self.icecastsink_audio.set_property('sync', False) self.icecastsink_audio.set_property('ip', sinks['STREAM_SERVER_IP']) self.icecastsink_audio.set_property('port', int(sinks['SERVER_PORT'])) self.icecastsink_audio.set_property('mount', sinks['AUDIO_MOUNT']) self.icecastsink_audio.set_property('password', sinks['PASSWORD']) self.icecastsink_stream = Gst.ElementFactory.make('shout2send', 'icecastsink_stream') self.icecastsink_stream.set_property('sync', False) self.icecastsink_stream.set_property('ip', sinks['STREAM_SERVER_IP']) self.icecastsink_stream.set_property('port', int(sinks['SERVER_PORT'])) self.icecastsink_stream.set_property('mount', sinks['VIDEO_MOUNT']) self.icecastsink_stream.set_property('password', sinks['PASSWORD']) def create_payloader_elements(self): pass def create_depayloader_elements(self): self.rtpjpegdepay = Gst.ElementFactory.make('rtpjpegdepay', 'rtpjpegdepay') def create_encoder_elements(self): # Audio encoders: self.vorbisenc = Gst.ElementFactory.make('vorbisenc', 'vorbisenc') # Video encoders: self.vp8enc = Gst.ElementFactory.make('vp8enc', 'vp8enc') self.vp8enc.set_property('min_quantizer', 1) self.vp8enc.set_property('max_quantizer', 13) self.vp8enc.set_property('cpu-used', 5) self.vp8enc.set_property('deadline', 42000) self.vp8enc.set_property('threads', 2) self.vp8enc.set_property('sharpness', 7) def create_decoder_elements(self): self.jpegdec = Gst.ElementFactory.make('jpegdec', 'jpegdec') self.jpegdec.set_property('max-errors', -1) def create_muxer_elements(self): self.oggmux = Gst.ElementFactory.make('oggmux', 'oggmux') self.mkvmux = Gst.ElementFactory.make('matroskamux', 'mkvmux') self.webmmux = Gst.ElementFactory.make('webmmux', 'webmmux') self.webmmux.set_property('streamable', True) def create_demuxer_elements(self): pass def create_filtering_elements(self): self.scaling = Gst.ElementFactory.make('videoscale', 'scaling') caps = Gst.caps_from_string('video/x-raw, width=(int)640, height=(int)360') self.capsfilter = Gst.ElementFactory.make('capsfilter', 'capsfilter') self.capsfilter.set_property('caps', caps) caps_backup = Gst.caps_from_string('video/x-raw, width=(int)640, height=(int)360') self.capsfilter_backup = Gst.ElementFactory.make('capsfilter', 'capsfilter_backup') self.capsfilter_backup.set_property('caps', caps_backup) def create_tee_elements(self): """Create tee elements to divide feeds.""" self.tee_rawvideo = Gst.ElementFactory.make('tee', 'tee_rawvideo') self.tee_videodecoded = Gst.ElementFactory.make('tee', 'tee_videodecoded') self.tee_streamfull = Gst.ElementFactory.make('tee', 'tee_streamfull') self.tee_rawaudio = Gst.ElementFactory.make('tee', 'tee_rawaudio') self.tee_streamaudio = Gst.ElementFactory.make('tee', 'tee_streamaudio') def connect_tee(self, tee_element, input_element, output_element_1, output_element_2, output_element_3=None,): """Links input and outputs of a given tee element.""" # Find a way to check if the element given are in the pipeline # then pass the result to the 'if' statement. ## argcheck = [True for arg in locals() if arg in 'the_list_of_elements_added'] ## print('[DEBUG] ArgList check: ', argcheck) ## if False not in argcheck if True: input_element.link(tee_element) tee_element.link(output_element_1) tee_element.link(output_element_2) if output_element_3: tee_element.link(output_element_3) else: print(ERROR, gettime(), 'Couldn\'t link the tee. Element(s) probably not in the pipeline ') def create_queues(self): # For video feed: self.queuev_1 = Gst.ElementFactory.make('queue', 'queuev_1') self.queuev_2 = Gst.ElementFactory.make('queue', 'queuev_2') self.queuev_3 = Gst.ElementFactory.make('queue', 'queuev_3') self.queuev_4 = Gst.ElementFactory.make('queue', 'queuev_4') self.queuev_5 = Gst.ElementFactory.make('queue', 'queuev_5') # For audio feed: self.queuea_1 = Gst.ElementFactory.make('queue', 'queuea_1') self.queuea_2 = Gst.ElementFactory.make('queue', 'queuea_2') self.queuea_3 = Gst.ElementFactory.make('queue', 'queuea_3') self.queuea_4 = Gst.ElementFactory.make('queue', 'queuea_4') self.queuea_4.set_property('leaky', 2) self.queuea_5 = Gst.ElementFactory.make('queue', 'queuea_5') # For audio+video muxer: self.queuem_1 = Gst.ElementFactory.make('queue', 'queuem_1') self.queuem_2 = Gst.ElementFactory.make('queue', 'queuem_2') self.queuem_2.set_property('leaky', 2) def create_pipeline_elements(self): print(INFO, gettime(), 'Pipeline creation state: creating elements... ', end='') # Inputs elements: self.create_video_sources() self.create_audio_sources() # Middle elements: self.create_audiolevel_plugin() self.create_payloader_elements() self.create_depayloader_elements() self.create_encoder_elements() self.create_decoder_elements() self.create_muxer_elements() self.create_filtering_elements() self.create_tee_elements() self.create_queues() # Output elements: self.create_filesink() self.create_streamsink() if self.feed == 'test': print('TEST OK...', end='') print('created') if self.feed == 'backup': print (INFO, gettime(), 'Webcam device location: ', self.videosrc_backup.get_property('device')) def add_elements_to_pipeline(self): print(INFO, gettime(), 'Pipeline creation state: adding elements... ', end='') cond = self.feed != 'test' # Inputs elements: self.streampipe.add(self.audiosrc) # Middle elements: self.streampipe.add(self.audiolevel) self.streampipe.add(self.queuea_1) self.streampipe.add(self.queuev_3) if cond: self.streampipe.add(self.vorbisenc) self.streampipe.add(self.oggmux) self.streampipe.add(self.queuea_2) self.streampipe.add(self.queuea_3) self.streampipe.add(self.vp8enc) self.streampipe.add(self.mkvmux) self.streampipe.add(self.webmmux) self.streampipe.add(self.tee_rawaudio) self.streampipe.add(self.tee_rawvideo) self.streampipe.add(self.tee_streamaudio) self.streampipe.add(self.tee_streamfull) self.streampipe.add(self.queuev_2) self.streampipe.add(self.queuev_4) self.streampipe.add(self.queuev_5) self.streampipe.add(self.queuea_4) self.streampipe.add(self.queuea_5) self.streampipe.add(self.queuem_1) self.streampipe.add(self.queuem_2) # Outputs elements: self.streampipe.add(self.screensink) if cond: self.streampipe.add(self.disksink_rawvideo) self.streampipe.add(self.disksink_audio) self.streampipe.add(self.disksink_stream) self.streampipe.add(self.icecastsink_audio) self.streampipe.add(self.icecastsink_stream) else: self.streampipe.add(self.audiosink) if self.feed == 'main' or self.feed == 'test': # Inputs elements: self.streampipe.add(self.videosrc) # Middle elements: self.streampipe.add(self.rtpjpegdepay) self.streampipe.add(self.jpegdec) self.streampipe.add(self.scaling) self.streampipe.add(self.capsfilter) self.streampipe.add(self.tee_videodecoded) self.streampipe.add(self.queuev_1) if self.feed == 'test': print ('TEST OK...', end='') elif self.feed == 'backup': # Inputs elements: self.streampipe.add(self.videosrc_backup) # Middle elements: self.streampipe.add(self.capsfilter_backup) print ('BACKUP OK...', end='') print('added') def link_pipeline_elements(self): """Link all elements with static pads.""" print(INFO, gettime(), 'Pipeline creation state: linking elements... ', end='') cond = self.feed != 'test' # Audio feed: self.audiosrc.link(self.audiolevel) self.audiolevel.link(self.queuea_1) if cond: self.queuea_1.link(self.vorbisenc) self.connect_tee(self.tee_rawaudio, self.vorbisenc, self.queuea_2, self.queuea_5,) self.queuea_2.link(self.oggmux) self.connect_tee(self.tee_streamaudio, self.oggmux, self.queuea_3, self.queuea_4,) self.queuea_3.link(self.disksink_audio) self.queuea_4.link(self.icecastsink_audio) self.queuea_5.link(self.webmmux) else: self.queuea_1.link(self.audiosink) # Video feed: if cond: self.queuev_2.link(self.mkvmux) self.mkvmux.link(self.queuev_4) self.queuev_4.link(self.disksink_rawvideo) else: self.queuev_1.link(self.rtpjpegdepay) self.rtpjpegdepay.link(self.jpegdec) self.jpegdec.link(self.queuev_3) self.queuev_3.link(self.screensink) # Stream (audio+video) feed: if cond: self.vp8enc.link(self.queuev_5) self.queuev_5.link(self.webmmux) self.connect_tee(self.tee_streamfull, self.webmmux, self.queuem_1, self.queuem_2,) self.queuem_1.link(self.disksink_stream) self.queuem_2.link(self.icecastsink_stream) if self.feed == 'main': # linking here RTSP feed self.queuev_1.link(self.rtpjpegdepay) self.connect_tee(self.tee_rawvideo, self.rtpjpegdepay, self.queuev_2, self.jpegdec,) self.connect_tee(self.tee_videodecoded, self.jpegdec, self.queuev_3, self.scaling,) # Stream (video) feed: self.scaling.link(self.capsfilter) self.capsfilter.link(self.vp8enc) elif self.feed == 'backup': # linking here backup feed (WEBCAM) self.videosrc_backup.link(self.capsfilter_backup) self.connect_tee(self.tee_rawvideo, self.capsfilter_backup, self.queuev_2, self.queuev_3, output_element_3=self.vp8enc) ## self.capsfilter_backup.link(self.queuev_3) print('BACKUP OK...', end='') if not cond: print('TEST OK...', end='') print('linked') def create_gstreamer_pipeline(self): # New empty pipeline: self.streampipe = Gst.Pipeline() self.create_pipeline_elements() # Setting-up: self.add_elements_to_pipeline() self.link_pipeline_elements() if self.feed == 'main' or self.feed == 'test': self.create_pipeline_callbacks() global bus bus = self.streampipe.get_bus() bus.add_signal_watch() bus.enable_sync_message_emission() # Used to get messages that GStreamer emits. bus.connect("message", self.on_message) print(INFO, gettime(), 'Pipeline creation state: successfully done.') return self.streampipe def on_message(self, bus, message): t = message.type if t == Gst.MessageType.EOS: self.streampipe.set_state(Gst.State.NULL) elif t == Gst.MessageType.ERROR: err, debug = message.parse_error() print (ERROR, '%s' % err, debug) def stream_play(self): self.streampipe.set_state(Gst.State.PLAYING) if self.feed == 'backup': print(WARN, gettime(), 'Backup pipeline started.') print(INFO, gettime(), 'PLAYING State resquested.') def stream_stop(self): self.streampipe.set_state(Gst.State.NULL) print(INFO, gettime(), 'STOPPED State resquested.') def set_filenames(self, string, streamfailed=False): """Sets filename and location for each sink.""" global fail_counter filename = string audio = './' + DIR_NAME + '/' + filename + '_AUDIO' rawvideo = './' + DIR_NAME + '/' + filename + '_RAWVIDEO' stream = './' + DIR_NAME + '/' + filename + '_STREAM' if self.feed == 'main': if streamfailed and filename: audio = audio + FAILED_SUFFIX + str(fail_counter) rawvideo = rawvideo + FAILED_SUFFIX + str(fail_counter) stream = stream + FAILED_SUFFIX + str(fail_counter) self.rename_files(audio, rawvideo, stream) fail_counter += 1 elif streamfailed: audio = AUDIO_DEFAULT + FAILED_SUFFIX + str(fail_counter) rawvideo = RAWVIDEO_DEFAULT + FAILED_SUFFIX + str(fail_counter) stream = STREAM_DEFAULT + FAILED_SUFFIX + str(fail_counter) self.rename_files(audio, rawvideo, stream) fail_counter += 1 else: self.rename_files(audio, rawvideo, stream) elif self.feed == 'backup': ## print('INSIDE BACKUP RENAMING') rename(AUDIO_BACKUP, audio) rename(RAWVIDEO_BACKUP, rawvideo) rename(STREAM_BACKUP, stream) print(INFO, gettime(), 'Audio file written on disk.') print(INFO, gettime(), 'Raw video file written on disk.') print(INFO, gettime(), 'Streamed file written on disk.') def rename_files(self, audio_name, rawvideo_name, stream_name): rename(AUDIO_DEFAULT, audio_name) rename(RAWVIDEO_DEFAULT, rawvideo_name) rename(STREAM_DEFAULT, stream_name) def get_gstreamer_bus(): return bus def gettime(): return strftime('%y-%m-%d_%H:%M:%S ', localtime())