NestJS : "@socket.io/redis-adapter": "^8.2.1",

77 Views Asked by At

I'm trying to set up a Redis Adapter with NestJS and socket.io for the Websocket. But I have some problems between my Adapter and my Gateway.

redis.adapter.ts

import { IoAdapter } from '@nestjs/platform-socket.io';
import { ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import { ConfigService } from '@nestjs/config';
import { Logger } from '@nestjs/common';

export class RedisIoAdapter extends IoAdapter {
    private adapterConstructor: ReturnType<typeof createAdapter>;
    private readonly logger = new Logger(RedisIoAdapter.name);
    private server: any;

    constructor(private readonly config: ConfigService) {
        super();
    }

    async connectToRedis() {
        const pubClient = createClient({
            socket: {
                host: this.config.get('REDIS_HOST'),
                port: this.config.get<number>('REDIS_PORT'),
            },
            password: this.config.get('REDIS_PASS'),
        });
        const subClient = pubClient.duplicate();

        await Promise.all([pubClient.connect(), subClient.connect()]);
        this.adapterConstructor = createAdapter(pubClient, subClient);
        if (pubClient.isReady && subClient.isReady) {
            this.logger.log('Connected to Redis client.');
        }
    }

    createIOServer(port: number, options?: ServerOptions) {
        this.server = super.createIOServer(port, options);
        this.server.adapter(this.adapterConstructor);
        return this.server;
    }
}

main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { RedisIoAdapter } from './adapters/redis.adapter';
import { ConfigService } from '@nestjs/config';

async function bootstrap() {
    const app = await NestFactory.create(AppModule);
    const config = app.get(ConfigService);

    const redisIoAdapter = new RedisIoAdapter(config);
    await redisIoAdapter.connectToRedis();
    app.useWebSocketAdapter(redisIoAdapter);

    await app.listen(config.get<number>('PORT'));
}
bootstrap();

event.gateway.ts


import {
    SubscribeMessage,
    WebSocketGateway,
    WebSocketServer,
    OnGatewayConnection,
    OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Socket, Server } from 'socket.io';
import { Logger } from '@nestjs/common';

@WebSocketGateway({
    cors: {
        origin: '*',
    },
})
export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
    @WebSocketServer() server: Server;
    private logger: Logger = new Logger('TestGateway');

    @SubscribeMessage('test')
    handleConnectUser(client: any, data: any): void {
        console.log(client);
        console.log('Received WebSocket message: ', data);
    }

    afterInit() {
        return this.logger.log('Init');
    }

    public handleDisconnect(client: Socket): void {
        return this.logger.log(`Client disconnected: ${client.id}`);
    }

    public handleConnection(client: Socket): void {
        return this.logger.log(`Client connected: ${client.id}`);
    }
}

From other NestJS app i do

await this.redisClient.publish('test', 'Test Message');

My publish is successfully added to Redis

1695261647.971334 [.....] "PUBLISH" "test" "0"

But my Gateway event are not trigger. I reread the docs on NestJS, I looked for other posts but nothing worked.

package.json

  "dependencies": {
    "@nestjs/common": "^10.2.5",
    "@nestjs/config": "^3.1.1",
    "@nestjs/core": "^10.2.5",
    "@nestjs/platform-express": "^10.2.5",
    "@nestjs/platform-socket.io": "^10.2.5",
    "@nestjs/websockets": "^10.2.5",
    "@prisma/client": "5.3.1",
    "@socket.io/redis-adapter": "^8.2.1",
    "redis": "^4.6.9",
    "reflect-metadata": "^0.1.13",
    "rxjs": "^7.8.1",
    "socket.io": "^4.7.2",
    "socket.io-redis": "^5.4.0",
    "ws": "^8.14.2"
  },
  "devDependencies": {
    "@nestjs/cli": "^10.1.17",
    "@nestjs/schematics": "^10.0.2",
    "@nestjs/testing": "^10.2.5",
    "@types/express": "^4.17.17",
    "@types/jest": "^29.5.5",
    "@types/node": "^20.6.3",
    "@types/supertest": "^2.0.12",
    "@typescript-eslint/eslint-plugin": "^6.7.2",
    "@typescript-eslint/parser": "^6.7.2",
    "eslint": "^8.49.0",
    "eslint-config-prettier": "^9.0.0",
    "eslint-plugin-prettier": "^5.0.0",
    "jest": "^29.7.0",
    "prettier": "^3.0.3",
    "prisma": "5.3.1",
    "source-map-support": "^0.5.21",
    "supertest": "^6.3.3",
    "ts-jest": "^29.1.1",
    "ts-loader": "^9.4.4",
    "ts-node": "^10.9.1",
    "tsconfig-paths": "^4.2.0",
    "typescript": "^5.2.2"
  },

if I add these lines in my connectToRedis() in class RedisIoAdapter:

        const subClient = pubClient.duplicate();
        subClient.subscribe('test', (message) => {
            console.log(message);
        });

I receive the publish data on redis, but how can I ensure that it is my event in my Gateway that is triggered ?

1

There are 1 best solutions below

0
TheZerbibi On

Ok it's fix

The change :

  • ioredis instead redis for the connection
  • I subcribe channels
  • I redirect to method in my Gateway

redis.adapter.ts

import { IoAdapter } from '@nestjs/platform-socket.io';
import { ServerOptions } from 'socket.io';
import { Redis } from 'ioredis';
import { ConfigService } from '@nestjs/config';
import { Logger } from '@nestjs/common';
import { GameGateway } from 'src/gateway/game.gateway';
import { createAdapter } from '@socket.io/redis-adapter';

export class RedisIoAdapter extends IoAdapter {
    private readonly logger = new Logger(RedisIoAdapter.name);
    private adapterConstructor: ReturnType<typeof createAdapter>;
    private server: any;

    constructor(
        private readonly app,
        private readonly config: ConfigService,
    ) {
        super(app);
    }

    async connectToRedis() {
        const pubClient = new Redis({
            host: this.config.get('REDIS_HOST'),
            port: this.config.get<number>('REDIS_PORT'),
            password: this.config.get('REDIS_PASS'),
        });
        const subClient = pubClient.duplicate();

        subClient.subscribe('new-connection');
        subClient.on('connect', () => {
            this.logger.log('Redis subClient connected');
        });

        subClient.on('message', (channel, message) => {
            if (channel === 'new-connection') this.app.get(GameGateway).handleRedisMessage(channel, message);
        });
        this.adapterConstructor = createAdapter(pubClient, subClient);
    }

    createIOServer(port: number, options?: ServerOptions) {
        const server = super.createIOServer(port, options);
        server.adapter(this.adapterConstructor);
        return server;
    }
}

game.gateway.ts

import {
    SubscribeMessage,
    WebSocketGateway,
    WebSocketServer,
    OnGatewayConnection,
    OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Socket, Server } from 'socket.io';
import { Logger } from '@nestjs/common';

@WebSocketGateway({
    cors: {
        origin: '*',
    },
})
export class GameGateway implements OnGatewayConnection, OnGatewayDisconnect {
    private logger: Logger = new Logger('TestGateway');
    @WebSocketServer() server: Server;

    @SubscribeMessage('new-connection')
    newConnection(client: any, data: any): void {
        this.handleRedisMessage(client, data);
    }

    handleRedisMessage(channel: string, message: string): void {
        console.log('redis-message', { channel, message });
    }

    public afterInit() {
        return this.logger.log('Init');
    }

    public handleDisconnect(client: Socket): void {
        return this.logger.log(`Client disconnected: ${client.id}`);
    }

    public handleConnection(client: Socket): void {
        console.log("New users");
    }
}

It's not perfect, i will probably improve certain parts (besides, don't hesitate if you have any advice!)