Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add observability to anchoring worker #1199

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/__tests__/ceramic_integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import { teeDbConnection } from './tee-db-connection.util.js'
import { CARFactory, type CAR } from 'cartonne'

process.env.NODE_ENV = 'test'
process.env['CAS_USE_IPFS_STORAGE'] = 'true'

const randomNumber = Math.floor(Math.random() * 10000)
const TOPIC = `/ceramic/local/${randomNumber}`
Expand Down
1 change: 1 addition & 0 deletions src/services/__tests__/anchor-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import { MockEventProducerService } from './mock-event-producer-service.util.js'
import { type IMerkleCarService, makeMerkleCarService } from '../merkle-car-service.js'

process.env['NODE_ENV'] = 'test'
process.env['CAS_USE_IPFS_STORAGE'] = 'true';

async function anchorCandidates(
candidates: Candidate[],
Expand Down
44 changes: 33 additions & 11 deletions src/services/anchor-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,45 +302,65 @@ export class AnchorService {
private async _anchorCandidates(candidates: Candidate[]): Promise<Partial<AnchorSummary>> {
logger.imp(`Creating Merkle tree from ${candidates.length} selected streams`)
const span = Metrics.startSpan('anchor_candidates')

const buildMerkleTreeSpan = Metrics.startSpan('build_merkle_tree')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does startSpan create timing metrics automatically? As in, how long a given span takes on avg, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A span represents a single operation within a specific trace. Traces are used to monitor the execution paths in a system. startSpan() will handle timing by reporting start and endTime.
It does not allow us to do aggregations, like counts or gauges. So the answer to your question is : no https://opentelemetry.io/docs/concepts/signals/traces/#spans

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, can we please add metrics for the time spent in each of those spans?

Counter for how many time a span is taken, a summary for how long each took, etc.

Copy link
Contributor Author

@samika98 samika98 May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if this is necessary after we data-dog profiling enabled on CAS @christianlavoie @smrz2001

const merkleTree = await this._buildMerkleTree(candidates)
buildMerkleTreeSpan.end()

// create and send ETH transaction
const createAndSendTransactionSpan = Metrics.startSpan('create_and_send_transaction')
const tx: Transaction = await this.transactionRepository.withTransactionMutex(() => {
logger.debug('Preparing to send transaction to put merkle root on blockchain')
return this.blockchainService.sendTransaction(merkleTree.root.data.cid)
})
createAndSendTransactionSpan.end()

// Create proof
const createIPFSPan = Metrics.startSpan('create_ipfs_proof')
logger.debug('Creating IPFS anchor proof')
const ipfsProofCid = this._createIPFSProof(merkleTree.car, tx, merkleTree.root.data.cid)
createIPFSPan.end()

// Create anchor records on IPFS
const createAnchorCommitsSpan = Metrics.startSpan('create_anchor_commits')
logger.debug('Creating anchor commits')
const anchors = await this._createAnchorCommits(ipfsProofCid, merkleTree)

logger.debug('Importing Merkle CAR to IPFS')
try {
await this.ipfsService.importCAR(merkleTree.car)
} catch (e) {
Metrics.count(METRIC_NAMES.MERKLE_CAR_STORAGE_FAILURE_IPFS, 1)
const message = `Can not store Merkle CAR to IPFS. Batch failed: ${e}`
logger.err(message)
throw e
createAnchorCommitsSpan.end()

// Do not store CAR file in IPFS by default
const importCARSpan = Metrics.startSpan('import_car')
if (process.env['CAS_USE_IPFS_STORAGE']) {
logger.debug('Importing Merkle CAR to IPFS')
try {
await this.ipfsService.importCAR(merkleTree.car)
Metrics.count(METRIC_NAMES.MERKLE_CAR_STORAGE_SUCCESS_IPFS, 1)
} catch (e) {
Metrics.count(METRIC_NAMES.MERKLE_CAR_STORAGE_FAILURE_IPFS, 1)
const message = `Can not store Merkle CAR to IPFS. Batch failed: ${e}`
logger.err(message)
throw e
}
}
importCARSpan.end()

const storeCARSpan = Metrics.startSpan('store_car')
logger.debug('Storing Merkle CAR file')
try {
await this.merkleCarService.storeCarFile(ipfsProofCid, merkleTree.car)
Metrics.count(METRIC_NAMES.MERKLE_CAR_STORAGE_SUCCESS_S3, 1)
} catch (e) {
Metrics.count(METRIC_NAMES.MERKLE_CAR_STORAGE_FAILURE_S3, 1)
const message = `Can not store Merkle CAR to S3. Batch failed: ${e}`
logger.err(message)
throw e
}
storeCARSpan.end()

// Update the database to record the successful anchors
logger.debug('Persisting results to local database')
const persistAnchorResultSpan = Metrics.startSpan('persist_anchor_result')
const persistedAnchorsCount = await this._persistAnchorResult(anchors, candidates)
persistAnchorResultSpan.end()

logger.imp(`Service successfully anchored ${anchors.length} CIDs.`)
Metrics.count(METRIC_NAMES.ANCHOR_SUCCESS, anchors.length)
Expand Down Expand Up @@ -504,8 +524,10 @@ export class AnchorService {
}

try {
await this.ipfsService.storeRecord(ipfsAnchorCommit)

// Do not store in IPFS by default
if (process.env['CAS_USE_IPFS_STORAGE']) {
await this.ipfsService.storeRecord(ipfsAnchorCommit)
}
// Do not publish to pubsub by default
if (process.env['CAS_PUBSUB_PUBLISH']) {
// TODO: Remove this case entirely after js-ceramic no longer supports pubsub
Expand Down
2 changes: 2 additions & 0 deletions src/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export enum METRIC_NAMES {
NO_ANCHOR_FOR_REQUEST = 'no_anchor_for_request',
MERKLE_CAR_STORAGE_FAILURE_IPFS = 'merkle_car_storage_failure_ipfs',
MERKLE_CAR_STORAGE_FAILURE_S3 = 'merkle_car_storage_failure_s3',
MERKLE_CAR_STORAGE_SUCCESS_IPFS = 'merkle_car_storage_success_ipfs',
MERKLE_CAR_STORAGE_SUCCESS_S3 = 'merkle_car_storage_success_s3',

// Transaction repository
MANY_ATTEMPTS_TO_ACQUIRE_MUTEX = 'many_attempts_to_acquire_mutex',
Expand Down
Loading