Autogen and Api

304 Views Asked by At

I am fairly new to programming and need some guidance. Hoping to find some help here. I have an Autogen Group chat I am trying to connect to a react frontend via FastAPI and Websockets. I can get it to work and return the responses to the front end with just a User_proxy and one assistant. Like so:

autogen_chat.py: 

import autogen
from user_proxy_webagent import UserProxyWebAgent
import asyncio

config_list = [
    {
        "model": "gpt-3.5-turbo",
    }
]

llm_config = {
    "model":"gpt-3.5-turbo-0613",
    "temperature": 0,
    "config_list": config_list,
}

class AutogenChat():
    def __init__(self, chat_id=None, websocket=None):
        self.websocket = websocket
        self.chat_id = chat_id
        self.client_sent_queue = asyncio.Queue()
        self.client_receive_queue = asyncio.Queue()

        self.assistant = autogen.AssistantAgent(
            name="assistant",
            llm_config=llm_config,
            system_message="""You are a helpful assistant"""
        )
        self.user_proxy = UserProxyWebAgent(  
            name="user_proxy",
            human_input_mode="ALWAYS", 
            max_consecutive_auto_reply=10,
            is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
            code_execution_config=False,
        )

        # add the queues to communicate 
        self.user_proxy.set_queues(self.client_sent_queue, self.client_receive_queue)

    async def start(self, message):
        await self.user_proxy.a_initiate_chat(
            self.assistant,
            clear_history=True,
            message=message
        )


user_proxy_webagent.py:

import autogen
from autogen import Agent, ConversableAgent
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
try:
    from termcolor import colored
except ImportError:
    def colored(x, *args, **kwargs):
        return x

class UserProxyWebAgent(autogen.UserProxyAgent):
    def __init__(self, *args, **kwargs):
        super(UserProxyWebAgent, self).__init__(*args, **kwargs)
        self._reply_func_list = []
        self.register_reply([Agent, None], ConversableAgent.generate_oai_reply)
        self.register_reply([Agent, None], ConversableAgent.generate_code_execution_reply)
        self.register_reply([Agent, None], ConversableAgent.generate_function_call_reply)
        self.register_reply([Agent, None], UserProxyWebAgent.a_check_termination_and_human_reply)

    async def a_check_termination_and_human_reply(
        self,
        messages: Optional[List[Dict]] = None,
        sender: Optional[Agent] = None,
        config: Optional[Any] = None,
    ) -> Tuple[bool, Union[str, Dict, None]]:
        """Check if the conversation should be terminated, and if human reply is provided."""
        if config is None:
            config = self
        if messages is None:
            messages = self._oai_messages[sender]
        message = messages[-1]
        reply = ""
        no_human_input_msg = ""
        if self.human_input_mode == "ALWAYS":
            reply = await self.a_get_human_input(
                f"Provide feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to end the conversation: "
            )
            no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
            # if the human input is empty, and the message is a termination message, then we will terminate the conversation
            reply = reply if reply or not self._is_termination_msg(message) else "exit"
        else:
            if self._consecutive_auto_reply_counter[sender] >= self._max_consecutive_auto_reply_dict[sender]:
                if self.human_input_mode == "NEVER":
                    reply = "exit"
                else:
                    # self.human_input_mode == "TERMINATE":
                    terminate = self._is_termination_msg(message)
                    reply = await self.a_get_human_input(
                        f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: "
                        if terminate
                        else f"Please give feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to stop the conversation: "
                    )
                    no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
                    # if the human input is empty, and the message is a termination message, then we will terminate the conversation
                    reply = reply if reply or not terminate else "exit"
            elif self._is_termination_msg(message):
                if self.human_input_mode == "NEVER":
                    reply = "exit"
                else:
                    # self.human_input_mode == "TERMINATE":
                    reply = await self.a_get_human_input(
                        f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: "
                    )
                    no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
                    # if the human input is empty, and the message is a termination message, then we will terminate the conversation
                    reply = reply or "exit"

        # print the no_human_input_msg
        if no_human_input_msg:
            print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True)

        # stop the conversation
        if reply == "exit":
            # reset the consecutive_auto_reply_counter
            self._consecutive_auto_reply_counter[sender] = 0
            return True, None

        # send the human reply
        if reply or self._max_consecutive_auto_reply_dict[sender] == 0:
            # reset the consecutive_auto_reply_counter
            self._consecutive_auto_reply_counter[sender] = 0
            return True, reply

        # increment the consecutive_auto_reply_counter
        self._consecutive_auto_reply_counter[sender] += 1
        if self.human_input_mode != "NEVER":
            print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True)

        return False, None

    def set_queues(self, client_sent_queue, client_receive_queue):
        self.client_sent_queue = client_sent_queue
        self.client_receive_queue = client_receive_queue

    async def a_get_human_input(self, prompt: str) -> str:
        last_message = self.last_message()
        if last_message["content"]:
            await self.client_receive_queue.put(last_message["content"])
            reply = await self.client_sent_queue.get()
            if reply and reply == "DO_FINISH":
                return "exit"
            return reply
        else:
            return 

main.py:

from fastapi import FastAPI, WebSocket, Request
from fastapi.responses import HTMLResponse
import uuid
from autogen_group_chat import AutogenChat
import asyncio
import uvicorn
from dotenv import load_dotenv, find_dotenv
import openai
import os

_ = load_dotenv(find_dotenv()) # read local .env file
openai.api_key = os.environ['OPENAI_API_KEY']
# openai.log='debug'

app = FastAPI()
app.autogen_chat = {}


class ConnectionManager:
    def __init__(self):
        self.active_connections: list[AutogenChat] = []

    async def connect(self, autogen_chat: AutogenChat):
        await autogen_chat.websocket.accept()
        self.active_connections.append(autogen_chat)

    async def disconnect(self, autogen_chat: AutogenChat):
        autogen_chat.client_receive_queue.put_nowait("DO_FINISH")
        print(f"autogen_chat {autogen_chat.chat_id} disconnected")
        self.active_connections.remove(autogen_chat)


manager = ConnectionManager()


async def send_to_client(autogen_chat: AutogenChat):
    while True:
        reply = await autogen_chat.client_receive_queue.get()
        if reply and reply == "DO_FINISH":
            autogen_chat.client_receive_queue.task_done()
            break
        await autogen_chat.websocket.send_text(reply)
        autogen_chat.client_receive_queue.task_done()
        await asyncio.sleep(0.05)

async def receive_from_client(autogen_chat: AutogenChat):
    while True:
        data = await autogen_chat.websocket.receive_text()
        if data and data == "DO_FINISH":
            await autogen_chat.client_receive_queue.put("DO_FINISH")
            await autogen_chat.client_sent_queue.put("DO_FINISH")
            break
        await autogen_chat.client_sent_queue.put(data)
        await asyncio.sleep(0.05)

@app.websocket("/ws/{chat_id}")
async def websocket_endpoint(websocket: WebSocket, chat_id: str):
    try:
        autogen_chat = AutogenChat(chat_id=chat_id, websocket=websocket)
        await manager.connect(autogen_chat)
        data = await autogen_chat.websocket.receive_text()
        future_calls = asyncio.gather(send_to_client(autogen_chat), receive_from_client(autogen_chat))
        await autogen_chat.start(data)
        print("DO_FINISHED")
    except Exception as e:
        print("ERROR", str(e))
    finally:
        try:
            await manager.disconnect(autogen_chat)
        except:
            pass

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

But when I try to add the group chat the interface disconnects. I want it to return each response of all the Assisants one at a time providing all of them the ability to ask questions back to the user. I use the same files as above and just replace the autogen_chat.py with autogen_groupchat in the main.py file.

autogen_groupchat.py:

import autogen
from user_proxy_webagent import UserProxyWebAgent
import asyncio

config_list = [
    {
        "model": "gpt-3.5-turbo",
    }
]


llm_config_assistant = {
    "model":"gpt-3.5-turbo",
    "temperature": 0,
    "config_list": config_list,
}
llm_config_proxy = {
    "model":"gpt-3.5-turbo-0613",
    "temperature": 0,
    "config_list": config_list,
}



class AutogenChat():
    def __init__(self, chat_id=None, websocket=None):
        self.websocket = websocket
        self.chat_id = chat_id
        self.client_sent_queue = asyncio.Queue()
        self.client_receive_queue = asyncio.Queue()

        self.diagnosis = autogen.AssistantAgent(
            name="specialist",
            llm_config=llm_config_assistant,
            max_consecutive_auto_reply=0,
        )
        self.instructions = autogen.AssistantAgent(
            name="instruction_writer",
            llm_config=llm_config_assistant,
            max_consecutive_auto_reply=0,
        )
        self.sourcing = autogen.AssistantAgent(
            name="sourcing_agent",
            llm_config=llm_config_assistant,
            max_consecutive_auto_reply=0,
        )

        self.user_proxy = UserProxyWebAgent(  
            name="user_proxy",
            human_input_mode="NEVER", 
            max_consecutive_auto_reply=10,
            is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
            code_execution_config=False,
        )

        # add the queues to communicate 
        self.user_proxy.set_queues(self.client_sent_queue, self.client_receive_queue)
        

        self.groupchat = autogen.GroupChat(agents=[self.diagnosis, self.instructions, self.sourcing], messages=[], max_round=20)
        self.manager = autogen.GroupChatManager(groupchat=self.groupchat, 
            llm_config=llm_config_assistant,
            human_input_mode="NEVER" )     

    async def start(self, message):
        await self.user_proxy.a_initiate_chat(
            self.manager,
            clear_history=True,
            message=message
        )

Is this the best approach to accomplish this? If so can someone provide me some guidance on how to get this operating properly. Thank you!

1

There are 1 best solutions below

0
Sebastian S. On

For an alternative approach, consider monkey patching the _print_received_message in conversable_agent.py. For an in-depth tutorial on implementing this solution, refer to the video at this link: https://www.youtube.com/watch?v=dW-qr_ntOgc&t=173s