diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index e6a281460d52..d008908508a1 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -35,6 +35,8 @@ var ( ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err() ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err() + ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err() + ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err() ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err() ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err() diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index f41cb6c05695..1857e5498f86 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" + "google.golang.org/grpc/metadata" ) type watchServer struct { @@ -189,19 +190,23 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { } }() + defer sws.close() select { case err = <-errc: close(sws.ctrlStream) case <-stream.Context().Done(): err = stream.Context().Err() - // the only server-side cancellation is noleader for now. if err == context.Canceled { - err = rpctypes.ErrGRPCNoLeader + if md, ok := metadata.FromIncomingContext(stream.Context()); ok { + rl := md.Get(rpctypes.MetadataRequireLeaderKey) + if len(rl) > 0 && rl[0] == "true" { + return rpctypes.ErrGRPCNoLeader + } + } + return rpctypes.ErrGRPCWatchCanceled } } - - sws.close() return err } diff --git a/tests/e2e/metrics_test.go b/tests/e2e/metrics_test.go index b515ac5b2bfc..2de2230eb1ed 100644 --- a/tests/e2e/metrics_test.go +++ b/tests/e2e/metrics_test.go @@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) { {"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")}, {"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)}, {"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))}, + {"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)}, {"/health", `{"health":"true"}`}, } { i++ @@ -59,6 +60,9 @@ func metricsTest(cx ctlCtx) { cx.t.Fatal(err) } + if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil { + cx.t.Fatal(err) + } if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil { cx.t.Fatalf("failed get with curl (%v)", err) }