Resolving ElephantIO ServerConnectionFailureException: Error establishing connection to server

23 Views Asked by At

I need to add socket support in the backend for real-time chat. Initially, I implemented an automatic reply feature in the API for a mobile app. This feature automatically replies after a user sends a message to another person. Although the queue is working, it's not real-time. Now, I want to add a socket in the backend so the user can receive the automatic message in real time.

use ElephantIO\Client;
use ElephantIO\Engine\SocketIO\Version2X;
class UserChatbotAuto implements ShouldQueue
{

    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $request;
    protected $user;
    protected $curentUser;
    protected $token;
    public function __construct(array $request, $token, $curentUser, $user)
    {
        //
        $this->request = $request;
        $this->user = $user;
        $this->curentUser = $curentUser;
        $this->token = $token;
    }

    public function handle()
    {
        $user = $this->user;
        $curentUser = $this->curentUser;
        $request = $this->request;
        $token = $this->token;
        try {
            Log::info('start queue' );
            $thread = $user->getThreadWithUser($curentUser);
            $option = [
                // 'handshake' => [
                    'auth' => [
                        'token' => 'Bearer ' . $token,
                        'threadWith' => $thread->id
                    ]
                // ]
            ];
            $yourApiKey = config('services.openai.secret');
            $client = OpenAI::client($yourApiKey);

            $result = $client->chat()->create([
                'model' => 'gpt-4',
                'messages' => [
                    [
                        "role" => "system",
                        "content" => "You are a mental health adviser, skilled in giving mental health related advice. Please answer in the language after the word question. No yapping"
                    ],
                    ['role' => 'user', 'content' => "give mental health advice for given question. my name is: " . $curentUser->name . ", only give the advice text don't write anything else. question: " . $request['message']],
                ],
            ]);

            $content_response = $result->choices[0]->message->content;
            Log::info('content_response: ' . $content_response);
            $message = Message::create([
                'thread_id' => $thread->id,
                'user_id' => $user->id,
                'body' => $content_response,
            ]);

            $client = new Client(new Version2X('http://19.......11:8001', $option));
            $client->initialize();
            $client->emit('sendMessageToUser', ['userId' => $user->id, 'message' => $content_response]);
            $client->close();
        } catch (\Exception $e) {
            Log::error($e);
        }
        Log::info('done queue' );
    }
}

This is the socket configuration file named server.js, in the mobile, it still working if a real user chat to a real user and the result is both user chat in real time

    require("dotenv").config();
const express = require("express");
const fetch = require("node-fetch");
const app = express();
const server = require("http").createServer(app);
const mysql = require("mysql2");

const baseUrl = "http://1........1:8001";

const io = require("socket.io")(server, {
  cors: {
    origin: "*",
  },
});

// Create a connection pool
const pool = mysql.createPool({
  host: process.env.DB_HOST,
  user: process.env.DB_USERNAME,
  password: process.env.DB_PASSWORD,
  database: process.env.DB_DATABASE,
  waitForConnections: true,
  connectionLimit: 10,
  queueLimit: 0,
});

const userConnectionDb = [];

const findConnectionIndexById = (socketId) => {
  const index = userConnectionDb.findIndex(
    (user) => user.socketId === socketId
  );
  if (index === -1) {
    return null;
  }

  return index;
};

const findByUserId = (userId) => {
  const index = userConnectionDb.findIndex((user) => user.userId === userId);
  if (index === -1) {
    return null;
  }

  return index;
};

const getSocketIdByUserId = (userId) => {
  const index = userConnectionDb.findIndex((user) => user.userId === userId);

  if (index !== -1) {
    return userConnectionDb[index].socketId;
  } else {
    return null;
  }
};

const validateUser = async (authToken) => {
  try {
    let user = null;
    //console.lo;
    const endPoint = baseUrl + "/api/profile/socket-profile";
    const options = {
      method: "GET",
      headers: {
        "Content-Type": "application/json",
        Accept: "application/json",
        Authorization: authToken,
      },
    };
    const response = await fetch(endPoint, options);
    if (!response.ok) {
      console.log({ status: response.status });
      throw new Error("Network response was not OK");
    }
    const responseData = await response.json();
    const userData = {
      userId: responseData.id,
    };
    user = userData;
    return { user, error: null };
  } catch (error) {
    console.log(error);
    return { user: null, error: error };
  }
};

const sendMessageToServer = async (senderToken, receiver, message) => {
  try {
    let user = null;
    const endPoint = baseUrl + "/api/message/send-socket-message";
    const options = {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Accept: "application/json",
        Authorization: senderToken,
      },
      body: JSON.stringify({ user_id: receiver, message }),
    };
    const response = await fetch(endPoint, options);
    if (!response.ok) {
      console.log({ status: response.status });
      throw new Error("Network response was not OK");
    }
    const responseData = await response.json();
    // console.log("message sent", responseData);
    return { data: responseData, error: null };
  } catch (error) {
    return { data: null, error };
  }
};

// Middleware to handle authentication
io.use(async (socket, next) => {
  try {
    const token = socket.handshake.auth.token;
    const threadWith = socket.handshake.auth.threadWith;

    // Perform authentication logic (e.g., verify the token)
    const { user, error } = await validateUser(token);
    if (error) throw new Error(error);
    if (!user) {
      // Authentication failed, reject the connection
      return next(new Error({ message: "Authentication failed", code: 401 }));
    }
    const userIndex = findByUserId(user.userId);
    if (userIndex !== null) {
      userConnectionDb.splice(userIndex, 1);
    }
    userConnectionDb.push({
      userId: user.userId,
      socketId: socket.id,
      threadWith: threadWith || null,
      token,
    });
    return next();
  } catch (error) {
    console.log(error);
    return next(new Error({ message: "Server error", code: 500 }));
  }
});

io.on("connection", (socket) => {
  console.log("New client connected Total users:", userConnectionDb.length);

  socket.on("message", (message) => {
    console.log("Received message:", message);
  });

  socket.on("disconnect", () => {
    const userIndex = findConnectionIndexById(socket.id);
    if (userIndex !== null) {
      userConnectionDb.splice(userIndex, 1);
    }
    console.log("Client disconnected Total users:", userConnectionDb.length);
  });

  // Handling the client's custom event
  socket.on("sendMessageToUser", async (data, callback) => {
    const { userId, message } = data;
    let callbackData = {
      success: true,
      online: false,
      data: null,
    };
    const socketId = getSocketIdByUserId(userId);
    if (socketId) {
      const otherUserIndex = findByUserId(userId);
      const currentUserIndex = findConnectionIndexById(socket.id);
      if (otherUserIndex === null || currentUserIndex === null) {
        callbackData.success = false;
        callback(callbackData);
        return;
      }
      const currentUserId = userConnectionDb[currentUserIndex].userId;
      const senderToken = userConnectionDb[currentUserIndex].token;
      const threadWithId = userConnectionDb[otherUserIndex].threadWith;
      // console.log({ threadWithId, currentUserId });

      if (threadWithId === currentUserId) {
        const { data, error } = await sendMessageToServer(
          senderToken,
          userId,
          message
        );
        if (error) {
          console.log(error);
          return;
        }
        io.to(socketId).emit("customMessage", {
          sendBy: currentUserId,
          data: data,
        });
        callbackData.online = true;
        callbackData.success = true;
        callbackData.data = data;
      } else {
        // console.log("Use is not on same thread");
      }
    } else {
      // console.log(" no user online with this id ");
    }
    // Send acknowledgment back to the client
    callback(callbackData);
  });
});

const port = 8001;
server.listen(port, () => {
  console.log(`Server is running on port ${port}`);
});

I tried calling the API to call the function and execute the queue above, but it returned an error:

ElephantIO\Exception\ServerConnectionFailureException: An error occurred while trying to establish a connection to the server in C:\xampp\htdocs\Project\tongle_latest\vendor\wisembly\elephant.io\src\Engine\SocketIO\Version1X.php:187

What can I do? Does anyone have any solution?

0

There are 0 best solutions below