Emulando una cola SQS en una aplicación NestJS

¿Qué es una cola SQS?

Emulando la cola SQS

version: "3.4"
services:
api:
build:
context: .
dockerfile: Dockerfile.dev
environment:
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY:${AWS_SECRET_ACCESS_KEY}
AWS_SQS_REGION: eu-west-2
AWS_SQS_ENDPOINT: "http://sqs:9324"
AWS_SQS_QUEUE_NAME: service-queue
ports:
- "8080:8080"
depends_on:
- sqs
sqs:
image: roribio16/alpine-sqs
ports:
- "9324:9324"
- "9325:9325"
volumes:
- ./infra/sqs/elasticmq.conf:/opt/config/elasticmq.conf
include classpath("application.conf")                                            

node-address {
protocol = http
host = "*"
port = 9324
context-path = ""
}

rest-sqs {
enabled = true
bind-port = 9324
bind-hostname = "0.0.0.0"
// Possible values: relaxed, strict
sqs-limits = strict
}

queues {
default {
defaultVisibilityTimeout = 10 seconds
delay = 5 seconds
receiveMessageWait = 0 seconds
}
service-queue {
defaultVisibilityTimeout = 10 seconds
delay = 5 seconds
receiveMessageWait = 0 seconds
}
}
aws --endpoint-url http://localhost:9324 sqs send-message --queue-url http://localhost:9324/queue/service-queue --message-body ""

Implementando en consumer en NestJS

import { Logger, OnApplicationBootstrap, OnApplicationShutdown } from '@nestjs/common'
import { AWSError, SQS } from 'aws-sdk'
import { PromiseResult } from 'aws-sdk/lib/request'
export default abstract class SQSQueue implements OnApplicationBootstrap, OnApplicationShutdown {
protected readonly logger = new Logger(SQSQueue.name, true)
protected queueUrl: string
protected polling = false
protected timeoutRef: NodeJS.Timeout = null
protected service: SQS
constructor(
protected readonly queueName: string,
protected readonly region: string,
protected readonly timeout = 100
) {
this.service = new SQS({ endpoint: process.env.AWS_SQS_ENDPOINT, region })
}
async onApplicationBootstrap(): Promise<void> {
this.logger.log(`Initiating queue consumer with name ${this.queueName}`)
this.polling = true
this.queueUrl = (
await this.service
.getQueueUrl({
QueueName: this.queueName,
})
.promise()
).QueueUrl
this.logger.log(`Reading messages from ${this.queueUrl}`)this.timeoutRef = setTimeout(async () => this.poll(), this.timeout)}onApplicationShutdown(): void {
this.polling = false
clearTimeout(this.timeoutRef)
}
public async poll(): Promise<void> {
const result: SQS.ReceiveMessageResult = await this.receiveMessage()
try {
await this.handleSQSResponse(result)
} catch (err) {
Logger.error(err)
}
if (this.polling) this.timeoutRef = setTimeout(() =>; this.poll(), this.timeout)
}
private async handleSQSResponse(result: SQS.ReceiveMessageResult): Promise<void>; {
if (!result.Messages || result.Messages.length === 0) return
await Promise.all(result.Messages.map(this.handleMessage.bind(this)))
}
private async receiveMessage(): Promise<PromiseResult<SQS.ReceiveMessageResult, AWSError>>; {
return this.service
.receiveMessage({
QueueUrl: this.queueUrl,
})
.promise()
}
protected async handleMessage(message: SQS.Message): Promise<void>; {
await this.handle(message)
await this.service
.deleteMessage({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
})
.promise()
}
protected abstract async handle(message: SQS.Message): Promis<void>;}
import { Inject, Logger } from '@nestjs/common'
import { SQS } from 'aws-sdk'
import SQSQueue from './sqs/sqs.queue'
export default class CallbackSQS extends SQSQueue {async handle(message: SQS.Message): Promise<void> {
this.logger.log(`Handling message ${message.MessageId} with body : ${message.Body}`)
// your code here
}
}

Conclusión

Recursos

--

--

Full Stack Web Developer — adrianalonso.es

Love podcasts or audiobooks? Learn on the go with our new app.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store