Skip to content

Commit

Permalink
Seperate bind and advertise ip
Browse files Browse the repository at this point in the history
  • Loading branch information
aminst committed Feb 20, 2024
1 parent 58479cf commit a628021
Show file tree
Hide file tree
Showing 13 changed files with 39 additions and 33 deletions.
3 changes: 2 additions & 1 deletion ansible/templates/oblishard-oramnode.service.j2
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ After=syslog.target network.target
Type=simple
ExecStart=/root/oblishard/oramnode/oramnode \
-oramnodeid {{ item.id }} \
-ip {{ item.local_bind_ip }} \
-bindip {{ item.local_bind_ip }} \
-advip {{ item.exposed_ip }} \
-rpcport {{ item.port }} \
-replicaid {{ item.replicaid }} \
-raftport {{ item.raftport }} \
Expand Down
3 changes: 2 additions & 1 deletion ansible/templates/oblishard-shardnode.service.j2
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ After=syslog.target network.target
Type=simple
ExecStart=/root/oblishard/shardnode/shardnode \
-shardnodeid {{ item.id }} \
-ip {{ item.local_bind_ip }} \
-bindip {{ item.local_bind_ip }} \
-advip {{ item.exposed_ip }} \
-rpcport {{ item.port }} \
-replicaid {{ item.replicaid }} \
-raftport {{ item.raftport }} \
Expand Down
2 changes: 1 addition & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"go.opentelemetry.io/otel"
)

// Usage: go run . -duration=<duration in seconds> -logpath=<log path> -conf=<configs path>
// Usage: ./client -h
func main() {
logPath := flag.String("logpath", "", "path to write logs")
configsPath := flag.String("conf", "../../configs/default", "configs directory path")
Expand Down
7 changes: 4 additions & 3 deletions cmd/oramnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
"github.com/rs/zerolog/log"
)

// Usage: ./oramnode -oramnodeid=<oramnodeid> -ip=<ip> -rpcport=<rpcport> -replicaid=<replicaid> -raftport=<raftport> -joinaddr=<ip:port> -conf=<configs path> -logpath=<log path>
// Usage: ./oramnode -h
func main() {
oramNodeID := flag.Int("oramnodeid", 0, "oramnode id, starting consecutively from zero")
ip := flag.String("ip", "127.0.0.1", "ip of this replica")
bindIP := flag.String("bindip", "127.0.0.1", "ip of this replica to bind to")
advIP := flag.String("advip", "127.0.0.1", "ip of this replica to advertise")
replicaID := flag.Int("replicaid", 0, "replica id, starting consecutively from zero")
rpcPort := flag.Int("rpcport", 0, "node rpc port")
raftPort := flag.Int("raftport", 0, "node raft port")
Expand Down Expand Up @@ -67,5 +68,5 @@ func main() {
defer cpuProfile.Stop()
}

oramnode.StartServer(*oramNodeID, *ip, *rpcPort, *replicaID, *raftPort, *joinAddr, rpcClients, redisEndpoints, parameters)
oramnode.StartServer(*oramNodeID, *bindIP, *advIP, *rpcPort, *replicaID, *raftPort, *joinAddr, rpcClients, redisEndpoints, parameters)
}
2 changes: 1 addition & 1 deletion cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/rs/zerolog/log"
)

// Usage: ./router -routerid=<routerid> -ip=<ip> -port=<port> -conf=<configs path> -logpath=<log path>
// Usage: ./router -h
func main() {

routerID := flag.Int("routerid", 0, "router id, starting consecutively from zero")
Expand Down
7 changes: 4 additions & 3 deletions cmd/shardnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
"github.com/rs/zerolog/log"
)

// Usage: ./shardnode -shardnodeid=<shardnodeid> -ip=<ip> -rpcport=<rpcport> -replicaid=<replicaid> -raftport=<raftport> -joinaddr=<ip:port> -conf=<configs path> -logpath=<log path>
// Usage: ./shardnode -h
func main() {
shardNodeID := flag.Int("shardnodeid", 0, "shardnode id, starting consecutively from zero")
ip := flag.String("ip", "127.0.0.1", "ip of this replica")
bindIP := flag.String("bindip", "127.0.0.1", "bind ip of this replica")
advIP := flag.String("advip", "127.0.0.1", "advertise ip of this replica")
replicaID := flag.Int("replicaid", 0, "replica id, starting consecutively from zero")
rpcPort := flag.Int("rpcport", 0, "node rpc port")
raftPort := flag.Int("raftport", 0, "node raft port")
Expand Down Expand Up @@ -69,5 +70,5 @@ func main() {
defer cpuProfile.Stop()
}

shardnode.StartServer(*shardNodeID, *ip, *rpcPort, *replicaID, *raftPort, *joinAddr, rpcClients, parameters, redisEndpoints, *configsPath)
shardnode.StartServer(*shardNodeID, *bindIP, *advIP, *rpcPort, *replicaID, *raftPort, *joinAddr, rpcClients, parameters, redisEndpoints, *configsPath)
}
4 changes: 2 additions & 2 deletions pkg/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func startShardNode(replicaID int, rpcPort int, raftPort int, joinAddr string) {
log.Fatal().Msgf("Failed to read parameters from yaml file; %v", err)
}
redisEndpoints := []config.RedisEndpoint{{ID: 0, IP: "localhost", Port: 6379}}
shardnode.StartServer(0, "localhost", rpcPort, replicaID, raftPort, joinAddr, rpcClients, parameters, redisEndpoints, "../../configs")
shardnode.StartServer(0, "localhost", "localhost", rpcPort, replicaID, raftPort, joinAddr, rpcClients, parameters, redisEndpoints, "../../configs")
}

func startOramNode(replicaID int, rpcPort int, raftPort int, joinAddr string) {
Expand All @@ -55,7 +55,7 @@ func startOramNode(replicaID int, rpcPort int, raftPort int, joinAddr string) {
if err != nil {
log.Fatal().Msgf("Failed to read parameters from yaml file; %v", err)
}
oramnode.StartServer(0, "localhost", rpcPort, replicaID, raftPort, joinAddr, rpcClients, []config.RedisEndpoint{{ID: 0, IP: "localhost", Port: 6379}}, parameters)
oramnode.StartServer(0, "localhost", "localhost", rpcPort, replicaID, raftPort, joinAddr, rpcClients, []config.RedisEndpoint{{ID: 0, IP: "localhost", Port: 6379}}, parameters)
}

// It assumes that the redis service is running on the default port (6379)
Expand Down
9 changes: 5 additions & 4 deletions pkg/oramnode/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (fsm *oramNodeFSM) Restore(rc io.ReadCloser) error {
// TODO: the logic for startRaftServer is the same for both shardNode and OramNode.
// TOOD: it can be moved to a new raft-utils package to reduce code duplication

func startRaftServer(isFirst bool, ip string, replicaID int, raftPort int, oramNodeFSM *oramNodeFSM) (*raft.Raft, error) {
func startRaftServer(isFirst bool, bindip string, advip string, replicaID int, raftPort int, oramNodeFSM *oramNodeFSM) (*raft.Raft, error) {
raftConfig := raft.DefaultConfig()
raftConfig.Logger = hclog.New(&hclog.LoggerOptions{Output: log.Logger})
raftConfig.LocalID = raft.ServerID(strconv.Itoa(replicaID))
Expand All @@ -170,13 +170,14 @@ func startRaftServer(isFirst bool, ip string, replicaID int, raftPort int, oramN

snapshots := raft.NewInmemSnapshotStore()

raftAddr := fmt.Sprintf("%s:%d", ip, raftPort)
tcpAddr, err := net.ResolveTCPAddr("tcp", raftAddr)
bindAddr := fmt.Sprintf("%s:%d", bindip, raftPort)
advAddr := fmt.Sprintf("%s:%d", advip, raftPort)
tcpAdvertiseAddr, err := net.ResolveTCPAddr("tcp", advAddr)
if err != nil {
return nil, fmt.Errorf("could not resolve tcp addr; %s", err)
}

transport, err := raft.NewTCPTransport(raftAddr, tcpAddr, 10, time.Second*10, os.Stderr)
transport, err := raft.NewTCPTransport(bindAddr, tcpAdvertiseAddr, 10, time.Second*10, os.Stderr)
if err != nil {
return nil, fmt.Errorf("could not create tcp transport; %s", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/oramnode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,10 @@ func (o *oramNodeServer) JoinRaftVoter(ctx context.Context, joinRaftVoterRequest
return &pb.JoinRaftVoterReply{Success: true}, nil
}

func StartServer(oramNodeServerID int, ip string, rpcPort int, replicaID int, raftPort int, joinAddr string, shardNodeRPCClients map[int]ReplicaRPCClientMap, redisEndpoints []config.RedisEndpoint, parameters config.Parameters) {
func StartServer(oramNodeServerID int, bindIP string, advIP string, rpcPort int, replicaID int, raftPort int, joinAddr string, shardNodeRPCClients map[int]ReplicaRPCClientMap, redisEndpoints []config.RedisEndpoint, parameters config.Parameters) {
isFirst := joinAddr == ""
oramNodeFSM := newOramNodeFSM()
r, err := startRaftServer(isFirst, ip, replicaID, raftPort, oramNodeFSM)
r, err := startRaftServer(isFirst, bindIP, advIP, replicaID, raftPort, oramNodeFSM)
if err != nil {
log.Fatal().Msgf("The raft node creation did not succeed; %s", err)
}
Expand All @@ -466,15 +466,15 @@ func StartServer(oramNodeServerID int, ip string, rpcPort int, replicaID int, ra
context.Background(),
&pb.JoinRaftVoterRequest{
NodeId: int32(replicaID),
NodeAddr: fmt.Sprintf("%s:%d", ip, raftPort),
NodeAddr: fmt.Sprintf("%s:%d", advIP, raftPort),
},
)
if err != nil || !joinRaftVoterReply.Success {
log.Error().Msgf("The raft node could not connect to the leader as a new voter; %s", err)
}
}

lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ip, rpcPort))
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", bindIP, rpcPort))
if err != nil {
log.Fatal().Msgf("failed to listen: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/oramnode/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func startLeaderRaftNodeServer(t *testing.T, storageHandler storage) *oramNodeSe
if err != nil {
t.Errorf("unable to get free port")
}
r, err := startRaftServer(true, "localhost", 0, raftPort, fsm)
r, err := startRaftServer(true, "localhost", "localhost", 0, raftPort, fsm)
if err != nil {
t.Errorf("unable to start raft server")
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/shardnode/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,26 +273,27 @@ func (fsm *shardNodeFSM) Restore(rc io.ReadCloser) error {
return fmt.Errorf("not implemented yet")
}

func startRaftServer(isFirst bool, ip string, replicaID int, raftPort int, shardshardNodeFSM *shardNodeFSM) (*raft.Raft, error) {
func startRaftServer(isFirst bool, bindIP string, advertiseIP string, replicaID int, raftPort int, shardshardNodeFSM *shardNodeFSM) (*raft.Raft, error) {

raftConfig := raft.DefaultConfig()
raftConfig.ElectionTimeout = 50 * time.Millisecond
raftConfig.HeartbeatTimeout = 50 * time.Millisecond
raftConfig.LeaderLeaseTimeout = 50 * time.Millisecond
raftConfig.ElectionTimeout = 150 * time.Millisecond
raftConfig.HeartbeatTimeout = 150 * time.Millisecond
raftConfig.LeaderLeaseTimeout = 150 * time.Millisecond
raftConfig.Logger = hclog.New(&hclog.LoggerOptions{Output: log.Logger})
raftConfig.LocalID = raft.ServerID(strconv.Itoa(replicaID))

store := raft.NewInmemStore()

snapshots := raft.NewInmemSnapshotStore()

raftAddr := fmt.Sprintf("%s:%d", ip, raftPort)
tcpAddr, err := net.ResolveTCPAddr("tcp", raftAddr)
bindAddr := fmt.Sprintf("%s:%d", bindIP, raftPort)
advertiseAddr := fmt.Sprintf("%s:%d", advertiseIP, raftPort)
tcpAdvertiseAddr, err := net.ResolveTCPAddr("tcp", advertiseAddr)
if err != nil {
return nil, fmt.Errorf("could not resolve tcp addr; %s", err)
}

transport, err := raft.NewTCPTransport(raftAddr, tcpAddr, 10, time.Second*10, os.Stderr)
transport, err := raft.NewTCPTransport(bindAddr, tcpAdvertiseAddr, 10, time.Second*10, os.Stderr)
if err != nil {
return nil, fmt.Errorf("could not create tcp transport; %s", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/shardnode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,10 @@ func (s *shardNodeServer) JoinRaftVoter(ctx context.Context, joinRaftVoterReques
return &pb.JoinRaftVoterReply{Success: true}, nil
}

func StartServer(shardNodeServerID int, ip string, rpcPort int, replicaID int, raftPort int, joinAddr string, oramNodeRPCClients map[int]ReplicaRPCClientMap, parameters config.Parameters, storages []config.RedisEndpoint, configsPath string) {
func StartServer(shardNodeServerID int, bindIp string, advertiseIp string, rpcPort int, replicaID int, raftPort int, joinAddr string, oramNodeRPCClients map[int]ReplicaRPCClientMap, parameters config.Parameters, storages []config.RedisEndpoint, configsPath string) {
isFirst := joinAddr == ""
shardNodeFSM := newShardNodeFSM(replicaID)
r, err := startRaftServer(isFirst, ip, replicaID, raftPort, shardNodeFSM)
r, err := startRaftServer(isFirst, bindIp, advertiseIp, replicaID, raftPort, shardNodeFSM)
if err != nil {
log.Fatal().Msgf("The raft node creation did not succeed; %s", err)
}
Expand All @@ -390,15 +390,15 @@ func StartServer(shardNodeServerID int, ip string, rpcPort int, replicaID int, r
context.Background(),
&pb.JoinRaftVoterRequest{
NodeId: int32(replicaID),
NodeAddr: fmt.Sprintf("%s:%d", ip, raftPort),
NodeAddr: fmt.Sprintf("%s:%d", advertiseIp, raftPort),
},
)
if err != nil || !joinRaftVoterReply.Success {
log.Error().Msgf("The raft node could not connect to the leader as a new voter; %s", err)
}
}

lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ip, rpcPort))
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", bindIp, rpcPort))
if err != nil {
log.Fatal().Msgf("failed to listen: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/shardnode/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func startLeaderRaftNodeServer(t *testing.T, batchSize int, withBatchReponses bo
if err != nil {
t.Errorf("unable to get free port")
}
r, err := startRaftServer(true, "localhost", 0, raftPort, fsm)
r, err := startRaftServer(true, "localhost", "localhost", 0, raftPort, fsm)
if err != nil {
t.Errorf("unable to start raft server; %v", err)
}
Expand Down

0 comments on commit a628021

Please sign in to comment.