-
Notifications
You must be signed in to change notification settings - Fork 14
/
Flow.scala
116 lines (107 loc) · 4.5 KB
/
Flow.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/*
* Copyright (c) 2019 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.storage.bigquery.repeater
import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.snowplow.storage.bigquery.common.metrics.Metrics
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.RepeaterCli.GcsPath
import com.snowplowanalytics.snowplow.storage.bigquery.repeater.services.Storage
import cats.data.EitherT
import cats.effect._
import cats.effect.concurrent.Ref
import cats.syntax.all._
import com.google.cloud.bigquery._
import fs2.{Chunk, Stream}
import io.chrisdavenport.log4cats.Logger
import org.joda.time.DateTime
import scala.concurrent.duration._
object Flow {
sealed trait InsertStatus extends Product with Serializable
case object Inserted extends InsertStatus // This event was inserted into BQ
case object Retry extends InsertStatus // This event was not inserted into BQ but Repeater will retry
/**
* Main sink processing data parsed from `failedInserts` topic.
* Attempt to insert a record into BigQuery. If insertion fails, turn it into an `Uninsertable`
* and forward to a dedicated queue, which will later be sunk to GCS.
* @param resources All application resources
* @param events Stream of events from Pub/Sub
*/
def sink[F[_]: Logger: Timer: Concurrent: ContextShift](
resources: Resources[F]
)(events: EventStream[F]): Stream[F, Unit] =
events.parEvalMapUnordered(resources.concurrency) { event =>
val insert = checkAndInsert[F](
resources.bigQuery,
resources.env.config.output.good.datasetId,
resources.env.config.output.good.tableId,
resources.backoffPeriod
)(event)
resources.insertBlocker.blockOn(insert).flatMap {
case Right(Inserted) => resources.logInserted
case Right(_) => Sync[F].unit
// format: off
case Left(d) => resources.logAbandoned *> resources.uninsertable.enqueue1(d)
// format: on
}
}
/** Dequeue uninsertable events and sink them to GCS */
def dequeueUninsertable[F[_]: Timer: Concurrent: Logger](
resources: Resources[F]
): Stream[F, Unit] =
resources
.uninsertable
.dequeueChunk(resources.bufferSize)
.groupWithin(resources.bufferSize, resources.timeout.seconds)
.evalMap(sinkBadChunk(resources.counter, resources.bucket, resources.metrics))
/**
* Sink whole chunk of uninsertable events to a file.
* The filename is composed of time and chunk number.
*/
def sinkBadChunk[F[_]: Timer: Sync: Logger](
counter: Ref[F, Int],
bucket: GcsPath,
metrics: Metrics[F]
)(chunk: Chunk[BadRow]): F[Unit] =
for {
time <- getTime
_ <- Logger[F].debug(s"Preparing to sink chunk, $time")
_ <- counter.update(_ + 1)
i <- counter.get
file = Storage.getFileName(bucket.path, i)
_ <- Logger[F].debug(s"Filename will be $file")
_ <- Storage.uploadChunk[F](bucket.bucket, file, chunk, metrics)
} yield ()
def checkAndInsert[F[_]: Sync: Logger](
client: BigQuery,
datasetId: String,
tableId: String,
backoffTime: Int
)(event: EventRecord[F]): F[Either[BadRow, InsertStatus]] = {
val res = for {
ready <- EitherT.right(event.value.isReady(backoffTime.toLong))
// We never actually generate a bad row here
result <- EitherT[F, BadRow, InsertStatus] {
if (ready) {
event.ack >> EitherT(services.Database.insert[F](client, datasetId, tableId, event.value))
.as(Inserted.asInstanceOf[InsertStatus])
.value
} else {
Logger[F].debug(s"Event ${event.value.eventId}/${event.value.etlTstamp} is not ready yet. Nack") >>
event.nack.as(Retry.asInstanceOf[InsertStatus].asRight)
}
}
} yield result
res.value
}
private def getTime[F[_]: Sync]: F[DateTime] =
Sync[F].delay(DateTime.now())
}