With the following code i want to established an audio connection between a python user (pyqt5) (server) and i browser user (html5) (client).
File: index.html
<!doctype html>
<html>
<head>
<title>Ip calls</title>
<!-- jquery library -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.7.1/jquery.min.js"></script>
<!-- own javascript -->
<script src="telephone_calls.js"></script>
</head>
<body>
<h1>Ip calls</h1>
<table>
<tr>
<td><label for="name">Όνομα:</label></td>
<td><input type="text" id="name" /></td>
</tr>
<tr>
<td><label for="surname">Επώνυμο:</label></td>
<td><input type="text" id="surname" /></td>
</tr>
</table>
</br>
<input id="control_call_button" type="button" value="Έναρξη κλήσης">
<input id="stop_call_button" type="button" value="Διακοπή κλήσης" style="visibility:hidden">
<audio style="width:100%;margin-top:5mm;background:rgb(241,243,244)" id="audio" autoplay="true" controls="true"></audio>
</body>
</html>
File: telephone_calls.py
// peer connection
var pc = null;
function createPeerConnection() {
var config = {
sdpSemantics: 'unified-plan',
iceServers: [{urls: ['stun:stun.l.google.com:19302']}]
};
pc = new RTCPeerConnection(config);
//pc = new RTCPeerConnection();
// connect audio
pc.addEventListener('track', function(evt) {
if (evt.track.kind == 'audio'){
document.getElementById('audio').srcObject = evt.streams[0];
$("#control_call_button").css("visibility","hidden");
$("#stop_call_button").css("visibility","visible");
};
});
return pc;
}
function negotiate() {
return pc.createOffer({offerToReceiveAudio:true}).then(function(offer) {
return pc.setLocalDescription(offer);
}).then(function() {
// wait for ICE gathering to complete
return new Promise(function(resolve) {
console.log(pc.iceGatheringState);
if (pc.iceGatheringState === 'complete') {
resolve();
} else {
function checkState() {
console.log(pc.iceGatheringState);
if (pc.iceGatheringState === 'complete') {
pc.removeEventListener('icegatheringstatechange', checkState);
resolve();
}
}
pc.addEventListener('icegatheringstatechange', checkState);
}
});
}).then(function() {
var offer = pc.localDescription;
return fetch('/offer', {
body: JSON.stringify({
sdp: offer.sdp,
type: offer.type,
"name":name,
"surname":surname
}),
headers: {
'Content-Type': 'application/json'
},
method: 'POST'
});
}).then(function(response) {
return response.json();
}).then(function(answer) {
return pc.setRemoteDescription(answer);
}).catch(function(e) {
alert(e);
console.log(e);
});
}
function stop_peer_connection() {
// send disconnect message because iceconnectionstate slow to go in failed or in closed state
dc.send("disconnected");
// close data channel
if (dc) {
dc.close();
}
// close local audio / video
pc.getSenders().forEach(function(sender) {
sender.track.stop();
});
// close transceivers
if (pc.getTransceivers) {
pc.getTransceivers().forEach(function(transceiver) {
if (transceiver.stop) {
transceiver.stop();
}
});
}
// close peer connection
setTimeout(function() {
pc.close();
pc = null;
}, 500);
$("#control_call_button").css("visibility","visible");
$("#stop_call_button").css("visibility","hidden");
}
function start(name,surname) {
pc = createPeerConnection();
dc = pc.createDataChannel('radio_metadata', {"ordered": true});
dc.onclose = function() {
};
dc.onopen = function() {
};
dc.onmessage = function(evt) {
console.log(evt.data);
};
//negotiate();
constraints = {audio:true,video:false};
navigator.mediaDevices.getUserMedia(constraints).then(function(stream) {
stream.getTracks().forEach(function(track) {
pc.addTrack(track, stream);
});
return negotiate();
}, function(err) {
alert('Could not acquire media: ' + err);
});
}
$(document).ready(function(){
$("#control_call_button").on( "click", function() {
name = $("#name").val();
surname = $("#surname").val();
start(name,surname)
});
$("#stop_call_button").on( "click", function() {
stop_peer_connection();
});
})
File: calls.py
# -*- coding: utf-8 -*-
from PyQt5 import QtCore, QtGui, QtWidgets
class Ui_Dialog(object):
def setupUi(self, Dialog):
Dialog.setObjectName("Dialog")
Dialog.resize(492, 60)
self.gridLayout = QtWidgets.QGridLayout(Dialog)
self.gridLayout.setObjectName("gridLayout")
self.label = QtWidgets.QLabel(Dialog)
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 0, 0, 1, 2)
self.pushButton = QtWidgets.QPushButton(Dialog)
self.pushButton.setObjectName("pushButton")
self.gridLayout.addWidget(self.pushButton, 1, 0, 1, 1)
self.pushButton_2 = QtWidgets.QPushButton(Dialog)
self.pushButton_2.setObjectName("pushButton_2")
self.gridLayout.addWidget(self.pushButton_2, 1, 1, 1, 1)
self.pushButton_3 = QtWidgets.QPushButton(Dialog)
self.pushButton_3.setObjectName("pushButton_3")
self.gridLayout.addWidget(self.pushButton_3, 1, 2, 1, 1)
self.retranslateUi(Dialog)
QtCore.QMetaObject.connectSlotsByName(Dialog)
def retranslateUi(self, Dialog):
_translate = QtCore.QCoreApplication.translate
Dialog.setWindowTitle(_translate("Dialog", "Τηλεφωνικές κλήσεις"))
self.label.setText(_translate("Dialog", "Τηλεφωνική κλήση από: ... ..."))
self.pushButton.setText(_translate("Dialog", "Απάντηση κλήσης"))
self.pushButton_2.setText(_translate("Dialog", "Απόρριψη κλήσης"))
self.pushButton_3.setText(_translate("Dialog", "Τερματισμός κλήσης"))
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
Dialog = QtWidgets.QDialog()
ui = Ui_Dialog()
ui.setupUi(Dialog)
Dialog.show()
sys.exit(app.exec_())
File: server_new.py
from PyQt5 import QtCore, QtGui, QtWidgets
from calls import Ui_Dialog
from aiohttp import web
from aiohttp.web_runner import GracefulExit
from aiortc.mediastreams import MediaStreamTrack
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer
from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder, MediaRelay
import av
import pyaudio
from pydub import AudioSegment
import asyncio
import json
import os
from multiprocessing import Process, Queue, Pipe, freeze_support
from queue import Queue as Simple_Queue
import sys
import threading
from datetime import datetime, timedelta
from time import sleep
import fractions
import time
import requests
stream_offer = None
pc = None
micTrack = None
blackHole = None
class Main:
def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.Dialog = QtWidgets.QDialog()
self.ui = Ui_Dialog()
self.ui.setupUi(self.Dialog)
self.Dialog.show()
self.ui.label.hide()
self.ui.pushButton.hide()
self.ui.pushButton_2.hide()
self.ui.pushButton_3.hide()
self.aiohttp_server = WebRtcServer()
self.aiohttp_server.start()
self.Dialog.closeEvent = lambda event:self.closeEvent(event)
sys.exit(self.app.exec_())
def closeEvent(self,event):
#try:
# response = requests.post('http://192.168.1.188:8080/shutdown', timeout=30)
#except:
# pass
self.aiohttp_server.terminate()
event.accept()
class WebRtcServer(Process):
def __init__(self):
super().__init__()
self.ROOT = os.path.dirname(__file__)
self.pc = None
self.stream_offer = None
self.micTrack = None
self.blackHole = None
def run(self):
self.app = web.Application()
self.app.on_shutdown.append(self.on_shutdown)
self.app.router.add_get("/", self.index)
self.app.router.add_get("/telephone_calls.js", self.javascript)
self.app.router.add_post("/offer", self.offer)
self.app.router.add_post("/shutdown", self.shutdown_aiohttp)
web.run_app(self.app, access_log=None, host="192.168.1.188", port=8080, ssl_context=None)
async def index(self,request):
content = open(os.path.join(self.ROOT, "index.html"), encoding="utf8").read()
return web.Response(content_type="text/html", text=content)
async def javascript(self,request):
content = open(os.path.join(self.ROOT, "telephone_calls.js"), encoding="utf8").read()
return web.Response(content_type="application/javascript", text=content)
async def offer(self,request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
name = params["name"]
surname = params["surname"]
#print(name+" "+surname)
pc = RTCPeerConnection()
self.pc = pc
self.stream_offer = Server_Stream_Offer()
self.pc.addTrack(self.stream_offer)
@pc.on("track")
async def on_track(track):
self.micTrack = ClientTrack(track)
self.blackHole = MediaBlackhole()
self.blackHole.addTrack(self.micTrack)
await self.blackHole.start()
@track.on("ended")
async def on_ended():
pass
@pc.on("connectionstatechange")
async def on_connectionstatechange():
if self.pc.connectionState == "closed" or self.pc.connectionState == "failed":
pass
@pc.on("datachannel")
async def on_datachannel(channel):
@channel.on("message")
async def on_message(message):
if message == "disconnected":
try:
print("1")
if self.stream_offer is not None:
self.stream_offer.stop()
self.stream_offer.stop_offering()
del self.stream_offer
self.stream_offer = None
print("self.stream_offer = None")
if self.blackHole is not None:
await self.blackHole.stop()
self.blackHole = None
print("Black hole terminated")
if self.micTrack is not None:
self.micTrack.close_full()
del self.micTrack
self.micTrack = None
print("Microphone track terminated")
print("Closing pc")
if pc is not None:
await pc.close()
print("Pc closed")
except Exception as e:
print(e)
# handle offer
await self.pc.setRemoteDescription(offer)
# send answer
answer = await self.pc.createAnswer()
await self.pc.setLocalDescription(answer)
return web.Response(content_type="application/json",text=json.dumps({"sdp": self.pc.localDescription.sdp, "type": self.pc.localDescription.type}))
async def on_shutdown(self,app):
try:
if self.stream_offer is not None:
self.stream_offer.stop_offering()
self.stream_offer.stop()
del self.stream_offer
self.stream_offer = None
if self.pc is not None:
coros = [self.pc.close()]
await asyncio.gather(*coros)
if self.micTrack is not None:
self.micTrack.close_full()
if self.blackHole is not None:
await self.blackHole.stop()
raise GracefulExit()
except Exception as e:
print(e)
async def shutdown_aiohttp(self,request):
await self.on_shutdown(self.app)
return web.Response(content_type="text/html", text="")
class Server_Stream_Offer(MediaStreamTrack):
kind = "audio"
def __init__(self):
super().__init__() # don't forget this!
self.q = Simple_Queue()
self.codec = av.CodecContext.create('pcm_s16le', 'r')
self.codec.sample_rate = 8000
self.codec.channels = 2
self.audio_samples = 0
self.p = pyaudio.PyAudio()
self.input_stream = self.p.open(format=pyaudio.paInt16,channels=2,rate=8000,input=True,frames_per_buffer=int(8000*0.020))
self.input_stream.start_stream()
self.run = True
self.read_from_microphone_thread = threading.Thread(target=self.read_from_microphone)
self.read_from_microphone_thread.start()
async def recv(self):
packet = av.Packet(self.q.get())
frame = self.codec.decode(packet)[0]
frame.pts = self.audio_samples
frame.time_base = fractions.Fraction(1, self.codec.sample_rate)
self.audio_samples += frame.samples
return frame
def read_from_microphone(self):
while(self.run):
in_data = self.input_stream.read(int(8000*0.020),exception_on_overflow = False)
slice = AudioSegment(in_data, sample_width=2, frame_rate=8000, channels=2)
self.q.put(slice.raw_data)
def stop_offering(self):
try:
self.run = False
self.input_stream.stop_stream()
self.input_stream.close()
self.read_from_microphone_thread.join()
print("Read from microphone thread terminated")
except Exception as e:
print(e)
class ClientTrack(MediaStreamTrack):
kind = "audio"
def __init__(self, track):
super().__init__()
self.track = track
self.q = Simple_Queue()
self.p = pyaudio.PyAudio()
self.output_stream = self.p.open(format=pyaudio.paInt16,channels=2,rate=44800,output=True,frames_per_buffer=int(16384/4))
self.output_stream.start_stream()
self.run = True
self.hear_client_thread = threading.Thread(target=self.hear_client)
self.hear_client_thread.start()
async def recv(self):
# Get a new PyAV frame
try:
frame = await self.track.recv()
except:
print("Exception")
self.track = None
if self.run:
self.close_full()
return None
else:
return None
self.q.put(frame)
def hear_client(self):
while(self.run):
frame = self.q.get()
packet_bytes = frame.to_ndarray().tobytes()
self.output_stream.write(packet_bytes)
def close_full(self):
self.run = False
self.hear_client_thread.join()
self.output_stream.stop_stream()
self.output_stream.close()
print("output stream closed")
if __name__ == "__main__":
program = Main()
to run this example:
- run
ngrok http 192.168.1.188:8080from the terminal - run
python server_new.pyfrom the terminal - view ngrok url page, then hit the button to start the audio call.
Tip: change lan ip address (192.168.1.188) to match yours ip address. Change both in server_new.py file and ngrok command.
The connections are ready for use.
The problem is when i try to disconnect (i try from client hitting the other button appear). The self.micTrack run recv() method continuously. How can i stop this?
How to termiate this connection properly?
Thanks in advance,
Chris Pappas