I am trying to established a connection between a pyqt5 python user and a html5 browser user.
From the server side i run:
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import pyqtSignal, QThread, Qt
from calls import Ui_Dialog
from aiohttp import web
from aiohttp.web_runner import GracefulExit
from aiortc.mediastreams import MediaStreamTrack,MediaStreamError
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer
from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder, MediaRelay
import av
import pyaudio
from pydub import AudioSegment
import asyncio
import json
import os
from multiprocessing import Process, Queue, Pipe, freeze_support
from queue import Queue as Simple_Queue
import sys
import threading
from datetime import datetime, timedelta
from time import sleep
import fractions
import time
import requests
stream_offer = None
pc = None
micTrack = None
blackHole = None
class Main:
def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.Dialog = QtWidgets.QDialog()
self.ui = Ui_Dialog()
self.ui.setupUi(self.Dialog)
self.Dialog.show()
self.ui.label.hide()
self.ui.pushButton.hide()
self.ui.pushButton_2.hide()
self.ui.pushButton_3.hide()
self.mother_pipe, self.child_pipe = Pipe()
self.queue = Queue()
self.emitter = Emitter(self.mother_pipe)
self.emitter.call_offering.connect(lambda name,surname:self.new_call(name,surname))
self.emitter.call_status.connect(lambda status:self.call_status(status))
self.emitter.start()
self.aiohttp_server = WebRtcServer(self.child_pipe,self.queue)
self.aiohttp_server.start()
self.Dialog.closeEvent = lambda event:self.closeEvent(event)
sys.exit(self.app.exec_())
def new_call(self,name,surname):
self.ui.label.show()
self.ui.label.setText("Τηλεφωνική κλήση από: "+str(name)+" "+str(surname))
self.ui.pushButton.show()
self.ui.pushButton.clicked.connect(lambda state:self.answer_call(state))
self.ui.pushButton_2.show()
self.ui.pushButton_2.clicked.connect(lambda state:self.reject_call(state))
def answer_call(self,state):
self.ui.pushButton.hide()
self.ui.pushButton_2.hide()
self.ui.pushButton_3.show()
self.ui.pushButton_3.clicked.connect(lambda state:self.end_call(state))
self.queue.put({"type":"call","call":"answer"})
def reject_call(self,state):
self.ui.label.hide()
self.ui.pushButton.hide()
self.ui.pushButton_2.hide()
self.queue.put({"type":"call","call":"reject"})
def end_call(self,state):
self.ui.label.hide()
self.ui.pushButton.hide()
self.ui.pushButton_2.hide()
self.ui.pushButton_3.hide()
self.aiohttp_server.send_channel_message("closing")
def call_status(self,status):
if status == "closed-by-client":
self.ui.label.hide()
self.ui.pushButton.hide()
self.ui.pushButton_2.hide()
self.ui.pushButton_3.hide()
def closeEvent(self,event):
#try:
# response = requests.post('http://192.168.1.188:8080/shutdown', timeout=30)
#except:
# pass
self.aiohttp_server.terminate()
event.accept()
class Emitter(QThread):
call_offering = pyqtSignal(str,str)
call_status = pyqtSignal(str)
def __init__(self, from_process: Pipe):
super().__init__()
self.data_from_process = from_process
def run(self):
while True:
data = self.data_from_process.recv()
if data["type"]=="call_offering":
self.call_offering.emit(data["name"],data["surname"])
elif data["type"] == "call-status":
self.call_status.emit(data["status"])
class WebRtcServer(Process):
def __init__(self, to_emitter, from_mother):
super().__init__()
self.ROOT = os.path.dirname(__file__)
self.pc = None
self.channels = []
self.stream_offer = None
self.micTrack = None
self.blackHole = None
self.to_emitter = to_emitter
self.data_from_mother = from_mother
def run(self):
self.app = web.Application()
self.app.on_shutdown.append(self.on_shutdown)
self.app.router.add_get("/", self.index)
self.app.router.add_get("/telephone_calls.js", self.javascript)
self.app.router.add_post("/offer", self.offer)
self.app.router.add_post("/shutdown", self.shutdown_aiohttp)
web.run_app(self.app, access_log=None, host="192.168.1.188", port=8080, ssl_context=None)
async def index(self,request):
content = open(os.path.join(self.ROOT, "index.html"), encoding="utf8").read()
return web.Response(content_type="text/html", text=content)
async def javascript(self,request):
content = open(os.path.join(self.ROOT, "telephone_calls.js"), encoding="utf8").read()
return web.Response(content_type="application/javascript", text=content)
async def offer(self,request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
name = params["name"]
surname = params["surname"]
self.offer_in_progress = True
self.to_emitter.send({"type":"call_offering","name":name,"surname":surname})
timer = 0
while(timer<30 or self.data_from_mother.qsize()==0):
timer+=0.1
await asyncio.sleep(0.1)
if self.data_from_mother.qsize() == 0:
#reject offer
return web.Response(content_type="application/json",text=json.dumps({"sdp": "", "type": ""}))
else:
data = self.data_from_mother.get()
if data["type"] == "call" and data["call"] == "answer":
pc = RTCPeerConnection()
self.pc = pc
self.stream_offer = Server_Stream_Offer()
self.pc.addTrack(self.stream_offer)
@pc.on("track")
async def on_track(track):
self.micTrack = ClientTrack(track)
self.blackHole = MediaBlackhole()
self.blackHole.addTrack(self.micTrack)
await self.blackHole.start()
@track.on("ended")
async def on_ended():
pass
@pc.on("connectionstatechange")
async def on_connectionstatechange():
if self.pc.connectionState == "closed" or self.pc.connectionState == "failed":
try:
print("1")
if self.stream_offer is not None:
self.stream_offer.stop()
self.stream_offer.stop_offering()
del self.stream_offer
self.stream_offer = None
print("self.stream_offer = None")
if self.blackHole is not None:
await self.blackHole.stop()
self.blackHole = None
print("Black hole terminated")
if self.micTrack is not None:
self.micTrack.close_full()
del self.micTrack
self.micTrack = None
print("Microphone track terminated")
print("Closing pc")
if pc is not None:
await pc.close()
print("Pc closed")
self.to_emitter.send({"type":"call-status","status":"closed-by-client"})
self.channel = None
#self.offer_in_progress = False
#self.manage_call_end_thread.join()
except Exception as e:
print(e)
@pc.on("datachannel")
async def on_datachannel(channel):
self.channel = channel
@channel.on("message")
async def on_message(message):
if message == "disconnected":
try:
print("1")
if self.stream_offer is not None:
self.stream_offer.stop()
self.stream_offer.stop_offering()
del self.stream_offer
self.stream_offer = None
print("self.stream_offer = None")
if self.blackHole is not None:
await self.blackHole.stop()
self.blackHole = None
print("Black hole terminated")
if self.micTrack is not None:
self.micTrack.close_full()
del self.micTrack
self.micTrack = None
print("Microphone track terminated")
print("Closing pc")
if pc is not None:
await pc.close()
print("Pc closed")
self.to_emitter.send({"type":"call-status","status":"closed-by-client"})
self.channel = None
#self.offer_in_progress = False
#self.manage_call_end_thread.join()
except Exception as e:
print(e)
# handle offer
await self.pc.setRemoteDescription(offer)
# send answer
answer = await self.pc.createAnswer()
await self.pc.setLocalDescription(answer)
#self.manage_call_end_thread = threading.Thread(target=self.manage_call_end)
#self.manage_call_end_thread.start()
return web.Response(content_type="application/json",text=json.dumps({"sdp": self.pc.localDescription.sdp, "type": self.pc.localDescription.type}))
else:
#reject call
return web.Response(content_type="application/json",text=json.dumps({"sdp": "", "type": ""}))
def send_channel_message(self,message):
self.channel.send(message)
async def on_shutdown(self,app):
try:
if self.stream_offer is not None:
self.stream_offer.stop_offering()
self.stream_offer.stop()
del self.stream_offer
self.stream_offer = None
if self.pc is not None:
coros = [self.pc.close()]
await asyncio.gather(*coros)
if self.micTrack is not None:
self.micTrack.close_full()
if self.blackHole is not None:
await self.blackHole.stop()
raise GracefulExit()
except Exception as e:
print(e)
async def shutdown_aiohttp(self,request):
await self.on_shutdown(self.app)
return web.Response(content_type="text/html", text="")
class Server_Stream_Offer(MediaStreamTrack):
kind = "audio"
def __init__(self):
super().__init__() # don't forget this!
self.q = Simple_Queue()
self.codec = av.CodecContext.create('pcm_s16le', 'r')
self.codec.sample_rate = 8000
self.codec.channels = 2
self.audio_samples = 0
self.p = pyaudio.PyAudio()
self.input_stream = self.p.open(format=pyaudio.paInt16,channels=2,rate=8000,input=True,frames_per_buffer=int(8000*0.020))
self.input_stream.start_stream()
self.run = True
self.read_from_microphone_thread = threading.Thread(target=self.read_from_microphone)
self.read_from_microphone_thread.start()
async def recv(self):
packet = av.Packet(self.q.get())
frame = self.codec.decode(packet)[0]
frame.pts = self.audio_samples
frame.time_base = fractions.Fraction(1, self.codec.sample_rate)
self.audio_samples += frame.samples
return frame
def read_from_microphone(self):
while(self.run):
in_data = self.input_stream.read(int(8000*0.020),exception_on_overflow = False)
slice = AudioSegment(in_data, sample_width=2, frame_rate=8000, channels=2)
self.q.put(slice.raw_data)
def stop_offering(self):
try:
self.run = False
self.input_stream.stop_stream()
self.input_stream.close()
self.read_from_microphone_thread.join()
print("Read from microphone thread terminated")
except Exception as e:
print(e)
class ClientTrack(MediaStreamTrack):
kind = "audio"
def __init__(self, track):
super().__init__()
self.track = track
self.q = Simple_Queue()
self.p = pyaudio.PyAudio()
self.output_stream = self.p.open(format=pyaudio.paInt16,channels=2,rate=44800,output=True,frames_per_buffer=int(16384/4))
self.output_stream.start_stream()
self.run = True
self.hear_client_thread = threading.Thread(target=self.hear_client)
self.hear_client_thread.start()
async def recv(self):
# Get a new PyAV frame
try:
frame = await self.track.recv()
except:
self.track = None
if self.run:
self.close_full()
raise MediaStreamError
return None
else:
raise MediaStreamError
return None
self.q.put(frame)
def hear_client(self):
while(self.run):
frame = self.q.get()
packet_bytes = frame.to_ndarray().tobytes()
self.output_stream.write(packet_bytes)
def close_full(self):
self.run = False
self.hear_client_thread.join()
self.output_stream.stop_stream()
self.output_stream.close()
print("output stream closed")
if __name__ == "__main__":
program = Main()
But when I want to terminate the call from server using: self.channel.send(message) in server console I see: AttributeError: 'WebRtcServer' object has no attribute 'channel'. Did you mean: 'channels'? but the channel variable was set in
@pc.on("datachannel")
async def on_datachannel(channel):
self.channel = channel
print(self.channel) #prints: <aiortc.rtcdatachannel.RTCDataChannel object at 0x0000019F5F4AD590>
...
What's wrong?