Emulando una cola SQS en una aplicación NestJS

La complejidad y requisitos de las aplicaciones de hoy en día nos imponen nuevos retos tratando de testearlas en nuestros entornos locales. Los mayores desafíos nos los encontramos cuando queremos probar nuestro software de manera end to end, sobre todo si la infraestructura se compone de servicios en la nube. Sin embargo, la mayor parte de las veces docker entra a nuestro rescate, ofreciéndonos imágenes construidas que nos pueden ayudar a emular el comportamiento de estos servicios.

Específicamente, para trabajar con AWS podemos encontrar soluciones completas como Localstack ( https://localstack.cloud/). Este servicio emula gran parte de los servicios de AWS exponiendo endpoints contra los que podemos emular nuestra infraestructura, la automatización de esta con Terraform o poder realizar pruebas de nuestro software end to end en nuestro proceso de integración continua sin depender del proveedor cloud.

En este post, hablaremos de una solución más ligera para emular el comportamiento de una cola AWS SQS totalmente compatible con nuestro consumer construido en Typescript y dentro del contexto de una aplicación NestJS.

¿Qué es una cola SQS?

SQS es la solución que nos ofrece AWS, pero tenemos servicios de idéntico comportamiento en Azure con Azure Queue Storage o en Google Cloud Platform con Cloud PubSub.

Emulando la cola SQS

ElasticMQ sigue la semántica de AWS SQS, y la gran ventaja de esto es que podemos utilizar el SDK de AWS contra este servicio. Los mensajes en SQS se reciben sondeando la cola. Cuando se recibe un mensaje, se bloquea durante un período de tiempo específico (Si el mensaje no se elimina durante ese tiempo, volverá a estar disponible para su entrega. visibilityTimeout).

El enfoque en este tipo de colas es asegurarse de que los mensajes se entreguen a los destinatarios. Sin embargo, puede suceder que un mensaje se entregue dos veces (por ejemplo, sí un Es por eso que debemos cumplir con el principio de consumer muere después de recibir un mensaje y procesarlo, pero antes de eliminarlo). idempotencia, es decir, que no se vean afectadas negativamente el consumer procesa el mismo mensaje más de una vez.

A continuación, se muestra cómo quedaría nuestro docker-compose.yml donde se incluye el servicio SQS utilizando una imagen dockerizada (https://github.com/roribio/alpine-sqs) conectado con el servicio node que levanta nuestra api.

version: "3.4"
services:
api:
build:
context: .
dockerfile: Dockerfile.dev
environment:
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY:${AWS_SECRET_ACCESS_KEY}
AWS_SQS_REGION: eu-west-2
AWS_SQS_ENDPOINT: "http://sqs:9324"
AWS_SQS_QUEUE_NAME: service-queue
ports:
- "8080:8080"
depends_on:
- sqs
sqs:
image: roribio16/alpine-sqs
ports:
- "9324:9324"
- "9325:9325"
volumes:
- ./infra/sqs/elasticmq.conf:/opt/config/elasticmq.conf

Como se puede ver, en la imagen alpine-sqs expone dos puertos 9324 y 9325 . El primero es usado como endpoint para consumirlo http://sqs:9324 y el segundo expone una interfaz web para depurar los mensajes que llegan a la cola.

Además, incluiremos un fichero de configuración mapeando en nuestro volumen el fichero /opt/config/elasticmq.conf donde configuraremos nuestras colas y su comportamiento:

include classpath("application.conf")                                            

node-address {
protocol = http
host = "*"
port = 9324
context-path = ""
}

rest-sqs {
enabled = true
bind-port = 9324
bind-hostname = "0.0.0.0"
// Possible values: relaxed, strict
sqs-limits = strict
}

queues {
default {
defaultVisibilityTimeout = 10 seconds
delay = 5 seconds
receiveMessageWait = 0 seconds
}
service-queue {
defaultVisibilityTimeout = 10 seconds
delay = 5 seconds
receiveMessageWait = 0 seconds
}
}

En este fichero definimos las colas y sus características como el tiempo de visibilidad, el delay o el tiempo de espera. Una vez levantado podemos ver a través de la interfaz 9325 los mensajes que llegan a la cola para poder depurar correctamente el formato de los mensajes y la cantidad.

Para mandar un mensaje a la cola podemos utilizar el CLI de AWS, indicando el endpoint, la cola y el cuerpo del mensaje:

aws --endpoint-url http://localhost:9324 sqs send-message --queue-url http://localhost:9324/queue/service-queue --message-body ""

Implementando en consumer en NestJS

Existen algunas soluciones no oficiales de nest como https://github.com/yannkaiser/nestjs-aws-sqs o https://github.com/aiandev/sqs-consumer-nestjs que podrían ayudarnos. Sin embargo, se decidió implementar una solución totalmente independiente. Este es el código de ejemplo de una clase abstracta con la funcionalidad necesaria para leer mensajes de una cola SQS:

import { Logger, OnApplicationBootstrap, OnApplicationShutdown } from '@nestjs/common'
import { AWSError, SQS } from 'aws-sdk'
import { PromiseResult } from 'aws-sdk/lib/request'
export default abstract class SQSQueue implements OnApplicationBootstrap, OnApplicationShutdown {
protected readonly logger = new Logger(SQSQueue.name, true)
protected queueUrl: string
protected polling = false
protected timeoutRef: NodeJS.Timeout = null
protected service: SQS
constructor(
protected readonly queueName: string,
protected readonly region: string,
protected readonly timeout = 100
) {
this.service = new SQS({ endpoint: process.env.AWS_SQS_ENDPOINT, region })
}
async onApplicationBootstrap(): Promise<void> {
this.logger.log(`Initiating queue consumer with name ${this.queueName}`)
this.polling = true
this.queueUrl = (
await this.service
.getQueueUrl({
QueueName: this.queueName,
})
.promise()
).QueueUrl
this.logger.log(`Reading messages from ${this.queueUrl}`)this.timeoutRef = setTimeout(async () => this.poll(), this.timeout)}onApplicationShutdown(): void {
this.polling = false
clearTimeout(this.timeoutRef)
}
public async poll(): Promise<void> {
const result: SQS.ReceiveMessageResult = await this.receiveMessage()
try {
await this.handleSQSResponse(result)
} catch (err) {
Logger.error(err)
}
if (this.polling) this.timeoutRef = setTimeout(() =>; this.poll(), this.timeout)
}
private async handleSQSResponse(result: SQS.ReceiveMessageResult): Promise<void>; {
if (!result.Messages || result.Messages.length === 0) return
await Promise.all(result.Messages.map(this.handleMessage.bind(this)))
}
private async receiveMessage(): Promise<PromiseResult<SQS.ReceiveMessageResult, AWSError>>; {
return this.service
.receiveMessage({
QueueUrl: this.queueUrl,
})
.promise()
}
protected async handleMessage(message: SQS.Message): Promise<void>; {
await this.handle(message)
await this.service
.deleteMessage({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
})
.promise()
}
protected abstract async handle(message: SQS.Message): Promis<void>;}

OnApplicationBootstrap y OnApplicationShutdown son dos interfaces que nos permiten engancharnos a los evento lanzados cuando la aplicación se termina de levantar y cuando se apaga el servicio. Estas interfaces las utilizamos para arrancar el polling cuando nuestra aplicación esté levantada. Estas interfaces nos obligan a implementar los métodos async onApplicationBootstrap():Promise<void>y OnApplicationShutdown():void donde arrancará y finalizará la lectura de nuestra cola. Tras recibir los mensajes almacenados en la cola se ejecutará el método handleMessage.

El método handleMessage se encarga de recibir el mensaje de la cola, llamar al manejador abstracto que se encarga de manejar el tipo de mensaje, ejecutar la lógica del manejador y eliminar el mensaje para no volver a ser recibido..

Para implementar nuestra lógica, crearemos una nueva clase que extienda de SQSQueue e implementaremos la lógica adecuada implementando el método handle . Un ejemplo de implementación concreta de un consumer puede ser el siguiente:

import { Inject, Logger } from '@nestjs/common'
import { SQS } from 'aws-sdk'
import SQSQueue from './sqs/sqs.queue'
export default class CallbackSQS extends SQSQueue {async handle(message: SQS.Message): Promise<void> {
this.logger.log(`Handling message ${message.MessageId} with body : ${message.Body}`)
// your code here
}
}

Dentro del contexto de una aplicación NestJS, esta clase CallbackSQS se establecería como un provider importado por el módulo correspondiente.

Conclusión

Recursos

https://medium.com/better-programming/how-to-emulate-aws-sqs-for-development-in-dockerized-ruby-on-rails-app-c0c16aadb84c

Originally published at https://adrianalonso.es on January 19, 2021.

Full Stack Web Developer — adrianalonso.es

Full Stack Web Developer — adrianalonso.es