Skip to content

Commit

Permalink
Merge pull request #10 from lvchkn/refactoring-1.6
Browse files Browse the repository at this point in the history
Refactoring - v1.6
  • Loading branch information
lvchkn authored Oct 1, 2023
2 parents 8fda5de + 771ae2e commit 2dcadc4
Show file tree
Hide file tree
Showing 21 changed files with 1,190 additions and 1,834 deletions.
18 changes: 10 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@

### How to run locally

Create .env file in the app's root directory (use .env.example as a reference).
1. Create .env file in the app's root directory (use .env.example as a reference).

Start the compose stack:
1. Start the compose stack:

```bash
docker compose up
```
```bash
docker compose up
```

Then run `npm i` and either of the following options:
1. Run the following commands:

- `npm run local` - run with ts-node
- `npm run local:watch` - run in watch mode with nodemon and ts-node.
```bash
npm i
npm run start
```

#### Run tests:

Expand Down
18 changes: 18 additions & 0 deletions db/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,30 @@ export async function connectToMongoDb() {
try {
await client.connect();
console.log("Successfully connected to Mongo!");
await createIndexes();
} catch (error) {
console.error("Error occurred while connecting to Mongo!", error);
await client.close();
}
}

async function createIndexes(): Promise<void> {
const collectionName = "releases";
const db = client.db("rymdata");

const options = {
collation: {
locale: "en",
strength: 2,
},
};

await db.collection(collectionName).createIndex({ artist: 1 }, options);
await db.collection(collectionName).createIndex({ album: 1 }, options);
await db.collection(collectionName).createIndex({ genres: 1 }, options);
await db.collection(collectionName).createIndex({ year: 1 }, options);
}

export function getClient() {
return client;
}
35 changes: 19 additions & 16 deletions db/releasesReadRepo.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,53 @@
import { Filter, WithId } from "mongodb";
import { Release } from "../parser.js";
import { FilterOptions } from "../routers/releasesRouter.js";
import { getClient } from "./mongo.js";
import { Release } from "../models/release.js";
import { Query } from "../services/queryProcessor.js";

const client = getClient();
const db = client.db("rymdata");
const releasesCollectionName = "releases";

function parseFilters(filterOptions: FilterOptions): Filter<Release> {
function parseFilters(query: Query): Filter<Release> {
const { artists, albums, genres, years } = query;
const filter: Filter<Release> = {};

if (filterOptions.artists) {
if (artists.length > 0) {
filter.artist = {
$in: filterOptions.artists,
$in: artists,
};
}

if (filterOptions.albums) {
if (albums.length > 0) {
filter.album = {
$in: filterOptions.albums,
$in: albums,
};
}

if (filterOptions.genres) {
if (genres.length > 0) {
filter.genres = {
$all: filterOptions.genres,
$all: genres,
};
}

if (filterOptions.years) {
if (years.length > 0) {
filter.year = {
$in: filterOptions.years,
$in: years,
};
}

return filter;
}

export async function getAllReleases(
filterOptions: FilterOptions
): Promise<WithId<Release>[]> {
const filters = parseFilters(filterOptions);
export async function getAllReleases(query: Query): Promise<WithId<Release>[]> {
const filters = parseFilters(query);

const releasesCursor = db
.collection<Release>(releasesCollectionName)
.find(filters);
.find(filters)
.collation({
locale: "en",
strength: 2,
});
const releases = releasesCursor.toArray();

return releases;
Expand Down
2 changes: 1 addition & 1 deletion db/releasesWriteRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import {
BulkWriteResult,
MongoBulkWriteError,
} from "mongodb";
import { Release } from "../parser.js";
import { getClient } from "./mongo.js";
import { Release } from "../models/release.js";

const client = getClient();
const db = client.db("rymdata");
Expand Down
6 changes: 1 addition & 5 deletions db/tasksRepo.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import { WithId } from "mongodb";
import { getClient } from "./mongo.js";
import { Task } from "../models/task.js";

const client = getClient();
const db = client.db("rymdata");
const tasksCollectionName = "tasks";

export interface Task {
id: string;
status: string;
}

export async function getTask(taskId: string): Promise<WithId<Task>> {
const task = await db
.collection<Task>(tasksCollectionName)
Expand Down
7 changes: 7 additions & 0 deletions models/parseRequest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export interface ParseRequest {
profile: string;
tag: string;
fromPage: number;
toPage: number;
isTest?: boolean;
}
6 changes: 6 additions & 0 deletions models/release.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface Release {
artist: string;
album: string;
genres: string[];
year: number;
}
6 changes: 6 additions & 0 deletions models/task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export type TaskStatus = "Pending" | "Completed";

export interface Task {
id: string;
status: TaskStatus;
}
79 changes: 39 additions & 40 deletions mq/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,51 +1,50 @@
import { ConsumeMessage } from "amqplib";
import { Task, upsertTask } from "../db/tasksRepo.js";
import { parseAndSave } from "../parser.js";
import { Channel, Connection, ConsumeMessage } from "amqplib";
import { upsertTask } from "../db/tasksRepo.js";
import { connectToRabbitMq } from "./amqp.js";
import { save } from "../services/dataSaver.js";
import { Task } from "../models/task.js";

export async function startConsumer(): Promise<void> {
const connection = await connectToRabbitMq();
const channel = await connection.createChannel();
let channel: Channel;
let connection: Connection;

process.once("SIGINT", async () => {
await channel.close();
await connection.close();
});
process.once("SIGINT", async () => {
await channel.close();
await connection.close();
});

export async function startConsumer(): Promise<void> {
connection = await connectToRabbitMq();
channel = await connection.createChannel();

const { queue } = await channel.assertQueue("parse-tasks-queue", {
durable: true,
});

await channel.prefetch(2);
await channel.consume(queue, onMessageHandler, { noAck: false });
}

async function onMessageHandler(message: ConsumeMessage | null): Promise<void> {
if (message === null) {
console.log("Error while receiving the message");
return;
}

const json = message.content.toString();
console.log("Message received!");

const parseRequest = JSON.parse(json);
const result = await save(parseRequest);

console.log(JSON.stringify(result));

const task: Task = {
id: message.properties.messageId,
status: "Completed",
};

const upsertTaskResult = await upsertTask(task);
console.log("upsertTaskResult completed:", JSON.stringify(upsertTaskResult));

await channel.consume(
queue,
async (message: ConsumeMessage | null) => {
if (message !== null) {
const json = message.content.toString();
console.log("Message received!");

const parseRequest = JSON.parse(json);
const result = await parseAndSave(parseRequest);

console.log(JSON.stringify(result));

const task: Task = {
id: message.properties.messageId,
status: "Completed",
};

const upsertTaskResult = await upsertTask(task);
console.log(
"upsertTaskResult completed:",
JSON.stringify(upsertTaskResult)
);

channel.ack(message);
} else {
console.log("Error while receiving the message");
}
},
{ noAck: false }
);
channel.ack(message);
}
5 changes: 3 additions & 2 deletions mq/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { Connection } from "amqplib";
import { v4 as uuidv4 } from "uuid";
import { exit } from "process";
import { connectToRabbitMq } from "./amqp.js";
import { Task, upsertTask } from "../db/tasksRepo.js";
import { upsertTask } from "../db/tasksRepo.js";
import { Task } from "../models/task.js";

let connection: Connection | null = null;

Expand Down Expand Up @@ -32,7 +33,7 @@ export async function publishMessage(message) {

const task: Task = {
id: uuid,
status: "pending",
status: "Pending",
};

const upsertTaskResult = await upsertTask(task);
Expand Down
Loading

0 comments on commit 2dcadc4

Please sign in to comment.