Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Push Publisher #27

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
1 change: 1 addition & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"rules": {
"@typescript-eslint/explicit-function-return-type": "warn",
"@typescript-eslint/strict-boolean-expressions": "warn",
"@typescript-eslint/no-non-null-assertion": "off",
"simple-import-sort/sort": "error"
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"cleaners": "^0.3.12",
"compression": "^1.7.4",
"cors": "^2.8.5",
"edge-server-tools": "^0.2.11",
"edge-server-tools": "^0.2.13",
"express": "^4.17.1",
"firebase-admin": "^8.12.1",
"morgan": "^1.10.0",
Expand Down
64 changes: 64 additions & 0 deletions src/NotificationManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import io from '@pm2/io'
import admin from 'firebase-admin'

import { ApiKey } from './models'

import BatchResponse = admin.messaging.BatchResponse

const successCounter = io.counter({
id: 'notifications:success:total',
name: 'Total Successful Notifications'
})
const failureCounter = io.counter({
id: 'notifications:failure:total',
name: 'Total Failed Notifications'
})

export const createNotificationManager = async (
apiKey: ApiKey | string
): Promise<admin.app.App> => {
if (typeof apiKey === 'string') apiKey = await ApiKey.fetch(apiKey)

const name = `app:${apiKey.appId}`
let app: admin.app.App
try {
app = admin.app(name)
} catch (err) {
app = admin.initializeApp(
{
credential: admin.credential.cert(apiKey.adminsdk)
},
name
)
}
return app
}

export const sendNotification = async (
app: admin.app.App,
title: string,
body: string,
tokens: string[],
data = {}
): Promise<BatchResponse> => {
const message: admin.messaging.MulticastMessage = {
notification: {
title,
body
},
data,
tokens
}

try {
const response = await app.messaging().sendMulticast(message)

successCounter.inc(response.successCount)
failureCounter.inc(response.failureCount)

return response
} catch (err) {
console.error(JSON.stringify(err, null, 2))
throw err
}
}
14 changes: 0 additions & 14 deletions src/api/index.ts

This file was deleted.

40 changes: 24 additions & 16 deletions src/api/router.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { asArray, asObject, asString, asUnknown } from 'cleaners'
import { asArray, asObject, asString } from 'cleaners'
import {
type HttpRequest,
type HttpResponse,
Expand All @@ -10,10 +10,13 @@ import {
jsonResponse,
statusCodes,
statusResponse
} from '../types/response-types'
} from '../types/http/response-types'
import { asAction } from '../types/task/Action'
import { asActionEffect } from '../types/task/ActionEffect'
import {
DbDoc,
asTaskDoc,
logger,
TaskDoc,
wrappedDeleteFromDb,
wrappedGetFromDb,
wrappedSaveToDb
Expand Down Expand Up @@ -51,31 +54,34 @@ const getTaskRoute = async (request: HttpRequest): Promise<HttpResponse> => {
}

// Construct a body and returns it as an HttpResponse.
// The body should have triggers, action, and taskId.
// The body should have actionEffects, action, userId, _id and taskId.
const createTaskRoute = async (request: HttpRequest): Promise<HttpResponse> => {
try {
const asBody = asObject({
taskId: asString,
triggers: asArray(asUnknown),
action: asUnknown
actionEffects: asArray(asActionEffect),
action: asAction
})

const queryObject = getQueryParamObject(
['taskId', 'triggers', 'action'],
['taskId', 'actionEffects', 'action'],
request.path
)
const triggersAsString = queryObject.triggers
const triggersAsArray = convertStringToArray(triggersAsString)
queryObject.triggers = triggersAsArray ?? []
const { taskId, triggers, action } = asBody(queryObject)
const actionEffectsAsString = queryObject.actionEffects
const actionEffectsAsArray = convertStringToArray(actionEffectsAsString)
queryObject.actionEffects = actionEffectsAsArray ?? []
const { taskId, actionEffects, action } = asBody(queryObject)
const cleanedAction = asAction(action)

const doc: DbDoc = {
taskId,
const doc: TaskDoc = asTaskDoc({
taskId: taskId,
userId: request.headers.userId,
triggers,
action,
actionEffects: actionEffects.map(actionEffect =>
asActionEffect(actionEffect)
),
cleanedAction,
_id: `${request.headers.userId}:${taskId}` // To help with partitioning
}
})

await wrappedSaveToDb([doc])
return statusResponse(statusCodes.SUCCESS, 'Successfully created the task')
Expand All @@ -85,6 +91,8 @@ const createTaskRoute = async (request: HttpRequest): Promise<HttpResponse> => {
}
}

// Remove tasks from the database. If the taskIds array is empty, it
// will delete all tasks under the userId.
const deleteTaskRoute = async (request: HttpRequest): Promise<HttpResponse> => {
try {
const asQuery = asObject({
Expand Down
17 changes: 0 additions & 17 deletions src/config.ts

This file was deleted.

37 changes: 17 additions & 20 deletions src/couchSetup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
} from 'edge-server-tools'
import { ServerScope } from 'nano'

import { tasksListening, tasksPublishing } from './database/views/couch-tasks'
import { serverConfig } from './serverConfig'

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -48,21 +49,18 @@ export const settingsSetup: DatabaseSetup = {

const apiKeysSetup: DatabaseSetup = { name: 'db_api_keys' }

const thresholdsSetup: DatabaseSetup = { name: 'db_currency_thresholds' }

const devicesSetup: DatabaseSetup = { name: 'db_devices' }

const usersSetup: DatabaseSetup = {
name: 'db_user_settings'
// documents: {
// '_design/filter': makeJsDesign('by-currency', ?),
// '_design/map': makeJsDesign('currency-codes', ?)
// }
}

const defaultsSetup: DatabaseSetup = {
name: 'defaults'
// syncedDocuments: ['thresholds']
const tasksSetup: DatabaseSetup = {
name: 'db_tasks',
// Turn on partition by userId for performance and security reasons.
// https://docs.couchdb.org/en/3.2.2/partitioned-dbs/index.html
options: {
partitioned: true
},
// Set up the views
documents: {
'_design/tasks_listening': tasksListening,
'_design/tasks_publishing': tasksPublishing
}
}

// ---------------------------------------------------------------------------
Expand All @@ -79,13 +77,12 @@ export async function setupDatabases(
replicatorSetup: syncedReplicators,
disableWatching
}

// @ts-expect-error
await setupDatabase(connection, settingsSetup, options)
await Promise.all([
// @ts-expect-error
setupDatabase(connection, apiKeysSetup, options),
setupDatabase(connection, thresholdsSetup, options),
setupDatabase(connection, devicesSetup, options),
setupDatabase(connection, usersSetup, options),
setupDatabase(connection, defaultsSetup, options)
// @ts-expect-error
setupDatabase(connection, tasksSetup, options)
])
}
81 changes: 81 additions & 0 deletions src/database/views/couch-tasks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Configures couchDB views that are used to model message queues.
* Associated helper functions are also provided.
*
* Publishers listen to these views to perform actions on update. One
* way of doing this is to use the {@link viewToStream} function from
* `edge-server-tools`.
*
* A key advantage of using views is that documents are programmatically
* indexed and serverd to views based on certain conditions, thereby
* elimitating the need to build seqarate listeners that subscribe to db
* documents and perform actions on update.
*
* Views can be named as a string, just like a normal database. They can
* be called by using `db.view(name, params)` method. The response will
* be of type `nano.DocumentViewResponse<T>` where `T` is the shape of the
* documents defined elsewhere. This type has a `rows` property that is
* consistent with many other getter methods in nano.
*/

// Certain import lines have lintings disabled because they are
// referenced only by documentation comments.
import {
JsDesignDocument,
makeJsDesign,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
viewToStream
} from 'edge-server-tools'

// eslint-disable-next-line @typescript-eslint/no-unused-vars
import { ActionEffect } from '../../types/task/ActionEffect'
import { Task } from '../../types/task/Task'
import { dbTasks, logger, packChange, TaskDoc } from '../../utils/dbUtils'

/**
* A view that indexes to all tasks that contain at least one incomplete
* {@link ActionEffect}.
*
* @remarks
* This view is not intended to be subscribed by any publishers. Think
* of this as a staging area for ongoing tasks.
*/
export const tasksListening: JsDesignDocument = makeJsDesign(
'tasks_listening',
() => ({
filter: function (taskDoc: TaskDoc) {
return taskDoc.doc.actionEffects.some(e => e.completed === false)
}
})
)

/**
* A view that indexes to all tasks with all {@link ActionEffect}
* completed.
*/
export const tasksPublishing: JsDesignDocument = makeJsDesign(
'tasks_publishing',
() => ({
filter: function (taskDoc: TaskDoc) {
return taskDoc.doc.actionEffects.every(Boolean)
}
})
)

/**
* Updates the a task document in the `db_tasks` database. The function
* receives a {@link Task} object and updates the relavent document based on
* the content of this task.
* @param {Task} updatedTask - The task that has its `action.inProgress`
* flag updated.
*/
export const updateInProgress = async (
updatedTask: Task,
id: string
): Promise<void> => {
try {
await dbTasks.insert(packChange(updatedTask, id))
} catch (e) {
logger(`Failed to make ${updatedTask.taskId}'s action as inprogress: `, e)
}
}
33 changes: 33 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import nano from 'nano'
import { makeExpressRoute } from 'serverlet/express'

import { pushNotificationRouterV2 } from './api/router'
import { setupDatabases } from './couchSetup'
import { createServer } from './server'
import { serverConfig } from './serverConfig'

async function main(): Promise<void> {
// Set up databases:
const connection = nano(serverConfig.couchUri)
await setupDatabases(connection)

// Create server
const server = createServer(
makeExpressRoute(pushNotificationRouterV2),
serverConfig
)

// Start Server
server.listen(server.get('httpPort'), server.get('httpHost'), () => {
console.log(
`Express server listening on port ${JSON.stringify(
server.get('httpPort')
)}`
)
})
}

main().catch(error => {
console.error(error)
process.exit(1)
})
2 changes: 2 additions & 0 deletions src/models/CurrencyThreshold.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ export class CurrencyThreshold extends Base implements ICurrencyThreshold {
price: number
): Promise<CurrencyThreshold> {
const threshold = this.thresholds[hours] ?? {
custom: undefined,
lastUpdated: 0,
price: 0
}
threshold.lastUpdated = timestamp
threshold.price = price
this.thresholds[hours] = threshold

return (await this.save()) as CurrencyThreshold
}
}
Loading