Integrating AWS SNS to NestJS Microservices

68 Views Asked by At

I'm currently working on integrating AWS SNS into my NestJS application using AWS Lambda. To achieve this, I'm following those instructions to add a custom transport strategy.

let cachedSNSServer;
async function bootstrapSNSServer(event: SNSEvent, context: Context): Promise<INestMicroservice> {
  if (!cachedSNSServer) {
    cachedSNSServer = await NestFactory.createMicroservice<MicroserviceOptions>(
      AppModule,
      { strategy: new AWSSNSServer(event, context) }
    );
  }
  return cachedSNSServer;
}

// SNS Handler
export const handler: Handler = async (event: SNSEvent, context: Context) => {
  cachedSNSServer = await bootstrapSNSServer(event, context);
  await cachedSNSServer.listen();
};

The main issue I'm encountering is related to the caching mechanism. Specifically, the event object, which contains the initial SNS message, is being cached inside the AWSSNSServer instance.

This means that while the first message is processed correctly, subsequent messages are being ignored because the event object is not updated with new messages.

Here is the code of AWSSNSServer :

import { Logger } from '@nestjs/common'
import { CustomTransportStrategy, MessageHandler, Server } from '@nestjs/microservices'
import { Context, SNSEvent, SNSEventRecord } from 'aws-lambda'

export class AWSSNSServer extends Server implements CustomTransportStrategy {
  event: SNSEvent
  context: Context

  constructor(event: SNSEvent, context: Context) {
    super()

    this.event = event
    this.context = context
  }

  async listen(callback: () => void) {
    for await (const incomingRecord of this.getRecords()) {
      const messageAttributes = this.getMessageAttributes(incomingRecord)
      const msg = this.getMessage(incomingRecord)

      const handler = this.findMatchingHandler(messageAttributes)
      if (handler) {
        await handler(msg, this.context)
      } else {
        Logger.error(`No handler found.`)
      }
    }
    callback()
  }

  private findMatchingHandler(messageAttributes: Record<string, string>): MessageHandler<any, any, any> | undefined {
    for (const messageHandlerKeyInString of this.messageHandlers.keys()) {
      let eventPattern: Record<string, string>
      try {
        eventPattern = JSON.parse(messageHandlerKeyInString)
      } catch (_e) {
        continue
      }

      if (this.matchesEventPattern(eventPattern, messageAttributes)) {
        return this.messageHandlers.get(messageHandlerKeyInString)
      }
    }
    return undefined
  }

  private matchesEventPattern(
    eventPattern: Record<string, string>,
    messageAttributes: Record<string, string>
  ): boolean {
    for (const [key, value] of Object.entries(eventPattern)) {
      if (messageAttributes[key] !== value) {
        return false
      }
    }
    return true
  }

  close() {
    return
  }

  private getMessageAttributes(record: SNSEventRecord): Record<string, string> {
    const messageAttributeMap = record.Sns.MessageAttributes

    const returnObject: Record<string, string> = {}

    Object.keys(messageAttributeMap).forEach((key) => {
      returnObject[key] = messageAttributeMap[key].Value
    })

    return returnObject
  }

  private getMessage(record: SNSEventRecord): string {
    return record.Sns.Message
  }

  private getRecords(): SNSEventRecord[] {
    return this.event.Records
  }
}
0

There are 0 best solutions below