From 389952941b51ed3ef67a07e670faee6ba98566ff Mon Sep 17 00:00:00 2001 From: Sergei Kraev Date: Sat, 27 Jan 2024 21:50:20 +0400 Subject: [PATCH] Adapted for go mod and vendored some libs: - added Makefile for easy build - added future build build bin in container - added future bulld packages in container - adapted for go mod - vendored some library --- .gitignore | 3 + Dockerfile | 16 +- Makefile | 54 + README.md | 114 ++- go.mod | 8 + go.sum | 4 + main.go | 15 +- build.sh => scripts/build.sh | 0 scripts/build_package.sh | 36 + scripts/prepare_container.sh | 16 + vendor/github.com/outbrain/golib/LICENSE | 201 ++++ .../github.com/samuel/go-zookeeper/.gitignore | 1 - .../samuel/go-zookeeper/.travis.yml | 33 - .../github.com/samuel/go-zookeeper/README.md | 11 - .../samuel/go-zookeeper/examples/basic.go | 22 - .../samuel/go-zookeeper/zk/cluster_test.go | 314 ------ .../github.com/samuel/go-zookeeper/zk/conn.go | 98 +- .../samuel/go-zookeeper/zk/conn_test.go | 57 -- .../samuel/go-zookeeper/zk/constants.go | 25 +- .../samuel/go-zookeeper/zk/constants_test.go | 24 - .../go-zookeeper/zk/dnshostprovider_test.go | 224 ----- .../github.com/samuel/go-zookeeper/zk/flw.go | 8 +- .../samuel/go-zookeeper/zk/flw_test.go | 330 ------ .../samuel/go-zookeeper/zk/lock_test.go | 94 -- .../samuel/go-zookeeper/zk/server_help.go | 216 ---- .../samuel/go-zookeeper/zk/server_java.go | 136 --- .../samuel/go-zookeeper/zk/structs.go | 19 +- .../samuel/go-zookeeper/zk/structs_test.go | 83 -- .../samuel/go-zookeeper/zk/throttle_test.go | 136 --- .../samuel/go-zookeeper/zk/util_test.go | 53 - .../samuel/go-zookeeper/zk/zk_test.go | 939 ------------------ vendor/modules.txt | 6 + 32 files changed, 520 insertions(+), 2776 deletions(-) create mode 100644 Makefile create mode 100644 go.mod create mode 100644 go.sum rename build.sh => scripts/build.sh (100%) create mode 100755 scripts/build_package.sh create mode 100755 scripts/prepare_container.sh create mode 100644 vendor/github.com/outbrain/golib/LICENSE delete mode 100644 vendor/github.com/samuel/go-zookeeper/.gitignore delete mode 100644 vendor/github.com/samuel/go-zookeeper/.travis.yml delete mode 100644 vendor/github.com/samuel/go-zookeeper/README.md delete mode 100644 vendor/github.com/samuel/go-zookeeper/examples/basic.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/cluster_test.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/conn_test.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/constants_test.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/dnshostprovider_test.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/flw_test.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/lock_test.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/server_help.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/server_java.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/structs_test.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/throttle_test.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/util_test.go delete mode 100644 vendor/github.com/samuel/go-zookeeper/zk/zk_test.go create mode 100644 vendor/modules.txt diff --git a/.gitignore b/.gitignore index 5fff1d9..2aee3c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ pkg +pkgs_out +bin +.prepared_cmd diff --git a/Dockerfile b/Dockerfile index ccdad95..eed9293 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,5 @@ -FROM gliderlabs/alpine:3.2 - -MAINTAINER Ryan Eschinger - -COPY . /go/src/github.com/outbrain/zookeepercli/ - -RUN apk add --update go git \ - && cd /go/src/github.com/outbrain/zookeepercli/ \ - && export GOPATH=/go \ - && go get \ - && go build -o /bin/zookeepercli \ - && rm -rf /go \ - && apk del --purge go git +# syntax=docker/dockerfile:experimental +FROM scratch +COPY bin/zookeepercli /bin/zookeepercli ENTRYPOINT ["/bin/zookeepercli"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e15babd --- /dev/null +++ b/Makefile @@ -0,0 +1,54 @@ +.PHONY: clean + +BINARY_NAME ?= zookeepercli +VERSION ?= $(shell git describe --long --tags --always --dirty --abbrev=10) +GOLANG_VERSION ?= 1.21-alpine + +CURRENT_DIR := $(shell pwd) + +DOCKER ?= docker + +.prepare_cmd: + printf "go build -o ./bin/%s -ldflags \"-X 'main.Version=%s'\"\n" "$(BINARY_NAME)" "$(VERSION)" > .prepared_cmd + +# build bin in docker, package in docker and build the docker image +all_in_docker: package_in_docker build_docker_image + @echo "All done" + +# build bin and package locally +all: package + @echo "All done" + +build: .prepare_cmd + sh .prepared_cmd + +package: build + @ /bin/sh ./scripts/build_package.sh + @ echo "=== Packaged files: ===" + @ echo "==== in ./pkgs_out ====" + @ ls -1 ./pkgs_out + @ echo "=======================" + +build_in_docker: .prepare_cmd + ${DOCKER} run --rm -t -v "$(CURRENT_DIR):/app" -w /app \ + -e CGO_ENABLED=0 \ + golang:$(GOLANG_VERSION) /bin/sh .prepared_cmd + +package_in_docker: build_in_docker + ${DOCKER} run --rm -t -v "$(CURRENT_DIR):/app" -w /app \ + -e CGO_ENABLED=0 \ + golang:$(GOLANG_VERSION) \ + /bin/sh -c \ + "/bin/sh ./scripts/prepare_container.sh && /bin/sh ./scripts/build_package.sh" + @ echo "=== Packaged files: ===" + @ echo "==== in ./pkgs_out ====" + @ ls -1 ./pkgs_out + @ echo "=======================" + +build_docker_image: build_in_docker + ${DOCKER} build -t zookeepercli:$(VERSION) -t zookeepercli:latest . + +clean: + rm -rf ./bin + rm -rf ./pkgs_out + rm -rf .prepared_cmd diff --git a/README.md b/README.md index 7a5b9b9..8763c9f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # zookeepercli -[![downloads](https://img.shields.io/github/downloads/outbrain/zookeepercli/total.svg)](https://github.com/outbrain/zookeepercli/releases) +[![downloads](https://img.shields.io/github/downloads/outbrain/zookeepercli/total.svg)](https://github.com/outbrain/zookeepercli/releases) Simple, lightweight, dependable CLI for ZooKeeper @@ -9,109 +9,126 @@ Simple, lightweight, dependable CLI for ZooKeeper * Basic CRUD-like operations: `create`, `set`, `delete` (aka `rm`), `exists`, `get`, `ls` (aka `children`). * Extended operations: `lsr` (ls recursive), `creater` (create recursively), `deleter` (aka `rmr`, delete recursively) * Well formatted and controlled output: supporting either `txt` or `json` format - * Single, no-dependencies binary file, based on a native Go ZooKeeper library + * Single, no-dependencies binary file, based on a native Go ZooKeeper library by [github.com/samuel/go-zookeeper](http://github.com/samuel/go-zookeeper) ([LICENSE](https://github.com/outbrain/zookeepercli/blob/master/go-zookeeper-LICENSE)) -### Download & Install +### Build: +Clone the repository: +``` +git clone --depth 1 https://github.com/openark/zookeepercli.git && zookeepercli +``` -There are [pre built binaries](https://github.com/outbrain/zookeepercli/releases) for download. -You can find `RPM` and `deb` packages, as well as pre-compiled, dependency free `zookeepercli` executable binary. -In fact, the only file installed by the pre-built `RPM` and `deb` packages is said executable binary file. +Build : +``` +make build # build binary with local goland +make package # previous + build deb, rpm, apk, tar, zip packages +make all # all previous steps -Otherwise the source code is freely available; you will need `git` installed as well as `go`, and you're on your own. +make build_in_docker # build binary with using docker container (golang-alpine) +make package_in_docker # previous + build deb, rpm, apk, tar, zip packages with using docker container +make build_docker_image # build docker image with zookeepercli +make all_in_docker # all previous steps - -### Usage: +make clean # clean all build artifacts +``` + +For build packages locally you need requires fpm: https://github.com/jordansissel/fpm. + +You can find the binary in the `bin` directory. \ +Also you can find `deb`, `rpm`, `apk`, `tar` and `zip` packages in pkgs_out. +### Usage: +``` $ zookeepercli --help Usage of zookeepercli: - -acls="31": optional, csv list [1|,2|,4|,8|,16|,31] - -auth_pwd="": optional, digest scheme, pwd - -auth_usr="": optional, digest scheme, user - -c="": command (exists|get|ls|lsr|create|creater|set|delete|rm|deleter|rmr|getacl|setacl) - -debug=false: debug mode (very verbose) - -force=false: force operation - -format="txt": output format (txt|json) - -servers="": srv1[:port1][,srv2[:port2]...] - -stack=false: add stack trace upon error + -acls="31": optional, csv list [1|,2|,4|,8|,16|,31] + -auth_pwd="": optional, digest scheme, pwd + -auth_usr="": optional, digest scheme, user + -c="": command (exists|get|ls|lsr|create|creater|set|delete|rm|deleter|rmr|getacl|setacl) + -debug=false: debug mode (very verbose) + -force=false: force operation + -format="txt": output format (txt|json) + -servers="": srv1[:port1][,srv2[:port2]...] + -stack=false: add stack trace upon error -verbose=false: verbose - + -version: print version and exit +``` ### Examples: - - + + $ zookeepercli --servers srv-1,srv-2,srv-3 -c create /demo_only some_value - + # Default port is 2181. The above is equivalent to: $ zookeepercli --servers srv-1:2181,srv-2:2181,srv-3:2181 -c create /demo_only some_value - + $ zookeepercli --servers srv-1,srv-2,srv-3 --format=txt -c get /demo_only some_value - + # Same as above, JSON format output: $ zookeepercli --servers srv-1,srv-2,srv-3 --format=json -c get /demo_only "some_value" - - # exists exits with exit code 0 when path exists, 1 when path does not exist + + # exists exits with exit code 0 when path exists, 1 when path does not exist $ zookeepercli --servers srv-1,srv-2,srv-3 -c exists /demo_only true - + $ zookeepercli --servers srv-1,srv-2,srv-3 -c set /demo_only another_value - + $ zookeepercli --servers srv-1,srv-2,srv-3 --format=json -c get /demo_only "another_value" - + $ zookeepercli --servers srv-1,srv-2,srv-3 -c delete /demo_only - + $ zookeepercli --servers srv-1,srv-2,srv-3 -c get /demo_only 2014-09-15 04:07:16 FATAL zk: node does not exist - + $ zookeepercli --servers srv-1,srv-2,srv-3 -c create /demo_only "path placeholder" $ zookeepercli --servers srv-1,srv-2,srv-3 -c create /demo_only/key1 "value1" $ zookeepercli --servers srv-1,srv-2,srv-3 -c create /demo_only/key2 "value2" $ zookeepercli --servers srv-1,srv-2,srv-3 -c create /demo_only/key3 "value3" - + $ zookeepercli --servers srv-1,srv-2,srv-3 -c ls /demo_only key3 key2 key1 - + # Same as above, JSON format output: $ zookeepercli --servers srv-1,srv-2,srv-3 --format=json -c ls /demo_only ["key3","key2","key1"] - + $ zookeepercli --servers srv-1,srv-2,srv-3 -c delete /demo_only 2014-09-15 08:26:31 FATAL zk: node has children - + $ zookeepercli --servers srv-1,srv-2,srv-3 -c delete /demo_only/key1 $ zookeepercli --servers srv-1,srv-2,srv-3 -c delete /demo_only/key2 $ zookeepercli --servers srv-1,srv-2,srv-3 -c delete /demo_only/key3 $ zookeepercli --servers srv-1,srv-2,srv-3 -c delete /demo_only # /demo_only path now does not exist. - + # Create recursively a path: $ zookeepercli --servers=srv-1,srv-2,srv-3 -c creater "/demo_only/child/key1" "val1" $ zookeepercli --servers=srv-1,srv-2,srv-3 -c creater "/demo_only/child/key2" "val2" - + # "-c creater" is same as "-c create --force" $ zookeepercli --servers=srv-1,srv-2,srv-3 -c get "/demo_only/child/key1" val1 # This path was auto generated due to recursive create: - $ zookeepercli --servers=srv-1,srv-2,srv-3 -c get "/demo_only" + $ zookeepercli --servers=srv-1,srv-2,srv-3 -c get "/demo_only" zookeepercli auto-generated - + # ls recursively a path and all sub children: - $ zookeepercli --servers=srv-1,srv-2,srv-3 -c lsr "/demo_only" + $ zookeepercli --servers=srv-1,srv-2,srv-3 -c lsr "/demo_only" child child/key1 child/key2 # set value with read and write acl using digest authentication $ zookeepercli --servers 192.168.59.103 --auth_usr "someuser" --auth_pwd "pass" --acls 1,2 -c create /secret4 value4 - + # get value using digest authentication $ zookeepercli --servers 192.168.59.103 --auth_usr "someuser" --auth_pwd "pass" -c get /secret4 @@ -132,17 +149,17 @@ Otherwise the source code is freely available; you will need `git` installed as # set an acl with world and digest authentication creating the node if it doesn't exist $ zookeepercli --servers srv-1,srv-2,srv-3 -force -c setacl /demo_acl_create "world:anyone:rw,digest:someuser:hashedpw:crdwa" -The tool was built in order to allow with shell scripting seamless integration with ZooKeeper. -There is another, official command line tool for ZooKeeper that the author found inadequate -in terms of output format and output control, as well as large footprint. +The tool was built in order to allow with shell scripting seamless integration with ZooKeeper. +There is another, official command line tool for ZooKeeper that the author found inadequate +in terms of output format and output control, as well as large footprint. **zookeepercli** overcomes those limitations and provides with quick, well formatted output as well as -enhanced functionality. +enhanced functionality. ### Docker You can also build and run **zookeepercli** in a Docker container. To build the image: - $ docker build -t zookeepercli . + $ make build_docker_image Now, you can run **zookeepercli** from a container. Examples: @@ -158,9 +175,4 @@ Now, you can run **zookeepercli** from a container. Examples: Release under the [Apache 2.0 license](https://github.com/outbrain/zookeepercli/blob/master/LICENSE) Authored by [Shlomi Noach](https://github.com/shlomi-noach) at [Outbrain](https://github.com/outbrain) - - - - - diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9326db5 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module github.com/openark/zookeepercli + +go 1.21.5 + +require ( + github.com/outbrain/golib v0.0.0-20200503083229-2531e5dbcc71 + github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..1fed751 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/outbrain/golib v0.0.0-20200503083229-2531e5dbcc71 h1:5FSwz/q8DhpkUsq8cqRN7gRVWWnfXfjeOeB8Bhj5ARc= +github.com/outbrain/golib v0.0.0-20200503083229-2531e5dbcc71/go.mod h1:JDhu//MMvcPVPH889Xr7DyamEbTLumgDBALGUyXrz1g= +github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 h1:AJNDS0kP60X8wwWFvbLPwDuojxubj9pbfK7pjHw0vKg= +github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= diff --git a/main.go b/main.go index dbe2398..119913e 100644 --- a/main.go +++ b/main.go @@ -19,17 +19,20 @@ package main import ( "flag" "fmt" - "github.com/outbrain/golib/log" - "github.com/outbrain/zookeepercli/output" - "github.com/outbrain/zookeepercli/zk" "io/ioutil" "math/rand" "os" "sort" "strings" "time" + + "github.com/openark/zookeepercli/output" + "github.com/openark/zookeepercli/zk" + "github.com/outbrain/golib/log" ) +var Version = "undefined-dev-version" + // main is the application's entry point. func main() { servers := flag.String("servers", "", "srv1[:port1][,srv2[:port2]...]") @@ -43,8 +46,14 @@ func main() { authUser := flag.String("auth_usr", "", "optional, digest scheme, user") authPwd := flag.String("auth_pwd", "", "optional, digest scheme, pwd") acls := flag.String("acls", "31", "optional, csv list [1|,2|,4|,8|,16|,31]") + version := flag.Bool("version", false, "print version and exit") flag.Parse() + if *version { + fmt.Println("zookeepercli version:", Version) + os.Exit(0) + } + log.SetLevel(log.ERROR) if *verbose { log.SetLevel(log.INFO) diff --git a/build.sh b/scripts/build.sh similarity index 100% rename from build.sh rename to scripts/build.sh diff --git a/scripts/build_package.sh b/scripts/build_package.sh new file mode 100755 index 0000000..0035fc9 --- /dev/null +++ b/scripts/build_package.sh @@ -0,0 +1,36 @@ +#!/bin/sh -e +# Build deb package for zookeepercli +# Sergei Kraev +# + +echo "========= Preparing packing start ========="; +PKGNAME="zookeepercli" +VER=$(git describe --long --tags --always --abbrev=10 | sed 's/^[^0-9]//ig') +ARCH=$(arch | sed s/aarch64/arm64/ | sed s/x86_64/amd64/) +RDIR="$(pwd)/pkgs_out/" + +rm -rf "${RDIR}" +mkdir -p "$RDIR" + +OUTTYPES="rpm deb apk tar zip" + +for OUTTYPE in $OUTTYPES; do + echo "========= Packing ${OUTTYPE} start ========="; + fpm \ + --output-type "${OUTTYPE}" \ + --input-type dir \ + --force \ + \ + --name "${PKGNAME}" \ + --package "${RDIR}" \ + --version "${VER}" \ + --architecture "${ARCH}" \ + --maintainer 'Shlomi Noach ' \ + --url 'https://github.com/openark/zookeepercli' \ + --description 'Zookeeper client console' \ + --license 'Apache 2.0' \ + --category 'universe/net' \ + --no-depends --no-auto-depends \ + --prefix /usr/local/bin \ + --chdir "./bin" . +done diff --git a/scripts/prepare_container.sh b/scripts/prepare_container.sh new file mode 100755 index 0000000..1cf0427 --- /dev/null +++ b/scripts/prepare_container.sh @@ -0,0 +1,16 @@ +#!/bin/sh -e +# Build deb package for zookeepercli +# Sergei Kraev +# + +echo "========= Preparing packing start ========="; +apk update +apk add --no-cache \ + git \ + ruby \ + rpm-dev \ + tar \ + zip + +git config --global --add safe.directory "$(pwd)" +gem install fpm diff --git a/vendor/github.com/outbrain/golib/LICENSE b/vendor/github.com/outbrain/golib/LICENSE new file mode 100644 index 0000000..6875dca --- /dev/null +++ b/vendor/github.com/outbrain/golib/LICENSE @@ -0,0 +1,201 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + +"License" shall mean the terms and conditions for use, reproduction, +and distribution as defined by Sections 1 through 9 of this document. + +"Licensor" shall mean the copyright owner or entity authorized by +the copyright owner that is granting the License. + +"Legal Entity" shall mean the union of the acting entity and all +other entities that control, are controlled by, or are under common +control with that entity. For the purposes of this definition, +"control" means (i) the power, direct or indirect, to cause the +direction or management of such entity, whether by contract or +otherwise, or (ii) ownership of fifty percent (50%) or more of the +outstanding shares, or (iii) beneficial ownership of such entity. + +"You" (or "Your") shall mean an individual or Legal Entity +exercising permissions granted by this License. + +"Source" form shall mean the preferred form for making modifications, +including but not limited to software source code, documentation +source, and configuration files. + +"Object" form shall mean any form resulting from mechanical +transformation or translation of a Source form, including but +not limited to compiled object code, generated documentation, +and conversions to other media types. + +"Work" shall mean the work of authorship, whether in Source or +Object form, made available under the License, as indicated by a +copyright notice that is included in or attached to the work +(an example is provided in the Appendix below). + +"Derivative Works" shall mean any work, whether in Source or Object +form, that is based on (or derived from) the Work and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. For the purposes +of this License, Derivative Works shall not include works that remain +separable from, or merely link (or bind by name) to the interfaces of, +the Work and Derivative Works thereof. + +"Contribution" shall mean any work of authorship, including +the original version of the Work and any modifications or additions +to that Work or Derivative Works thereof, that is intentionally +submitted to Licensor for inclusion in the Work by the copyright owner +or by an individual or Legal Entity authorized to submit on behalf of +the copyright owner. For the purposes of this definition, "submitted" +means any form of electronic, verbal, or written communication sent +to the Licensor or its representatives, including but not limited to +communication on electronic mailing lists, source code control systems, +and issue tracking systems that are managed by, or on behalf of, the +Licensor for the purpose of discussing and improving the Work, but +excluding communication that is conspicuously marked or otherwise +designated in writing by the copyright owner as "Not a Contribution." + +"Contributor" shall mean Licensor and any individual or Legal Entity +on behalf of whom a Contribution has been received by Licensor and +subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +copyright license to reproduce, prepare Derivative Works of, +publicly display, publicly perform, sublicense, and distribute the +Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +(except as stated in this section) patent license to make, have made, +use, offer to sell, sell, import, and otherwise transfer the Work, +where such license applies only to those patent claims licensable +by such Contributor that are necessarily infringed by their +Contribution(s) alone or by combination of their Contribution(s) +with the Work to which such Contribution(s) was submitted. If You +institute patent litigation against any entity (including a +cross-claim or counterclaim in a lawsuit) alleging that the Work +or a Contribution incorporated within the Work constitutes direct +or contributory patent infringement, then any patent licenses +granted to You under this License for that Work shall terminate +as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the +Work or Derivative Works thereof in any medium, with or without +modifications, and in Source or Object form, provided that You +meet the following conditions: + +(a) You must give any other recipients of the Work or +Derivative Works a copy of this License; and + +(b) You must cause any modified files to carry prominent notices +stating that You changed the files; and + +(c) You must retain, in the Source form of any Derivative Works +that You distribute, all copyright, patent, trademark, and +attribution notices from the Source form of the Work, +excluding those notices that do not pertain to any part of +the Derivative Works; and + +(d) If the Work includes a "NOTICE" text file as part of its +distribution, then any Derivative Works that You distribute must +include a readable copy of the attribution notices contained +within such NOTICE file, excluding those notices that do not +pertain to any part of the Derivative Works, in at least one +of the following places: within a NOTICE text file distributed +as part of the Derivative Works; within the Source form or +documentation, if provided along with the Derivative Works; or, +within a display generated by the Derivative Works, if and +wherever such third-party notices normally appear. The contents +of the NOTICE file are for informational purposes only and +do not modify the License. You may add Your own attribution +notices within Derivative Works that You distribute, alongside +or as an addendum to the NOTICE text from the Work, provided +that such additional attribution notices cannot be construed +as modifying the License. + +You may add Your own copyright statement to Your modifications and +may provide additional or different license terms and conditions +for use, reproduction, or distribution of Your modifications, or +for any such Derivative Works as a whole, provided Your use, +reproduction, and distribution of the Work otherwise complies with +the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, +any Contribution intentionally submitted for inclusion in the Work +by You to the Licensor shall be under the terms and conditions of +this License, without any additional terms or conditions. +Notwithstanding the above, nothing herein shall supersede or modify +the terms of any separate license agreement you may have executed +with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade +names, trademarks, service marks, or product names of the Licensor, +except as required for reasonable and customary use in describing the +origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or +agreed to in writing, Licensor provides the Work (and each +Contributor provides its Contributions) on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied, including, without limitation, any warranties or conditions +of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A +PARTICULAR PURPOSE. You are solely responsible for determining the +appropriateness of using or redistributing the Work and assume any +risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, +whether in tort (including negligence), contract, or otherwise, +unless required by applicable law (such as deliberate and grossly +negligent acts) or agreed to in writing, shall any Contributor be +liable to You for damages, including any direct, indirect, special, +incidental, or consequential damages of any character arising as a +result of this License or out of the use or inability to use the +Work (including but not limited to damages for loss of goodwill, +work stoppage, computer failure or malfunction, or any and all +other commercial damages or losses), even if such Contributor +has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing +the Work or Derivative Works thereof, You may choose to offer, +and charge a fee for, acceptance of support, warranty, indemnity, +or other liability obligations and/or rights consistent with this +License. However, in accepting such obligations, You may act only +on Your own behalf and on Your sole responsibility, not on behalf +of any other Contributor, and only if You agree to indemnify, +defend, and hold each Contributor harmless for any liability +incurred by, or claims asserted against, such Contributor by reason +of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + +To apply the Apache License to your work, attach the following +boilerplate notice, with the fields enclosed by brackets "{}" +replaced with your own identifying information. (Don't include +the brackets!) The text should be enclosed in the appropriate +comment syntax for the file format. We also recommend that a +file or class name and description of purpose be included on the +same "printed page" as the copyright notice for easier +identification within third-party archives. + +Copyright 2014 Outbrain Inc + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/vendor/github.com/samuel/go-zookeeper/.gitignore b/vendor/github.com/samuel/go-zookeeper/.gitignore deleted file mode 100644 index e43b0f9..0000000 --- a/vendor/github.com/samuel/go-zookeeper/.gitignore +++ /dev/null @@ -1 +0,0 @@ -.DS_Store diff --git a/vendor/github.com/samuel/go-zookeeper/.travis.yml b/vendor/github.com/samuel/go-zookeeper/.travis.yml deleted file mode 100644 index 65b27a8..0000000 --- a/vendor/github.com/samuel/go-zookeeper/.travis.yml +++ /dev/null @@ -1,33 +0,0 @@ -language: go -go: - - 1.9 - -jdk: - - oraclejdk9 - -sudo: false - -branches: - only: - - master - -before_install: - - wget http://apache.cs.utah.edu/zookeeper/zookeeper-${zk_version}/zookeeper-${zk_version}.tar.gz - - tar -zxvf zookeeper*tar.gz && zip -d zookeeper-${zk_version}/contrib/fatjar/zookeeper-${zk_version}-fatjar.jar 'META-INF/*.SF' 'META-INF/*.DSA' - - go get github.com/mattn/goveralls - - go get golang.org/x/tools/cmd/cover - -script: - - jdk_switcher use oraclejdk9 - - go build ./... - - go fmt ./... - - go vet ./... - - go test -i -race ./... - - go test -race -covermode atomic -coverprofile=profile.cov ./zk - - goveralls -coverprofile=profile.cov -service=travis-ci - -env: - global: - secure: Coha3DDcXmsekrHCZlKvRAc+pMBaQU1QS/3++3YCCUXVDBWgVsC1ZIc9df4RLdZ/ncGd86eoRq/S+zyn1XbnqK5+ePqwJoUnJ59BE8ZyHLWI9ajVn3fND1MTduu/ksGsS79+IYbdVI5wgjSgjD3Ktp6Y5uPl+BPosjYBGdNcHS4= - matrix: - - zk_version=3.4.10 diff --git a/vendor/github.com/samuel/go-zookeeper/README.md b/vendor/github.com/samuel/go-zookeeper/README.md deleted file mode 100644 index afc1d08..0000000 --- a/vendor/github.com/samuel/go-zookeeper/README.md +++ /dev/null @@ -1,11 +0,0 @@ -Native Go Zookeeper Client Library -=================================== - -[![GoDoc](https://godoc.org/github.com/samuel/go-zookeeper?status.svg)](https://godoc.org/github.com/samuel/go-zookeeper) -[![Build Status](https://travis-ci.org/samuel/go-zookeeper.png)](https://travis-ci.org/samuel/go-zookeeper) -[![Coverage Status](https://coveralls.io/repos/github/samuel/go-zookeeper/badge.svg?branch=master)](https://coveralls.io/github/samuel/go-zookeeper?branch=master) - -License -------- - -3-clause BSD. See LICENSE file. diff --git a/vendor/github.com/samuel/go-zookeeper/examples/basic.go b/vendor/github.com/samuel/go-zookeeper/examples/basic.go deleted file mode 100644 index 28dfa63..0000000 --- a/vendor/github.com/samuel/go-zookeeper/examples/basic.go +++ /dev/null @@ -1,22 +0,0 @@ -package main - -import ( - "fmt" - "time" - - "github.com/samuel/go-zookeeper/zk" -) - -func main() { - c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second) //*10) - if err != nil { - panic(err) - } - children, stat, ch, err := c.ChildrenW("/") - if err != nil { - panic(err) - } - fmt.Printf("%+v %+v\n", children, stat) - e := <-ch - fmt.Printf("%+v\n", e) -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/cluster_test.go b/vendor/github.com/samuel/go-zookeeper/zk/cluster_test.go deleted file mode 100644 index dcceaa4..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/cluster_test.go +++ /dev/null @@ -1,314 +0,0 @@ -package zk - -import ( - "sync" - "testing" - "time" -) - -type logWriter struct { - t *testing.T - p string -} - -func (lw logWriter) Write(b []byte) (int, error) { - lw.t.Logf("%s%s", lw.p, string(b)) - return len(b), nil -} - -func TestBasicCluster(t *testing.T) { - ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk1, err := ts.Connect(0) - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk1.Close() - zk2, err := ts.Connect(1) - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk2.Close() - - time.Sleep(time.Second * 5) - - if _, err := zk1.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create failed on node 1: %+v", err) - } - if by, _, err := zk2.Get("/gozk-test"); err != nil { - t.Fatalf("Get failed on node 2: %+v", err) - } else if string(by) != "foo-cluster" { - t.Fatal("Wrong data for node 2") - } -} - -// If the current leader dies, then the session is reestablished with the new one. -func TestClientClusterFailover(t *testing.T) { - tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer tc.Stop() - zk, evCh, err := tc.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - sl := NewStateLogger(evCh) - - hasSessionEvent1 := sl.NewWatcher(sessionStateMatcher(StateHasSession)).Wait(8 * time.Second) - if hasSessionEvent1 == nil { - t.Fatalf("Failed to connect and get session") - } - - if _, err := zk.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create failed on node 1: %+v", err) - } - - hasSessionWatcher2 := sl.NewWatcher(sessionStateMatcher(StateHasSession)) - - // Kill the current leader - tc.StopServer(hasSessionEvent1.Server) - - // Wait for the session to be reconnected with the new leader. - if hasSessionWatcher2.Wait(8*time.Second) == nil { - t.Fatalf("Failover failed") - } - - if by, _, err := zk.Get("/gozk-test"); err != nil { - t.Fatalf("Get failed on node 2: %+v", err) - } else if string(by) != "foo-cluster" { - t.Fatal("Wrong data for node 2") - } -} - -// If a ZooKeeper cluster looses quorum then a session is reconnected as soon -// as the quorum is restored. -func TestNoQuorum(t *testing.T) { - tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer tc.Stop() - zk, evCh, err := tc.ConnectAllTimeout(4 * time.Second) - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - sl := NewStateLogger(evCh) - - // Wait for initial session to be established - hasSessionEvent1 := sl.NewWatcher(sessionStateMatcher(StateHasSession)).Wait(8 * time.Second) - if hasSessionEvent1 == nil { - t.Fatalf("Failed to connect and get session") - } - initialSessionID := zk.sessionID - DefaultLogger.Printf(" Session established: id=%d, timeout=%d", zk.sessionID, zk.sessionTimeoutMs) - - // Kill the ZooKeeper leader and wait for the session to reconnect. - DefaultLogger.Printf(" Kill the leader") - disconnectWatcher1 := sl.NewWatcher(sessionStateMatcher(StateDisconnected)) - hasSessionWatcher2 := sl.NewWatcher(sessionStateMatcher(StateHasSession)) - tc.StopServer(hasSessionEvent1.Server) - - disconnectedEvent1 := disconnectWatcher1.Wait(8 * time.Second) - if disconnectedEvent1 == nil { - t.Fatalf("Failover failed, missed StateDisconnected event") - } - if disconnectedEvent1.Server != hasSessionEvent1.Server { - t.Fatalf("Unexpected StateDisconnected event, expected=%s, actual=%s", - hasSessionEvent1.Server, disconnectedEvent1.Server) - } - - hasSessionEvent2 := hasSessionWatcher2.Wait(8 * time.Second) - if hasSessionEvent2 == nil { - t.Fatalf("Failover failed, missed StateHasSession event") - } - - // Kill the ZooKeeper leader leaving the cluster without quorum. - DefaultLogger.Printf(" Kill the leader") - disconnectWatcher2 := sl.NewWatcher(sessionStateMatcher(StateDisconnected)) - tc.StopServer(hasSessionEvent2.Server) - - disconnectedEvent2 := disconnectWatcher2.Wait(8 * time.Second) - if disconnectedEvent2 == nil { - t.Fatalf("Failover failed, missed StateDisconnected event") - } - if disconnectedEvent2.Server != hasSessionEvent2.Server { - t.Fatalf("Unexpected StateDisconnected event, expected=%s, actual=%s", - hasSessionEvent2.Server, disconnectedEvent2.Server) - } - - // Make sure that we keep retrying connecting to the only remaining - // ZooKeeper server, but the attempts are being dropped because there is - // no quorum. - DefaultLogger.Printf(" Retrying no luck...") - var firstDisconnect *Event - begin := time.Now() - for time.Now().Sub(begin) < 6*time.Second { - disconnectedEvent := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(4 * time.Second) - if disconnectedEvent == nil { - t.Fatalf("Disconnected event expected") - } - if firstDisconnect == nil { - firstDisconnect = disconnectedEvent - continue - } - if disconnectedEvent.Server != firstDisconnect.Server { - t.Fatalf("Disconnect from wrong server: expected=%s, actual=%s", - firstDisconnect.Server, disconnectedEvent.Server) - } - } - - // Start a ZooKeeper node to restore quorum. - hasSessionWatcher3 := sl.NewWatcher(sessionStateMatcher(StateHasSession)) - tc.StartServer(hasSessionEvent1.Server) - - // Make sure that session is reconnected with the same ID. - hasSessionEvent3 := hasSessionWatcher3.Wait(8 * time.Second) - if hasSessionEvent3 == nil { - t.Fatalf("Session has not been reconnected") - } - if zk.sessionID != initialSessionID { - t.Fatalf("Wrong session ID: expected=%d, actual=%d", initialSessionID, zk.sessionID) - } - - // Make sure that the session is not dropped soon after reconnect - e := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(6 * time.Second) - if e != nil { - t.Fatalf("Unexpected disconnect") - } -} - -func TestWaitForClose(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, err := ts.Connect(0) - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - timeout := time.After(30 * time.Second) -CONNECTED: - for { - select { - case ev := <-zk.eventChan: - if ev.State == StateConnected { - break CONNECTED - } - case <-timeout: - zk.Close() - t.Fatal("Timeout") - } - } - zk.Close() - for { - select { - case _, ok := <-zk.eventChan: - if !ok { - return - } - case <-timeout: - t.Fatal("Timeout") - } - } -} - -func TestBadSession(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - - zk.conn.Close() - time.Sleep(time.Millisecond * 100) - - if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } -} - -type EventLogger struct { - events []Event - watchers []*EventWatcher - lock sync.Mutex - wg sync.WaitGroup -} - -func NewStateLogger(eventCh <-chan Event) *EventLogger { - el := &EventLogger{} - el.wg.Add(1) - go func() { - defer el.wg.Done() - for event := range eventCh { - el.lock.Lock() - for _, sw := range el.watchers { - if !sw.triggered && sw.matcher(event) { - sw.triggered = true - sw.matchCh <- event - } - } - DefaultLogger.Printf(" event received: %v\n", event) - el.events = append(el.events, event) - el.lock.Unlock() - } - }() - return el -} - -func (el *EventLogger) NewWatcher(matcher func(Event) bool) *EventWatcher { - ew := &EventWatcher{matcher: matcher, matchCh: make(chan Event, 1)} - el.lock.Lock() - el.watchers = append(el.watchers, ew) - el.lock.Unlock() - return ew -} - -func (el *EventLogger) Events() []Event { - el.lock.Lock() - transitions := make([]Event, len(el.events)) - copy(transitions, el.events) - el.lock.Unlock() - return transitions -} - -func (el *EventLogger) Wait4Stop() { - el.wg.Wait() -} - -type EventWatcher struct { - matcher func(Event) bool - matchCh chan Event - triggered bool -} - -func (ew *EventWatcher) Wait(timeout time.Duration) *Event { - select { - case event := <-ew.matchCh: - return &event - case <-time.After(timeout): - return nil - } -} - -func sessionStateMatcher(s State) func(Event) bool { - return func(e Event) bool { - return e.Type == EventSession && e.State == s - } -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/conn.go b/vendor/github.com/samuel/go-zookeeper/zk/conn.go index f79a51b..da9503a 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/conn.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/conn.go @@ -409,13 +409,11 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) { defer close(reauthReadyChan) if c.logInfo { - c.logger.Printf("Re-submitting `%d` credentials after reconnect", - len(c.creds)) + c.logger.Printf("re-submitting `%d` credentials after reconnect", len(c.creds)) } for _, cred := range c.creds { if shouldCancel() { - c.logger.Printf("Cancel rer-submitting credentials") return } resChan, err := c.sendRequest( @@ -428,7 +426,7 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) { nil) if err != nil { - c.logger.Printf("Call to sendRequest failed during credential resubmit: %s", err) + c.logger.Printf("call to sendRequest failed during credential resubmit: %s", err) // FIXME(prozlach): lets ignore errors for now continue } @@ -437,14 +435,14 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) { select { case res = <-resChan: case <-c.closeChan: - c.logger.Printf("Recv closed, cancel re-submitting credentials") + c.logger.Printf("recv closed, cancel re-submitting credentials") return case <-c.shouldQuit: - c.logger.Printf("Should quit, cancel re-submitting credentials") + c.logger.Printf("should quit, cancel re-submitting credentials") return } if res.err != nil { - c.logger.Printf("Credential re-submit failed: %s", res.err) + c.logger.Printf("credential re-submit failed: %s", res.err) // FIXME(prozlach): lets ignore errors for now continue } @@ -486,14 +484,14 @@ func (c *Conn) loop() { err := c.authenticate() switch { case err == ErrSessionExpired: - c.logger.Printf("Authentication failed: %s", err) + c.logger.Printf("authentication failed: %s", err) c.invalidateWatches(err) case err != nil && c.conn != nil: - c.logger.Printf("Authentication failed: %s", err) + c.logger.Printf("authentication failed: %s", err) c.conn.Close() case err == nil: if c.logInfo { - c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs) + c.logger.Printf("authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs) } c.hostProvider.Connected() // mark success c.closeChan = make(chan struct{}) // channel to tell send loop stop @@ -508,7 +506,7 @@ func (c *Conn) loop() { } err := c.sendLoop() if err != nil || c.logInfo { - c.logger.Printf("Send loop terminated: err=%v", err) + c.logger.Printf("send loop terminated: err=%v", err) } c.conn.Close() // causes recv loop to EOF/exit wg.Done() @@ -523,7 +521,7 @@ func (c *Conn) loop() { err = c.recvLoop(c.conn) } if err != io.EOF || c.logInfo { - c.logger.Printf("Recv loop terminated: err=%v", err) + c.logger.Printf("recv loop terminated: err=%v", err) } if err == nil { panic("zk: recvLoop should never return nil error") @@ -697,20 +695,28 @@ func (c *Conn) authenticate() error { binary.BigEndian.PutUint32(buf[:4], uint32(n)) - c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10)) + if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10)); err != nil { + return err + } _, err = c.conn.Write(buf[:n+4]) - c.conn.SetWriteDeadline(time.Time{}) if err != nil { return err } + if err := c.conn.SetWriteDeadline(time.Time{}); err != nil { + return err + } // Receive and decode a connect response. - c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10)) + if err := c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10)); err != nil { + return err + } _, err = io.ReadFull(c.conn, buf[:4]) - c.conn.SetReadDeadline(time.Time{}) if err != nil { return err } + if err := c.conn.SetReadDeadline(time.Time{}); err != nil { + return err + } blen := int(binary.BigEndian.Uint32(buf[:4])) if cap(buf) < blen { @@ -772,14 +778,18 @@ func (c *Conn) sendData(req *request) error { c.requests[req.xid] = req c.requestsLock.Unlock() - c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)) + if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)); err != nil { + return err + } _, err = c.conn.Write(c.buf[:n+4]) - c.conn.SetWriteDeadline(time.Time{}) if err != nil { req.recvChan <- response{-1, err} c.conn.Close() return err } + if err := c.conn.SetWriteDeadline(time.Time{}); err != nil { + return err + } return nil } @@ -802,13 +812,17 @@ func (c *Conn) sendLoop() error { binary.BigEndian.PutUint32(c.buf[:4], uint32(n)) - c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)) + if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)); err != nil { + return err + } _, err = c.conn.Write(c.buf[:n+4]) - c.conn.SetWriteDeadline(time.Time{}) if err != nil { c.conn.Close() return err } + if err := c.conn.SetWriteDeadline(time.Time{}); err != nil { + return err + } case <-c.closeChan: return nil } @@ -823,10 +837,12 @@ func (c *Conn) recvLoop(conn net.Conn) error { buf := make([]byte, sz) for { // package length - conn.SetReadDeadline(time.Now().Add(c.recvTimeout)) + if err := conn.SetReadDeadline(time.Now().Add(c.recvTimeout)); err != nil { + c.logger.Printf("failed to set connection deadline: %v", err) + } _, err := io.ReadFull(conn, buf[:4]) if err != nil { - return err + return fmt.Errorf("failed to read from connection: %v", err) } blen := int(binary.BigEndian.Uint32(buf[:4])) @@ -838,10 +854,12 @@ func (c *Conn) recvLoop(conn net.Conn) error { } _, err = io.ReadFull(conn, buf[:blen]) - conn.SetReadDeadline(time.Time{}) if err != nil { return err } + if err := conn.SetReadDeadline(time.Time{}); err != nil { + return err + } res := responseHeader{} _, err = decodePacket(buf[:16], &res) @@ -874,7 +892,7 @@ func (c *Conn) recvLoop(conn net.Conn) error { c.watchersLock.Lock() for _, t := range wTypes { wpt := watchPathType{res.Path, t} - if watchers := c.watchers[wpt]; watchers != nil && len(watchers) > 0 { + if watchers, ok := c.watchers[wpt]; ok { for _, ch := range watchers { ch <- ev close(ch) @@ -1220,6 +1238,38 @@ func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) { return mr, err } +// IncrementalReconfig is the zookeeper reconfiguration api that allows adding and removing servers +// by lists of members. +// Return the new configuration stats. +func (c *Conn) IncrementalReconfig(joining, leaving []string, version int64) (*Stat, error) { + // TODO: validate the shape of the member string to give early feedback. + request := &reconfigRequest{ + JoiningServers: []byte(strings.Join(joining, ",")), + LeavingServers: []byte(strings.Join(leaving, ",")), + CurConfigId: version, + } + + return c.internalReconfig(request) +} + +// Reconfig is the non-incremental update functionality for Zookeeper where the list preovided +// is the entire new member list. +// the optional version allows for conditional reconfigurations, -1 ignores the condition. +func (c *Conn) Reconfig(members []string, version int64) (*Stat, error) { + request := &reconfigRequest{ + NewMembers: []byte(strings.Join(members, ",")), + CurConfigId: version, + } + + return c.internalReconfig(request) +} + +func (c *Conn) internalReconfig(request *reconfigRequest) (*Stat, error) { + response := &reconfigReponse{} + _, err := c.request(opReconfig, request, response, nil) + return &response.Stat, err +} + // Server returns the current or last-connected server name. func (c *Conn) Server() string { c.serverMu.Lock() diff --git a/vendor/github.com/samuel/go-zookeeper/zk/conn_test.go b/vendor/github.com/samuel/go-zookeeper/zk/conn_test.go deleted file mode 100644 index 94206d9..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/conn_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package zk - -import ( - "io/ioutil" - "testing" - "time" -) - -func TestRecurringReAuthHang(t *testing.T) { - t.Skip("Race condition in test") - - sessionTimeout := 2 * time.Second - - finish := make(chan struct{}) - defer close(finish) - go func() { - select { - case <-finish: - return - case <-time.After(5 * sessionTimeout): - panic("expected not hang") - } - }() - - zkC, err := StartTestCluster(2, ioutil.Discard, ioutil.Discard) - if err != nil { - panic(err) - } - defer zkC.Stop() - - conn, evtC, err := zkC.ConnectAll() - if err != nil { - panic(err) - } - for conn.State() != StateHasSession { - time.Sleep(50 * time.Millisecond) - } - - go func() { - for range evtC { - } - }() - - // Add auth. - conn.AddAuth("digest", []byte("test:test")) - - currentServer := conn.Server() - conn.debugCloseRecvLoop = true - conn.debugReauthDone = make(chan struct{}) - zkC.StopServer(currentServer) - // wait connect to new zookeeper. - for conn.Server() == currentServer && conn.State() != StateHasSession { - time.Sleep(100 * time.Millisecond) - } - - <-conn.debugReauthDone -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/constants.go b/vendor/github.com/samuel/go-zookeeper/zk/constants.go index 33b5563..ccafcfc 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/constants.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/constants.go @@ -2,6 +2,7 @@ package zk import ( "errors" + "fmt" ) const ( @@ -25,6 +26,7 @@ const ( opGetChildren2 = 12 opCheck = 13 opMulti = 14 + opReconfig = 16 opClose = -11 opSetAuth = 100 opSetWatches = 101 @@ -92,7 +94,7 @@ func (s State) String() string { if name := stateNames[s]; name != "" { return name } - return "Unknown" + return "unknown state" } type ErrCode int32 @@ -113,8 +115,10 @@ var ( ErrClosing = errors.New("zk: zookeeper is closing") ErrNothing = errors.New("zk: no server responsees to process") ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored") - + ErrReconfigDisabled = errors.New("attempts to perform a reconfiguration operation when reconfiguration feature is disabled") + ErrBadArguments = errors.New("invalid arguments") // ErrInvalidCallback = errors.New("zk: invalid callback specified") + errCodeToError = map[ErrCode]error{ 0: nil, errAPIError: ErrAPIError, @@ -126,11 +130,13 @@ var ( errNotEmpty: ErrNotEmpty, errSessionExpired: ErrSessionExpired, // errInvalidCallback: ErrInvalidCallback, - errInvalidAcl: ErrInvalidACL, - errAuthFailed: ErrAuthFailed, - errClosing: ErrClosing, - errNothing: ErrNothing, - errSessionMoved: ErrSessionMoved, + errInvalidAcl: ErrInvalidACL, + errAuthFailed: ErrAuthFailed, + errClosing: ErrClosing, + errNothing: ErrNothing, + errSessionMoved: ErrSessionMoved, + errZReconfigDisabled: ErrReconfigDisabled, + errBadArguments: ErrBadArguments, } ) @@ -138,7 +144,7 @@ func (e ErrCode) toError() error { if err, ok := errCodeToError[e]; ok { return err } - return ErrUnknown + return fmt.Errorf("unknown error: %v", e) } const ( @@ -168,6 +174,8 @@ const ( errClosing ErrCode = -116 errNothing ErrCode = -117 errSessionMoved ErrCode = -118 + // Attempts to perform a reconfiguration operation when reconfiguration feature is disabled + errZReconfigDisabled ErrCode = -123 ) // Constants for ACL permissions @@ -197,6 +205,7 @@ var ( opGetChildren2: "getChildren2", opCheck: "check", opMulti: "multi", + opReconfig: "reconfig", opClose: "close", opSetAuth: "setAuth", opSetWatches: "setWatches", diff --git a/vendor/github.com/samuel/go-zookeeper/zk/constants_test.go b/vendor/github.com/samuel/go-zookeeper/zk/constants_test.go deleted file mode 100644 index 9fe6b04..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/constants_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package zk - -import ( - "fmt" - "testing" -) - -func TestModeString(t *testing.T) { - if fmt.Sprintf("%v", ModeUnknown) != "unknown" { - t.Errorf("unknown value should be 'unknown'") - } - - if fmt.Sprintf("%v", ModeLeader) != "leader" { - t.Errorf("leader value should be 'leader'") - } - - if fmt.Sprintf("%v", ModeFollower) != "follower" { - t.Errorf("follower value should be 'follower'") - } - - if fmt.Sprintf("%v", ModeStandalone) != "standalone" { - t.Errorf("standlone value should be 'standalone'") - } -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/dnshostprovider_test.go b/vendor/github.com/samuel/go-zookeeper/zk/dnshostprovider_test.go deleted file mode 100644 index 77a6065..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/dnshostprovider_test.go +++ /dev/null @@ -1,224 +0,0 @@ -package zk - -import ( - "fmt" - "log" - "testing" - "time" -) - -// localhostLookupHost is a test replacement for net.LookupHost that -// always returns 127.0.0.1 -func localhostLookupHost(host string) ([]string, error) { - return []string{"127.0.0.1"}, nil -} - -// TestDNSHostProviderCreate is just like TestCreate, but with an -// overridden HostProvider that ignores the provided hostname. -func TestDNSHostProviderCreate(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - - port := ts.Servers[0].Port - server := fmt.Sprintf("foo.example.com:%d", port) - hostProvider := &DNSHostProvider{lookupHost: localhostLookupHost} - zk, _, err := Connect([]string{server}, time.Second*15, WithHostProvider(hostProvider)) - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - path := "/gozk-test" - - if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create returned error: %+v", err) - } else if p != path { - t.Fatalf("Create returned different path '%s' != '%s'", p, path) - } - if data, stat, err := zk.Get(path); err != nil { - t.Fatalf("Get returned error: %+v", err) - } else if stat == nil { - t.Fatal("Get returned nil stat") - } else if len(data) < 4 { - t.Fatal("Get returned wrong size data") - } -} - -// localHostPortsFacade wraps a HostProvider, remapping the -// address/port combinations it returns to "localhost:$PORT" where -// $PORT is chosen from the provided ports. -type localHostPortsFacade struct { - inner HostProvider // The wrapped HostProvider - ports []int // The provided list of ports - nextPort int // The next port to use - mapped map[string]string // Already-mapped address/port combinations -} - -func newLocalHostPortsFacade(inner HostProvider, ports []int) *localHostPortsFacade { - return &localHostPortsFacade{ - inner: inner, - ports: ports, - mapped: make(map[string]string), - } -} - -func (lhpf *localHostPortsFacade) Len() int { return lhpf.inner.Len() } -func (lhpf *localHostPortsFacade) Connected() { lhpf.inner.Connected() } -func (lhpf *localHostPortsFacade) Init(servers []string) error { return lhpf.inner.Init(servers) } -func (lhpf *localHostPortsFacade) Next() (string, bool) { - server, retryStart := lhpf.inner.Next() - - // If we've already set up a mapping for that server, just return it. - if localMapping := lhpf.mapped[server]; localMapping != "" { - return localMapping, retryStart - } - - if lhpf.nextPort == len(lhpf.ports) { - log.Fatalf("localHostPortsFacade out of ports to assign to %q; current config: %q", server, lhpf.mapped) - } - - localMapping := fmt.Sprintf("localhost:%d", lhpf.ports[lhpf.nextPort]) - lhpf.mapped[server] = localMapping - lhpf.nextPort++ - return localMapping, retryStart -} - -var _ HostProvider = &localHostPortsFacade{} - -// TestDNSHostProviderReconnect tests that the zk.Conn correctly -// reconnects when the Zookeeper instance it's connected to -// restarts. It wraps the DNSHostProvider in a lightweight facade that -// remaps addresses to localhost:$PORT combinations corresponding to -// the test ZooKeeper instances. -func TestDNSHostProviderReconnect(t *testing.T) { - ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - - innerHp := &DNSHostProvider{lookupHost: func(host string) ([]string, error) { - return []string{"192.0.2.1", "192.0.2.2", "192.0.2.3"}, nil - }} - ports := make([]int, 0, len(ts.Servers)) - for _, server := range ts.Servers { - ports = append(ports, server.Port) - } - hp := newLocalHostPortsFacade(innerHp, ports) - - zk, _, err := Connect([]string{"foo.example.com:12345"}, time.Second, WithHostProvider(hp)) - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - path := "/gozk-test" - - // Initial operation to force connection. - if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - - // Figure out which server we're connected to. - currentServer := zk.Server() - t.Logf("Connected to %q. Finding test server index…", currentServer) - serverIndex := -1 - for i, server := range ts.Servers { - server := fmt.Sprintf("localhost:%d", server.Port) - t.Logf("…trying %q", server) - if currentServer == server { - serverIndex = i - t.Logf("…found at index %d", i) - break - } - } - if serverIndex == -1 { - t.Fatalf("Cannot determine test server index.") - } - - // Restart the connected server. - ts.Servers[serverIndex].Srv.Stop() - ts.Servers[serverIndex].Srv.Start() - - // Continue with the basic TestCreate tests. - if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create returned error: %+v", err) - } else if p != path { - t.Fatalf("Create returned different path '%s' != '%s'", p, path) - } - if data, stat, err := zk.Get(path); err != nil { - t.Fatalf("Get returned error: %+v", err) - } else if stat == nil { - t.Fatal("Get returned nil stat") - } else if len(data) < 4 { - t.Fatal("Get returned wrong size data") - } - - if zk.Server() == currentServer { - t.Errorf("Still connected to %q after restart.", currentServer) - } -} - -// TestDNSHostProviderRetryStart tests the `retryStart` functionality -// of DNSHostProvider. -// It's also probably the clearest visual explanation of exactly how -// it works. -func TestDNSHostProviderRetryStart(t *testing.T) { - t.Parallel() - - hp := &DNSHostProvider{lookupHost: func(host string) ([]string, error) { - return []string{"192.0.2.1", "192.0.2.2", "192.0.2.3"}, nil - }} - - if err := hp.Init([]string{"foo.example.com:12345"}); err != nil { - t.Fatal(err) - } - - testdata := []struct { - retryStartWant bool - callConnected bool - }{ - // Repeated failures. - {false, false}, - {false, false}, - {false, false}, - {true, false}, - {false, false}, - {false, false}, - {true, true}, - - // One success offsets things. - {false, false}, - {false, true}, - {false, true}, - - // Repeated successes. - {false, true}, - {false, true}, - {false, true}, - {false, true}, - {false, true}, - - // And some more failures. - {false, false}, - {false, false}, - {true, false}, // Looped back to last known good server: all alternates failed. - {false, false}, - } - - for i, td := range testdata { - _, retryStartGot := hp.Next() - if retryStartGot != td.retryStartWant { - t.Errorf("%d: retryStart=%v; want %v", i, retryStartGot, td.retryStartWant) - } - if td.callConnected { - hp.Connected() - } - } -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/flw.go b/vendor/github.com/samuel/go-zookeeper/zk/flw.go index 3e97f96..1fb8b2a 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/flw.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/flw.go @@ -255,12 +255,16 @@ func fourLetterWord(server, command string, timeout time.Duration) ([]byte, erro // once the command has been processed, but better safe than sorry defer conn.Close() - conn.SetWriteDeadline(time.Now().Add(timeout)) + if err := conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil { + return nil, err + } _, err = conn.Write([]byte(command)) if err != nil { return nil, err } - conn.SetReadDeadline(time.Now().Add(timeout)) + if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil { + return nil, err + } return ioutil.ReadAll(conn) } diff --git a/vendor/github.com/samuel/go-zookeeper/zk/flw_test.go b/vendor/github.com/samuel/go-zookeeper/zk/flw_test.go deleted file mode 100644 index 5bbabb9..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/flw_test.go +++ /dev/null @@ -1,330 +0,0 @@ -package zk - -import ( - "net" - "testing" - "time" -) - -var ( - zkSrvrOut = `Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT -Latency min/avg/max: 0/1/10 -Received: 4207 -Sent: 4220 -Connections: 81 -Outstanding: 1 -Zxid: 0x110a7a8f37 -Mode: leader -Node count: 306 -` - zkConsOut = ` /10.42.45.231:45361[1](queued=0,recved=9435,sent=9457,sid=0x94c2989e04716b5,lop=PING,est=1427238717217,to=20001,lcxid=0x55120915,lzxid=0xffffffffffffffff,lresp=1427259255908,llat=0,minlat=0,avglat=1,maxlat=17) - /10.55.33.98:34342[1](queued=0,recved=9338,sent=9350,sid=0x94c2989e0471731,lop=PING,est=1427238849319,to=20001,lcxid=0x55120944,lzxid=0xffffffffffffffff,lresp=1427259252294,llat=0,minlat=0,avglat=1,maxlat=18) - /10.44.145.114:46556[1](queued=0,recved=109253,sent=109617,sid=0x94c2989e0471709,lop=DELE,est=1427238791305,to=20001,lcxid=0x55139618,lzxid=0x110a7b187d,lresp=1427259257423,llat=2,minlat=0,avglat=1,maxlat=23) - -` -) - -func TestFLWRuok(t *testing.T) { - t.Parallel() - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - defer l.Close() - - go tcpServer(l, "") - - oks := FLWRuok([]string{l.Addr().String()}, time.Second*10) - if len(oks) == 0 { - t.Errorf("no values returned") - } - if !oks[0] { - t.Errorf("instance should be marked as OK") - } - - // - // Confirm that it also returns false for dead instances - // - l, err = net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - defer l.Close() - - go tcpServer(l, "dead") - - oks = FLWRuok([]string{l.Addr().String()}, time.Second*10) - if len(oks) == 0 { - t.Errorf("no values returned") - } - if oks[0] { - t.Errorf("instance should be marked as not OK") - } -} - -func TestFLWSrvr(t *testing.T) { - t.Parallel() - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - defer l.Close() - - go tcpServer(l, "") - - statsSlice, ok := FLWSrvr([]string{l.Addr().String()}, time.Second*10) - if !ok { - t.Errorf("failure indicated on 'srvr' parsing") - } - if len(statsSlice) == 0 { - t.Errorf("no *ServerStats instances returned") - } - - stats := statsSlice[0] - - if stats.Error != nil { - t.Fatalf("error seen in stats: %v", err.Error()) - } - - if stats.Sent != 4220 { - t.Errorf("Sent != 4220") - } - - if stats.Received != 4207 { - t.Errorf("Received != 4207") - } - - if stats.NodeCount != 306 { - t.Errorf("NodeCount != 306") - } - - if stats.MinLatency != 0 { - t.Errorf("MinLatency != 0") - } - - if stats.AvgLatency != 1 { - t.Errorf("AvgLatency != 1") - } - - if stats.MaxLatency != 10 { - t.Errorf("MaxLatency != 10") - } - - if stats.Connections != 81 { - t.Errorf("Connection != 81") - } - - if stats.Outstanding != 1 { - t.Errorf("Outstanding != 1") - } - - if stats.Epoch != 17 { - t.Errorf("Epoch != 17") - } - - if stats.Counter != 175804215 { - t.Errorf("Counter != 175804215") - } - - if stats.Mode != ModeLeader { - t.Errorf("Mode != ModeLeader") - } - - if stats.Version != "3.4.6-1569965" { - t.Errorf("Version expected: 3.4.6-1569965") - } -} - -func TestFLWCons(t *testing.T) { - t.Parallel() - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - defer l.Close() - - go tcpServer(l, "") - - clients, ok := FLWCons([]string{l.Addr().String()}, time.Second*10) - if !ok { - t.Errorf("failure indicated on 'cons' parsing") - } - if len(clients) == 0 { - t.Errorf("no *ServerClients instances returned") - } - - results := []*ServerClient{ - { - Queued: 0, - Received: 9435, - Sent: 9457, - SessionID: 669956116721374901, - LastOperation: "PING", - Established: time.Unix(1427238717217, 0), - Timeout: 20001, - Lcxid: 1427245333, - Lzxid: -1, - LastResponse: time.Unix(1427259255908, 0), - LastLatency: 0, - MinLatency: 0, - AvgLatency: 1, - MaxLatency: 17, - Addr: "10.42.45.231:45361", - }, - { - Queued: 0, - Received: 9338, - Sent: 9350, - SessionID: 669956116721375025, - LastOperation: "PING", - Established: time.Unix(1427238849319, 0), - Timeout: 20001, - Lcxid: 1427245380, - Lzxid: -1, - LastResponse: time.Unix(1427259252294, 0), - LastLatency: 0, - MinLatency: 0, - AvgLatency: 1, - MaxLatency: 18, - Addr: "10.55.33.98:34342", - }, - { - Queued: 0, - Received: 109253, - Sent: 109617, - SessionID: 669956116721374985, - LastOperation: "DELE", - Established: time.Unix(1427238791305, 0), - Timeout: 20001, - Lcxid: 1427346968, - Lzxid: 73190283389, - LastResponse: time.Unix(1427259257423, 0), - LastLatency: 2, - MinLatency: 0, - AvgLatency: 1, - MaxLatency: 23, - Addr: "10.44.145.114:46556", - }, - } - - for _, z := range clients { - if z.Error != nil { - t.Errorf("error seen: %v", err.Error()) - } - - for i, v := range z.Clients { - c := results[i] - - if v.Error != nil { - t.Errorf("client error seen: %v", err.Error()) - } - - if v.Queued != c.Queued { - t.Errorf("Queued value mismatch (%d/%d)", v.Queued, c.Queued) - } - - if v.Received != c.Received { - t.Errorf("Received value mismatch (%d/%d)", v.Received, c.Received) - } - - if v.Sent != c.Sent { - t.Errorf("Sent value mismatch (%d/%d)", v.Sent, c.Sent) - } - - if v.SessionID != c.SessionID { - t.Errorf("SessionID value mismatch (%d/%d)", v.SessionID, c.SessionID) - } - - if v.LastOperation != c.LastOperation { - t.Errorf("LastOperation value mismatch ('%v'/'%v')", v.LastOperation, c.LastOperation) - } - - if v.Timeout != c.Timeout { - t.Errorf("Timeout value mismatch (%d/%d)", v.Timeout, c.Timeout) - } - - if v.Lcxid != c.Lcxid { - t.Errorf("Lcxid value mismatch (%d/%d)", v.Lcxid, c.Lcxid) - } - - if v.Lzxid != c.Lzxid { - t.Errorf("Lzxid value mismatch (%d/%d)", v.Lzxid, c.Lzxid) - } - - if v.LastLatency != c.LastLatency { - t.Errorf("LastLatency value mismatch (%d/%d)", v.LastLatency, c.LastLatency) - } - - if v.MinLatency != c.MinLatency { - t.Errorf("MinLatency value mismatch (%d/%d)", v.MinLatency, c.MinLatency) - } - - if v.AvgLatency != c.AvgLatency { - t.Errorf("AvgLatency value mismatch (%d/%d)", v.AvgLatency, c.AvgLatency) - } - - if v.MaxLatency != c.MaxLatency { - t.Errorf("MaxLatency value mismatch (%d/%d)", v.MaxLatency, c.MaxLatency) - } - - if v.Addr != c.Addr { - t.Errorf("Addr value mismatch ('%v'/'%v')", v.Addr, c.Addr) - } - - if !c.Established.Equal(v.Established) { - t.Errorf("Established value mismatch (%v/%v)", c.Established, v.Established) - } - - if !c.LastResponse.Equal(v.LastResponse) { - t.Errorf("Established value mismatch (%v/%v)", c.LastResponse, v.LastResponse) - } - } - } -} - -func tcpServer(listener net.Listener, thing string) { - for { - conn, err := listener.Accept() - if err != nil { - return - } - go connHandler(conn, thing) - } -} - -func connHandler(conn net.Conn, thing string) { - defer conn.Close() - - data := make([]byte, 4) - - _, err := conn.Read(data) - if err != nil { - return - } - - switch string(data) { - case "ruok": - switch thing { - case "dead": - return - default: - conn.Write([]byte("imok")) - } - case "srvr": - switch thing { - case "dead": - return - default: - conn.Write([]byte(zkSrvrOut)) - } - case "cons": - switch thing { - case "dead": - return - default: - conn.Write([]byte(zkConsOut)) - } - default: - conn.Write([]byte("This ZooKeeper instance is not currently serving requests.")) - } -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/lock_test.go b/vendor/github.com/samuel/go-zookeeper/zk/lock_test.go deleted file mode 100644 index 8a3478a..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/lock_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package zk - -import ( - "testing" - "time" -) - -func TestLock(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - acls := WorldACL(PermAll) - - l := NewLock(zk, "/test", acls) - if err := l.Lock(); err != nil { - t.Fatal(err) - } - if err := l.Unlock(); err != nil { - t.Fatal(err) - } - - val := make(chan int, 3) - - if err := l.Lock(); err != nil { - t.Fatal(err) - } - - l2 := NewLock(zk, "/test", acls) - go func() { - if err := l2.Lock(); err != nil { - t.Fatal(err) - } - val <- 2 - if err := l2.Unlock(); err != nil { - t.Fatal(err) - } - val <- 3 - }() - time.Sleep(time.Millisecond * 100) - - val <- 1 - if err := l.Unlock(); err != nil { - t.Fatal(err) - } - if x := <-val; x != 1 { - t.Fatalf("Expected 1 instead of %d", x) - } - if x := <-val; x != 2 { - t.Fatalf("Expected 2 instead of %d", x) - } - if x := <-val; x != 3 { - t.Fatalf("Expected 3 instead of %d", x) - } -} - -// This tests creating a lock with a path that's more than 1 node deep (e.g. "/test-multi-level/lock"), -// when a part of that path already exists (i.e. "/test-multi-level" node already exists). -func TestMultiLevelLock(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - acls := WorldACL(PermAll) - path := "/test-multi-level" - if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create returned error: %+v", err) - } else if p != path { - t.Fatalf("Create returned different path '%s' != '%s'", p, path) - } - l := NewLock(zk, "/test-multi-level/lock", acls) - defer zk.Delete("/test-multi-level", -1) // Clean up what we've created for this test - defer zk.Delete("/test-multi-level/lock", -1) - if err := l.Lock(); err != nil { - t.Fatal(err) - } - if err := l.Unlock(); err != nil { - t.Fatal(err) - } -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/server_help.go b/vendor/github.com/samuel/go-zookeeper/zk/server_help.go deleted file mode 100644 index 3663064..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/server_help.go +++ /dev/null @@ -1,216 +0,0 @@ -package zk - -import ( - "fmt" - "io" - "io/ioutil" - "math/rand" - "os" - "path/filepath" - "strings" - "time" -) - -func init() { - rand.Seed(time.Now().UnixNano()) -} - -type TestServer struct { - Port int - Path string - Srv *Server -} - -type TestCluster struct { - Path string - Servers []TestServer -} - -func StartTestCluster(size int, stdout, stderr io.Writer) (*TestCluster, error) { - tmpPath, err := ioutil.TempDir("", "gozk") - if err != nil { - return nil, err - } - success := false - startPort := int(rand.Int31n(6000) + 10000) - cluster := &TestCluster{Path: tmpPath} - defer func() { - if !success { - cluster.Stop() - } - }() - for serverN := 0; serverN < size; serverN++ { - srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN)) - if err := os.Mkdir(srvPath, 0700); err != nil { - return nil, err - } - port := startPort + serverN*3 - cfg := ServerConfig{ - ClientPort: port, - DataDir: srvPath, - } - for i := 0; i < size; i++ { - cfg.Servers = append(cfg.Servers, ServerConfigServer{ - ID: i + 1, - Host: "127.0.0.1", - PeerPort: startPort + i*3 + 1, - LeaderElectionPort: startPort + i*3 + 2, - }) - } - cfgPath := filepath.Join(srvPath, "zoo.cfg") - fi, err := os.Create(cfgPath) - if err != nil { - return nil, err - } - err = cfg.Marshall(fi) - fi.Close() - if err != nil { - return nil, err - } - - fi, err = os.Create(filepath.Join(srvPath, "myid")) - if err != nil { - return nil, err - } - _, err = fmt.Fprintf(fi, "%d\n", serverN+1) - fi.Close() - if err != nil { - return nil, err - } - - srv := &Server{ - ConfigPath: cfgPath, - Stdout: stdout, - Stderr: stderr, - } - if err := srv.Start(); err != nil { - return nil, err - } - cluster.Servers = append(cluster.Servers, TestServer{ - Path: srvPath, - Port: cfg.ClientPort, - Srv: srv, - }) - } - if err := cluster.waitForStart(10, time.Second); err != nil { - return nil, err - } - success = true - return cluster, nil -} - -func (tc *TestCluster) Connect(idx int) (*Conn, error) { - zk, _, err := Connect([]string{fmt.Sprintf("127.0.0.1:%d", tc.Servers[idx].Port)}, time.Second*15) - return zk, err -} - -func (tc *TestCluster) ConnectAll() (*Conn, <-chan Event, error) { - return tc.ConnectAllTimeout(time.Second * 15) -} - -func (tc *TestCluster) ConnectAllTimeout(sessionTimeout time.Duration) (*Conn, <-chan Event, error) { - return tc.ConnectWithOptions(sessionTimeout) -} - -func (tc *TestCluster) ConnectWithOptions(sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error) { - hosts := make([]string, len(tc.Servers)) - for i, srv := range tc.Servers { - hosts[i] = fmt.Sprintf("127.0.0.1:%d", srv.Port) - } - zk, ch, err := Connect(hosts, sessionTimeout, options...) - return zk, ch, err -} - -func (tc *TestCluster) Stop() error { - for _, srv := range tc.Servers { - srv.Srv.Stop() - } - defer os.RemoveAll(tc.Path) - return tc.waitForStop(5, time.Second) -} - -// waitForStart blocks until the cluster is up -func (tc *TestCluster) waitForStart(maxRetry int, interval time.Duration) error { - // verify that the servers are up with SRVR - serverAddrs := make([]string, len(tc.Servers)) - for i, s := range tc.Servers { - serverAddrs[i] = fmt.Sprintf("127.0.0.1:%d", s.Port) - } - - for i := 0; i < maxRetry; i++ { - _, ok := FLWSrvr(serverAddrs, time.Second) - if ok { - return nil - } - time.Sleep(interval) - } - return fmt.Errorf("unable to verify health of servers") -} - -// waitForStop blocks until the cluster is down -func (tc *TestCluster) waitForStop(maxRetry int, interval time.Duration) error { - // verify that the servers are up with RUOK - serverAddrs := make([]string, len(tc.Servers)) - for i, s := range tc.Servers { - serverAddrs[i] = fmt.Sprintf("127.0.0.1:%d", s.Port) - } - - var success bool - for i := 0; i < maxRetry && !success; i++ { - success = true - for _, ok := range FLWRuok(serverAddrs, time.Second) { - if ok { - success = false - } - } - if !success { - time.Sleep(interval) - } - } - if !success { - return fmt.Errorf("unable to verify servers are down") - } - return nil -} - -func (tc *TestCluster) StartServer(server string) { - for _, s := range tc.Servers { - if strings.HasSuffix(server, fmt.Sprintf(":%d", s.Port)) { - s.Srv.Start() - return - } - } - panic(fmt.Sprintf("Unknown server: %s", server)) -} - -func (tc *TestCluster) StopServer(server string) { - for _, s := range tc.Servers { - if strings.HasSuffix(server, fmt.Sprintf(":%d", s.Port)) { - s.Srv.Stop() - return - } - } - panic(fmt.Sprintf("Unknown server: %s", server)) -} - -func (tc *TestCluster) StartAllServers() error { - for _, s := range tc.Servers { - if err := s.Srv.Start(); err != nil { - return fmt.Errorf( - "Failed to start server listening on port `%d` : %+v", s.Port, err) - } - } - - return nil -} - -func (tc *TestCluster) StopAllServers() error { - for _, s := range tc.Servers { - if err := s.Srv.Stop(); err != nil { - return fmt.Errorf( - "Failed to stop server listening on port `%d` : %+v", s.Port, err) - } - } - - return nil -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/server_java.go b/vendor/github.com/samuel/go-zookeeper/zk/server_java.go deleted file mode 100644 index e553ec1..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/server_java.go +++ /dev/null @@ -1,136 +0,0 @@ -package zk - -import ( - "fmt" - "io" - "os" - "os/exec" - "path/filepath" -) - -type ErrMissingServerConfigField string - -func (e ErrMissingServerConfigField) Error() string { - return fmt.Sprintf("zk: missing server config field '%s'", string(e)) -} - -const ( - DefaultServerTickTime = 2000 - DefaultServerInitLimit = 10 - DefaultServerSyncLimit = 5 - DefaultServerAutoPurgeSnapRetainCount = 3 - DefaultPeerPort = 2888 - DefaultLeaderElectionPort = 3888 -) - -type ServerConfigServer struct { - ID int - Host string - PeerPort int - LeaderElectionPort int -} - -type ServerConfig struct { - TickTime int // Number of milliseconds of each tick - InitLimit int // Number of ticks that the initial synchronization phase can take - SyncLimit int // Number of ticks that can pass between sending a request and getting an acknowledgement - DataDir string // Direcrory where the snapshot is stored - ClientPort int // Port at which clients will connect - AutoPurgeSnapRetainCount int // Number of snapshots to retain in dataDir - AutoPurgePurgeInterval int // Purge task internal in hours (0 to disable auto purge) - Servers []ServerConfigServer -} - -func (sc ServerConfig) Marshall(w io.Writer) error { - if sc.DataDir == "" { - return ErrMissingServerConfigField("dataDir") - } - fmt.Fprintf(w, "dataDir=%s\n", sc.DataDir) - if sc.TickTime <= 0 { - sc.TickTime = DefaultServerTickTime - } - fmt.Fprintf(w, "tickTime=%d\n", sc.TickTime) - if sc.InitLimit <= 0 { - sc.InitLimit = DefaultServerInitLimit - } - fmt.Fprintf(w, "initLimit=%d\n", sc.InitLimit) - if sc.SyncLimit <= 0 { - sc.SyncLimit = DefaultServerSyncLimit - } - fmt.Fprintf(w, "syncLimit=%d\n", sc.SyncLimit) - if sc.ClientPort <= 0 { - sc.ClientPort = DefaultPort - } - fmt.Fprintf(w, "clientPort=%d\n", sc.ClientPort) - if sc.AutoPurgePurgeInterval > 0 { - if sc.AutoPurgeSnapRetainCount <= 0 { - sc.AutoPurgeSnapRetainCount = DefaultServerAutoPurgeSnapRetainCount - } - fmt.Fprintf(w, "autopurge.snapRetainCount=%d\n", sc.AutoPurgeSnapRetainCount) - fmt.Fprintf(w, "autopurge.purgeInterval=%d\n", sc.AutoPurgePurgeInterval) - } - if len(sc.Servers) > 0 { - for _, srv := range sc.Servers { - if srv.PeerPort <= 0 { - srv.PeerPort = DefaultPeerPort - } - if srv.LeaderElectionPort <= 0 { - srv.LeaderElectionPort = DefaultLeaderElectionPort - } - fmt.Fprintf(w, "server.%d=%s:%d:%d\n", srv.ID, srv.Host, srv.PeerPort, srv.LeaderElectionPort) - } - } - return nil -} - -var jarSearchPaths = []string{ - "zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar", - "../zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar", - "/usr/share/java/zookeeper-*.jar", - "/usr/local/zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar", - "/usr/local/Cellar/zookeeper/*/libexec/contrib/fatjar/zookeeper-*-fatjar.jar", -} - -func findZookeeperFatJar() string { - var paths []string - zkPath := os.Getenv("ZOOKEEPER_PATH") - if zkPath == "" { - paths = jarSearchPaths - } else { - paths = []string{filepath.Join(zkPath, "contrib/fatjar/zookeeper-*-fatjar.jar")} - } - for _, path := range paths { - matches, _ := filepath.Glob(path) - // TODO: could sort by version and pick latest - if len(matches) > 0 { - return matches[0] - } - } - return "" -} - -type Server struct { - JarPath string - ConfigPath string - Stdout, Stderr io.Writer - - cmd *exec.Cmd -} - -func (srv *Server) Start() error { - if srv.JarPath == "" { - srv.JarPath = findZookeeperFatJar() - if srv.JarPath == "" { - return fmt.Errorf("zk: unable to find server jar") - } - } - srv.cmd = exec.Command("java", "-jar", srv.JarPath, "server", srv.ConfigPath) - srv.cmd.Stdout = srv.Stdout - srv.cmd.Stderr = srv.Stderr - return srv.cmd.Start() -} - -func (srv *Server) Stop() error { - srv.cmd.Process.Signal(os.Kill) - return srv.cmd.Wait() -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/structs.go b/vendor/github.com/samuel/go-zookeeper/zk/structs.go index d4af27d..9400c3c 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/structs.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/structs.go @@ -6,6 +6,7 @@ import ( "log" "reflect" "runtime" + "strings" "time" ) @@ -277,6 +278,18 @@ type multiResponse struct { DoneHeader multiHeader } +// zk version 3.5 reconfig API +type reconfigRequest struct { + JoiningServers []byte + LeavingServers []byte + NewMembers []byte + // curConfigId version of the current configuration + // optional - causes reconfiguration to return an error if configuration is no longer current + CurConfigId int64 +} + +type reconfigReponse getDataResponse + func (r *multiRequest) Encode(buf []byte) (int, error) { total := 0 for _, op := range r.Ops { @@ -392,7 +405,7 @@ type encoder interface { func decodePacket(buf []byte, st interface{}) (n int, err error) { defer func() { if r := recover(); r != nil { - if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" { + if e, ok := r.(runtime.Error); ok && strings.HasPrefix(e.Error(), "runtime error: slice bounds out of range") { err = ErrShortBuffer } else { panic(r) @@ -483,7 +496,7 @@ func decodePacketValue(buf []byte, v reflect.Value) (int, error) { func encodePacket(buf []byte, st interface{}) (n int, err error) { defer func() { if r := recover(); r != nil { - if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" { + if e, ok := r.(runtime.Error); ok && strings.HasPrefix(e.Error(), "runtime error: slice bounds out of range") { err = ErrShortBuffer } else { panic(r) @@ -604,6 +617,8 @@ func requestStructForOp(op int32) interface{} { return &CheckVersionRequest{} case opMulti: return &multiRequest{} + case opReconfig: + return &reconfigRequest{} } return nil } diff --git a/vendor/github.com/samuel/go-zookeeper/zk/structs_test.go b/vendor/github.com/samuel/go-zookeeper/zk/structs_test.go deleted file mode 100644 index a3f2797..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/structs_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package zk - -import ( - "reflect" - "testing" -) - -func TestEncodeDecodePacket(t *testing.T) { - t.Parallel() - encodeDecodeTest(t, &requestHeader{-2, 5}) - encodeDecodeTest(t, &connectResponse{1, 2, 3, nil}) - encodeDecodeTest(t, &connectResponse{1, 2, 3, []byte{4, 5, 6}}) - encodeDecodeTest(t, &getAclResponse{[]ACL{{12, "s", "anyone"}}, Stat{}}) - encodeDecodeTest(t, &getChildrenResponse{[]string{"foo", "bar"}}) - encodeDecodeTest(t, &pathWatchRequest{"path", true}) - encodeDecodeTest(t, &pathWatchRequest{"path", false}) - encodeDecodeTest(t, &CheckVersionRequest{"/", -1}) - encodeDecodeTest(t, &multiRequest{Ops: []multiRequestOp{{multiHeader{opCheck, false, -1}, &CheckVersionRequest{"/", -1}}}}) -} - -func TestRequestStructForOp(t *testing.T) { - for op, name := range opNames { - if op != opNotify && op != opWatcherEvent { - if s := requestStructForOp(op); s == nil { - t.Errorf("No struct for op %s", name) - } - } - } -} - -func encodeDecodeTest(t *testing.T, r interface{}) { - buf := make([]byte, 1024) - n, err := encodePacket(buf, r) - if err != nil { - t.Errorf("encodePacket returned non-nil error %+v\n", err) - return - } - t.Logf("%+v %x", r, buf[:n]) - r2 := reflect.New(reflect.ValueOf(r).Elem().Type()).Interface() - n2, err := decodePacket(buf[:n], r2) - if err != nil { - t.Errorf("decodePacket returned non-nil error %+v\n", err) - return - } - if n != n2 { - t.Errorf("sizes don't match: %d != %d", n, n2) - return - } - if !reflect.DeepEqual(r, r2) { - t.Errorf("results don't match: %+v != %+v", r, r2) - return - } -} - -func TestEncodeShortBuffer(t *testing.T) { - t.Parallel() - _, err := encodePacket([]byte{}, &requestHeader{1, 2}) - if err != ErrShortBuffer { - t.Errorf("encodePacket should return ErrShortBuffer on a short buffer instead of '%+v'", err) - return - } -} - -func TestDecodeShortBuffer(t *testing.T) { - t.Parallel() - _, err := decodePacket([]byte{}, &responseHeader{}) - if err != ErrShortBuffer { - t.Errorf("decodePacket should return ErrShortBuffer on a short buffer instead of '%+v'", err) - return - } -} - -func BenchmarkEncode(b *testing.B) { - buf := make([]byte, 4096) - st := &connectRequest{Passwd: []byte("1234567890")} - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - if _, err := encodePacket(buf, st); err != nil { - b.Fatal(err) - } - } -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/throttle_test.go b/vendor/github.com/samuel/go-zookeeper/zk/throttle_test.go deleted file mode 100644 index 633ce05..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/throttle_test.go +++ /dev/null @@ -1,136 +0,0 @@ -/* -Copyright 2012 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Vendored from go4.org/net/throttle - -package zk - -import ( - "fmt" - "net" - "sync" - "time" -) - -const unitSize = 1400 // read/write chunk size. ~MTU size. - -type Rate struct { - KBps int // or 0, to not rate-limit bandwidth - Latency time.Duration -} - -// byteTime returns the time required for n bytes. -func (r Rate) byteTime(n int) time.Duration { - if r.KBps == 0 { - return 0 - } - return time.Duration(float64(n)/1024/float64(r.KBps)) * time.Second -} - -type Listener struct { - net.Listener - Down Rate // server Writes to Client - Up Rate // server Reads from client -} - -func (ln *Listener) Accept() (net.Conn, error) { - c, err := ln.Listener.Accept() - time.Sleep(ln.Up.Latency) - if err != nil { - return nil, err - } - tc := &conn{Conn: c, Down: ln.Down, Up: ln.Up} - tc.start() - return tc, nil -} - -type nErr struct { - n int - err error -} - -type writeReq struct { - writeAt time.Time - p []byte - resc chan nErr -} - -type conn struct { - net.Conn - Down Rate // for reads - Up Rate // for writes - - wchan chan writeReq - closeOnce sync.Once - closeErr error -} - -func (c *conn) start() { - c.wchan = make(chan writeReq, 1024) - go c.writeLoop() -} - -func (c *conn) writeLoop() { - for req := range c.wchan { - time.Sleep(req.writeAt.Sub(time.Now())) - var res nErr - for len(req.p) > 0 && res.err == nil { - writep := req.p - if len(writep) > unitSize { - writep = writep[:unitSize] - } - n, err := c.Conn.Write(writep) - time.Sleep(c.Up.byteTime(len(writep))) - res.n += n - res.err = err - req.p = req.p[n:] - } - req.resc <- res - } -} - -func (c *conn) Close() error { - c.closeOnce.Do(func() { - err := c.Conn.Close() - close(c.wchan) - c.closeErr = err - }) - return c.closeErr -} - -func (c *conn) Write(p []byte) (n int, err error) { - defer func() { - if e := recover(); e != nil { - n = 0 - err = fmt.Errorf("%v", err) - return - } - }() - resc := make(chan nErr, 1) - c.wchan <- writeReq{time.Now().Add(c.Up.Latency), p, resc} - res := <-resc - return res.n, res.err -} - -func (c *conn) Read(p []byte) (n int, err error) { - const max = 1024 - if len(p) > max { - p = p[:max] - } - n, err = c.Conn.Read(p) - time.Sleep(c.Down.byteTime(n)) - return -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/util_test.go b/vendor/github.com/samuel/go-zookeeper/zk/util_test.go deleted file mode 100644 index 53a5950..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/util_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package zk - -import "testing" - -func TestFormatServers(t *testing.T) { - t.Parallel() - servers := []string{"127.0.0.1:2181", "127.0.0.42", "127.0.42.1:8811"} - r := []string{"127.0.0.1:2181", "127.0.0.42:2181", "127.0.42.1:8811"} - for i, s := range FormatServers(servers) { - if s != r[i] { - t.Errorf("%v should equal %v", s, r[i]) - } - } -} - -func TestValidatePath(t *testing.T) { - tt := []struct { - path string - seq bool - valid bool - }{ - {"/this is / a valid/path", false, true}, - {"/", false, true}, - {"", false, false}, - {"not/valid", false, false}, - {"/ends/with/slash/", false, false}, - {"/sequential/", true, true}, - {"/test\u0000", false, false}, - {"/double//slash", false, false}, - {"/single/./period", false, false}, - {"/double/../period", false, false}, - {"/double/..ok/period", false, true}, - {"/double/alsook../period", false, true}, - {"/double/period/at/end/..", false, false}, - {"/name/with.period", false, true}, - {"/test\u0001", false, false}, - {"/test\u001f", false, false}, - {"/test\u0020", false, true}, // first allowable - {"/test\u007e", false, true}, // last valid ascii - {"/test\u007f", false, false}, - {"/test\u009f", false, false}, - {"/test\uf8ff", false, false}, - {"/test\uffef", false, true}, - {"/test\ufff0", false, false}, - } - - for _, tc := range tt { - err := validatePath(tc.path, tc.seq) - if (err != nil) == tc.valid { - t.Errorf("failed to validate path %q", tc.path) - } - } -} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/zk_test.go b/vendor/github.com/samuel/go-zookeeper/zk/zk_test.go deleted file mode 100644 index c81ef9f..0000000 --- a/vendor/github.com/samuel/go-zookeeper/zk/zk_test.go +++ /dev/null @@ -1,939 +0,0 @@ -package zk - -import ( - "crypto/rand" - "encoding/hex" - "fmt" - "io" - "net" - "reflect" - "regexp" - "sort" - "strings" - "sync" - "sync/atomic" - "testing" - "time" -) - -func TestStateChanges(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - - callbackChan := make(chan Event) - f := func(event Event) { - callbackChan <- event - } - - zk, eventChan, err := ts.ConnectWithOptions(15*time.Second, WithEventCallback(f)) - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - - verifyEventOrder := func(c <-chan Event, expectedStates []State, source string) { - for _, state := range expectedStates { - for { - event, ok := <-c - if !ok { - t.Fatalf("unexpected channel close for %s", source) - } - - if event.Type != EventSession { - continue - } - - if event.State != state { - t.Fatalf("mismatched state order from %s, expected %v, received %v", source, state, event.State) - } - break - } - } - } - - states := []State{StateConnecting, StateConnected, StateHasSession} - verifyEventOrder(callbackChan, states, "callback") - verifyEventOrder(eventChan, states, "event channel") - - zk.Close() - verifyEventOrder(callbackChan, []State{StateDisconnected}, "callback") - verifyEventOrder(eventChan, []State{StateDisconnected}, "event channel") -} - -func TestCreate(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - path := "/gozk-test" - - if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create returned error: %+v", err) - } else if p != path { - t.Fatalf("Create returned different path '%s' != '%s'", p, path) - } - if data, stat, err := zk.Get(path); err != nil { - t.Fatalf("Get returned error: %+v", err) - } else if stat == nil { - t.Fatal("Get returned nil stat") - } else if len(data) < 4 { - t.Fatal("Get returned wrong size data") - } -} - -func TestMulti(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - path := "/gozk-test" - - if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - ops := []interface{}{ - &CreateRequest{Path: path, Data: []byte{1, 2, 3, 4}, Acl: WorldACL(PermAll)}, - &SetDataRequest{Path: path, Data: []byte{1, 2, 3, 4}, Version: -1}, - } - if res, err := zk.Multi(ops...); err != nil { - t.Fatalf("Multi returned error: %+v", err) - } else if len(res) != 2 { - t.Fatalf("Expected 2 responses got %d", len(res)) - } else { - t.Logf("%+v", res) - } - if data, stat, err := zk.Get(path); err != nil { - t.Fatalf("Get returned error: %+v", err) - } else if stat == nil { - t.Fatal("Get returned nil stat") - } else if len(data) < 4 { - t.Fatal("Get returned wrong size data") - } -} - -func TestIfAuthdataSurvivesReconnect(t *testing.T) { - // This test case ensures authentication data is being resubmited after - // reconnect. - testNode := "/auth-testnode" - - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - acl := DigestACL(PermAll, "userfoo", "passbar") - - _, err = zk.Create(testNode, []byte("Some very secret content"), 0, acl) - if err != nil && err != ErrNodeExists { - t.Fatalf("Failed to create test node : %+v", err) - } - - _, _, err = zk.Get(testNode) - if err == nil || err != ErrNoAuth { - var msg string - - if err == nil { - msg = "Fetching data without auth should have resulted in an error" - } else { - msg = fmt.Sprintf("Expecting ErrNoAuth, got `%+v` instead", err) - } - t.Fatalf(msg) - } - - zk.AddAuth("digest", []byte("userfoo:passbar")) - - _, _, err = zk.Get(testNode) - if err != nil { - t.Fatalf("Fetching data with auth failed: %+v", err) - } - - ts.StopAllServers() - ts.StartAllServers() - - _, _, err = zk.Get(testNode) - if err != nil { - t.Fatalf("Fetching data after reconnect failed: %+v", err) - } -} - -func TestMultiFailures(t *testing.T) { - // This test case ensures that we return the errors associated with each - // opeThis in the event a call to Multi() fails. - const firstPath = "/gozk-test-first" - const secondPath = "/gozk-test-second" - - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - // Ensure firstPath doesn't exist and secondPath does. This will cause the - // 2nd operation in the Multi() to fail. - if err := zk.Delete(firstPath, -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - if _, err := zk.Create(secondPath, nil /* data */, 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create returned error: %+v", err) - } - - ops := []interface{}{ - &CreateRequest{Path: firstPath, Data: []byte{1, 2}, Acl: WorldACL(PermAll)}, - &CreateRequest{Path: secondPath, Data: []byte{3, 4}, Acl: WorldACL(PermAll)}, - } - res, err := zk.Multi(ops...) - if err != ErrNodeExists { - t.Fatalf("Multi() didn't return correct error: %+v", err) - } - if len(res) != 2 { - t.Fatalf("Expected 2 responses received %d", len(res)) - } - if res[0].Error != nil { - t.Fatalf("First operation returned an unexpected error %+v", res[0].Error) - } - if res[1].Error != ErrNodeExists { - t.Fatalf("Second operation returned incorrect error %+v", res[1].Error) - } - if _, _, err := zk.Get(firstPath); err != ErrNoNode { - t.Fatalf("Node %s was incorrectly created: %+v", firstPath, err) - } -} - -func TestGetSetACL(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - if err := zk.AddAuth("digest", []byte("blah")); err != nil { - t.Fatalf("AddAuth returned error %+v", err) - } - - path := "/gozk-test" - - if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - if path, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create returned error: %+v", err) - } else if path != "/gozk-test" { - t.Fatalf("Create returned different path '%s' != '/gozk-test'", path) - } - - expected := WorldACL(PermAll) - - if acl, stat, err := zk.GetACL(path); err != nil { - t.Fatalf("GetACL returned error %+v", err) - } else if stat == nil { - t.Fatalf("GetACL returned nil Stat") - } else if len(acl) != 1 || expected[0] != acl[0] { - t.Fatalf("GetACL mismatch expected %+v instead of %+v", expected, acl) - } - - expected = []ACL{{PermAll, "ip", "127.0.0.1"}} - - if stat, err := zk.SetACL(path, expected, -1); err != nil { - t.Fatalf("SetACL returned error %+v", err) - } else if stat == nil { - t.Fatalf("SetACL returned nil Stat") - } - - if acl, stat, err := zk.GetACL(path); err != nil { - t.Fatalf("GetACL returned error %+v", err) - } else if stat == nil { - t.Fatalf("GetACL returned nil Stat") - } else if len(acl) != 1 || expected[0] != acl[0] { - t.Fatalf("GetACL mismatch expected %+v instead of %+v", expected, acl) - } -} - -func TestAuth(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - path := "/gozk-digest-test" - if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - - acl := DigestACL(PermAll, "user", "password") - - if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, acl); err != nil { - t.Fatalf("Create returned error: %+v", err) - } else if p != path { - t.Fatalf("Create returned different path '%s' != '%s'", p, path) - } - - if a, stat, err := zk.GetACL(path); err != nil { - t.Fatalf("GetACL returned error %+v", err) - } else if stat == nil { - t.Fatalf("GetACL returned nil Stat") - } else if len(a) != 1 || acl[0] != a[0] { - t.Fatalf("GetACL mismatch expected %+v instead of %+v", acl, a) - } - - if _, _, err := zk.Get(path); err != ErrNoAuth { - t.Fatalf("Get returned error %+v instead of ErrNoAuth", err) - } - - if err := zk.AddAuth("digest", []byte("user:password")); err != nil { - t.Fatalf("AddAuth returned error %+v", err) - } - - if data, stat, err := zk.Get(path); err != nil { - t.Fatalf("Get returned error %+v", err) - } else if stat == nil { - t.Fatalf("Get returned nil Stat") - } else if len(data) != 4 { - t.Fatalf("Get returned wrong data length") - } -} - -func TestChildren(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - deleteNode := func(node string) { - if err := zk.Delete(node, -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - } - - deleteNode("/gozk-test-big") - - if path, err := zk.Create("/gozk-test-big", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create returned error: %+v", err) - } else if path != "/gozk-test-big" { - t.Fatalf("Create returned different path '%s' != '/gozk-test-big'", path) - } - - rb := make([]byte, 1000) - hb := make([]byte, 2000) - prefix := []byte("/gozk-test-big/") - for i := 0; i < 10000; i++ { - _, err := rand.Read(rb) - if err != nil { - t.Fatal("Cannot create random znode name") - } - hex.Encode(hb, rb) - - expect := string(append(prefix, hb...)) - if path, err := zk.Create(expect, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create returned error: %+v", err) - } else if path != expect { - t.Fatalf("Create returned different path '%s' != '%s'", path, expect) - } - defer deleteNode(string(expect)) - } - - children, _, err := zk.Children("/gozk-test-big") - if err != nil { - t.Fatalf("Children returned error: %+v", err) - } else if len(children) != 10000 { - t.Fatal("Children returned wrong number of nodes") - } -} - -func TestChildWatch(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - - children, stat, childCh, err := zk.ChildrenW("/") - if err != nil { - t.Fatalf("Children returned error: %+v", err) - } else if stat == nil { - t.Fatal("Children returned nil stat") - } else if len(children) < 1 { - t.Fatal("Children should return at least 1 child") - } - - if path, err := zk.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create returned error: %+v", err) - } else if path != "/gozk-test" { - t.Fatalf("Create returned different path '%s' != '/gozk-test'", path) - } - - select { - case ev := <-childCh: - if ev.Err != nil { - t.Fatalf("Child watcher error %+v", ev.Err) - } - if ev.Path != "/" { - t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/") - } - case _ = <-time.After(time.Second * 2): - t.Fatal("Child watcher timed out") - } - - // Delete of the watched node should trigger the watch - - children, stat, childCh, err = zk.ChildrenW("/gozk-test") - if err != nil { - t.Fatalf("Children returned error: %+v", err) - } else if stat == nil { - t.Fatal("Children returned nil stat") - } else if len(children) != 0 { - t.Fatal("Children should return 0 children") - } - - if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - - select { - case ev := <-childCh: - if ev.Err != nil { - t.Fatalf("Child watcher error %+v", ev.Err) - } - if ev.Path != "/gozk-test" { - t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/") - } - case _ = <-time.After(time.Second * 2): - t.Fatal("Child watcher timed out") - } -} - -func TestSetWatchers(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - zk.reconnectLatch = make(chan struct{}) - zk.setWatchLimit = 1024 // break up set-watch step into 1k requests - var setWatchReqs atomic.Value - zk.setWatchCallback = func(reqs []*setWatchesRequest) { - setWatchReqs.Store(reqs) - } - - zk2, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk2.Close() - - if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - - testPaths := map[string]<-chan Event{} - defer func() { - // clean up all of the test paths we create - for p := range testPaths { - zk2.Delete(p, -1) - } - }() - - // we create lots of paths to watch, to make sure a "set watches" request - // on re-create will be too big and be required to span multiple packets - for i := 0; i < 1000; i++ { - testPath, err := zk.Create(fmt.Sprintf("/gozk-test-%d", i), []byte{}, 0, WorldACL(PermAll)) - if err != nil { - t.Fatalf("Create returned: %+v", err) - } - testPaths[testPath] = nil - _, _, testEvCh, err := zk.GetW(testPath) - if err != nil { - t.Fatalf("GetW returned: %+v", err) - } - testPaths[testPath] = testEvCh - } - - children, stat, childCh, err := zk.ChildrenW("/") - if err != nil { - t.Fatalf("Children returned error: %+v", err) - } else if stat == nil { - t.Fatal("Children returned nil stat") - } else if len(children) < 1 { - t.Fatal("Children should return at least 1 child") - } - - // Simulate network error by brutally closing the network connection. - zk.conn.Close() - for p := range testPaths { - if err := zk2.Delete(p, -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - } - if path, err := zk2.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { - t.Fatalf("Create returned error: %+v", err) - } else if path != "/gozk-test" { - t.Fatalf("Create returned different path '%s' != '/gozk-test'", path) - } - - time.Sleep(100 * time.Millisecond) - - // zk should still be waiting to reconnect, so none of the watches should have been triggered - for p, ch := range testPaths { - select { - case <-ch: - t.Fatalf("GetW watcher for %q should not have triggered yet", p) - default: - } - } - select { - case <-childCh: - t.Fatalf("ChildrenW watcher should not have triggered yet") - default: - } - - // now we let the reconnect occur and make sure it resets watches - close(zk.reconnectLatch) - - for p, ch := range testPaths { - select { - case ev := <-ch: - if ev.Err != nil { - t.Fatalf("GetW watcher error %+v", ev.Err) - } - if ev.Path != p { - t.Fatalf("GetW watcher wrong path %s instead of %s", ev.Path, p) - } - case <-time.After(2 * time.Second): - t.Fatal("GetW watcher timed out") - } - } - - select { - case ev := <-childCh: - if ev.Err != nil { - t.Fatalf("Child watcher error %+v", ev.Err) - } - if ev.Path != "/" { - t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/") - } - case <-time.After(2 * time.Second): - t.Fatal("Child watcher timed out") - } - - // Yay! All watches fired correctly. Now we also inspect the actual set-watch request objects - // to ensure they didn't exceed the expected packet set. - buf := make([]byte, bufferSize) - totalWatches := 0 - actualReqs := setWatchReqs.Load().([]*setWatchesRequest) - if len(actualReqs) < 12 { - // sanity check: we should have generated *at least* 12 requests to reset watches - t.Fatalf("too few setWatchesRequest messages: %d", len(actualReqs)) - } - for _, r := range actualReqs { - totalWatches += len(r.ChildWatches) + len(r.DataWatches) + len(r.ExistWatches) - n, err := encodePacket(buf, r) - if err != nil { - t.Fatalf("encodePacket failed: %v! request:\n%+v", err, r) - } else if n > 1024 { - t.Fatalf("setWatchesRequest exceeded allowed size (%d > 1024)! request:\n%+v", n, r) - } - } - - if totalWatches != len(testPaths)+1 { - t.Fatalf("setWatchesRequests did not include all expected watches; expecting %d, got %d", len(testPaths)+1, totalWatches) - } -} - -func TestExpiringWatch(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - zk, _, err := ts.ConnectAll() - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - - if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { - t.Fatalf("Delete returned error: %+v", err) - } - - children, stat, childCh, err := zk.ChildrenW("/") - if err != nil { - t.Fatalf("Children returned error: %+v", err) - } else if stat == nil { - t.Fatal("Children returned nil stat") - } else if len(children) < 1 { - t.Fatal("Children should return at least 1 child") - } - - zk.sessionID = 99999 - zk.conn.Close() - - select { - case ev := <-childCh: - if ev.Err != ErrSessionExpired { - t.Fatalf("Child watcher error %+v instead of expected ErrSessionExpired", ev.Err) - } - if ev.Path != "/" { - t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/") - } - case <-time.After(2 * time.Second): - t.Fatal("Child watcher timed out") - } -} - -func TestRequestFail(t *testing.T) { - // If connecting fails to all servers in the list then pending requests - // should be errored out so they don't hang forever. - - zk, _, err := Connect([]string{"127.0.0.1:32444"}, time.Second*15) - if err != nil { - t.Fatal(err) - } - defer zk.Close() - - ch := make(chan error) - go func() { - _, _, err := zk.Get("/blah") - ch <- err - }() - select { - case err := <-ch: - if err == nil { - t.Fatal("Expected non-nil error on failed request due to connection failure") - } - case <-time.After(time.Second * 2): - t.Fatal("Get hung when connection could not be made") - } -} - -func TestSlowServer(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - - realAddr := fmt.Sprintf("127.0.0.1:%d", ts.Servers[0].Port) - proxyAddr, stopCh, err := startSlowProxy(t, - Rate{}, Rate{}, - realAddr, func(ln *Listener) { - if ln.Up.Latency == 0 { - ln.Up.Latency = time.Millisecond * 2000 - ln.Down.Latency = time.Millisecond * 2000 - } else { - ln.Up.Latency = 0 - ln.Down.Latency = 0 - } - }) - if err != nil { - t.Fatal(err) - } - defer close(stopCh) - - zk, _, err := Connect([]string{proxyAddr}, time.Millisecond*500) - if err != nil { - t.Fatal(err) - } - defer zk.Close() - - _, _, wch, err := zk.ChildrenW("/") - if err != nil { - t.Fatal(err) - } - - // Force a reconnect to get a throttled connection - zk.conn.Close() - - time.Sleep(time.Millisecond * 100) - - if err := zk.Delete("/gozk-test", -1); err == nil { - t.Fatal("Delete should have failed") - } - - // The previous request should have timed out causing the server to be disconnected and reconnected - - if _, err := zk.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { - t.Fatal(err) - } - - // Make sure event is still returned because the session should not have been affected - select { - case ev := <-wch: - t.Logf("Received event: %+v", ev) - case <-time.After(time.Second): - t.Fatal("Expected to receive a watch event") - } -} - -func startSlowProxy(t *testing.T, up, down Rate, upstream string, adj func(ln *Listener)) (string, chan bool, error) { - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return "", nil, err - } - tln := &Listener{ - Listener: ln, - Up: up, - Down: down, - } - stopCh := make(chan bool) - go func() { - <-stopCh - tln.Close() - }() - go func() { - for { - cn, err := tln.Accept() - if err != nil { - if !strings.Contains(err.Error(), "use of closed network connection") { - t.Fatalf("Accept failed: %s", err.Error()) - } - return - } - if adj != nil { - adj(tln) - } - go func(cn net.Conn) { - defer cn.Close() - upcn, err := net.Dial("tcp", upstream) - if err != nil { - t.Log(err) - return - } - // This will leave hanging goroutines util stopCh is closed - // but it doesn't matter in the context of running tests. - go func() { - <-stopCh - upcn.Close() - }() - go func() { - if _, err := io.Copy(upcn, cn); err != nil { - if !strings.Contains(err.Error(), "use of closed network connection") { - // log.Printf("Upstream write failed: %s", err.Error()) - } - } - }() - if _, err := io.Copy(cn, upcn); err != nil { - if !strings.Contains(err.Error(), "use of closed network connection") { - // log.Printf("Upstream read failed: %s", err.Error()) - } - } - }(cn) - } - }() - return ln.Addr().String(), stopCh, nil -} - -func TestMaxBufferSize(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) - if err != nil { - t.Fatal(err) - } - defer ts.Stop() - // no buffer size - zk, _, err := ts.ConnectWithOptions(15 * time.Second) - var l testLogger - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zk.Close() - // 1k buffer size, logs to custom test logger - zkLimited, _, err := ts.ConnectWithOptions(15*time.Second, WithMaxBufferSize(1024), func(conn *Conn) { - conn.SetLogger(&l) - }) - if err != nil { - t.Fatalf("Connect returned error: %+v", err) - } - defer zkLimited.Close() - - // With small node with small number of children - data := []byte{101, 102, 103, 103} - _, err = zk.Create("/foo", data, 0, WorldACL(PermAll)) - if err != nil { - t.Fatalf("Create returned error: %+v", err) - } - var children []string - for i := 0; i < 4; i++ { - childName, err := zk.Create("/foo/child", nil, FlagEphemeral|FlagSequence, WorldACL(PermAll)) - if err != nil { - t.Fatalf("Create returned error: %+v", err) - } - children = append(children, childName[len("/foo/"):]) // strip parent prefix from name - } - sort.Strings(children) - - // Limited client works fine - resultData, _, err := zkLimited.Get("/foo") - if err != nil { - t.Fatalf("Get returned error: %+v", err) - } - if !reflect.DeepEqual(resultData, data) { - t.Fatalf("Get returned unexpected data; expecting %+v, got %+v", data, resultData) - } - resultChildren, _, err := zkLimited.Children("/foo") - if err != nil { - t.Fatalf("Children returned error: %+v", err) - } - sort.Strings(resultChildren) - if !reflect.DeepEqual(resultChildren, children) { - t.Fatalf("Children returned unexpected names; expecting %+v, got %+v", children, resultChildren) - } - - // With large node though... - data = make([]byte, 1024) - for i := 0; i < 1024; i++ { - data[i] = byte(i) - } - _, err = zk.Create("/bar", data, 0, WorldACL(PermAll)) - if err != nil { - t.Fatalf("Create returned error: %+v", err) - } - _, _, err = zkLimited.Get("/bar") - // NB: Sadly, without actually de-serializing the too-large response packet, we can't send the - // right error to the corresponding outstanding request. So the request just sees ErrConnectionClosed - // while the log will see the actual reason the connection was closed. - expectErr(t, err, ErrConnectionClosed) - expectLogMessage(t, &l, "received packet from server with length .*, which exceeds max buffer size 1024") - - // Or with large number of children... - totalLen := 0 - children = nil - for totalLen < 1024 { - childName, err := zk.Create("/bar/child", nil, FlagEphemeral|FlagSequence, WorldACL(PermAll)) - if err != nil { - t.Fatalf("Create returned error: %+v", err) - } - n := childName[len("/bar/"):] // strip parent prefix from name - children = append(children, n) - totalLen += len(n) - } - sort.Strings(children) - _, _, err = zkLimited.Children("/bar") - expectErr(t, err, ErrConnectionClosed) - expectLogMessage(t, &l, "received packet from server with length .*, which exceeds max buffer size 1024") - - // Other client (without buffer size limit) can successfully query the node and its children, of course - resultData, _, err = zk.Get("/bar") - if err != nil { - t.Fatalf("Get returned error: %+v", err) - } - if !reflect.DeepEqual(resultData, data) { - t.Fatalf("Get returned unexpected data; expecting %+v, got %+v", data, resultData) - } - resultChildren, _, err = zk.Children("/bar") - if err != nil { - t.Fatalf("Children returned error: %+v", err) - } - sort.Strings(resultChildren) - if !reflect.DeepEqual(resultChildren, children) { - t.Fatalf("Children returned unexpected names; expecting %+v, got %+v", children, resultChildren) - } -} - -func expectErr(t *testing.T, err error, expected error) { - if err == nil { - t.Fatalf("Get for node that is too large should have returned error!") - } - if err != expected { - t.Fatalf("Get returned wrong error; expecting ErrClosing, got %+v", err) - } -} - -func expectLogMessage(t *testing.T, logger *testLogger, pattern string) { - re := regexp.MustCompile(pattern) - events := logger.Reset() - if len(events) == 0 { - t.Fatalf("Failed to log error; expecting message that matches pattern: %s", pattern) - } - var found []string - for _, e := range events { - if re.Match([]byte(e)) { - found = append(found, e) - } - } - if len(found) == 0 { - t.Fatalf("Failed to log error; expecting message that matches pattern: %s", pattern) - } else if len(found) > 1 { - t.Fatalf("Logged error redundantly %d times:\n%+v", len(found), found) - } -} - -type testLogger struct { - mu sync.Mutex - events []string -} - -func (l *testLogger) Printf(msgFormat string, args ...interface{}) { - msg := fmt.Sprintf(msgFormat, args...) - fmt.Println(msg) - l.mu.Lock() - defer l.mu.Unlock() - l.events = append(l.events, msg) -} - -func (l *testLogger) Reset() []string { - l.mu.Lock() - defer l.mu.Unlock() - ret := l.events - l.events = nil - return ret -} diff --git a/vendor/modules.txt b/vendor/modules.txt new file mode 100644 index 0000000..7ca9339 --- /dev/null +++ b/vendor/modules.txt @@ -0,0 +1,6 @@ +# github.com/outbrain/golib v0.0.0-20200503083229-2531e5dbcc71 +## explicit +github.com/outbrain/golib/log +# github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 +## explicit +github.com/samuel/go-zookeeper/zk