49 lines
1.7 KiB
TypeScript
49 lines
1.7 KiB
TypeScript
import { Injectable, OnModuleInit } from '@nestjs/common';
|
|
import { ConfigService } from '@nestjs/config';
|
|
import { connect, Connection, Channel } from 'amqplib';
|
|
|
|
@Injectable()
|
|
export class RabbitMQService implements OnModuleInit {
|
|
private connection!: Connection;
|
|
private channel!: Channel;
|
|
|
|
constructor(private configService: ConfigService) {}
|
|
|
|
async onModuleInit() {
|
|
await this.connectWithRetry();
|
|
}
|
|
|
|
async connect(): Promise<void> {
|
|
const user = this.configService.get<string>('rabbitmq.user');
|
|
const pass = this.configService.get<string>('rabbitmq.pass');
|
|
const host = this.configService.get<string>('rabbitmq.host');
|
|
const port = this.configService.get<string>('rabbitmq.port');
|
|
|
|
this.connection = await connect(`amqp://${user}:${pass}@${host}:${port}`);
|
|
this.channel = await this.connection.createChannel();
|
|
console.log('Connected to RabbitMQ');
|
|
}
|
|
|
|
async connectWithRetry(retries = 5, delayMs = 3000): Promise<void> {
|
|
for (let i = 0; i < retries; i++) {
|
|
try {
|
|
await this.connect();
|
|
return;
|
|
} catch (err) {
|
|
console.error(
|
|
`RabbitMQ connection failed, retrying in ${delayMs}ms... (${i + 1}/${retries})`,
|
|
);
|
|
await new Promise((res) => setTimeout(res, delayMs));
|
|
}
|
|
}
|
|
throw new Error('Could not connect to RabbitMQ after multiple attempts');
|
|
}
|
|
|
|
async sendToQueue(queue: string, message: any) {
|
|
if (!this.channel) throw new Error('RabbitMQ channel not initialized');
|
|
await this.channel.assertQueue(queue, { durable: true });
|
|
this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)));
|
|
console.log(`Sent message to queue "${queue}"`);
|
|
}
|
|
}
|