Skip to content

Commit

Permalink
Connect to chain sync query port with traced client
Browse files Browse the repository at this point in the history
  • Loading branch information
jhbertra committed Jun 23, 2023
1 parent c3ecb4c commit cb8ee96
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
1 change: 1 addition & 0 deletions marlowe-runtime/marlowe-runtime.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ executable marlowe-sync
, hasql-pool ^>=0.8
, hs-opentelemetry-api ==0.0.3.6
, hs-opentelemetry-sdk ==0.0.3.4
, marlowe-chain-sync ==0.0.2
, marlowe-protocols ==0.1.1.0
, marlowe-runtime:{discovery-api, history-api, sync, sync-api} ==0.0.2
, mtl >=2.2 && <3
Expand Down
11 changes: 10 additions & 1 deletion marlowe-runtime/marlowe-sync/Logging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,23 @@ import Data.ByteString (ByteString)
import Language.Marlowe.Protocol.HeaderSync.Types (MarloweHeaderSync)
import Language.Marlowe.Protocol.Query.Types (MarloweQuery)
import Language.Marlowe.Protocol.Sync.Types (MarloweSync)
import Language.Marlowe.Runtime.ChainSync.Api (ChainSyncQuery)
import Language.Marlowe.Runtime.Sync (renderDatabaseSelectorOTel)
import Language.Marlowe.Runtime.Sync.Database
import Network.Protocol.Driver.Trace (TcpServerSelector, renderTcpServerSelectorOTel)
import Network.Protocol.Driver.Trace (
TcpClientSelector,
TcpServerSelector,
renderTcpClientSelectorOTel,
renderTcpServerSelectorOTel,
)
import Network.Protocol.Handshake.Types (Handshake)
import Network.Protocol.Query.Types (Query)
import Observe.Event (idInjectSelector, injectSelector)
import Observe.Event.Render.OpenTelemetry
import Prelude hiding (filter)

data RootSelector f where
ChainSyncQueryClient :: TcpClientSelector (Handshake (Query ChainSyncQuery)) f -> RootSelector f
MarloweSyncServer :: TcpServerSelector (Handshake MarloweSync) f -> RootSelector f
MarloweHeaderSyncServer :: TcpServerSelector (Handshake MarloweHeaderSync) f -> RootSelector f
MarloweQueryServer :: TcpServerSelector (Handshake MarloweQuery) f -> RootSelector f
Expand All @@ -39,6 +47,7 @@ renderRootSelectorOTel
-> Maybe ByteString
-> RenderSelectorOTel RootSelector
renderRootSelectorOTel dbName dbUser host port = \case
ChainSyncQueryClient sel -> renderTcpClientSelectorOTel sel
MarloweSyncServer sel -> renderTcpServerSelectorOTel sel
MarloweHeaderSyncServer sel -> renderTcpServerSelectorOTel sel
MarloweQueryServer sel -> renderTcpServerSelectorOTel sel
Expand Down
7 changes: 4 additions & 3 deletions marlowe-runtime/marlowe-sync/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import Language.Marlowe.Runtime.Sync (MarloweSync (..), SyncDependencies (..), s
import Language.Marlowe.Runtime.Sync.Database (hoistDatabaseQueries, logDatabaseQueries)
import qualified Language.Marlowe.Runtime.Sync.Database.PostgreSQL as Postgres
import Logging (RootSelector (..), renderRootSelectorOTel)
import Network.Protocol.Driver (TcpServerDependencies (..), tcpClient)
import Network.Protocol.Driver.Trace (tcpServerTraced)
import Network.Protocol.Driver (TcpServerDependencies (..))
import Network.Protocol.Driver.Trace (tcpClientTraced, tcpServerTraced)
import Network.Protocol.Query.Client (queryClientPeer)
import Network.Protocol.Query.Server (queryServerPeer)
import Network.Socket (HostName, PortNumber)
Expand Down Expand Up @@ -79,7 +79,8 @@ run Options{..} = bracket (Pool.acquire 100 (Just 5000000) (fromString databaseU
(either throwIO pure <=< liftIO . Pool.use pool)
Postgres.databaseQueries
, runtimeVersion = version
, chainSyncQueryConnector = tcpClient chainSyncHost chainQueryPort queryClientPeer
, chainSyncQueryConnector =
tcpClientTraced (injectSelector ChainSyncQueryClient) chainSyncHost chainQueryPort queryClientPeer
}

tcpServerTraced "marlowe-sync" (injectSelector MarloweSyncServer)
Expand Down

0 comments on commit cb8ee96

Please sign in to comment.