Skip to content

Commit

Permalink
Write average latency to the output file
Browse files Browse the repository at this point in the history
  • Loading branch information
aminst committed Feb 28, 2024
1 parent 270ee78 commit 21ad4ee
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
31 changes: 21 additions & 10 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ import (
)

type ReadResponse struct {
block string
value string
err error
block string
value string
latency time.Duration
err error
}

type WriteResponse struct {
block string
success bool
latency time.Duration
err error
}

Expand Down Expand Up @@ -63,30 +65,34 @@ func (c *client) WaitForStorageToBeReady(redisEndpoints []config.RedisEndpoint,
func (c *client) asyncRead(block string, routerRPCClient RouterRPCClient, readResponseChannel chan ReadResponse) {
c.rateLimit.Acquire()
ctx, span := c.tracer.Start(context.Background(), "client read request")
startTime := time.Now()
value, err := routerRPCClient.Read(ctx, block)
latency := time.Since(startTime)
log.Debug().Msgf("Got value %s for block %s", value, block)
span.End()
c.rateLimit.Release()
if err != nil {
readResponseChannel <- ReadResponse{block: block, value: "", err: fmt.Errorf("failed to call Read block %s on router; %v", block, err)}
} else if value == "" {
readResponseChannel <- ReadResponse{block: block, value: "", err: nil}
readResponseChannel <- ReadResponse{block: block, value: "", latency: latency, err: nil}
} else {
readResponseChannel <- ReadResponse{block: block, value: value, err: nil}
readResponseChannel <- ReadResponse{block: block, value: value, latency: latency, err: nil}
}
}

func (c *client) asyncWrite(block string, newValue string, routerRPCClient RouterRPCClient, writeResponseChannel chan WriteResponse) {
c.rateLimit.Acquire()
ctx, span := c.tracer.Start(context.Background(), "client write request")
startTime := time.Now()
value, err := routerRPCClient.Write(ctx, block, newValue)
latency := time.Since(startTime)
log.Debug().Msgf("Got success %v for block %s", value, block)
span.End()
c.rateLimit.Release()
if err != nil {
writeResponseChannel <- WriteResponse{block: block, success: false, err: fmt.Errorf("failed to call Write block %s on router; %v", block, err)}
} else {
writeResponseChannel <- WriteResponse{block: block, success: value, err: nil}
writeResponseChannel <- WriteResponse{block: block, success: value, latency: latency, err: nil}
}
}

Expand All @@ -109,24 +115,27 @@ func (c *client) SendRequestsForever(ctx context.Context, readResponseChannel ch
}
}

type ResponseCount struct {
type ResponseStatus struct {
readOperations int
writeOperations int
latencies []time.Duration
}

// getResponsesForever cancels remaining operations and returns when the context is cancelled
// returns the number of read and write operations over fixed intervals in the duration
func (c *client) GetResponsesForever(ctx context.Context, readResponseChannel chan ReadResponse, writeResponseChannel chan WriteResponse) []ResponseCount {
func (c *client) GetResponsesForever(ctx context.Context, readResponseChannel chan ReadResponse, writeResponseChannel chan WriteResponse) []ResponseStatus {
readOperations, writeOperations := 0, 0
var responseCounts []ResponseCount
var latencies []time.Duration
var responseCounts []ResponseStatus
timout := time.After(1 * time.Second)
for {
select {
case <-ctx.Done():
return responseCounts
case <-timout:
responseCounts = append(responseCounts, ResponseCount{readOperations, writeOperations})
responseCounts = append(responseCounts, ResponseStatus{readOperations, writeOperations, latencies})
readOperations, writeOperations = 0, 0
latencies = nil
timout = time.After(1 * time.Second)
default:
}
Expand All @@ -138,6 +147,7 @@ func (c *client) GetResponsesForever(ctx context.Context, readResponseChannel ch
} else {
log.Debug().Msgf("Sucess in Read of block %s. Got value: %v\n", readResponse.block, readResponse.value)
readOperations++
latencies = append(latencies, readResponse.latency)
}
case writeResponse := <-writeResponseChannel:
if writeResponse.err != nil {
Expand All @@ -146,6 +156,7 @@ func (c *client) GetResponsesForever(ctx context.Context, readResponseChannel ch
} else {
log.Debug().Msgf("Finished writing block %s. Success: %v\n", writeResponse.block, writeResponse.success)
writeOperations++
latencies = append(latencies, writeResponse.latency)
}
default:
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/client/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,28 @@ import (
"os"
)

func WriteOutputToFile(outputFilePath string, responseCount []ResponseCount) error {
func WriteOutputToFile(outputFilePath string, responseCount []ResponseStatus) error {
file, err := os.Create(outputFilePath)
if err != nil {
return err
}
defer file.Close()
sum := 0.0
experimentAverageLatency := 0.0
// averageLatency := 0.0
for _, count := range responseCount {
throughput := float64(count.readOperations + count.writeOperations)
averageLatency := 0.0
for _, latency := range count.latencies {
averageLatency += float64(latency.Milliseconds())
}
averageLatency = averageLatency / float64(len(count.latencies))
sum += throughput
experimentAverageLatency += averageLatency
file.WriteString(fmt.Sprintf("Throughput: %f\n", throughput))
file.WriteString(fmt.Sprintf("Average Latency: %f\n", averageLatency))
}
file.WriteString(fmt.Sprintf("Average Throughput: %f\n", sum/float64(len(responseCount))))
file.WriteString(fmt.Sprintf("Experiment Average Latency: %f\n", experimentAverageLatency/float64(len(responseCount))))
return nil
}

0 comments on commit 21ad4ee

Please sign in to comment.