Skip to content

Commit

Permalink
Merge pull request #480 from Half-Shot/hs/metrics
Browse files Browse the repository at this point in the history
Metrics
  • Loading branch information
Half-Shot authored Jun 13, 2019
2 parents c30b07f + c3eaaa8 commit b3a8e7a
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 11 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ docker build -t halfshot/matrix-appservice-discord .
# Run the container
docker run -v /matrix-appservice-discord:/data -p 9005:9005 halfshot/matrix-appservice-discord
```
#### Metrics

The bridge supports reporting metrics via Prometheus. You can configure metrics support in the config
file. The metrics will be reported under the URL provided in the registration file, on the `/metrics` endpoint.

#### 3PID Protocol Support

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"node-html-parser": "^1.1.11",
"p-queue": "^6.0.1",
"pg-promise": "^8.5.1",
"prom-client": "^11.3.0",
"tslint": "^5.11.0",
"typescript": "^3.1.3",
"winston": "^3.0.0",
Expand Down
23 changes: 20 additions & 3 deletions src/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import * as Discord from "discord.js";
import * as mime from "mime";
import { IMatrixEvent, IMatrixMediaInfo } from "./matrixtypes";
import { DiscordCommandHandler } from "./discordcommandhandler";
import { MetricPeg } from "./metrics";

const log = new Log("DiscordBot");

Expand Down Expand Up @@ -240,6 +241,7 @@ export class DiscordBot {
client.on("messageDelete", async (msg: Discord.Message) => {
try {
await this.waitUnlock(msg.channel);
this.clientFactory.bindMetricsToChannel(msg.channel as Discord.TextChannel);
this.discordMessageQueue[msg.channel.id] = (async () => {
await (this.discordMessageQueue[msg.channel.id] || Promise.resolve());
try {
Expand All @@ -260,6 +262,7 @@ export class DiscordBot {
promiseArr.push(async () => {
try {
await this.waitUnlock(msg.channel);
this.clientFactory.bindMetricsToChannel(msg.channel as Discord.TextChannel);
await this.DeleteDiscordMessage(msg);
} catch (err) {
log.error("Caught while handling 'messageDeleteBulk'", err);
Expand All @@ -274,6 +277,7 @@ export class DiscordBot {
client.on("messageUpdate", async (oldMessage: Discord.Message, newMessage: Discord.Message) => {
try {
await this.waitUnlock(newMessage.channel);
this.clientFactory.bindMetricsToChannel(newMessage.channel as Discord.TextChannel);
this.discordMessageQueue[newMessage.channel.id] = (async () => {
await (this.discordMessageQueue[newMessage.channel.id] || Promise.resolve());
try {
Expand All @@ -288,12 +292,15 @@ export class DiscordBot {
});
client.on("message", async (msg: Discord.Message) => {
try {
MetricPeg.get.registerRequest(msg.id);
await this.waitUnlock(msg.channel);
this.clientFactory.bindMetricsToChannel(msg.channel as Discord.TextChannel);
this.discordMessageQueue[msg.channel.id] = (async () => {
await (this.discordMessageQueue[msg.channel.id] || Promise.resolve());
try {
await this.OnMessage(msg);
} catch (err) {
MetricPeg.get.requestOutcome(msg.id, true, "fail");
log.error("Caught while handing 'message'", err);
}
})();
Expand Down Expand Up @@ -389,6 +396,7 @@ export class DiscordBot {
}
const channel = guild.channels.get(room);
if (channel && channel.type === "text") {
this.ClientFactory.bindMetricsToChannel(channel as Discord.TextChannel);
const lookupResult = new ChannelLookupResult();
lookupResult.channel = channel as Discord.TextChannel;
lookupResult.botUser = this.bot.user.id === client.user.id;
Expand Down Expand Up @@ -448,6 +456,7 @@ export class DiscordBot {
// NOTE: Don't send replies to discord if we are a puppet.
msg = await chan.send(embed.description, opts);
} else if (hook) {
MetricPeg.get.remoteCall("hook.send");
msg = await hook.send(embed.description, {
avatarURL: embed!.author!.icon_url,
embeds: embedSet.replyEmbed ? [embedSet.replyEmbed] : undefined,
Expand Down Expand Up @@ -539,6 +548,7 @@ export class DiscordBot {
if (guild) {
const channel = client.channels.get(entry.remote!.get("discord_channel") as string);
if (channel) {
this.ClientFactory.bindMetricsToChannel(channel as Discord.TextChannel);
return channel;
}
throw Error("Channel given in room entry not found");
Expand Down Expand Up @@ -738,11 +748,13 @@ export class DiscordBot {
if (indexOfMsg !== -1) {
log.verbose("Got repeated message, ignoring.");
delete this.sentMessages[indexOfMsg];
MetricPeg.get.requestOutcome(msg.id, true, "dropped");
return; // Skip *our* messages
}
const chan = msg.channel as Discord.TextChannel;
if (msg.author.id === this.bot.user.id) {
// We don't support double bridging.
MetricPeg.get.requestOutcome(msg.id, true, "dropped");
return;
}
// Test for webhooks
Expand All @@ -751,13 +763,15 @@ export class DiscordBot {
.filterArray((h) => h.name === "_matrix").pop();
if (webhook && msg.webhookID === webhook.id) {
// Filter out our own webhook messages.
MetricPeg.get.requestOutcome(msg.id, true, "dropped");
return;
}
}

// check if it is a command to process by the bot itself
if (msg.content.startsWith("!matrix")) {
await this.discordCommandHandler.Process(msg);
MetricPeg.get.requestOutcome(msg.id, true, "success");
return;
}

Expand All @@ -766,14 +780,15 @@ export class DiscordBot {
let rooms;
try {
rooms = await this.channelSync.GetRoomIdsFromChannel(msg.channel);
if (rooms === null) {
throw Error();
}
} catch (err) {
log.verbose("No bridged rooms to send message to. Oh well.");
MetricPeg.get.requestOutcome(msg.id, true, "dropped");
return null;
}
try {
if (rooms === null) {
return null;
}
const intent = this.GetIntentFromDiscordMember(msg.author, msg.webhookID);
// Check Attachements
await Util.AsyncForEach(msg.attachments.array(), async (attachment) => {
Expand Down Expand Up @@ -854,7 +869,9 @@ export class DiscordBot {
await afterSend(res);
}
});
MetricPeg.get.requestOutcome(msg.id, true, "success");
} catch (err) {
MetricPeg.get.requestOutcome(msg.id, true, "fail");
log.verbose("Failed to send message into room.", err);
}
}
Expand Down
21 changes: 17 additions & 4 deletions src/clientfactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ limitations under the License.

import { DiscordBridgeConfigAuth } from "./config";
import { DiscordStore } from "./store";
import { Client as DiscordClient } from "discord.js";
import { Client as DiscordClient, TextChannel } from "discord.js";
import { Log } from "./log";
import { Util } from "./util";
import { MetricPeg } from "./metrics";

const log = new Log("ClientFactory");

const READY_TIMEOUT = 30000;

export class DiscordClientFactory {
private config: DiscordBridgeConfigAuth;
private store: DiscordStore;
Expand Down Expand Up @@ -110,4 +108,19 @@ export class DiscordClientFactory {
return this.botClient;
}
}

public bindMetricsToChannel(channel: TextChannel) {
// tslint:disable-next-line:no-any
const flexChan = channel as any;
if (flexChan._xmet_send !== undefined) {
return;
}
// Prefix the real functions with _xmet_
flexChan._xmet_send = channel.send;
// tslint:disable-next-line:only-arrow-functions
channel.send = function() {
MetricPeg.get.remoteCall("channel.send");
return flexChan._xmet_send.apply(channel, arguments);
};
}
}
1 change: 1 addition & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class DiscordBridgeConfigBridge {
public disableEveryoneMention: boolean = false;
public disableHereMention: boolean = false;
public disableJoinLeaveNotifications: boolean = false;
public enableMetrics: boolean = false;
}

export class DiscordBridgeConfigDatabase {
Expand Down
9 changes: 9 additions & 0 deletions src/db/roomstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { Util } from "../util";

import * as uuid from "uuid/v4";
import { Postgres } from "./postgres";
import { MetricPeg } from "../metrics";

const log = new Log("DbRoomStore");

Expand Down Expand Up @@ -155,8 +156,10 @@ export class DbRoomStore {
public async getEntriesByMatrixId(matrixId: string): Promise<IRoomStoreEntry[]> {
const cached = this.entriesMatrixIdCache.get(matrixId);
if (cached && cached.ts + ENTRY_CACHE_LIMETIME > Date.now()) {
MetricPeg.get.storeCall("RoomStore.getEntriesByMatrixId", true);
return cached.e;
}
MetricPeg.get.storeCall("RoomStore.getEntriesByMatrixId", false);
const entries = await this.db.All(
"SELECT * FROM room_entries WHERE matrix_id = $id", {id: matrixId},
);
Expand Down Expand Up @@ -190,6 +193,7 @@ export class DbRoomStore {
}

public async getEntriesByMatrixIds(matrixIds: string[]): Promise<IRoomStoreEntry[]> {
MetricPeg.get.storeCall("RoomStore.getEntriesByMatrixIds", false);
const mxIdMap = { };
matrixIds.forEach((mxId, i) => mxIdMap[i] = mxId);
const sql = `SELECT * FROM room_entries WHERE matrix_id IN (${matrixIds.map((_, id) => `\$${id}`).join(", ")})`;
Expand Down Expand Up @@ -222,6 +226,7 @@ export class DbRoomStore {
}

public async linkRooms(matrixRoom: MatrixStoreRoom, remoteRoom: RemoteStoreRoom) {
MetricPeg.get.storeCall("RoomStore.linkRooms", false);
await this.upsertRoom(remoteRoom);

const values = {
Expand All @@ -244,6 +249,7 @@ export class DbRoomStore {
}

public async getEntriesByRemoteRoomData(data: IRemoteRoomDataLazy): Promise<IRoomStoreEntry[]> {
MetricPeg.get.storeCall("RoomStore.getEntriesByRemoteRoomData", false);
Object.keys(data).filter((k) => typeof(data[k]) === "boolean").forEach((k) => {
data[k] = Number(data[k]);
});
Expand All @@ -270,11 +276,13 @@ export class DbRoomStore {
}

public async removeEntriesByRemoteRoomId(remoteId: string) {
MetricPeg.get.storeCall("RoomStore.removeEntriesByRemoteRoomId", false);
await this.db.Run(`DELETE FROM room_entries WHERE remote_id = $remoteId`, {remoteId});
await this.db.Run(`DELETE FROM remote_room_data WHERE room_id = $remoteId`, {remoteId});
}

public async removeEntriesByMatrixRoomId(matrixId: string) {
MetricPeg.get.storeCall("RoomStore.removeEntriesByMatrixRoomId", false);
const entries = (await this.db.All(`SELECT * FROM room_entries WHERE matrix_id = $matrixId`, {matrixId})) || [];
await Util.AsyncForEach(entries, async (entry) => {
if (entry.remote_id) {
Expand All @@ -286,6 +294,7 @@ export class DbRoomStore {
}

private async upsertRoom(room: RemoteStoreRoom) {
MetricPeg.get.storeCall("RoomStore.upsertRoom", false);
if (!room.data) {
throw new Error("Tried to upsert a room with undefined data");
}
Expand Down
7 changes: 6 additions & 1 deletion src/db/userstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ limitations under the License.
*/

import { IDatabaseConnector } from "./connector";
import * as uuid from "uuid/v4";
import { Log } from "../log";
import { MetricPeg } from "../metrics";

/**
* A UserStore compatible with
Expand Down Expand Up @@ -54,8 +54,11 @@ export class DbUserStore {
public async getRemoteUser(remoteId: string): Promise<RemoteUser|null> {
const cached = this.remoteUserCache.get(remoteId);
if (cached && cached.ts + ENTRY_CACHE_LIMETIME > Date.now()) {
MetricPeg.get.storeCall("UserStore.getRemoteUser", true);
return cached.e;
}
MetricPeg.get.storeCall("UserStore.getRemoteUser", false);

const row = await this.db.Get(
"SELECT * FROM user_entries WHERE remote_id = $id", {id: remoteId},
);
Expand Down Expand Up @@ -86,6 +89,7 @@ export class DbUserStore {
}

public async setRemoteUser(user: RemoteUser) {
MetricPeg.get.storeCall("UserStore.setRemoteUser", false);
this.remoteUserCache.delete(user.id);
const existingData = await this.db.Get(
"SELECT * FROM remote_user_data WHERE remote_id = $remoteId",
Expand Down Expand Up @@ -156,6 +160,7 @@ AND guild_id = $guild_id`,
}

public async linkUsers(matrixId: string, remoteId: string) {
MetricPeg.get.storeCall("UserStore.linkUsers", false);
// This is used ONCE in the bridge to link two IDs, so do not UPSURT data.
try {
await this.db.Run(`INSERT INTO user_entries VALUES ($matrixId, $remoteId)`, {
Expand Down
13 changes: 12 additions & 1 deletion src/discordas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { DiscordBot } from "./bot";
import { DiscordStore } from "./store";
import { Log } from "./log";
import "source-map-support/register";
import { MetricPeg, PrometheusBridgeMetrics } from "./metrics";

const log = new Log("DiscordAS");

Expand Down Expand Up @@ -94,13 +95,16 @@ async function run(port: number, fileConfig: DiscordBridgeConfig) {
} catch (err) { log.error("Exception thrown while handling \"onAliasQuery\" event", err); }
},
onEvent: async (request) => {
const data = request.getData();
try {
MetricPeg.get.registerRequest(data.event_id);
// Build our own context.
if (!store.roomStore) {
log.warn("Discord store not ready yet, dropping message");
MetricPeg.get.requestOutcome(data.event_id, false, "dropped");
return;
}
const roomId = request.getData().room_id;
const roomId = data.room_id;

const context: BridgeContext = {
rooms: {},
Expand All @@ -112,7 +116,9 @@ async function run(port: number, fileConfig: DiscordBridgeConfig) {
}

await request.outcomeFrom(callbacks.onEvent(request, context));
MetricPeg.get.requestOutcome(data.event_id, false, "success");
} catch (err) {
MetricPeg.get.requestOutcome(data.event_id, false, "fail");
log.error("Exception thrown while handling \"onEvent\" event", err);
await request.outcomeFrom(Promise.reject("Failed to handle"));
}
Expand Down Expand Up @@ -160,6 +166,11 @@ async function run(port: number, fileConfig: DiscordBridgeConfig) {
await bridge.run(port, config);
log.info(`Started listening on port ${port}`);

if (config.bridge.enableMetrics) {
log.info("Enabled metrics");
MetricPeg.set(new PrometheusBridgeMetrics().init(bridge));
}

try {
await store.init(undefined, bridge.getRoomStore(), bridge.getUserStore());
} catch (ex) {
Expand Down
Loading

0 comments on commit b3a8e7a

Please sign in to comment.