81 lines
2.6 KiB
TypeScript
81 lines
2.6 KiB
TypeScript
import { Injectable, OnModuleInit } from '@nestjs/common';
|
|
import { ConfigService } from '@nestjs/config';
|
|
import { connect, Connection, Channel } from 'amqplib';
|
|
|
|
@Injectable()
|
|
export class RabbitMQService implements OnModuleInit {
|
|
private connection!: Connection;
|
|
private channel!: Channel;
|
|
private maxRetry: number;
|
|
private delay: number;
|
|
|
|
constructor(private configService: ConfigService) {
|
|
this.maxRetry = 5;
|
|
this.delay = 500;
|
|
}
|
|
|
|
async onModuleInit() {
|
|
await this.connectWithRetry();
|
|
}
|
|
|
|
async connect(): Promise<void> {
|
|
const user = this.configService.get<string>('appConfig.user');
|
|
const pass = this.configService.get<string>('appConfig.pass');
|
|
const host = this.configService.get<string>('appConfig.host');
|
|
const port = this.configService.get<string>('appConfig.port');
|
|
|
|
this.connection = await connect(`amqp://${user}:${pass}@${host}:${port}`);
|
|
this.channel = await this.connection.createChannel();
|
|
console.log('Connected to RabbitMQ');
|
|
}
|
|
|
|
async connectWithRetry(retries = 5, delayMs = 3000): Promise<void> {
|
|
for (let i = 0; i < retries; i++) {
|
|
try {
|
|
await this.connect();
|
|
return;
|
|
} catch (err) {
|
|
console.error(
|
|
`RabbitMQ connection failed, retrying in ${delayMs}ms... (${i + 1}/${retries})`,
|
|
);
|
|
await new Promise((res) => setTimeout(res, delayMs));
|
|
}
|
|
}
|
|
throw new Error('Could not connect to RabbitMQ after multiple attempts');
|
|
}
|
|
|
|
//this method send the message to rabbitMQ queue
|
|
async sendToQueue(queue: string, message: any) {
|
|
if (!this.channel) throw new Error('RabbitMQ channel not initialized');
|
|
|
|
// //check if the queue exist and create it if not
|
|
// await this.channel.assertQueue(queue, { durable: true });
|
|
|
|
for (let attempt = 1; attempt <= this.maxRetry; attempt++) {
|
|
try {
|
|
const sent = await this.channel.sendToQueue(
|
|
queue,
|
|
Buffer.from(JSON.stringify(message)),
|
|
);
|
|
if (!sent) {
|
|
throw new Error('Message not sent');
|
|
}
|
|
console.log(`Message sent to queue ${queue} attempt ${attempt}`);
|
|
return;
|
|
} catch (error) {
|
|
if (attempt === this.maxRetry) {
|
|
console.log('All attempts failed');
|
|
throw error;
|
|
}
|
|
|
|
const maxDelay = Math.pow(2, attempt) * this.delay;
|
|
const delay = maxDelay / 2 + Math.random() * (maxDelay / 2);
|
|
console.warn(
|
|
`Attempt ${attempt} failed: ${error.message}. Retrying in ${Math.round(delay)}ms`,
|
|
);
|
|
await new Promise((res) => setTimeout(res, delay));
|
|
}
|
|
}
|
|
}
|
|
}
|