Skip to content

Commit

Permalink
Fix the message publishing issue
Browse files Browse the repository at this point in the history
  • Loading branch information
rusiruavb committed Jan 5, 2022
1 parent 645b5b3 commit 3125e3b
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 57 deletions.
File renamed without changes.
6 changes: 3 additions & 3 deletions src/api/controllers/Application.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ApplicationService from "../services";
* @returns {IApplication} - New application document
*/
export const addApplication = async (request: Request, response: Response, next: NextFunction) => {
await ApplicationService.addApplication(request.body)
await ApplicationService.addApplication(request, request.body)
.then(async (data) => {
request.handleResponse.successRespond(response)(data);
next();
Expand Down Expand Up @@ -99,7 +99,7 @@ export const changeApplicationStatusIntoInterview = async (
) => {
const { applicationId } = request.params;
if (applicationId) {
await ApplicationService.changeApplicationStatusIntoInterview(applicationId, request.body)
await ApplicationService.changeApplicationStatusIntoInterview(request, applicationId, request.body)
.then((data) => {
request.handleResponse.successRespond(response)(data);
next();
Expand All @@ -125,7 +125,7 @@ export const changeApplicationStatusIntoInterview = async (
export const changeApplicationStatusIntoSelected = async (request: Request, response: Response, next: NextFunction) => {
const { applicationId } = request.params;
if (applicationId) {
await ApplicationService.changeApplicationStatusIntoSelected(applicationId)
await ApplicationService.changeApplicationStatusIntoSelected(request, applicationId)
.then((data) => {
request.handleResponse.successRespond(response)(data);
next();
Expand Down
14 changes: 7 additions & 7 deletions src/api/services/Application.service.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import { DocumentDefinition } from "mongoose";
import { IApplication, IInterview } from "../../interfaces";
import ApplicationModel from "../models/Application.model";
import { configs } from "../../config";
import { request } from "express";
import { Request } from "express";

/**
* Application Service
* @param {IApplication} application
* @returns {Promise<IApplication>}
*/
export const addApplication = async (applicationData: DocumentDefinition<IApplication>) => {
export const addApplication = async (request: Request, applicationData: DocumentDefinition<IApplication>) => {
return await ApplicationModel.create(applicationData)
.then(async (application) => {
// Send email
Expand All @@ -36,7 +35,7 @@ export const addApplication = async (applicationData: DocumentDefinition<IApplic

// Send email data to message queue
const channel = request.channel;
request.queue.publishMessage(channel, configs.queue.emailService, JSON.stringify(email));
request.queue.publishMessage(channel, JSON.stringify(email));
return application;
})
.catch((error) => {
Expand Down Expand Up @@ -98,6 +97,7 @@ export const archiveApplication = async (applicationId: string) => {
* @param applicationId @type string
*/
export const changeApplicationStatusIntoInterview = async (
request: Request,
applicationId: string,
interviewData: DocumentDefinition<IInterview>
) => {
Expand Down Expand Up @@ -126,7 +126,7 @@ export const changeApplicationStatusIntoInterview = async (

// Send email data to message queue
const channel = request.channel;
request.queue.publishMessage(channel, configs.queue.emailService, JSON.stringify(email));
request.queue.publishMessage(channel, JSON.stringify(email));
return application;
} else {
return null;
Expand All @@ -141,7 +141,7 @@ export const changeApplicationStatusIntoInterview = async (
* @function changeApplicationStatusIntoSelected to update the status into SELECTED of an application in the system
* @param applicationId @type string
*/
export const changeApplicationStatusIntoSelected = async (applicationId: string) => {
export const changeApplicationStatusIntoSelected = async (request: Request, applicationId: string) => {
return await ApplicationModel.findById(applicationId)
.then(async (application) => {
if (application) {
Expand All @@ -162,7 +162,7 @@ export const changeApplicationStatusIntoSelected = async (applicationId: string)

// Send email data to message queue
const channel = request.channel;
request.queue.publishMessage(channel, configs.queue.emailService, JSON.stringify(email));
request.queue.publishMessage(channel, JSON.stringify(email));
return application;
} else {
return null;
Expand Down
3 changes: 1 addition & 2 deletions src/api/services/Contact.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { DocumentDefinition } from "mongoose";
import { IContact } from "../../interfaces";
import ContactModel from "../models/Contact.model";
import moment from "moment";
import { configs } from "../../config";
import { request } from "express";

/**
Expand Down Expand Up @@ -32,7 +31,7 @@ export const insertContact = async (contactData: DocumentDefinition<IContact>) =

// Send email data to message queue
const channel = request.channel;
request.queue.publishMessage(channel, configs.queue.emailService, JSON.stringify(email));
request.queue.publishMessage(channel, JSON.stringify(email));
return data;
})
.catch((error) => {
Expand Down
42 changes: 12 additions & 30 deletions src/app.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import dotenv from "dotenv";
dotenv.config();
import express, { Express, Request, Response, NextFunction } from "express";
import cors from "cors";
import logger from "./util/logger";
import responseHandler from "./util/response.handler";
import routes from "./api/routes";
import { configs } from "./config";
import connect from "./util/database.connection";
import amqp from "amqplib";
import messageQueue from "./util/queue.config";
import { Channel } from "amqplib";
import EmailService from "./util/email.handler";

dotenv.config();
export const app: Express = express();
const PORT: string = configs.port;
const ENVIRONMENT = configs.environment;
const MONGO_URI = configs.mongodb.uri;
let channel: Channel;

messageQueue.createChannel().then((channelData) => {
channel = channelData;
});

// Register Middleware Chain
app.use(cors());
Expand All @@ -22,37 +29,12 @@ app.use(express.urlencoded({ extended: true }));
// Inject Response Handler
app.use((req: Request, res: Response, next: NextFunction) => {
req.handleResponse = responseHandler;
req.channel = channel;
req.queue = messageQueue;
new EmailService(channel);
next();
});

// Create and Inject the message queue
app.use((req: Request, res: Response, next: NextFunction) => {
try {
// Create the channel
amqp
.connect(configs.queue.messageBrokerURL)
.then((connection) => {
connection
.createChannel()
.then((channel) => {
channel.assertExchange(configs.queue.exchangeName, "direct", { durable: false });

// Add channel as request property
req.channel = channel;
next();
})
.catch((channelError) => {
logger.error(`Channel Error: ${channelError.message}`);
});
})
.catch((connectionError: any) => {
logger.error(`Connection Error: ${connectionError.message}`);
});
} catch (error: any) {
logger.error(error.message);
}
});

// Root API Call
app.get("/", (req: Request, res: Response, next: NextFunction) => {
res.send("<h2>MS CLUB SLIIT Web API</h2>");
Expand Down
14 changes: 7 additions & 7 deletions src/util/email.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { configs } from "../config";
import moment from "moment";
import fetch from "cross-fetch";
import { Channel } from "amqplib";
import { subscribeMessages } from "./queue.config";
import messageQueue from "./queue.config";
import sgMail from "@sendgrid/mail";

// HTML Configuration
Expand All @@ -21,14 +21,14 @@ class EmailService {

constructor(channel: Channel) {
this.channel = channel;
subscribeMessages(this.channel, this);
messageQueue.subscribeMessages(this.channel, this);
}

sendEmailWithTemplate(data: any) {
const fileName = data.email.template;
const to = data.email.to;
const subject = data.email.subject;
const emailBodyData = data.email.body;
const fileName = data.template;
const to = data.to;
const subject = data.subject;
const emailBodyData = data.body;

return new Promise((resolve, reject) => {
EmailService.getEmailTemplatePath(fileName)
Expand Down Expand Up @@ -81,7 +81,7 @@ class EmailService {
sgMail
.send(msg)
.then((responseData: any) => {
logger.info(`Email sent ${responseData}`);
logger.info(`Email sent to ${to}`);
return resolve(responseData);
})
.catch((error: any) => {
Expand Down
11 changes: 5 additions & 6 deletions src/util/queue.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ import { configs } from "../config";
const createChannel = async () => {
const connection = await amqp.connect(configs.queue.messageBrokerURL);
const channel = await connection.createChannel();
await channel.assertExchange(configs.queue.exchangeName, "direct", { durable: false });
await channel.assertQueue(configs.queue.emailQueue, { durable: false });
return channel;
};

// Publish the messages
const publishMessage = async (channel: Channel, bindingKey: string, message: any) => {
await channel.publish(configs.queue.exchangeName, bindingKey, Buffer.from(JSON.stringify(message)));
const publishMessage = (channel: Channel, message: any) => {
channel.sendToQueue(configs.queue.emailQueue, Buffer.from(JSON.stringify(message)));
};

// Subscribe to messages
const subscribeMessages = async (channel: Channel, service: any) => {
const serviceQueue = await channel.assertQueue(configs.queue.emailQueue);
channel.bindQueue(serviceQueue.queue, configs.queue.exchangeName, configs.queue.emailService);
const serviceQueue = await channel.assertQueue(configs.queue.emailQueue, { durable: false });
channel.consume(serviceQueue.queue, (data) => {
if (data) {
const queueItem = JSON.parse(JSON.parse(data.content.toString()));
Expand All @@ -27,4 +26,4 @@ const subscribeMessages = async (channel: Channel, service: any) => {
});
};

export { createChannel, publishMessage, subscribeMessages };
export default { createChannel, publishMessage, subscribeMessages };
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2016",
"target": "es2017",
"module": "commonjs",
"rootDir": "./src",
"moduleResolution": "node",
Expand Down
2 changes: 1 addition & 1 deletion typings/express/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ declare global {
*
* request.queue.publishMessage(channel, bindingKey, message);
*/
publishMessage(channel: Channel, bindingKey: string, message: any): Promise<void>;
publishMessage(channel: Channel, message: any): void;

/**
* Consume the messages that are published to the message queue. The function will take 3 parameters.
Expand Down

0 comments on commit 3125e3b

Please sign in to comment.