Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merc 6304 view function ea telem support #14467

Merged
merged 8 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/twenty-boxes-thank.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"chainlink": patch
---
#added
* Adds support for "tags" to Tasks that can be used generically.
* Adds a descendent task search method
* Added support in Mercury EA telemetry to utilize tags for telemetry extraction
161 changes: 126 additions & 35 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type EATelemetry struct {
BridgeTaskRunStartedTimestamp int64
BridgeTaskRunEndedTimestamp int64
AssetSymbol string
BridgeRequestData string
}

type EnhancedTelemetryData struct {
Expand Down Expand Up @@ -168,8 +169,8 @@ func ParseMercuryEATelemetry(lggr logger.Logger, trrs pipeline.TaskRunResults, f
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, id=%s, name=%q", trr.Task.DotID(), bridgeName), "err", err, "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
}

eaTelem.DpBenchmarkPrice, eaTelem.DpBid, eaTelem.DpAsk = getPricesFromResults(lggr, trr, trrs, feedVersion)
eaTelem.BridgeRequestData = bridgeTask.RequestData
eaTelem.DpBenchmarkPrice, eaTelem.DpBid, eaTelem.DpAsk = getPricesFromBridgeTask(lggr, trr, trrs, feedVersion)

eaTelem.BridgeTaskRunStartedTimestamp = trr.CreatedAt.UnixMilli()
eaTelem.BridgeTaskRunEndedTimestamp = trr.FinishedAt.Time.UnixMilli()
Expand Down Expand Up @@ -448,10 +449,11 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
ConfigDigest: d.RepTimestamp.ConfigDigest.Hex(),
Round: int64(d.RepTimestamp.Round),
Epoch: int64(d.RepTimestamp.Epoch),
BridgeRequestData: eaTelem.BridgeRequestData,
AssetSymbol: eaTelem.AssetSymbol,
Version: uint32(d.FeedVersion),
}

e.lggr.Debugw(fmt.Sprintf("EA Telemetry = %+v", t), "feedID", e.job.OCR2OracleSpec.FeedID.Hex(), "jobID", e.job.ID, "datasource", eaTelem.DataSource)
bytes, err := proto.Marshal(t)
if err != nil {
e.lggr.Warnf("protobuf marshal failed %v", err.Error())
Expand All @@ -462,11 +464,25 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced
}
}

type telemetryAttributes struct {
PriceType *string `json:"priceType"`
}

func parseTelemetryAttributes(a string) (telemetryAttributes, error) {
attrs := &telemetryAttributes{}
err := json.Unmarshal([]byte(a), attrs)
if err != nil {
return telemetryAttributes{}, err
}
return *attrs, nil
}

// getAssetSymbolFromRequestData parses the requestData of the bridge to generate an asset symbol pair
func getAssetSymbolFromRequestData(requestData string) string {
type reqDataPayload struct {
To string `json:"to"`
From string `json:"from"`
To *string `json:"to"`
From *string `json:"from"`
Address *string `json:"address"` // used for view function ea only
}
type reqData struct {
Data reqDataPayload `json:"data"`
Expand All @@ -478,7 +494,15 @@ func getAssetSymbolFromRequestData(requestData string) string {
return ""
}

return rd.Data.From + "/" + rd.Data.To
if rd.Data.From != nil && rd.Data.To != nil {
return *rd.Data.From + "/" + *rd.Data.To
}

if rd.Data.Address != nil {
return *rd.Data.Address
}

return ""
}

// ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent
Expand All @@ -489,26 +513,107 @@ func ShouldCollectEnhancedTelemetryMercury(jb job.Job) bool {
return false
}

// getPricesFromResults parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
const (
bid = "bid"
ask = "ask"
benchmark = "benchmark"
exchangeRate = "exchangeRate"
)

func getPricesFromBridgeTask(lggr logger.Logger, bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, bidPrice, askPrice float64

// This will assume that all fields we care about are tagged with the correct priceType
benchmarkPrice, bidPrice, askPrice = getPricesFromBridgeTaskByTelemetryField(lggr, bridgeTask, allTasks)

// If prices weren't parsed by telemetry fields - attempt to get prices using the legacy method
// This is for backwards compatibility with job specs that don't have the telemetry attributes set
if benchmarkPrice == 0 && bidPrice == 0 && askPrice == 0 {
benchmarkP, bidP, askP := getPricesFromResultsByOrder(lggr, bridgeTask, allTasks, mercuryVersion)
bidPrice = bidP
askPrice = askP
benchmarkPrice = benchmarkP
}

return benchmarkPrice, bidPrice, askPrice
}

// CollectTaskRunResultsWithTags collects TaskRunResults for descendent tasks with non-empty TaskTags.
func collectTaskRunResultsWithTags(bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) []pipeline.TaskRunResult {
startTask := bridgeTask.Task
descendants := startTask.GetDescendantTasks()
var taskRunResultsWithTags []pipeline.TaskRunResult
for _, task := range descendants {
trr := allTasks.GetTaskRunResultOf(task)
if trr != nil {
if trr.Task.TaskTags() != "" {
taskRunResultsWithTags = append(taskRunResultsWithTags, *trr)
}
}
}
return taskRunResultsWithTags
}

// getPricesFromBridgeTaskByTelemetryField attempts to parse prices from via telemetry fields in the TaskTags
func getPricesFromBridgeTaskByTelemetryField(lggr logger.Logger, bridgeTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults) (float64, float64, float64) {
var benchmarkPrice, bidPrice, askPrice float64

// Outputs are the mapped tasks from this task.
var tasksWithTags = collectTaskRunResultsWithTags(bridgeTask, allTasks)

for _, trr := range tasksWithTags {
attributes, err := parseTelemetryAttributes(trr.Task.TaskTags())
if err != nil {
lggr.Warnw(fmt.Sprintf("found telemetry attributes but cannot them, taskTags=%s", trr.Task.TaskTags()), "err", err)
continue
}

if attributes.PriceType != nil {
switch *attributes.PriceType {
case bid:
bidPrice = parsePriceFromTask(lggr, trr)
case ask:
askPrice = parsePriceFromTask(lggr, trr)
case benchmark:
benchmarkPrice = parsePriceFromTask(lggr, trr)
case exchangeRate:
price := parsePriceFromTask(lggr, trr)
benchmarkPrice, bidPrice, askPrice = price, price, price
case "":
lggr.Warnw(fmt.Sprintf("no priceType found in attributes, parsedAttributes=%+v, id %s", attributes, trr.Task.DotID()))
}
}
}

return benchmarkPrice, bidPrice, askPrice
}

func parsePriceFromTask(lggr logger.Logger, trr pipeline.TaskRunResult) float64 {
var val float64
if trr.Result.Error != nil {
lggr.Warnw(fmt.Sprintf("got error on EA telemetry price task, id %s: %s", trr.Task.DotID(), trr.Result.Error), "err", trr.Result.Error)
return 0
}
val, err := getResultFloat64(&trr)
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry price to float64, DOT id %s", trr.Task.DotID()), "task_type", trr.Task.Type(), "task_tags", trr.Task.TaskTags(), "err", err)
}
return val
}

// getPricesFromResultsByOrder parses the pipeline.TaskRunResults for pipeline.TaskTypeJSONParse and gets the benchmarkPrice,
// bid and ask. This functions expects the pipeline.TaskRunResults to be correctly ordered
func getPricesFromResults(lggr logger.Logger, startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
func getPricesFromResultsByOrder(lggr logger.Logger, startTask pipeline.TaskRunResult, allTasks pipeline.TaskRunResults, mercuryVersion mercuryutils.FeedVersion) (float64, float64, float64) {
var benchmarkPrice, askPrice, bidPrice float64
var err error

// We rely on task results to be sorted in the correct order
benchmarkPriceTask := allTasks.GetNextTaskOf(startTask)
if benchmarkPriceTask == nil {
lggr.Warn("cannot parse enhanced EA telemetry benchmark price, task is nil")
return 0, 0, 0
}
if benchmarkPriceTask.Task.Type() == pipeline.TaskTypeJSONParse {
if benchmarkPriceTask.Result.Error != nil {
lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry benchmark price, id %s: %s", benchmarkPriceTask.Task.DotID(), benchmarkPriceTask.Result.Error), "err", benchmarkPriceTask.Result.Error)
} else {
benchmarkPrice, err = getResultFloat64(benchmarkPriceTask)
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry benchmark price, id %s", benchmarkPriceTask.Task.DotID()), "err", err)
}
}
benchmarkPrice = parsePriceFromTask(lggr, *benchmarkPriceTask)
}

// mercury version 2 only supports benchmarkPrice
Expand All @@ -522,31 +627,17 @@ func getPricesFromResults(lggr logger.Logger, startTask pipeline.TaskRunResult,
return benchmarkPrice, 0, 0
}

if bidTask != nil && bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry bid price, id %s: %s", bidTask.Task.DotID(), bidTask.Result.Error), "err", bidTask.Result.Error)
} else {
bidPrice, err = getResultFloat64(bidTask)
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry bid price, id %s", bidTask.Task.DotID()), "err", err)
}
}
if bidTask.Task.Type() == pipeline.TaskTypeJSONParse {
bidPrice = parsePriceFromTask(lggr, *bidTask)
}

askTask := allTasks.GetNextTaskOf(*bidTask)
if askTask == nil {
lggr.Warnf("cannot parse enhanced EA telemetry ask price, task is nil, id %s", benchmarkPriceTask.Task.DotID())
return benchmarkPrice, bidPrice, 0
}
if askTask != nil && askTask.Task.Type() == pipeline.TaskTypeJSONParse {
if bidTask.Result.Error != nil {
lggr.Warnw(fmt.Sprintf("got error for enhanced EA telemetry ask price, id %s: %s", askTask.Task.DotID(), askTask.Result.Error), "err", askTask.Result.Error)
} else {
askPrice, err = getResultFloat64(askTask)
if err != nil {
lggr.Warnw(fmt.Sprintf("cannot parse enhanced EA telemetry ask price, id %s", askTask.Task.DotID()), "err", err)
}
}
if askTask.Task.Type() == pipeline.TaskTypeJSONParse {
askPrice = parsePriceFromTask(lggr, *askTask)
}

return benchmarkPrice, bidPrice, askPrice
Expand Down
Loading
Loading