aiortc channel.send ERROR: AttributeError: 'WebRtcServer' object has no attribute 'channel'. Did you mean: 'channels'?

44 Views Asked by At

I am trying to established a connection between a pyqt5 python user and a html5 browser user.

From the server side i run:

from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import pyqtSignal, QThread, Qt
from calls import Ui_Dialog
from aiohttp import web
from aiohttp.web_runner import GracefulExit
from aiortc.mediastreams import MediaStreamTrack,MediaStreamError
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer
from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder, MediaRelay
import av
import pyaudio
from pydub import AudioSegment
import asyncio
import json
import os
from multiprocessing import Process, Queue, Pipe, freeze_support
from queue import Queue as Simple_Queue
import sys
import threading
from datetime import datetime, timedelta
from time import sleep
import fractions
import time
import requests

stream_offer = None
pc = None
micTrack = None
blackHole = None

class Main:
    def __init__(self):
        self.app = QtWidgets.QApplication(sys.argv)
        self.Dialog = QtWidgets.QDialog()
        self.ui = Ui_Dialog()
        self.ui.setupUi(self.Dialog)
        
        self.Dialog.show()
        
        self.ui.label.hide()
        self.ui.pushButton.hide()
        self.ui.pushButton_2.hide()
        self.ui.pushButton_3.hide()

        self.mother_pipe, self.child_pipe = Pipe()
        self.queue = Queue()
        self.emitter = Emitter(self.mother_pipe)
        self.emitter.call_offering.connect(lambda name,surname:self.new_call(name,surname))
        self.emitter.call_status.connect(lambda status:self.call_status(status))
        self.emitter.start()        
        
        self.aiohttp_server = WebRtcServer(self.child_pipe,self.queue)
        self.aiohttp_server.start()
        
        self.Dialog.closeEvent = lambda event:self.closeEvent(event)
        
        sys.exit(self.app.exec_())

    def new_call(self,name,surname):
        self.ui.label.show()
        self.ui.label.setText("Τηλεφωνική κλήση από: "+str(name)+" "+str(surname))
        
        self.ui.pushButton.show()
        self.ui.pushButton.clicked.connect(lambda state:self.answer_call(state))
        
        self.ui.pushButton_2.show()
        self.ui.pushButton_2.clicked.connect(lambda state:self.reject_call(state))
    
    def answer_call(self,state):
        self.ui.pushButton.hide()
        self.ui.pushButton_2.hide()
        self.ui.pushButton_3.show()
        self.ui.pushButton_3.clicked.connect(lambda state:self.end_call(state))
        self.queue.put({"type":"call","call":"answer"})
        
    def reject_call(self,state):
        self.ui.label.hide()
        self.ui.pushButton.hide()
        self.ui.pushButton_2.hide()
        self.queue.put({"type":"call","call":"reject"})
        
    def end_call(self,state):
        self.ui.label.hide()
        self.ui.pushButton.hide()
        self.ui.pushButton_2.hide()
        self.ui.pushButton_3.hide()
        self.aiohttp_server.send_channel_message("closing")
    
    def call_status(self,status):
        if status == "closed-by-client":
            self.ui.label.hide()
            self.ui.pushButton.hide()
            self.ui.pushButton_2.hide()
            self.ui.pushButton_3.hide()
            
    def closeEvent(self,event):
        #try:
        #   response = requests.post('http://192.168.1.188:8080/shutdown', timeout=30)
        #except:
        #   pass
        self.aiohttp_server.terminate()
        event.accept()


class Emitter(QThread):
    call_offering = pyqtSignal(str,str)
    call_status = pyqtSignal(str)
    
    def __init__(self, from_process: Pipe):
        super().__init__()
        self.data_from_process = from_process
        
    def run(self):
        while True:
            data = self.data_from_process.recv()
            if data["type"]=="call_offering":
                self.call_offering.emit(data["name"],data["surname"])
            elif data["type"] == "call-status":
                self.call_status.emit(data["status"])


class WebRtcServer(Process):
    def __init__(self, to_emitter, from_mother):
        super().__init__()
        self.ROOT = os.path.dirname(__file__)
        self.pc = None
        self.channels = []
        self.stream_offer = None
        self.micTrack = None
        self.blackHole = None
        self.to_emitter = to_emitter
        self.data_from_mother = from_mother
    
    def run(self):
        self.app = web.Application()
        self.app.on_shutdown.append(self.on_shutdown)
        self.app.router.add_get("/", self.index)
        self.app.router.add_get("/telephone_calls.js", self.javascript)
        self.app.router.add_post("/offer", self.offer)
        self.app.router.add_post("/shutdown", self.shutdown_aiohttp)
        web.run_app(self.app, access_log=None, host="192.168.1.188", port=8080, ssl_context=None)

    async def index(self,request):
        content = open(os.path.join(self.ROOT, "index.html"), encoding="utf8").read()
        return web.Response(content_type="text/html", text=content)

    async def javascript(self,request):
        content = open(os.path.join(self.ROOT, "telephone_calls.js"), encoding="utf8").read()
        return web.Response(content_type="application/javascript", text=content)

    async def offer(self,request):  
        params = await request.json()
        offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
        name = params["name"]
        surname = params["surname"]
        self.offer_in_progress = True
        
        self.to_emitter.send({"type":"call_offering","name":name,"surname":surname})
        timer = 0
        while(timer<30 or self.data_from_mother.qsize()==0):
            timer+=0.1
            await asyncio.sleep(0.1)
            
        if self.data_from_mother.qsize() == 0:
            #reject offer
            return web.Response(content_type="application/json",text=json.dumps({"sdp": "", "type": ""}))
        else:
            data = self.data_from_mother.get()
            if data["type"] == "call" and data["call"] == "answer":
                pc = RTCPeerConnection()
                self.pc = pc
                self.stream_offer = Server_Stream_Offer()
                self.pc.addTrack(self.stream_offer)

                @pc.on("track")
                async def on_track(track):  
                    self.micTrack = ClientTrack(track)
                    self.blackHole = MediaBlackhole()
                    self.blackHole.addTrack(self.micTrack)
                    await self.blackHole.start()

                    @track.on("ended")
                    async def on_ended():
                        pass

                @pc.on("connectionstatechange")
                async def on_connectionstatechange():
                    if self.pc.connectionState == "closed" or self.pc.connectionState == "failed":
                        try:
                            print("1")
                            if self.stream_offer is not None:
                                self.stream_offer.stop()
                                self.stream_offer.stop_offering()
                            del self.stream_offer
                            self.stream_offer = None
                            print("self.stream_offer = None")
                            
                            if self.blackHole is not None:
                                await self.blackHole.stop()
                                self.blackHole = None
                            print("Black hole terminated")
                            
                            if self.micTrack is not None:
                                self.micTrack.close_full()
                                del self.micTrack
                                self.micTrack = None
                            print("Microphone track terminated")
                            
                            print("Closing pc")
                            if pc is not None:
                                await pc.close()
                            print("Pc closed")
                            self.to_emitter.send({"type":"call-status","status":"closed-by-client"})
                            self.channel = None
                            #self.offer_in_progress = False
                            #self.manage_call_end_thread.join()
                        except Exception as e:
                            print(e)


                            
                @pc.on("datachannel")
                async def on_datachannel(channel):
                    self.channel = channel
                    @channel.on("message")
                    async def on_message(message):
                        if message == "disconnected":
                            try:
                                print("1")
                                if self.stream_offer is not None:
                                    self.stream_offer.stop()
                                    self.stream_offer.stop_offering()
                                del self.stream_offer
                                self.stream_offer = None
                                print("self.stream_offer = None")
                                
                                if self.blackHole is not None:
                                    await self.blackHole.stop()
                                    self.blackHole = None
                                print("Black hole terminated")
                                
                                if self.micTrack is not None:
                                    self.micTrack.close_full()
                                    del self.micTrack
                                    self.micTrack = None
                                print("Microphone track terminated")
                                
                                print("Closing pc")
                                if pc is not None:
                                    await pc.close()
                                print("Pc closed")
                                self.to_emitter.send({"type":"call-status","status":"closed-by-client"})
                                self.channel = None
                                #self.offer_in_progress = False
                                #self.manage_call_end_thread.join()
                            except Exception as e:
                                print(e)

                # handle offer
                await self.pc.setRemoteDescription(offer)

                # send answer
                answer = await self.pc.createAnswer()
                await self.pc.setLocalDescription(answer)

                #self.manage_call_end_thread = threading.Thread(target=self.manage_call_end)
                #self.manage_call_end_thread.start()
                            
                return web.Response(content_type="application/json",text=json.dumps({"sdp": self.pc.localDescription.sdp, "type": self.pc.localDescription.type}))
            else:
                #reject call
                return web.Response(content_type="application/json",text=json.dumps({"sdp": "", "type": ""}))

    def send_channel_message(self,message):
        self.channel.send(message)
    
    
    async def on_shutdown(self,app):
        try:
            if self.stream_offer is not None:
                self.stream_offer.stop_offering()
                self.stream_offer.stop()
            del self.stream_offer
            self.stream_offer = None
            if self.pc is not None:
                coros = [self.pc.close()]
                await asyncio.gather(*coros)
            if self.micTrack is not None:
                self.micTrack.close_full()
            if self.blackHole is not None:
                await self.blackHole.stop()
            raise GracefulExit()
        except Exception as e:
            print(e)

    async def shutdown_aiohttp(self,request):
        await self.on_shutdown(self.app)
        return web.Response(content_type="text/html", text="")


class Server_Stream_Offer(MediaStreamTrack):
    kind = "audio"

    def __init__(self):
        super().__init__()  # don't forget this!
        
        self.q = Simple_Queue()
        
        self.codec = av.CodecContext.create('pcm_s16le', 'r')
        self.codec.sample_rate = 8000
        self.codec.channels = 2

        self.audio_samples = 0

        self.p = pyaudio.PyAudio()
        self.input_stream = self.p.open(format=pyaudio.paInt16,channels=2,rate=8000,input=True,frames_per_buffer=int(8000*0.020))
        self.input_stream.start_stream()
        
        self.run = True
        
        self.read_from_microphone_thread = threading.Thread(target=self.read_from_microphone)
        self.read_from_microphone_thread.start()
        
    async def recv(self):
        packet = av.Packet(self.q.get())
        frame = self.codec.decode(packet)[0]
        frame.pts = self.audio_samples
        frame.time_base = fractions.Fraction(1, self.codec.sample_rate)
        self.audio_samples += frame.samples
        return frame

    def read_from_microphone(self):
        while(self.run):
            in_data = self.input_stream.read(int(8000*0.020),exception_on_overflow = False)            
            slice = AudioSegment(in_data, sample_width=2, frame_rate=8000, channels=2)
            self.q.put(slice.raw_data)

    def stop_offering(self):
        try:
            self.run = False
            self.input_stream.stop_stream()
            self.input_stream.close()
            self.read_from_microphone_thread.join()
            print("Read from microphone thread terminated")
        except Exception as e:
            print(e)
            
class ClientTrack(MediaStreamTrack):
    kind = "audio"

    def __init__(self, track):
        super().__init__()
        self.track = track
        self.q = Simple_Queue()
        self.p = pyaudio.PyAudio()
        self.output_stream = self.p.open(format=pyaudio.paInt16,channels=2,rate=44800,output=True,frames_per_buffer=int(16384/4))
        self.output_stream.start_stream()
        
        self.run = True
        self.hear_client_thread = threading.Thread(target=self.hear_client)
        self.hear_client_thread.start()
        
    async def recv(self):
        # Get a new PyAV frame

        try:
            frame = await self.track.recv()
        except:
            self.track = None
            if self.run:
                self.close_full()
                raise MediaStreamError
                return None
            else:
                raise MediaStreamError
                return None
            
        self.q.put(frame)
        
        
    def hear_client(self):
        while(self.run):
            frame = self.q.get()
            packet_bytes = frame.to_ndarray().tobytes()
            self.output_stream.write(packet_bytes)
        
    def close_full(self):
        self.run = False
        self.hear_client_thread.join()
        self.output_stream.stop_stream()
        self.output_stream.close()
        print("output stream closed")

if __name__ == "__main__":
    program = Main()

But when I want to terminate the call from server using: self.channel.send(message) in server console I see: AttributeError: 'WebRtcServer' object has no attribute 'channel'. Did you mean: 'channels'? but the channel variable was set in

@pc.on("datachannel")
    async def on_datachannel(channel):
        self.channel = channel
        print(self.channel) #prints: <aiortc.rtcdatachannel.RTCDataChannel object at 0x0000019F5F4AD590>
        ...

What's wrong?

0

There are 0 best solutions below