Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fragment Watch Reponse Messages #8371

Closed

Conversation

mangoslicer
Copy link
Contributor

Addresses #8188

Still a WIP. 💀 Do not merge 💀

@mangoslicer
Copy link
Contributor Author

I left the log statements in for clarity and added a couple of "tests" for demonstration purposes. The server and watch proxy fragment a watch response by limiting the number of events per message, then the client glues the fragments back together.


int64 fragment_count = 7;

int64 curr_fragment = 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these be bool fragment = 7? it's simpler to have the flag set until the last message so the client only needs to know it should keep accumulating responses

if maxEventsPerMsg == 0 {
return
}
for _, fragment := range v3rpc.FragmentWatchResponse(maxEventsPerMsg, wresp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this duplicates the functionality from the server; the proxy can probably passively forward fragments received from the server instead of refragmenting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the proxy receives responses from a watch client, by the time the proxy gets the watch responses that it is supposed to relay to its watchers, the watch response fragments have already been glued back together. If I am not mistaken the data flow seems to be:
mvcc -> server -> client watcher started by proxy -> proxy -> user's client watcher.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The watch client could have a WithFragments() option to skip reassembly. An alternative is to share the fragmenting code between the proxy and server, but it's not clear to me how that would fit together cleanly. Duplicating the fragmentation policy is not good-- too easy for code to drift.

if maxEventsPerMsg == 0 {
return
}
for _, fragment := range FragmentWatchResponse(maxEventsPerMsg, wr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the WatchCreateRequest protobuf should have bool fragment; field to optionally enable this feature; if it's not opt-in then older clients will have trouble with revisions being split across responses

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I'll have to think about how to implement this. Do you think a map between the watchId and fragment boolean that would be stored in the serverWatchStream would be a good idea? It seems like once the WatchCreateRequest reaches the serverWatchStream's recvLoop the fragment boolean has to be stored somewhere rather than passed along to the watch request on the mvcc backend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds OK

@codecov-io
Copy link

Codecov Report

Merging #8371 into master will decrease coverage by 2.6%.
The diff coverage is 70.65%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #8371      +/-   ##
==========================================
- Coverage   77.23%   74.62%   -2.61%     
==========================================
  Files         353      354       +1     
  Lines       28019    28410     +391     
==========================================
- Hits        21640    21202     -438     
- Misses       4863     5766     +903     
+ Partials     1516     1442      -74
Impacted Files Coverage Δ
etcdserver/config.go 81.81% <ø> (ø) ⬆️
etcdserver/errors.go 0% <ø> (ø) ⬆️
embed/serve.go 68.37% <ø> (ø) ⬆️
etcdserver/api/v3rpc/util.go 100% <ø> (ø) ⬆️
etcdserver/api/v3rpc/rpctypes/error.go 100% <ø> (ø) ⬆️
etcdmain/grpc_proxy.go 18.27% <0%> (-44.17%) ⬇️
etcdctl/ctlv3/command/printer_fields.go 0% <0%> (ø) ⬆️
auth/store.go 80.18% <0%> (-0.19%) ⬇️
etcdctl/ctlv3/command/printer_json.go 58.33% <0%> (-5.31%) ⬇️
etcdctl/ctlv3/command/printer_table.go 0% <0%> (ø) ⬆️
... and 104 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b39891e...d63c431. Read the comment docs.

Copy link
Contributor

@heyitsanthony heyitsanthony left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly remarks about naming and default behavior

@@ -0,0 +1,161 @@
// Copyright 2017 The etcd Authors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be part of clientv3/integration/watch_test.go

}

// Create and register watch proxy.
wp, _ := grpcproxy.NewWatchProxy(clus.Client(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to create a proxy here; the tag cluster_proxy will automatically have the integration package put a proxy between the server and the client

clus.Members[2].GRPCAddr(),
},
}
cli, err := clientv3.New(cfg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't make a special client if it's not needed; just use clus.Client(0)


// Does not include the clientv3.WithFragmentedResponse option.
wChannel := w.Watch(context.TODO(), "foo", clientv3.WithRange("z"))
_, err = clus.Client(0).Revoke(context.Background(), firstLease.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using a lease to bulk delete, `Delete(context.TODO(), "foo", clientv3.WithPrefix())`` would be clearer / less code

defer testutil.AfterTest(t)
// MaxResponseBytes will overflow to 1000 once the grpcOverheadBytes,
// which have a value of 512 * 1024, are added to MaxResponseBytes.
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, MaxResponseBytes: ^uint(0) - (512*1024 - 1 - 1000)})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Size: 1 since this isn't testing any clustering features

WatchId: int64(id),
Created: true,
Canceled: id == -1,
MoreFragments: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to set this; it's false by default

filters []mvcc.FilterFunc
progress bool
prevKV bool
fragmentResponse bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/fragmentResponse/fragment

if err != nil {
return err
}
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

continue
}
var sendFragments func(maxEventsPerMsg int, wr *pb.WatchResponse) error
sendFragments = func(maxEventsPerMsg int, wr *pb.WatchResponse) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should be func (sws *serverWatchStream) sendFragments(maxEvents int, wr *pb.WatchResponse) error


// MoreFragments indicates that more fragments composing one large
// watch fragment are expected.
MoreFragments bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just Fragment?

@mangoslicer
Copy link
Contributor Author

@heyitsanthony
Hey, when you get the time, can you clear up some of the questions I have on your feedback.

@allwmh
Copy link

allwmh commented Jan 17, 2018

have been merge ?

@mangoslicer
Copy link
Contributor Author

@allwmh Not yet, sorry for the delay. I'll continue working on this shortly

@gyuho
Copy link
Contributor

gyuho commented Feb 7, 2018

Closing in favor of #9291.

We will cherry-pick this patch.

Thanks.

@gyuho gyuho closed this Feb 7, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

5 participants