adding retry system plus jitter for randomisation and resilience to lost, and other keycloak config
This commit is contained in:
parent
120990f89f
commit
859580db45
712
package-lock.json
generated
712
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@ -29,6 +29,8 @@
|
|||||||
"amqplib": "^0.10.9",
|
"amqplib": "^0.10.9",
|
||||||
"class-transformer": "^0.5.1",
|
"class-transformer": "^0.5.1",
|
||||||
"class-validator": "^0.14.2",
|
"class-validator": "^0.14.2",
|
||||||
|
"keycloak-connect": "^26.1.1",
|
||||||
|
"nest-keycloak-connect": "^1.10.1",
|
||||||
"reflect-metadata": "^0.2.2",
|
"reflect-metadata": "^0.2.2",
|
||||||
"rxjs": "^7.8.1",
|
"rxjs": "^7.8.1",
|
||||||
"swagger-ui-express": "^5.0.1"
|
"swagger-ui-express": "^5.0.1"
|
||||||
|
|||||||
@ -4,14 +4,55 @@ import { AppService } from './app.service';
|
|||||||
import { WebhookController } from './controllers/webhook.controller';
|
import { WebhookController } from './controllers/webhook.controller';
|
||||||
import { WebhookService } from './services/webhook.service';
|
import { WebhookService } from './services/webhook.service';
|
||||||
import { RabbitMQService } from './services/rabbit.service';
|
import { RabbitMQService } from './services/rabbit.service';
|
||||||
import { ConfigModule } from '@nestjs/config';
|
import { ConfigModule, ConfigService } from '@nestjs/config';
|
||||||
import rabbitmqConfig from './config/rabbitmq.config';
|
import appConfig from './config/app.config';
|
||||||
|
import {
|
||||||
|
AuthGuard,
|
||||||
|
KeycloakConnectModule,
|
||||||
|
PolicyEnforcementMode,
|
||||||
|
ResourceGuard,
|
||||||
|
RoleGuard,
|
||||||
|
TokenValidation,
|
||||||
|
} from 'nest-keycloak-connect';
|
||||||
|
import { APP_GUARD } from '@nestjs/core';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [
|
||||||
ConfigModule.forRoot({ isGlobal: true,load: [rabbitmqConfig] }),
|
KeycloakConnectModule.registerAsync({
|
||||||
|
imports: [ConfigModule],
|
||||||
|
useFactory: (configService: ConfigService) => {
|
||||||
|
const keycloakConfig = configService.get('appConfig.keycloak');
|
||||||
|
return {
|
||||||
|
authServerUrl: keycloakConfig.authServerUrl,
|
||||||
|
realm: keycloakConfig.realm,
|
||||||
|
clientId: keycloakConfig.clientId,
|
||||||
|
secret: keycloakConfig.clientSecret,
|
||||||
|
policyEnforcement: PolicyEnforcementMode.PERMISSIVE,
|
||||||
|
tokenValidation: TokenValidation.ONLINE,
|
||||||
|
};
|
||||||
|
},
|
||||||
|
inject: [ConfigService],
|
||||||
|
}),
|
||||||
|
ConfigModule.forRoot({ isGlobal: true, load: [appConfig] }),
|
||||||
],
|
],
|
||||||
|
|
||||||
controllers: [AppController, WebhookController],
|
controllers: [AppController, WebhookController],
|
||||||
providers: [AppService, WebhookService, RabbitMQService],
|
providers: [
|
||||||
|
AppService,
|
||||||
|
WebhookService,
|
||||||
|
RabbitMQService,
|
||||||
|
{
|
||||||
|
provide: APP_GUARD,
|
||||||
|
useClass: AuthGuard,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: APP_GUARD,
|
||||||
|
useClass: ResourceGuard,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
provide: APP_GUARD,
|
||||||
|
useClass: RoleGuard,
|
||||||
|
},
|
||||||
|
],
|
||||||
})
|
})
|
||||||
export class AppModule {}
|
export class AppModule {}
|
||||||
|
|||||||
20
src/config/app.config.ts
Normal file
20
src/config/app.config.ts
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
import { registerAs } from '@nestjs/config';
|
||||||
|
|
||||||
|
export default registerAs('appConfig', () => ({
|
||||||
|
user: process.env.RABBITMQ_USER ,
|
||||||
|
pass: process.env.RABBITMQ_PASS ,
|
||||||
|
host: process.env.RABBITMQ_HOST ,
|
||||||
|
port: process.env.RABBITMQ_PORT ,
|
||||||
|
apiUrl: process.env.RABBITMQ_API_URL || 'https://rabbitmq.dcb.pixpay.sn/api',
|
||||||
|
queues: {
|
||||||
|
smsmo: process.env.RABBITMQ_QUEUE_WEBHOOK || 'smsmo_queue',
|
||||||
|
subscription: process.env.RABBITMQ_QUEUE_PAYMENT || 'subscription_queue',
|
||||||
|
he: process.env.RABBITMQ_QUEUE_NOTIFICATION || 'he_queue',
|
||||||
|
},
|
||||||
|
keycloak: {
|
||||||
|
authServerUrl: process.env.KEYCLOAK_AUTH_SERVER_URL ,
|
||||||
|
clientId: process.env.KEYCLOAK_CLIENT_ID ,
|
||||||
|
clientSecret: process.env.KEYCLOAK_CLIENT_SECRET ,
|
||||||
|
realm: process.env.KEYCLOAK_REALM ,
|
||||||
|
},
|
||||||
|
}));
|
||||||
@ -1,14 +0,0 @@
|
|||||||
import { registerAs } from '@nestjs/config';
|
|
||||||
|
|
||||||
export default registerAs('rabbitmq', () => ({
|
|
||||||
user: process.env.RABBITMQ_USER,
|
|
||||||
pass: process.env.RABBITMQ_PASS,
|
|
||||||
host: process.env.RABBITMQ_HOST,
|
|
||||||
port: process.env.RABBITMQ_PORT,
|
|
||||||
apiUrl: process.env.RABBITMQ_API_URL || 'https://rabbitmq.dcb.pixpay.sn/api',
|
|
||||||
queues: {
|
|
||||||
smsmo: process.env.RABBITMQ_QUEUE_WEBHOOK || 'smsmo_queue',
|
|
||||||
subscription: process.env.RABBITMQ_QUEUE_PAYMENT || 'subscription_queue',
|
|
||||||
he: process.env.RABBITMQ_QUEUE_NOTIFICATION || 'he_queue',
|
|
||||||
},
|
|
||||||
}));
|
|
||||||
@ -11,6 +11,7 @@ import {
|
|||||||
} from '@nestjs/common';
|
} from '@nestjs/common';
|
||||||
|
|
||||||
import { ApiTags } from '@nestjs/swagger';
|
import { ApiTags } from '@nestjs/swagger';
|
||||||
|
import { Resource, Roles } from 'nest-keycloak-connect';
|
||||||
import { InboundSMSMessageNotificationWrapperDto } from 'src/dtos/sms.mo.dto';
|
import { InboundSMSMessageNotificationWrapperDto } from 'src/dtos/sms.mo.dto';
|
||||||
import { SubscriptionDto } from 'src/dtos/subscription.dto';
|
import { SubscriptionDto } from 'src/dtos/subscription.dto';
|
||||||
import { WebhookService } from 'src/services/webhook.service';
|
import { WebhookService } from 'src/services/webhook.service';
|
||||||
@ -22,6 +23,7 @@ export class WebhookController {
|
|||||||
|
|
||||||
@Post('sms-mo/:operator/:country')
|
@Post('sms-mo/:operator/:country')
|
||||||
@HttpCode(HttpStatus.CREATED)
|
@HttpCode(HttpStatus.CREATED)
|
||||||
|
@Roles({ roles: ['admin_webhook'] })
|
||||||
async smsMoNotification(
|
async smsMoNotification(
|
||||||
@Param('country') country: string,
|
@Param('country') country: string,
|
||||||
@Param('operator') operator: string,
|
@Param('operator') operator: string,
|
||||||
@ -38,6 +40,7 @@ export class WebhookController {
|
|||||||
|
|
||||||
@Post('subscription/:operator/:country')
|
@Post('subscription/:operator/:country')
|
||||||
@HttpCode(HttpStatus.CREATED)
|
@HttpCode(HttpStatus.CREATED)
|
||||||
|
@Roles({ roles: ['admin_webhook'] })
|
||||||
async manageSubscription(
|
async manageSubscription(
|
||||||
@Param('country') country: string,
|
@Param('country') country: string,
|
||||||
@Param('operator') operator: string,
|
@Param('operator') operator: string,
|
||||||
@ -53,6 +56,7 @@ export class WebhookController {
|
|||||||
|
|
||||||
@Get('he/:operator/:country')
|
@Get('he/:operator/:country')
|
||||||
@HttpCode(HttpStatus.OK)
|
@HttpCode(HttpStatus.OK)
|
||||||
|
@Roles({ roles: ['admin_webhook'] })
|
||||||
async heNotification(
|
async heNotification(
|
||||||
@Param('country') country: string,
|
@Param('country') country: string,
|
||||||
@Param('operator') operator: string,
|
@Param('operator') operator: string,
|
||||||
|
|||||||
@ -6,8 +6,13 @@ import { connect, Connection, Channel } from 'amqplib';
|
|||||||
export class RabbitMQService implements OnModuleInit {
|
export class RabbitMQService implements OnModuleInit {
|
||||||
private connection!: Connection;
|
private connection!: Connection;
|
||||||
private channel!: Channel;
|
private channel!: Channel;
|
||||||
|
private maxRetry: number;
|
||||||
|
private delay: number;
|
||||||
|
|
||||||
constructor(private configService: ConfigService) {}
|
constructor(private configService: ConfigService) {
|
||||||
|
this.maxRetry = 5;
|
||||||
|
this.delay = 500;
|
||||||
|
}
|
||||||
|
|
||||||
async onModuleInit() {
|
async onModuleInit() {
|
||||||
await this.connectWithRetry();
|
await this.connectWithRetry();
|
||||||
@ -39,10 +44,37 @@ export class RabbitMQService implements OnModuleInit {
|
|||||||
throw new Error('Could not connect to RabbitMQ after multiple attempts');
|
throw new Error('Could not connect to RabbitMQ after multiple attempts');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//this method send the message to rabbitMQ queue
|
||||||
async sendToQueue(queue: string, message: any) {
|
async sendToQueue(queue: string, message: any) {
|
||||||
if (!this.channel) throw new Error('RabbitMQ channel not initialized');
|
if (!this.channel) throw new Error('RabbitMQ channel not initialized');
|
||||||
|
|
||||||
|
//check if the queue exist and create it if not
|
||||||
await this.channel.assertQueue(queue, { durable: true });
|
await this.channel.assertQueue(queue, { durable: true });
|
||||||
this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)));
|
|
||||||
console.log(`Sent message to queue "${queue}"`);
|
for (let attempt = 1; attempt <= this.maxRetry; attempt++) {
|
||||||
|
try {
|
||||||
|
const sent = await this.channel.sendToQueue(
|
||||||
|
queue,
|
||||||
|
Buffer.from(JSON.stringify(message)),
|
||||||
|
);
|
||||||
|
if (!sent) {
|
||||||
|
throw new Error('Message not sent');
|
||||||
|
}
|
||||||
|
console.log(`Message sent to queue ${queue} attempt ${attempt}`);
|
||||||
|
return;
|
||||||
|
} catch (error) {
|
||||||
|
if (attempt === this.maxRetry) {
|
||||||
|
console.log('All attempts failed');
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxDelay = Math.pow(2, attempt) * this.delay;
|
||||||
|
const delay = maxDelay / 2 + Math.random() * (maxDelay / 2);
|
||||||
|
console.warn(
|
||||||
|
`Attempt ${attempt} failed: ${error.message}. Retrying in ${Math.round(delay)}ms`,
|
||||||
|
);
|
||||||
|
await new Promise((res) => setTimeout(res, delay));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user