Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the execution engine #25

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fda83e0
erase deploy artifact file
v9n Aug 14, 2024
03e1890
erc4337 client in go
v9n Aug 13, 2024
146f092
refactor standalone package into pkg
v9n Aug 14, 2024
edfe993
add simpleaccountabi
v9n Aug 14, 2024
7a3435c
move simpleaccount
v9n Aug 14, 2024
2d4bdb3
wip
v9n Aug 14, 2024
4e8e84b
add queue engine to process job
v9n Aug 17, 2024
2882305
add example
v9n Aug 19, 2024
8e7a787
implement check and expression engine
v9n Aug 19, 2024
1da2096
simplify code, move env to config, improve workflow engine
v9n Sep 4, 2024
fdc68fc
implement more example
v9n Sep 4, 2024
cc61a60
implement cancel
v9n Sep 4, 2024
c654185
add another example that never matched
v9n Sep 4, 2024
fcffaf1
impelement function to query any contract
v9n Sep 4, 2024
086352d
add more demo
v9n Sep 4, 2024
8cf40b8
ignore cred files
v9n Sep 4, 2024
7fc4b59
handle task deletion
v9n Sep 4, 2024
d545901
add task deletion example and time transfer
v9n Sep 4, 2024
23f2cfe
add t ime schedule
v9n Sep 4, 2024
5255b19
use seq for task id, rename action, improve failing reason
v9n Sep 6, 2024
49903b3
update example body -> action
v9n Sep 6, 2024
bf15738
fix crash when task syncing run before engine is ready
v9n Sep 6, 2024
2715f3c
improve reliability
v9n Sep 8, 2024
d05055f
set entrypoint and factory adress from config
v9n Sep 8, 2024
35b49f4
add log
v9n Sep 8, 2024
d630907
simply example to run and test
v9n Sep 8, 2024
b23bc84
add more logging, simplify buil-in chainlink
v9n Sep 9, 2024
17dac59
update example
v9n Sep 9, 2024
f9cbcf6
improve logging
v9n Sep 9, 2024
1ad3d4b
udpate example
v9n Sep 9, 2024
559efbf
remove hardcode
v9n Sep 10, 2024
fc183da
add signature
v9n Sep 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ out/
# Dotenv file
.env
*.env
.env*

# binary
ap-avs
Expand All @@ -24,4 +25,7 @@ metadata.json
# Ignores yaml configuration files in the config directory
config/*.yaml

contracts/script/output/
contracts/script/output*

contracts/broadcast/
alias-ecdsa.key.json
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,8 @@ build-docker:
## up: bring up docker compose stack
up:
docker compose up

## unstable-build: generate an unstable for internal test
unstable-build:
docker build --platform=linux/amd64 --build-arg RELEASE_TAG=unstable -t avaprotocol/ap-avs:unstable -f dockerfiles/operator.Dockerfile .
docker push avaprotocol/ap-avs:unstable
60 changes: 50 additions & 10 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import (

"github.com/AvaProtocol/ap-avs/aggregator/types"
"github.com/AvaProtocol/ap-avs/core"
"github.com/AvaProtocol/ap-avs/core/apqueue"
"github.com/AvaProtocol/ap-avs/core/chainio"
"github.com/AvaProtocol/ap-avs/core/chainio/aa"
"github.com/AvaProtocol/ap-avs/core/config"
"github.com/AvaProtocol/ap-avs/core/taskengine"
"github.com/Layr-Labs/eigensdk-go/chainio/clients"
sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients"
blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation"
Expand All @@ -36,12 +39,19 @@ const (
avsName = "oak-avs"
)

type AggregatorStatus string

const (
initStatus AggregatorStatus = "init"
runningStatus AggregatorStatus = "running"
shutdownStatus AggregatorStatus = "shutdown"
)

func RunWithConfig(configPath string) error {
nodeConfig, err := config.NewConfig(configPath)
if err != nil {
panic(fmt.Errorf("failed to parse config file: %w\nMake sure %s is exist and a valid yaml file %w.", configPath, err))
}
fmt.Printf("loaded config: %v\n", nodeConfig)

aggregator, err := NewAggregator(nodeConfig)
if err != nil {
Expand All @@ -55,6 +65,7 @@ func RunWithConfig(configPath string) error {
type Aggregator struct {
logger logging.Logger
avsWriter chainio.AvsWriterer

// aggregation related fields
blsAggregationService blsagg.BlsAggregationService
tasks map[types.TaskIndex]cstaskmanager.IAutomationTaskManagerTask
Expand All @@ -69,6 +80,16 @@ type Aggregator struct {
chainID *big.Int

operatorPool *OperatorPool

// task engines handles trigger scheduling and send distribute checks to
// operator to checks
engine *taskengine.Engine
// upon a task condition is met, taskengine will schedule it in our queue to
// be executed.
queue *apqueue.Queue
worker *apqueue.Worker

status AggregatorStatus
}

// NewAggregator creates a new Aggregator with the provided config.
Expand Down Expand Up @@ -100,9 +121,7 @@ func NewAggregator(c *config.Config) (*Aggregator, error) {
panic(err)
//return nil, err
}

fmt.Println("avsReader", avsReader, "clients", clients)

c.Logger.Info("create avsrrader and client", "avsReader", avsReader, "clients", clients)
}()

// TODO: These are erroring out and we don't need them now yet
Expand All @@ -122,6 +141,7 @@ func NewAggregator(c *config.Config) (*Aggregator, error) {
config: c,

operatorPool: &OperatorPool{},
status: initStatus,
}, nil
}

Expand Down Expand Up @@ -160,22 +180,36 @@ func (agg *Aggregator) init() {
} else {
config.CurrentChainEnv = config.HoleskyEnv
}

// Setup account abstraction config
aa.SetFactoryAddress(agg.config.SmartWallet.FactoryAddress)
aa.SetEntrypointAddress(agg.config.SmartWallet.EntrypointAddress)
}

func (agg *Aggregator) Start(ctx context.Context) error {
agg.status = runningStatus

agg.logger.Infof("Starting aggregator")

agg.init()

agg.logger.Infof("Initialize Storagre")
agg.initDB(ctx)
if err := agg.initDB(ctx); err != nil {
agg.logger.Fatalf("failed to initialize storage", "error", err)
}

agg.logger.Infof("Starting rpc server.")
go agg.startRpcServer(ctx)
agg.logger.Infof("Starting Task engine")
go agg.startTaskEngine(ctx)

agg.logger.Info("Starting repl")
go agg.startRepl()

agg.logger.Infof("Starting http server.")
agg.logger.Infof("Starting http server")
go agg.startHttpServer(ctx)

agg.logger.Infof("Starting rpc server")
go agg.startRpcServer(ctx)

// Setup wait signal
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -186,10 +220,12 @@ func (agg *Aggregator) Start(ctx context.Context) error {
}()

<-done
agg.logger.Infof("Shutting down.")
agg.logger.Infof("Shutting down...")

// Shutdown the db
// TODO: handle ongoing client and fanout closing
agg.status = shutdownStatus
agg.stopRepl()
agg.stopTaskEngine()
agg.db.Close()

return nil
Expand Down Expand Up @@ -235,3 +271,7 @@ func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg
agg.logger.Error("Aggregator failed to respond to task", "err", err)
}
}

func (agg *Aggregator) IsShutdown() bool {
return agg.status == shutdownStatus
}
2 changes: 2 additions & 0 deletions aggregator/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (o *OperatorPool) Checkin(payload *avsproto.Checkin) error {
Version: payload.Version,
}

fmt.Println("signature", payload.Signature)

data, err := json.Marshal(status)

if err != nil {
Expand Down
132 changes: 132 additions & 0 deletions aggregator/repl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package aggregator

import (
"bufio"
"fmt"
"log"
"net"
"os"
"strings"
)

var (
repListener net.Listener
)

func (agg *Aggregator) stopRepl() {
if repListener != nil {
repListener.Close()
}

}
func (agg *Aggregator) startRepl() {
var err error
repListener, err = net.Listen("unix", agg.config.SocketPath)
if err != nil {
return
}

for {
if agg.IsShutdown() {
return
}
conn, err := repListener.Accept()
if err != nil {
log.Println("Failed to accept connection:", err)
continue
}

go handleConnection(agg, conn)
}
}

func handleConnection(agg *Aggregator, conn net.Conn) {
defer func() {
agg.logger.Info("Close repl connection", "remote", conn.RemoteAddr().String())
conn.Close()
}()

reader := bufio.NewReader(conn)
fmt.Fprintln(conn, "AP CLI REPL")
fmt.Fprintln(conn, "-------------------------")

for {
fmt.Fprint(conn, "> ")
input, err := reader.ReadString('\n')

// Check for EOF (Ctrl+D)
if err != nil {
if err.Error() == "EOF" {
fmt.Fprintln(conn, "\nExiting...")
break
}
log.Println("Error reading input:", err)
continue
}

input = strings.TrimSpace(input)

if input == "" {
continue
}

parts := strings.SplitN(input, " ", 2)
command := strings.ToLower(parts[0])

switch command {
case "list":
if len(parts) == 2 {
if keys, err := agg.db.ListKeys(parts[1]); err == nil {
for _, k := range keys {
fmt.Fprintln(conn, k)
}
}

} else {
fmt.Fprintln(conn, "Usage: list <prefix>* or list *")
}
case "get":
if len(parts) == 2 {
if key, err := agg.db.GetKey([]byte(parts[1])); err == nil {
fmt.Fprintln(conn, string(key))
}
} else {
fmt.Fprintln(conn, "Usage: get <key>")
}
case "set":
parts = strings.SplitN(input, " ", 3)
if len(parts) >= 3 {
if parts[2][0] == '@' {
if content, err := os.ReadFile(parts[2][1:]); err == nil {
if err = agg.db.Set([]byte(parts[1]), content); err == nil {
fmt.Fprintln(conn, "written "+string(parts[1]))
}
} else {
fmt.Fprintln(conn, "invalid file "+parts[2][1:])
}
} else {
if err = agg.db.Set([]byte(parts[1]), []byte(parts[2])); err == nil {
fmt.Fprintln(conn, "written "+parts[1])
}

}
} else {
fmt.Fprintln(conn, "Usage: set <key> @/path-to-file")
}
case "gc":
fmt.Fprintln(conn, "start gc with 0.7")
err := agg.db.Vacuum()
if err == nil {
fmt.Fprintln(conn, "gc success. still have more to run")
} else {
fmt.Fprintln(conn, "gc is done. no more log file to be removed")
}

case "exit":
fmt.Fprintln(conn, "Exiting...")
return
default:
fmt.Fprintln(conn, "Unknown command:", command)
}
}
}
Loading