aiortc peer to peer connection (browser user - python user) is not closed properly

110 Views Asked by At

With the following code i want to established an audio connection between a python user (pyqt5) (server) and i browser user (html5) (client).

File: index.html

<!doctype html>
<html>
    <head>
        <title>Ip calls</title>
        
        <!-- jquery library -->
        <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.7.1/jquery.min.js"></script>
        
        <!-- own javascript -->
        <script src="telephone_calls.js"></script>
        
    </head>
    <body>
        <h1>Ip calls</h1>
        
        <table>
        <tr>
            <td><label for="name">Όνομα:</label></td>
            <td><input type="text" id="name" /></td>
        </tr>
        <tr>
            <td><label for="surname">Επώνυμο:</label></td>
            <td><input type="text" id="surname" /></td>
        </tr>
        </table>
        </br>
        <input id="control_call_button" type="button" value="Έναρξη κλήσης">
        <input id="stop_call_button" type="button" value="Διακοπή κλήσης" style="visibility:hidden">
        
        <audio style="width:100%;margin-top:5mm;background:rgb(241,243,244)" id="audio" autoplay="true" controls="true"></audio>
    </body>
</html>

File: telephone_calls.py

// peer connection
var pc = null;

function createPeerConnection() {
    var config = {
        sdpSemantics: 'unified-plan',
        iceServers: [{urls: ['stun:stun.l.google.com:19302']}]
    };
    

    pc = new RTCPeerConnection(config);
    //pc = new RTCPeerConnection();

    // connect audio
    pc.addEventListener('track', function(evt) {
        if (evt.track.kind == 'audio'){
            document.getElementById('audio').srcObject = evt.streams[0];
            $("#control_call_button").css("visibility","hidden");
            $("#stop_call_button").css("visibility","visible");
        };
    });
    


    return pc;
}

function negotiate() {
    return pc.createOffer({offerToReceiveAudio:true}).then(function(offer) {
        return pc.setLocalDescription(offer);
    }).then(function() {
        // wait for ICE gathering to complete
        return new Promise(function(resolve) {
            console.log(pc.iceGatheringState);
            if (pc.iceGatheringState === 'complete') {
                resolve();
            } else {
                function checkState() {
                    console.log(pc.iceGatheringState);
                    if (pc.iceGatheringState === 'complete') {
                        pc.removeEventListener('icegatheringstatechange', checkState);
                        resolve();
                    }
                }
                pc.addEventListener('icegatheringstatechange', checkState);

            }
        });
    }).then(function() {
        var offer = pc.localDescription;
        
        return fetch('/offer', {
            body: JSON.stringify({
                sdp: offer.sdp,
                type: offer.type,
                "name":name,
                "surname":surname
            }),
            headers: {
                'Content-Type': 'application/json'
            },
            method: 'POST'
        });
    }).then(function(response) {
        return response.json();
    }).then(function(answer) {
            return pc.setRemoteDescription(answer);
    }).catch(function(e) {
        alert(e);
        console.log(e);
    });
    
}

function stop_peer_connection() {
    // send disconnect message because iceconnectionstate slow to go in failed or in closed state
    dc.send("disconnected");

    // close data channel
    if (dc) {
        dc.close();
    }

    // close local audio / video
    pc.getSenders().forEach(function(sender) {
        sender.track.stop();
    });

    // close transceivers
    if (pc.getTransceivers) {
        pc.getTransceivers().forEach(function(transceiver) {
            if (transceiver.stop) {
                transceiver.stop();
            }
        });
    }

    

    // close peer connection
    setTimeout(function() {
        pc.close();
        pc = null;
    }, 500);

    $("#control_call_button").css("visibility","visible");
    $("#stop_call_button").css("visibility","hidden");
}

function start(name,surname) {
    pc = createPeerConnection();
    
    dc = pc.createDataChannel('radio_metadata', {"ordered": true});
    dc.onclose = function() {
        
    };
    dc.onopen = function() {
        
    };
    dc.onmessage = function(evt) {
        console.log(evt.data);
    };
    
    
    
    //negotiate();
    
    constraints = {audio:true,video:false};
    
    navigator.mediaDevices.getUserMedia(constraints).then(function(stream) {
        stream.getTracks().forEach(function(track) {
            pc.addTrack(track, stream);
        });
        return negotiate();
        }, function(err) {
            alert('Could not acquire media: ' + err);
    });
    
    
}

$(document).ready(function(){
    $("#control_call_button").on( "click", function() {
        name = $("#name").val();
        surname = $("#surname").val();
        start(name,surname)
    });
    
    $("#stop_call_button").on( "click", function() {
        stop_peer_connection();
    });
})

File: calls.py

# -*- coding: utf-8 -*-

from PyQt5 import QtCore, QtGui, QtWidgets


class Ui_Dialog(object):
    def setupUi(self, Dialog):
        Dialog.setObjectName("Dialog")
        Dialog.resize(492, 60)
        self.gridLayout = QtWidgets.QGridLayout(Dialog)
        self.gridLayout.setObjectName("gridLayout")
        self.label = QtWidgets.QLabel(Dialog)
        self.label.setObjectName("label")
        self.gridLayout.addWidget(self.label, 0, 0, 1, 2)
        self.pushButton = QtWidgets.QPushButton(Dialog)
        self.pushButton.setObjectName("pushButton")
        self.gridLayout.addWidget(self.pushButton, 1, 0, 1, 1)
        self.pushButton_2 = QtWidgets.QPushButton(Dialog)
        self.pushButton_2.setObjectName("pushButton_2")
        self.gridLayout.addWidget(self.pushButton_2, 1, 1, 1, 1)
        self.pushButton_3 = QtWidgets.QPushButton(Dialog)
        self.pushButton_3.setObjectName("pushButton_3")
        self.gridLayout.addWidget(self.pushButton_3, 1, 2, 1, 1)

        self.retranslateUi(Dialog)
        QtCore.QMetaObject.connectSlotsByName(Dialog)

    def retranslateUi(self, Dialog):
        _translate = QtCore.QCoreApplication.translate
        Dialog.setWindowTitle(_translate("Dialog", "Τηλεφωνικές κλήσεις"))
        self.label.setText(_translate("Dialog", "Τηλεφωνική κλήση από: ... ..."))
        self.pushButton.setText(_translate("Dialog", "Απάντηση κλήσης"))
        self.pushButton_2.setText(_translate("Dialog", "Απόρριψη κλήσης"))
        self.pushButton_3.setText(_translate("Dialog", "Τερματισμός κλήσης"))


if __name__ == "__main__":
    import sys
    app = QtWidgets.QApplication(sys.argv)
    Dialog = QtWidgets.QDialog()
    ui = Ui_Dialog()
    ui.setupUi(Dialog)
    Dialog.show()
    sys.exit(app.exec_())

File: server_new.py

from PyQt5 import QtCore, QtGui, QtWidgets
from calls import Ui_Dialog
from aiohttp import web
from aiohttp.web_runner import GracefulExit
from aiortc.mediastreams import MediaStreamTrack
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer
from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder, MediaRelay
import av
import pyaudio
from pydub import AudioSegment
import asyncio
import json
import os
from multiprocessing import Process, Queue, Pipe, freeze_support
from queue import Queue as Simple_Queue
import sys
import threading
from datetime import datetime, timedelta
from time import sleep
import fractions
import time
import requests

stream_offer = None
pc = None
micTrack = None
blackHole = None

class Main:
    def __init__(self):
        self.app = QtWidgets.QApplication(sys.argv)
        self.Dialog = QtWidgets.QDialog()
        self.ui = Ui_Dialog()
        self.ui.setupUi(self.Dialog)
        
        self.Dialog.show()
        
        self.ui.label.hide()
        self.ui.pushButton.hide()
        self.ui.pushButton_2.hide()
        self.ui.pushButton_3.hide()
        
        self.aiohttp_server = WebRtcServer()
        self.aiohttp_server.start()
        
        self.Dialog.closeEvent = lambda event:self.closeEvent(event)
        
        sys.exit(self.app.exec_())
                
    def closeEvent(self,event):
        #try:
        #   response = requests.post('http://192.168.1.188:8080/shutdown', timeout=30)
        #except:
        #   pass
        self.aiohttp_server.terminate()
        event.accept()


class WebRtcServer(Process):
    def __init__(self):
        super().__init__()
        self.ROOT = os.path.dirname(__file__)
        self.pc = None
        self.stream_offer = None
        self.micTrack = None
        self.blackHole = None
    
    def run(self):
        self.app = web.Application()
        self.app.on_shutdown.append(self.on_shutdown)
        self.app.router.add_get("/", self.index)
        self.app.router.add_get("/telephone_calls.js", self.javascript)
        self.app.router.add_post("/offer", self.offer)
        self.app.router.add_post("/shutdown", self.shutdown_aiohttp)
        web.run_app(self.app, access_log=None, host="192.168.1.188", port=8080, ssl_context=None)

    async def index(self,request):
        content = open(os.path.join(self.ROOT, "index.html"), encoding="utf8").read()
        return web.Response(content_type="text/html", text=content)

    async def javascript(self,request):
        content = open(os.path.join(self.ROOT, "telephone_calls.js"), encoding="utf8").read()
        return web.Response(content_type="application/javascript", text=content)

    async def offer(self,request):  
        params = await request.json()
        offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
        name = params["name"]
        surname = params["surname"]
        #print(name+" "+surname)
        pc = RTCPeerConnection()
        self.pc = pc
        self.stream_offer = Server_Stream_Offer()
        self.pc.addTrack(self.stream_offer)

        @pc.on("track")
        async def on_track(track):  
            self.micTrack = ClientTrack(track)
            self.blackHole = MediaBlackhole()
            self.blackHole.addTrack(self.micTrack)
            await self.blackHole.start()

            @track.on("ended")
            async def on_ended():
                pass

        @pc.on("connectionstatechange")
        async def on_connectionstatechange():
            if self.pc.connectionState == "closed" or self.pc.connectionState == "failed":
                pass


                    
        @pc.on("datachannel")
        async def on_datachannel(channel):
            @channel.on("message")
            async def on_message(message):
                if message == "disconnected":
                    try:
                        print("1")
                        if self.stream_offer is not None:
                            self.stream_offer.stop()
                            self.stream_offer.stop_offering()
                        del self.stream_offer
                        self.stream_offer = None
                        print("self.stream_offer = None")
                        
                        if self.blackHole is not None:
                            await self.blackHole.stop()
                            self.blackHole = None
                        print("Black hole terminated")
                        
                        if self.micTrack is not None:
                            self.micTrack.close_full()
                            del self.micTrack
                            self.micTrack = None
                        print("Microphone track terminated")
                        
                        print("Closing pc")
                        if pc is not None:
                            await pc.close()
                        print("Pc closed")
                    except Exception as e:
                        print(e)

        # handle offer
        await self.pc.setRemoteDescription(offer)

        # send answer
        answer = await self.pc.createAnswer()
        await self.pc.setLocalDescription(answer)
                    
        return web.Response(content_type="application/json",text=json.dumps({"sdp": self.pc.localDescription.sdp, "type": self.pc.localDescription.type}))

    async def on_shutdown(self,app):
        try:
            if self.stream_offer is not None:
                self.stream_offer.stop_offering()
                self.stream_offer.stop()
            del self.stream_offer
            self.stream_offer = None
            if self.pc is not None:
                coros = [self.pc.close()]
                await asyncio.gather(*coros)
            if self.micTrack is not None:
                self.micTrack.close_full()
            if self.blackHole is not None:
                await self.blackHole.stop()
            raise GracefulExit()
        except Exception as e:
            print(e)

    async def shutdown_aiohttp(self,request):
        await self.on_shutdown(self.app)
        return web.Response(content_type="text/html", text="")


class Server_Stream_Offer(MediaStreamTrack):
    kind = "audio"

    def __init__(self):
        super().__init__()  # don't forget this!
        
        self.q = Simple_Queue()
        
        self.codec = av.CodecContext.create('pcm_s16le', 'r')
        self.codec.sample_rate = 8000
        self.codec.channels = 2

        self.audio_samples = 0

        self.p = pyaudio.PyAudio()
        self.input_stream = self.p.open(format=pyaudio.paInt16,channels=2,rate=8000,input=True,frames_per_buffer=int(8000*0.020))
        self.input_stream.start_stream()
        
        self.run = True
        
        self.read_from_microphone_thread = threading.Thread(target=self.read_from_microphone)
        self.read_from_microphone_thread.start()
        
    async def recv(self):
        packet = av.Packet(self.q.get())
        frame = self.codec.decode(packet)[0]
        frame.pts = self.audio_samples
        frame.time_base = fractions.Fraction(1, self.codec.sample_rate)
        self.audio_samples += frame.samples
        return frame

    def read_from_microphone(self):
        while(self.run):
            in_data = self.input_stream.read(int(8000*0.020),exception_on_overflow = False)            
            slice = AudioSegment(in_data, sample_width=2, frame_rate=8000, channels=2)
            self.q.put(slice.raw_data)

    def stop_offering(self):
        try:
            self.run = False
            self.input_stream.stop_stream()
            self.input_stream.close()
            self.read_from_microphone_thread.join()
            print("Read from microphone thread terminated")
        except Exception as e:
            print(e)
            
class ClientTrack(MediaStreamTrack):
    kind = "audio"

    def __init__(self, track):
        super().__init__()
        self.track = track
        self.q = Simple_Queue()
        self.p = pyaudio.PyAudio()
        self.output_stream = self.p.open(format=pyaudio.paInt16,channels=2,rate=44800,output=True,frames_per_buffer=int(16384/4))
        self.output_stream.start_stream()
        
        self.run = True
        self.hear_client_thread = threading.Thread(target=self.hear_client)
        self.hear_client_thread.start()
        
    async def recv(self):
        # Get a new PyAV frame

        try:
            frame = await self.track.recv()
        except:
            print("Exception")
            self.track = None
            if self.run:
                self.close_full()
                return None
            else:
                return None
            
        self.q.put(frame)
        
        
    def hear_client(self):
        while(self.run):
            frame = self.q.get()
            packet_bytes = frame.to_ndarray().tobytes()
            self.output_stream.write(packet_bytes)
        
    def close_full(self):
        self.run = False
        self.hear_client_thread.join()
        self.output_stream.stop_stream()
        self.output_stream.close()
        print("output stream closed")

if __name__ == "__main__":
    program = Main()

to run this example:

  • run ngrok http 192.168.1.188:8080 from the terminal
  • run python server_new.py from the terminal
  • view ngrok url page, then hit the button to start the audio call.

Tip: change lan ip address (192.168.1.188) to match yours ip address. Change both in server_new.py file and ngrok command.

The connections are ready for use.

The problem is when i try to disconnect (i try from client hitting the other button appear). The self.micTrack run recv() method continuously. How can i stop this?

How to termiate this connection properly?

Thanks in advance,

Chris Pappas

0

There are 0 best solutions below