diff --git a/.github/actions/setup-e2e/action.yaml b/.github/actions/setup-e2e/action.yaml index 343b091867..659bc5ff35 100644 --- a/.github/actions/setup-e2e/action.yaml +++ b/.github/actions/setup-e2e/action.yaml @@ -32,6 +32,10 @@ inputs: description: "If k3d is not required, set this to false" required: false default: "true" + require_k3d_storage: + description: "If local-path-storage is not required, set this to false" + required: false + default: "false" require_minikube: description: "If minikube is not required, set this to true and set require_k3d to false" required: false @@ -85,6 +89,7 @@ runs: with: agents: 3 ingress_port: ${{ inputs.ingress_port }} + storage: ${{ inputs.require_k3d_storage }} - name: Setup Minikube environment if: ${{ inputs.require_minikube == 'true' }} shell: bash diff --git a/.github/actions/setup-k3d/action.yaml b/.github/actions/setup-k3d/action.yaml index 19f0d0cf87..777bf95f11 100644 --- a/.github/actions/setup-k3d/action.yaml +++ b/.github/actions/setup-k3d/action.yaml @@ -36,6 +36,10 @@ inputs: description: "Number of agents" required: false default: "3" + storage: + description: "If true, the local-path-storage will be deployed" + required: false + default: "false" options: description: "Options for k3d cluster create command" required: false @@ -124,6 +128,11 @@ runs: echo $KUBECONFIG cat $KUBECONFIG cat /etc/hosts + - name: Set local path storage + if: ${{ inputs.storage == 'true' }} + shell: bash + run: | + make k3d/storage - name: Show Kubernetes Cluster Info shell: bash run: | diff --git a/.github/helm/values/values-rollout-agent.yaml b/.github/helm/values/values-rollout-agent.yaml new file mode 100644 index 0000000000..4d782d3388 --- /dev/null +++ b/.github/helm/values/values-rollout-agent.yaml @@ -0,0 +1,138 @@ +# +# Copyright (C) 2019-2025 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +defaults: + image: + tag: v1.7.16 + logging: + level: debug + server_config: + metrics: + pprof: + enabled: true # enable Pyroscope + healths: + liveness: + livenessProbe: + initialDelaySeconds: 60 + readiness: + readinessProbe: + initialDelaySeconds: 60 + servers: + grpc: + server: + grpc: + interceptors: + - RecoverInterceptor + - TraceInterceptor + - MetricInterceptor + grpc: + client: + dial_option: + interceptors: + - TraceInterceptor + observability: + enabled: true + otlp: + collector_endpoint: "opentelemetry-collector-collector.default.svc.cluster.local:4317" + trace: + enabled: true + networkPolicy: + enabled: true + custom: + ingress: + - from: + - podSelector: + matchLabels: + app.kubernetes.io/name: pyroscope + egress: + - to: + - podSelector: + matchLabels: + app.kubernetes.io/name: opentelemetry-collector-collector +gateway: + lb: + enabled: true + maxReplicas: 1 + minReplicas: 1 + hpa: + enabled: false + resources: + requests: + cpu: 200m + memory: 150Mi + gateway_config: + index_replica: 2 + # ingress: + # # if enabled is true, vald-lb-gateway can be connected through Kubernetes ingress from the external network. + # enabled: true + # # TODO: Set your ingress host. + # host: localhost + # service: + # # NOTE: https://doc.traefik.io/traefik/routing/providers/kubernetes-ingress/#on-service + # annotations: + # traefik.ingress.kubernetes.io/service.serversscheme: h2c +agent: + minReplicas: 3 + maxReplicas: 3 + podManagementPolicy: Parallel + hpa: + enabled: false + resources: + requests: + cpu: 100m + memory: 50Mi + # We recommend to set this value long enough to ensure the backup speed of PV, since the Index is backed up at the end of the pod. + terminationGracePeriodSeconds: 600 + # This is the persistent volume settings. + # Please change it according to your environment. + persistentVolume: + enabled: true + accessMode: ReadWriteOncePod + storageClass: local-path + size: 500Mi + ngt: + auto_index_duration_limit: 2m + auto_index_check_duration: 30s + auto_index_length: 500 + dimension: 784 + enable_in_memory_mode: false + index_path: "/var/ngt/index" +discoverer: + minReplicas: 1 + hpa: + enabled: false + resources: + requests: + cpu: 100m + memory: 50Mi +manager: + index: + enabled: true + replicas: 1 + resources: + requests: + cpu: 100m + memory: 30Mi + indexer: + auto_index_duration_limit: 2m + auto_index_check_duration: 30s + auto_index_length: 1000 + corrector: + enabled: false + # suspend because you do not want corrector to start automatically in CI + # instead run it manually + suspend: true + schedule: "1 2 3 4 5" diff --git a/.github/workflows/dockers-agent-faiss-image.yaml b/.github/workflows/dockers-agent-faiss-image.yaml index 153ee05a14..d691b4f0fa 100644 --- a/.github/workflows/dockers-agent-faiss-image.yaml +++ b/.github/workflows/dockers-agent-faiss-image.yaml @@ -265,4 +265,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: agent-faiss + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-agent-image.yaml b/.github/workflows/dockers-agent-image.yaml index f1311bf7fb..960768a228 100644 --- a/.github/workflows/dockers-agent-image.yaml +++ b/.github/workflows/dockers-agent-image.yaml @@ -71,4 +71,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: agent + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-agent-ngt-image.yaml b/.github/workflows/dockers-agent-ngt-image.yaml index 4610cb8399..13d2764da2 100644 --- a/.github/workflows/dockers-agent-ngt-image.yaml +++ b/.github/workflows/dockers-agent-ngt-image.yaml @@ -269,4 +269,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: agent-ngt + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-agent-sidecar-image.yaml b/.github/workflows/dockers-agent-sidecar-image.yaml index a3a0f6a68e..1870d99d64 100644 --- a/.github/workflows/dockers-agent-sidecar-image.yaml +++ b/.github/workflows/dockers-agent-sidecar-image.yaml @@ -295,4 +295,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: agent-sidecar + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-benchmark-job-image.yaml b/.github/workflows/dockers-benchmark-job-image.yaml index 9202808fe5..ad9d86e0c6 100644 --- a/.github/workflows/dockers-benchmark-job-image.yaml +++ b/.github/workflows/dockers-benchmark-job-image.yaml @@ -261,4 +261,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: benchmark-job + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-benchmark-operator-image.yaml b/.github/workflows/dockers-benchmark-operator-image.yaml index 403d2bb82a..68b26b13cb 100644 --- a/.github/workflows/dockers-benchmark-operator-image.yaml +++ b/.github/workflows/dockers-benchmark-operator-image.yaml @@ -255,4 +255,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: benchmark-operator + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-dev-container-image.yaml b/.github/workflows/dockers-dev-container-image.yaml index 2a55c83da2..85a4e76788 100644 --- a/.github/workflows/dockers-dev-container-image.yaml +++ b/.github/workflows/dockers-dev-container-image.yaml @@ -55,4 +55,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: dev-container + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-discoverer-k8s-image.yaml b/.github/workflows/dockers-discoverer-k8s-image.yaml index 048bebc47d..e594e1be8f 100644 --- a/.github/workflows/dockers-discoverer-k8s-image.yaml +++ b/.github/workflows/dockers-discoverer-k8s-image.yaml @@ -259,4 +259,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: discoverer-k8s + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-example-client-image.yaml b/.github/workflows/dockers-example-client-image.yaml index b529a4bc25..f3db763d4f 100644 --- a/.github/workflows/dockers-example-client-image.yaml +++ b/.github/workflows/dockers-example-client-image.yaml @@ -65,4 +65,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: example-client + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-gateway-filter-image.yaml b/.github/workflows/dockers-gateway-filter-image.yaml index cc361b2435..f092f303aa 100644 --- a/.github/workflows/dockers-gateway-filter-image.yaml +++ b/.github/workflows/dockers-gateway-filter-image.yaml @@ -259,4 +259,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: gateway-filter + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-gateway-lb-image.yaml b/.github/workflows/dockers-gateway-lb-image.yaml index 9a28c196e0..c13ab8971c 100644 --- a/.github/workflows/dockers-gateway-lb-image.yaml +++ b/.github/workflows/dockers-gateway-lb-image.yaml @@ -257,4 +257,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: gateway-lb + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-gateway-mirror-image.yaml b/.github/workflows/dockers-gateway-mirror-image.yaml index 4f6e0c485c..1c68964e8c 100644 --- a/.github/workflows/dockers-gateway-mirror-image.yaml +++ b/.github/workflows/dockers-gateway-mirror-image.yaml @@ -261,4 +261,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: gateway-mirror + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-helm-operator-image.yaml b/.github/workflows/dockers-helm-operator-image.yaml index 9b62139dbe..f2818cfa3d 100644 --- a/.github/workflows/dockers-helm-operator-image.yaml +++ b/.github/workflows/dockers-helm-operator-image.yaml @@ -65,4 +65,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: helm-operator + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-index-correction-image.yaml b/.github/workflows/dockers-index-correction-image.yaml index f8f65b969f..a98b0a26c0 100644 --- a/.github/workflows/dockers-index-correction-image.yaml +++ b/.github/workflows/dockers-index-correction-image.yaml @@ -249,4 +249,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: index-correction + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-index-creation-image.yaml b/.github/workflows/dockers-index-creation-image.yaml index 0436b46a0d..d750782597 100644 --- a/.github/workflows/dockers-index-creation-image.yaml +++ b/.github/workflows/dockers-index-creation-image.yaml @@ -243,4 +243,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: index-creation + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-index-deletion-image.yaml b/.github/workflows/dockers-index-deletion-image.yaml index 653763aad7..6cb1b50bc6 100644 --- a/.github/workflows/dockers-index-deletion-image.yaml +++ b/.github/workflows/dockers-index-deletion-image.yaml @@ -243,4 +243,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: index-deletion + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-index-operator-image.yaml b/.github/workflows/dockers-index-operator-image.yaml index 6e33ad364a..a7c62e7737 100644 --- a/.github/workflows/dockers-index-operator-image.yaml +++ b/.github/workflows/dockers-index-operator-image.yaml @@ -241,4 +241,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: index-operator + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-index-save-image.yaml b/.github/workflows/dockers-index-save-image.yaml index 4506c14dc3..9883165aa9 100644 --- a/.github/workflows/dockers-index-save-image.yaml +++ b/.github/workflows/dockers-index-save-image.yaml @@ -243,4 +243,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: index-save + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-manager-index-image.yaml b/.github/workflows/dockers-manager-index-image.yaml index 580fbfbbcd..17dec7f2fa 100644 --- a/.github/workflows/dockers-manager-index-image.yaml +++ b/.github/workflows/dockers-manager-index-image.yaml @@ -263,4 +263,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: manager-index + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/dockers-readreplica-rotate-image.yaml b/.github/workflows/dockers-readreplica-rotate-image.yaml index ab1bc9b67e..c542eee184 100644 --- a/.github/workflows/dockers-readreplica-rotate-image.yaml +++ b/.github/workflows/dockers-readreplica-rotate-image.yaml @@ -239,4 +239,5 @@ jobs: uses: ./.github/workflows/_docker-image.yaml with: target: readreplica-rotate + platforms: linux/amd64,linux/arm64 secrets: inherit diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index cd0b01df30..d70db8fa11 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -348,6 +348,49 @@ jobs: e2e/readreplica env: POD_NAME: ${{ steps.deploy_vald_readreplica.outputs.POD_NAME }} + e2e-stream-crud-with-rollout-restart-agent: + name: "E2E test (Stream CRUD) with rollout restart agent" + needs: [detect-ci-container] + runs-on: ubuntu-latest + timeout-minutes: 60 + container: + image: ghcr.io/vdaas/vald/vald-ci-container:${{ needs.detect-ci-container.outputs.TAG }} + options: "--add-host host.docker.internal:host-gateway" + steps: + - uses: actions/checkout@v4 + - name: Set Git config + run: | + git config --global --add safe.directory ${GITHUB_WORKSPACE} + - name: Setup E2E environment + id: setup_e2e + uses: ./.github/actions/setup-e2e + with: + require_k3d_storage: true + - name: Deploy Vald + id: deploy_vald + uses: ./.github/actions/e2e-deploy-vald + with: + helm_extra_options: ${{ steps.setup_e2e.outputs.HELM_EXTRA_OPTIONS }} + values: .github/helm/values/values-rollout-agent.yaml + wait_for_selector: app=vald-lb-gateway + - name: Run E2E CRUD with rollout restart agent + run: | + make hack/benchmark/assets/dataset/${{ env.DATASET }} + make E2E_BIND_PORT=8081 \ + E2E_DATASET_NAME=${{ env.DATASET }} \ + E2E_INSERT_COUNT=1000 \ + E2E_SEARCH_COUNT=1000 \ + E2E_SEARCH_BY_ID_COUNT=1000 \ + E2E_GET_OBJECT_COUNT=100 \ + E2E_UPDATE_COUNT=100 \ + E2E_UPSERT_COUNT=100 \ + E2E_REMOVE_COUNT=100 \ + E2E_WAIT_FOR_CREATE_INDEX_DURATION=3m \ + E2E_TARGET_POD_NAME=${POD_NAME} \ + E2E_TARGET_NAMESPACE=default \ + e2e/rollout/restart/agent + env: + POD_NAME: ${{ steps.deploy_vald.outputs.POD_NAME }} e2e-stream-crud-with-mirror: name: "E2E test (Stream CRUD) with mirror" needs: [detect-ci-container] @@ -417,6 +460,7 @@ jobs: - e2e-stream-crud-under-index-management-jobs - e2e-stream-crud-with-mirror - e2e-stream-crud-with-readreplica + - e2e-stream-crud-with-rollout-restart-agent runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/Makefile b/Makefile index d0ae3db8f5..79418bec3e 100644 --- a/Makefile +++ b/Makefile @@ -359,6 +359,7 @@ E2E_PORTFORWARD_ENABLED ?= true E2E_REMOVE_COUNT ?= 3 E2E_SEARCH_BY_ID_COUNT ?= 100 E2E_SEARCH_COUNT ?= 1000 +E2E_SEARCH_CONCURRENCY ?= 10 E2E_TARGET_NAME ?= vald-lb-gateway E2E_TARGET_NAMESPACE ?= default E2E_TARGET_POD_NAME ?= $(eval E2E_TARGET_POD_NAME := $(shell kubectl get pods --selector=app=$(E2E_TARGET_NAME) -n $(E2E_TARGET_NAMESPACE) | tail -1 | cut -f1 -d " "))$(E2E_TARGET_POD_NAME) @@ -368,6 +369,7 @@ E2E_UPDATE_COUNT ?= 10 E2E_UPSERT_COUNT ?= 10 E2E_WAIT_FOR_CREATE_INDEX_DURATION ?= 8m E2E_WAIT_FOR_START_TIMEOUT ?= 10m +E2E_WAIT_FOR_RESOURCE_READY ?= 3m E2E_SEARCH_FROM ?= 0 E2E_SEARCH_BY_ID_FROM ?= 0 E2E_INSERT_FROM ?= 0 diff --git a/Makefile.d/e2e.mk b/Makefile.d/e2e.mk index 6ebdc3ad35..bcce7cca06 100644 --- a/Makefile.d/e2e.mk +++ b/Makefile.d/e2e.mk @@ -89,6 +89,21 @@ e2e/index/job/correction: e2e/readreplica: $(call run-e2e-crud-test,-run TestE2EReadReplica) +.PHONY: e2e/standard/rollaout/restart/agent +## run rollout-restart agent e2e +e2e/standard/rollout/restart/agent: + $(call run-e2e-crud-test,-run TestE2EStandardCRUDWithRolloutRestart) + +.PHONY: e2e/stream/rollaout/restart/agent +## run rollout-restart agent e2e +e2e/stream/rollout/restart/agent: + $(call run-e2e-crud-test,-run TestE2EAgentRolloutRestart) + +.PHONY: e2e/large/search +## run big multisearch agent e2e +e2e/large/search: + $(call run-e2e-crud-test,-run TestE2EHighConcurrencyMultiSearch) + .PHONY: e2e/maxdim ## run e2e/maxdim e2e/maxdim: diff --git a/Makefile.d/functions.mk b/Makefile.d/functions.mk index 58d8ca8f17..99b61cec7c 100644 --- a/Makefile.d/functions.mk +++ b/Makefile.d/functions.mk @@ -150,6 +150,7 @@ define run-e2e-crud-test -correction-insert-num=$(E2E_INSERT_COUNT) \ -insert-num=$(E2E_INSERT_COUNT) \ -search-num=$(E2E_SEARCH_COUNT) \ + -search-conn=$(E2E_SEARCH_CONCURRENCY) \ -search-by-id-num=$(E2E_SEARCH_BY_ID_COUNT) \ -get-object-num=$(E2E_GET_OBJECT_COUNT) \ -update-num=$(E2E_UPDATE_COUNT) \ @@ -163,6 +164,7 @@ define run-e2e-crud-test -search-by-id-from=$(E2E_SEARCH_BY_ID_FROM) \ -get-object-from=$(E2E_GET_OBJECT_FROM) \ -wait-after-insert=$(E2E_WAIT_FOR_CREATE_INDEX_DURATION) \ + -wait-resource-ready=$(E2E_WAIT_FOR_RESOURCE_READY) \ -portforward=$(E2E_PORTFORWARD_ENABLED) \ -portforward-pod-name=$(E2E_TARGET_POD_NAME) \ -portforward-pod-port=$(E2E_TARGET_PORT) \ diff --git a/Makefile.d/k3d.mk b/Makefile.d/k3d.mk index 9971904a32..aec7a3475e 100644 --- a/Makefile.d/k3d.mk +++ b/Makefile.d/k3d.mk @@ -40,6 +40,7 @@ k3d/start: --image docker.io/rancher/k3s:$(K3S_VERSION) \ --host-pid-mode=$(K3D_HOST_PID_MODE) \ --api-port $(K3D_HOST):$(K3D_PORT) \ + --registry-create local-registry:0.0.0.0:5000 \ -v "/lib/modules:/lib/modules" \ $(K3D_OPTIONS) @make k3d/config diff --git a/dockers/agent/core/agent/Dockerfile b/dockers/agent/core/agent/Dockerfile index caf56e6e0e..c77f010b4a 100644 --- a/dockers/agent/core/agent/Dockerfile +++ b/dockers/agent/core/agent/Dockerfile @@ -94,4 +94,4 @@ LABEL maintainer="vdaas.org vald team " COPY --from=builder /usr/bin/agent /usr/bin/agent # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/agent"] +ENTRYPOINT ["/usr/bin/agent"] \ No newline at end of file diff --git a/dockers/agent/core/faiss/Dockerfile b/dockers/agent/core/faiss/Dockerfile index 12d0a9ecf8..49288c2c95 100644 --- a/dockers/agent/core/faiss/Dockerfile +++ b/dockers/agent/core/faiss/Dockerfile @@ -96,4 +96,4 @@ COPY --from=builder /usr/bin/faiss /usr/bin/faiss COPY cmd/agent/core/faiss/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/faiss"] +ENTRYPOINT ["/usr/bin/faiss"] \ No newline at end of file diff --git a/dockers/agent/core/ngt/Dockerfile b/dockers/agent/core/ngt/Dockerfile index 2acf8fc761..c806de1ea6 100644 --- a/dockers/agent/core/ngt/Dockerfile +++ b/dockers/agent/core/ngt/Dockerfile @@ -95,4 +95,4 @@ COPY --from=builder /usr/bin/ngt /usr/bin/ngt COPY cmd/agent/core/ngt/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/ngt"] +ENTRYPOINT ["/usr/bin/ngt"] \ No newline at end of file diff --git a/dockers/agent/sidecar/Dockerfile b/dockers/agent/sidecar/Dockerfile index ede4d8c618..d5cd04ea94 100644 --- a/dockers/agent/sidecar/Dockerfile +++ b/dockers/agent/sidecar/Dockerfile @@ -85,4 +85,4 @@ LABEL maintainer="vdaas.org vald team " COPY --from=builder /usr/bin/sidecar /usr/bin/sidecar # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/sidecar"] +ENTRYPOINT ["/usr/bin/sidecar"] \ No newline at end of file diff --git a/dockers/binfmt/Dockerfile b/dockers/binfmt/Dockerfile index 0be93ba3d9..f63da8d40a 100644 --- a/dockers/binfmt/Dockerfile +++ b/dockers/binfmt/Dockerfile @@ -17,4 +17,4 @@ # # DO_NOT_EDIT this Dockerfile is generated by https://github.com/vdaas/vald/blob/main/hack/docker/gen/main.go -FROM tonistiigi/binfmt:master AS builder +FROM tonistiigi/binfmt:master AS builder \ No newline at end of file diff --git a/dockers/buildbase/Dockerfile b/dockers/buildbase/Dockerfile index 5dde4958c7..85d2385659 100644 --- a/dockers/buildbase/Dockerfile +++ b/dockers/buildbase/Dockerfile @@ -17,4 +17,4 @@ # # DO_NOT_EDIT this Dockerfile is generated by https://github.com/vdaas/vald/blob/main/hack/docker/gen/main.go -FROM ubuntu:devel AS builder +FROM ubuntu:devel AS builder \ No newline at end of file diff --git a/dockers/buildkit/Dockerfile b/dockers/buildkit/Dockerfile index 43a5a5b0b3..e63c6f5ebc 100644 --- a/dockers/buildkit/Dockerfile +++ b/dockers/buildkit/Dockerfile @@ -17,4 +17,4 @@ # # DO_NOT_EDIT this Dockerfile is generated by https://github.com/vdaas/vald/blob/main/hack/docker/gen/main.go -FROM moby/buildkit:master AS builder +FROM moby/buildkit:master AS builder \ No newline at end of file diff --git a/dockers/buildkit/syft/scanner/Dockerfile b/dockers/buildkit/syft/scanner/Dockerfile index 87be4558e9..b840429bf4 100644 --- a/dockers/buildkit/syft/scanner/Dockerfile +++ b/dockers/buildkit/syft/scanner/Dockerfile @@ -17,4 +17,4 @@ # # DO_NOT_EDIT this Dockerfile is generated by https://github.com/vdaas/vald/blob/main/hack/docker/gen/main.go -FROM docker/buildkit-syft-scanner:edge AS scanner +FROM docker/buildkit-syft-scanner:edge AS scanner \ No newline at end of file diff --git a/dockers/ci/base/Dockerfile b/dockers/ci/base/Dockerfile index 12572f742a..540c43eb2b 100644 --- a/dockers/ci/base/Dockerfile +++ b/dockers/ci/base/Dockerfile @@ -127,4 +127,4 @@ RUN --mount=type=bind,target=.,rw \ && rm -rf ${GOPATH}/src/github.com/${ORG}/${REPO}/* # skipcq: DOK-DL3002 USER root:root -ENTRYPOINT ["/bin/bash"] +ENTRYPOINT ["/bin/bash"] \ No newline at end of file diff --git a/dockers/dev/Dockerfile b/dockers/dev/Dockerfile index f176b94d28..03c4eeaa6f 100644 --- a/dockers/dev/Dockerfile +++ b/dockers/dev/Dockerfile @@ -140,4 +140,4 @@ RUN --mount=type=bind,target=.,rw \ && make faiss/install \ && rm -rf ${GOPATH}/src/github.com/${ORG}/${REPO}/* # skipcq: DOK-DL3002 -USER root:root +USER root:root \ No newline at end of file diff --git a/dockers/discoverer/k8s/Dockerfile b/dockers/discoverer/k8s/Dockerfile index 91555000dd..b62f0b6483 100644 --- a/dockers/discoverer/k8s/Dockerfile +++ b/dockers/discoverer/k8s/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/discoverer /usr/bin/discoverer COPY cmd/discoverer/k8s/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/discoverer"] +ENTRYPOINT ["/usr/bin/discoverer"] \ No newline at end of file diff --git a/dockers/example/client/Dockerfile b/dockers/example/client/Dockerfile index 5ec5cdc9a8..6d4b012261 100644 --- a/dockers/example/client/Dockerfile +++ b/dockers/example/client/Dockerfile @@ -92,4 +92,4 @@ LABEL maintainer="vdaas.org vald team " COPY --from=builder /usr/bin/client /usr/bin/client # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/client"] +ENTRYPOINT ["/usr/bin/client"] \ No newline at end of file diff --git a/dockers/gateway/filter/Dockerfile b/dockers/gateway/filter/Dockerfile index 81538ee43d..1764f27578 100644 --- a/dockers/gateway/filter/Dockerfile +++ b/dockers/gateway/filter/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/filter /usr/bin/filter COPY cmd/gateway/filter/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/filter"] +ENTRYPOINT ["/usr/bin/filter"] \ No newline at end of file diff --git a/dockers/gateway/lb/Dockerfile b/dockers/gateway/lb/Dockerfile index e31433c4b6..eba440c8fc 100644 --- a/dockers/gateway/lb/Dockerfile +++ b/dockers/gateway/lb/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/lb /usr/bin/lb COPY cmd/gateway/lb/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/lb"] +ENTRYPOINT ["/usr/bin/lb"] \ No newline at end of file diff --git a/dockers/gateway/mirror/Dockerfile b/dockers/gateway/mirror/Dockerfile index 2ae86669a2..3010c06b95 100644 --- a/dockers/gateway/mirror/Dockerfile +++ b/dockers/gateway/mirror/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/mirror /usr/bin/mirror COPY cmd/gateway/mirror/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/mirror"] +ENTRYPOINT ["/usr/bin/mirror"] \ No newline at end of file diff --git a/dockers/index/job/correction/Dockerfile b/dockers/index/job/correction/Dockerfile index 058abbf82d..8ae06c4216 100644 --- a/dockers/index/job/correction/Dockerfile +++ b/dockers/index/job/correction/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/index-correction /usr/bin/index-correction COPY cmd/index/job/correction/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/index-correction"] +ENTRYPOINT ["/usr/bin/index-correction"] \ No newline at end of file diff --git a/dockers/index/job/creation/Dockerfile b/dockers/index/job/creation/Dockerfile index 96262c44a0..f04848ba16 100644 --- a/dockers/index/job/creation/Dockerfile +++ b/dockers/index/job/creation/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/index-creation /usr/bin/index-creation COPY cmd/index/job/creation/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/index-creation"] +ENTRYPOINT ["/usr/bin/index-creation"] \ No newline at end of file diff --git a/dockers/index/job/deletion/Dockerfile b/dockers/index/job/deletion/Dockerfile index d4226f8896..a8e30aafb3 100644 --- a/dockers/index/job/deletion/Dockerfile +++ b/dockers/index/job/deletion/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/index-deletion /usr/bin/index-deletion COPY cmd/index/job/deletion/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/index-deletion"] +ENTRYPOINT ["/usr/bin/index-deletion"] \ No newline at end of file diff --git a/dockers/index/job/readreplica/rotate/Dockerfile b/dockers/index/job/readreplica/rotate/Dockerfile index 330e68b21f..757abce176 100644 --- a/dockers/index/job/readreplica/rotate/Dockerfile +++ b/dockers/index/job/readreplica/rotate/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/readreplica-rotate /usr/bin/readreplica-rotate COPY cmd/index/job/readreplica/rotate/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/readreplica-rotate"] +ENTRYPOINT ["/usr/bin/readreplica-rotate"] \ No newline at end of file diff --git a/dockers/index/job/save/Dockerfile b/dockers/index/job/save/Dockerfile index 903988ea48..88eaca0c07 100644 --- a/dockers/index/job/save/Dockerfile +++ b/dockers/index/job/save/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/index-save /usr/bin/index-save COPY cmd/index/job/save/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/index-save"] +ENTRYPOINT ["/usr/bin/index-save"] \ No newline at end of file diff --git a/dockers/index/operator/Dockerfile b/dockers/index/operator/Dockerfile index fdfa113d5e..c31cf644bc 100644 --- a/dockers/index/operator/Dockerfile +++ b/dockers/index/operator/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/index-operator /usr/bin/index-operator COPY cmd/index/operator/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/index-operator"] +ENTRYPOINT ["/usr/bin/index-operator"] \ No newline at end of file diff --git a/dockers/manager/index/Dockerfile b/dockers/manager/index/Dockerfile index 80426e7c6d..c3f00056e2 100644 --- a/dockers/manager/index/Dockerfile +++ b/dockers/manager/index/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/index /usr/bin/index COPY cmd/manager/index/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/index"] +ENTRYPOINT ["/usr/bin/index"] \ No newline at end of file diff --git a/dockers/operator/helm/Dockerfile b/dockers/operator/helm/Dockerfile index b40530fc31..0bdbd4ae13 100644 --- a/dockers/operator/helm/Dockerfile +++ b/dockers/operator/helm/Dockerfile @@ -107,4 +107,4 @@ COPY --from=builder /opt/helm/charts/vald /opt/helm/charts/vald COPY --from=builder /opt/helm/charts/vald-helm-operator /opt/helm/charts/vald-helm-operator # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/helm-operator", "run", "--watches-file=/opt/helm/watches.yaml"] +ENTRYPOINT ["/usr/bin/helm-operator", "run", "--watches-file=/opt/helm/watches.yaml"] \ No newline at end of file diff --git a/dockers/tools/benchmark/job/Dockerfile b/dockers/tools/benchmark/job/Dockerfile index fc778ef054..69fc24185b 100644 --- a/dockers/tools/benchmark/job/Dockerfile +++ b/dockers/tools/benchmark/job/Dockerfile @@ -93,4 +93,4 @@ COPY --from=builder /usr/bin/job /usr/bin/job COPY cmd/tools/benchmark/job/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/job"] +ENTRYPOINT ["/usr/bin/job"] \ No newline at end of file diff --git a/dockers/tools/benchmark/operator/Dockerfile b/dockers/tools/benchmark/operator/Dockerfile index c225fbaac6..b352196472 100644 --- a/dockers/tools/benchmark/operator/Dockerfile +++ b/dockers/tools/benchmark/operator/Dockerfile @@ -86,4 +86,4 @@ COPY --from=builder /usr/bin/operator /usr/bin/operator COPY cmd/tools/benchmark/operator/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/operator"] +ENTRYPOINT ["/usr/bin/operator"] \ No newline at end of file diff --git a/dockers/tools/cli/loadtest/Dockerfile b/dockers/tools/cli/loadtest/Dockerfile index 9ac5f7dd14..0b80e452bf 100644 --- a/dockers/tools/cli/loadtest/Dockerfile +++ b/dockers/tools/cli/loadtest/Dockerfile @@ -93,4 +93,4 @@ COPY --from=builder /usr/bin/loadtest /usr/bin/loadtest COPY cmd/tools/cli/loadtest/sample.yaml /etc/server/config.yaml # skipcq: DOK-DL3002 USER nonroot:nonroot -ENTRYPOINT ["/usr/bin/loadtest"] +ENTRYPOINT ["/usr/bin/loadtest"] \ No newline at end of file diff --git a/hack/docker/gen/main.go b/hack/docker/gen/main.go index 49cd257124..3caa5cbea1 100644 --- a/hack/docker/gen/main.go +++ b/hack/docker/gen/main.go @@ -1023,6 +1023,9 @@ jobs: workflow.On.PullRequest.Paths = slices.Compact(workflow.On.PullRequest.Paths) workflow.On.PullRequestTarget.Paths = workflow.On.PullRequest.Paths + if data.BuildPlatforms == "" { + data.BuildPlatforms = multiPlatforms + } workflow.Jobs.Build.With.Platforms = data.BuildPlatforms workflowYamlTmp, err := yaml.Marshal(workflow) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 66fc3fbf2f..2c60359b2a 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -704,7 +704,7 @@ func (n *ngt) initNGT(opts ...core.Option) (err error) { var current uint64 if err != nil { if !n.enableCopyOnWrite { - log.Debug("failed to load vald index from %s\t error: %v", n.path, err) + log.Debugf("failed to load vald index from %s\t error: %v", n.path, err) if n.kvs == nil { n.kvs = kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency)) } else if n.kvs.Len() > 0 { diff --git a/tests/e2e/crud/crud_test.go b/tests/e2e/crud/crud_test.go index bfda87cdd9..a297e320e3 100644 --- a/tests/e2e/crud/crud_test.go +++ b/tests/e2e/crud/crud_test.go @@ -26,6 +26,7 @@ import ( "os" "os/exec" "strings" + "sync" "testing" "time" @@ -33,6 +34,7 @@ import ( "github.com/vdaas/vald/internal/file" "github.com/vdaas/vald/internal/net/grpc/codes" "github.com/vdaas/vald/internal/net/grpc/status" + "github.com/vdaas/vald/internal/sync/errgroup" "github.com/vdaas/vald/tests/e2e/hdf5" "github.com/vdaas/vald/tests/e2e/kubernetes/client" "github.com/vdaas/vald/tests/e2e/kubernetes/kubectl" @@ -54,15 +56,17 @@ var ( upsertNum int removeNum int - insertFrom int - searchFrom int - searchByIDFrom int - getObjectFrom int - updateFrom int - upsertFrom int - removeFrom int + insertFrom int + searchFrom int + searchByIDFrom int + searchConcurrency int + getObjectFrom int + updateFrom int + upsertFrom int + removeFrom int - waitAfterInsertDuration time.Duration + waitAfterInsertDuration time.Duration + waitResourceReadyDuration time.Duration kubeClient client.Client namespace string @@ -80,7 +84,8 @@ func init() { flag.IntVar(&insertNum, "insert-num", 10000, "number of id-vector pairs used for insert") flag.IntVar(&correctionInsertNum, "correction-insert-num", 10000, "number of id-vector pairs used for insert") flag.IntVar(&searchNum, "search-num", 10000, "number of id-vector pairs used for search") - flag.IntVar(&searchByIDNum, "search-by-id-num", 100, "number of id-vector pairs used for search-by-id") + flag.IntVar(&searchByIDNum, "search-by-id-num", 3, "number of id-vector pairs used for search-by-id") + flag.IntVar(&searchConcurrency, "search-conn", 100, "number of search concurrency") flag.IntVar(&getObjectNum, "get-object-num", 100, "number of id-vector pairs used for get-object") flag.IntVar(&updateNum, "update-num", 10000, "number of id-vector pairs used for update") flag.IntVar(&upsertNum, "upsert-num", 10000, "number of id-vector pairs used for upsert") @@ -96,6 +101,7 @@ func init() { datasetName := flag.String("dataset", "fashion-mnist-784-euclidean.hdf5", "dataset") waitAfterInsert := flag.String("wait-after-insert", "3m", "wait duration after inserting vectors") + waitResourceReady := flag.String("wait-resource-ready", "3m", "wait duration for resource ready") pf := flag.Bool("portforward", false, "enable port forwarding") pfPodName := flag.String("portforward-pod-name", "vald-gateway-0", "pod name (only for port forward)") @@ -131,6 +137,11 @@ func init() { if err != nil { panic(err) } + + waitResourceReadyDuration, err = time.ParseDuration(*waitResourceReady) + if err != nil { + panic(err) + } } func teardown() { @@ -171,7 +182,7 @@ func TestE2ESearchOnly(t *testing.T) { t.Fatalf("an error occurred: %s", err) } - err = op.Search(t, ctx, operation.Dataset{ + err = op.StreamSearch(t, ctx, operation.Dataset{ Test: ds.Test[searchFrom : searchFrom+searchNum], Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], }) @@ -189,7 +200,7 @@ func TestE2ELinearSearchOnly(t *testing.T) { t.Fatalf("an error occurred: %s", err) } - err = op.LinearSearch(t, ctx, operation.Dataset{ + err = op.StreamLinearSearch(t, ctx, operation.Dataset{ Test: ds.Test[searchFrom : searchFrom+searchNum], Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], }) @@ -284,7 +295,7 @@ func TestE2EInsertAndSearch(t *testing.T) { sleep(t, waitAfterInsertDuration) - err = op.Search(t, ctx, operation.Dataset{ + err = op.StreamSearch(t, ctx, operation.Dataset{ Test: ds.Test[searchFrom : searchFrom+searchNum], Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], }) @@ -311,7 +322,7 @@ func TestE2EInsertAndLinearSearch(t *testing.T) { sleep(t, waitAfterInsertDuration) - err = op.LinearSearch(t, ctx, operation.Dataset{ + err = op.StreamLinearSearch(t, ctx, operation.Dataset{ Test: ds.Test[searchFrom : searchFrom+searchNum], Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], }) @@ -338,7 +349,7 @@ func TestE2EStandardCRUD(t *testing.T) { sleep(t, waitAfterInsertDuration) - err = op.Search(t, ctx, operation.Dataset{ + err = op.StreamSearch(t, ctx, operation.Dataset{ Test: ds.Test[searchFrom : searchFrom+searchNum], Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], }) @@ -346,14 +357,14 @@ func TestE2EStandardCRUD(t *testing.T) { t.Fatalf("an error occurred: %s", err) } - err = op.SearchByID(t, ctx, operation.Dataset{ + op.StreamSearchByID(t, ctx, operation.Dataset{ Train: ds.Train[searchByIDFrom : searchByIDFrom+searchByIDNum], }) if err != nil { t.Fatalf("an error occurred: %s", err) } - err = op.LinearSearch(t, ctx, operation.Dataset{ + err = op.StreamLinearSearch(t, ctx, operation.Dataset{ Test: ds.Test[searchFrom : searchFrom+searchNum], Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], }) @@ -361,7 +372,7 @@ func TestE2EStandardCRUD(t *testing.T) { t.Fatalf("an error occurred: %s", err) } - err = op.LinearSearchByID(t, ctx, operation.Dataset{ + err = op.StreamLinearSearchByID(t, ctx, operation.Dataset{ Train: ds.Train[searchByIDFrom : searchByIDFrom+searchByIDNum], }) if err != nil { @@ -911,7 +922,7 @@ func TestE2EReadReplica(t *testing.T) { t.Fatalf("failed to wait for read replica rotator jobs to complete: %s", err) } - err = op.Search(t, ctx, operation.Dataset{ + err = op.StreamSearch(t, ctx, operation.Dataset{ Test: ds.Test[searchFrom : searchFrom+searchNum], Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], }) @@ -919,14 +930,14 @@ func TestE2EReadReplica(t *testing.T) { t.Fatalf("an error occurred: %s", err) } - err = op.SearchByID(t, ctx, operation.Dataset{ + err = op.StreamSearch(t, ctx, operation.Dataset{ Train: ds.Train[searchByIDFrom : searchByIDFrom+searchByIDNum], }) if err != nil { t.Fatalf("an error occurred: %s", err) } - err = op.LinearSearch(t, ctx, operation.Dataset{ + err = op.StreamLinearSearch(t, ctx, operation.Dataset{ Test: ds.Test[searchFrom : searchFrom+searchNum], Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], }) @@ -934,7 +945,7 @@ func TestE2EReadReplica(t *testing.T) { t.Fatalf("an error occurred: %s", err) } - err = op.LinearSearchByID(t, ctx, operation.Dataset{ + err = op.StreamLinearSearchByID(t, ctx, operation.Dataset{ Train: ds.Train[searchByIDFrom : searchByIDFrom+searchByIDNum], }) if err != nil { @@ -987,3 +998,371 @@ func TestE2EReadReplica(t *testing.T) { t.Fatalf("an error occurred: %s", err) } } + +// TestE2EAgentRolloutRestart tests that search requests succeed with rollout restart vald-agent. +func TestE2EAgentRolloutRestart(t *testing.T) { + t.Cleanup(teardown) + + if kubeClient == nil { + var err error + kubeClient, err = client.New(kubeConfig) + if err != nil { + t.Skipf("TestE2EReadReplica needs kubernetes client but failed to create one: %s", err) + } + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + op, err := operation.New(host, port) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Upsert(t, ctx, operation.Dataset{ + Train: ds.Train[insertFrom : insertFrom+insertNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + sleep(t, waitAfterInsertDuration) + + searchFunc := func(ctx context.Context) error { + return op.StreamSearch(t, ctx, operation.Dataset{ + Test: ds.Test[searchFrom : searchFrom+searchNum], + Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], + }) + } + + wg := sync.WaitGroup{} + mu := sync.Mutex{} + done := make(chan struct{}) + var serr error + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + eg, egctx := errgroup.New(ctx) + for i := 0; i < searchConcurrency; i++ { + eg.Go(func() (e error) { + ierr := searchFunc(egctx) + if ierr != nil { + st, ok := status.FromError(ierr) + if ok && st.Code() == codes.DeadlineExceeded { + _, _, rerr := status.ParseError(ierr, codes.DeadlineExceeded, "an error occurred") + mu.Lock() + e = errors.Join(e, rerr) + mu.Unlock() + } + } + return + }) + } + egerr := eg.Wait() + mu.Lock() + serr = errors.Join(serr, egerr) + mu.Unlock() + time.Sleep(10 * time.Second) + } + } + }() + + // Wait for StatefulSet to be ready + t.Log("rollout restart agent and waiting for agent pods ready...") + err = kubectl.RolloutResourceName(ctx, t, "statefulset", "vald-agent", waitResourceReadyDuration.String()) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + cnt, err := op.IndexInfo(t, ctx) + if err != nil { + if cnt == nil { + t.Fatalf("an error occurred: err = %s", err) + } + t.Fatalf("an error occurred: count = %d, err = %s", cnt.Stored, err) + } + + err = op.Exists(t, ctx, "0") + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.GetObject(t, ctx, operation.Dataset{ + Train: ds.Train[getObjectFrom : getObjectFrom+getObjectNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Remove(t, ctx, operation.Dataset{ + Train: ds.Train[removeFrom : removeFrom+removeNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + // Remove all vector data after the current - 1 hour. + err = op.RemoveByTimestamp(t, ctx, time.Now().Add(-time.Hour).UnixNano()) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + close(done) + wg.Wait() + if serr != nil { + t.Fatalf("an error occurred: %s", serr) + } +} + +// TestE2EHighConcurrencyMultiSearch tests that high concurrency search requests succeed. +func TestE2EHighConcurrencyMultiSearch(t *testing.T) { + t.Cleanup(teardown) + + if kubeClient == nil { + var err error + kubeClient, err = client.New(kubeConfig) + if err != nil { + t.Skipf("TestE2EReadReplica needs kubernetes client but failed to create one: %s", err) + } + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + op, err := operation.New(host, port) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Upsert(t, ctx, operation.Dataset{ + Train: ds.Train[insertFrom : insertFrom+insertNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + sleep(t, waitAfterInsertDuration) + + searchFunc := func(ctx context.Context) error { + return op.MultiSearch(t, ctx, operation.Dataset{ + Test: ds.Test[searchFrom : searchFrom+searchNum], + Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], + }) + } + + wg := sync.WaitGroup{} + mu := sync.Mutex{} + var serr error + wg.Add(1) + done := make(chan struct{}) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + eg, egctx := errgroup.New(ctx) + for i := 0; i < searchConcurrency; i++ { + eg.Go(func() (e error) { + ierr := searchFunc(egctx) + if ierr != nil { + st, ok := status.FromError(ierr) + if ok && st.Code() == codes.DeadlineExceeded { + _, _, rerr := status.ParseError(ierr, codes.DeadlineExceeded, "an error occurred") + mu.Lock() + e = errors.Join(e, rerr) + mu.Unlock() + } + } + return + }) + } + egerr := eg.Wait() + mu.Lock() + serr = errors.Join(serr, egerr) + mu.Unlock() + time.Sleep(5 * time.Second) + } + } + }() + + // Wait for StatefulSet to be ready + t.Log("rollout restart agent and waiting for agent pods ready...") + err = kubectl.RolloutResourceName(ctx, t, "statefulset", "vald-agent", waitResourceReadyDuration.String()) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + cnt, err := op.IndexInfo(t, ctx) + if err != nil { + if cnt == nil { + t.Fatalf("an error occurred: err = %s", err) + } + t.Fatalf("an error occurred: count = %d, err = %s", cnt.Stored, err) + } + + err = op.Exists(t, ctx, "0") + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.GetObject(t, ctx, operation.Dataset{ + Train: ds.Train[getObjectFrom : getObjectFrom+getObjectNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Remove(t, ctx, operation.Dataset{ + Train: ds.Train[removeFrom : removeFrom+removeNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + // Remove all vector data after the current - 1 hour. + err = op.RemoveByTimestamp(t, ctx, time.Now().Add(-time.Hour).UnixNano()) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + close(done) + wg.Wait() + if serr != nil { + t.Fatalf("an error occurred: %s", serr.Error()) + } +} + +// TestE2EStandardCRUDWithRolloutRestart tests that standard CRUD test with rollout restart agent +func TestE2EStandardCRUDWithRolloutRestart(t *testing.T) { + t.Cleanup(teardown) + ctx := context.Background() + + op, err := operation.New(host, port) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Insert(t, ctx, operation.Dataset{ + Train: ds.Train[insertFrom : insertFrom+insertNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + sleep(t, waitAfterInsertDuration) + + searchFunc := func(ctx context.Context) error { + return op.Search(t, ctx, operation.Dataset{ + Test: ds.Test[searchFrom : searchFrom+searchNum], + Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], + }) + } + + wg := sync.WaitGroup{} + wg.Add(1) + mu := sync.Mutex{} + var serr error + wg.Add(1) + done := make(chan struct{}) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + eg, egctx := errgroup.New(ctx) + for i := 0; i < searchConcurrency; i++ { + eg.Go(func() (e error) { + ierr := searchFunc(egctx) + if ierr != nil { + t.Log(ierr) + st, ok := status.FromError(ierr) + if ok && st.Code() == codes.DeadlineExceeded { + _, _, rerr := status.ParseError(ierr, codes.DeadlineExceeded, "an error occurred") + mu.Lock() + e = errors.Join(e, rerr) + mu.Unlock() + } + } + return + }) + } + egerr := eg.Wait() + mu.Lock() + serr = errors.Join(serr, egerr) + mu.Unlock() + time.Sleep(5 * time.Second) + } + } + }() + + // Wait for StatefulSet to be ready + t.Log("rollout restart agent and waiting for agent pods ready...") + err = kubectl.RolloutResourceName(ctx, t, "statefulset", "vald-agent", waitResourceReadyDuration.String()) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Exists(t, ctx, "0") + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.GetObject(t, ctx, operation.Dataset{ + Train: ds.Train[getObjectFrom : getObjectFrom+getObjectNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.StreamListObject(t, ctx, operation.Dataset{ + Train: ds.Train[insertFrom : insertFrom+insertNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Update(t, ctx, operation.Dataset{ + Train: ds.Train[updateFrom : updateFrom+updateNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Upsert(t, ctx, operation.Dataset{ + Train: ds.Train[upsertFrom : upsertFrom+upsertNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Remove(t, ctx, operation.Dataset{ + Train: ds.Train[removeFrom : removeFrom+removeNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + // Remove all vector data after the current - 1 hour. + err = op.RemoveByTimestamp(t, ctx, time.Now().Add(-time.Hour).UnixNano()) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + close(done) + wg.Done() + mu.Lock() + if serr != nil { + t.Fatalf("an error occurred: %s", serr.Error()) + } + mu.Unlock() +} diff --git a/tests/e2e/kubernetes/client/client.go b/tests/e2e/kubernetes/client/client.go index cf41f5f515..71809efa16 100644 --- a/tests/e2e/kubernetes/client/client.go +++ b/tests/e2e/kubernetes/client/client.go @@ -68,6 +68,11 @@ type Client interface { name, namespace string, cronJob *v1.CronJob, ) error + WaitForStatefulSetReady( + ctx context.Context, + namespace, name string, + timeout time.Duration, + ) (ok bool, err error) } type client struct { @@ -201,3 +206,28 @@ func (cli *client) CreateJobFromCronJob( _, err := cli.clientset.BatchV1().Jobs(namespace).Create(ctx, job, metav1.CreateOptions{}) return err } + +func (cli *client) WaitForStatefulSetReady( + ctx context.Context, namespace, name string, timeout time.Duration, +) (ok bool, err error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + tick := time.NewTicker(time.Second) + defer tick.Stop() + + for { + ss, err := cli.clientset.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if ss.Status.UpdatedReplicas == ss.Status.Replicas && ss.Status.ReadyReplicas == ss.Status.Replicas { + return true, nil + } + select { + case <-ctx.Done(): + return false, ctx.Err() + case <-tick.C: + } + } +} diff --git a/tests/e2e/kubernetes/kubectl/kubectl.go b/tests/e2e/kubernetes/kubectl/kubectl.go index 9e13d37482..92f917737a 100644 --- a/tests/e2e/kubernetes/kubectl/kubectl.go +++ b/tests/e2e/kubernetes/kubectl/kubectl.go @@ -19,9 +19,11 @@ package kubectl import ( + "bufio" "context" "fmt" "os/exec" + "strings" "testing" "github.com/vdaas/vald/internal/errors" @@ -40,6 +42,49 @@ func RolloutResource(ctx context.Context, t *testing.T, resource string) error { return runCmd(t, cmd) } +func RolloutResourceName( + ctx context.Context, t *testing.T, resource string, name string, timeout string, +) error { + t.Helper() + cmd := exec.CommandContext(ctx, "kubectl", "rollout", "restart", resource, name) + if err := runCmd(t, cmd); err != nil { + return err + } + + r := strings.Join([]string{resource, name}, "/") + to := strings.Join([]string{"--timeout", timeout}, "=") + cmd = exec.CommandContext(ctx, "kubectl", "rollout", "status", r, "--watch", to) + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + defer stdout.Close() + + stderr, err := cmd.StderrPipe() + if err != nil { + return err + } + defer stderr.Close() + + if err := cmd.Start(); err != nil { + return err + } + go func() { + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + fmt.Println(scanner.Text()) + } + }() + go func() { + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + fmt.Println("Error:", scanner.Text()) + } + }() + + return cmd.Wait() +} + // WaitResources waits for multiple resources to be ready. func WaitResources( ctx context.Context, t *testing.T, resource, labelSelector, condition, timeout string, diff --git a/tests/e2e/operation/multi.go b/tests/e2e/operation/multi.go index 5ca5784921..7b655eef64 100644 --- a/tests/e2e/operation/multi.go +++ b/tests/e2e/operation/multi.go @@ -19,6 +19,7 @@ import ( "context" "strconv" "testing" + "time" "github.com/vdaas/vald/apis/grpc/v1/payload" ) @@ -29,10 +30,12 @@ func (c *client) MultiSearch(t *testing.T, ctx context.Context, ds Dataset) erro return err } + to := time.Second * 3 cfg := &payload.Search_Config{ - Num: 3, + Num: 10, Radius: -1.0, Epsilon: 0.1, + Timeout: to.Nanoseconds(), } reqs := make([]*payload.Search_Request, 0, len(ds.Test)) @@ -55,6 +58,7 @@ func (c *client) MultiSearch(t *testing.T, ctx context.Context, ds Dataset) erro if len(res.GetResponses()) != len(ds.Test) { t.Error("number of responses does not match with sent requests") } + t.Logf("res: %d", len(res.Responses)) return nil } diff --git a/tests/e2e/operation/operation.go b/tests/e2e/operation/operation.go index f8d8ae2027..6b7992d070 100644 --- a/tests/e2e/operation/operation.go +++ b/tests/e2e/operation/operation.go @@ -36,7 +36,9 @@ type Dataset struct { type Client interface { Search(t *testing.T, ctx context.Context, ds Dataset) error + StreamSearch(t *testing.T, ctx context.Context, ds Dataset) error SearchByID(t *testing.T, ctx context.Context, ds Dataset) error + StreamSearchByID(t *testing.T, ctx context.Context, ds Dataset) error SearchWithParameters( t *testing.T, ctx context.Context, @@ -60,7 +62,9 @@ type Client interface { errorValidator ErrorValidator, ) error LinearSearch(t *testing.T, ctx context.Context, ds Dataset) error + StreamLinearSearch(t *testing.T, ctx context.Context, ds Dataset) error LinearSearchByID(t *testing.T, ctx context.Context, ds Dataset) error + StreamLinearSearchByID(t *testing.T, ctx context.Context, ds Dataset) error LinearSearchWithParameters( t *testing.T, ctx context.Context, @@ -200,8 +204,8 @@ func (c *client) getGRPCConn() (*grpc.ClientConn, error) { grpc.WithInsecure(), grpc.WithKeepaliveParams( keepalive.ClientParameters{ - Time: time.Second, - Timeout: 5 * time.Second, + Time: 10 * time.Minute, + Timeout: 60 * time.Second, PermitWithoutStream: true, }, ), diff --git a/tests/e2e/operation/stream.go b/tests/e2e/operation/stream.go index 757f8e8e61..fa6b2a4937 100644 --- a/tests/e2e/operation/stream.go +++ b/tests/e2e/operation/stream.go @@ -21,6 +21,7 @@ import ( "reflect" "strconv" "testing" + "time" "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/internal/errors" @@ -66,7 +67,8 @@ func ParseAndLogError(t *testing.T, err error) error { return parsed } -func (c *client) Search(t *testing.T, ctx context.Context, ds Dataset) error { +func (c *client) StreamSearch(t *testing.T, ctx context.Context, ds Dataset) error { + to := time.Second * 1 return c.SearchWithParameters( t, ctx, @@ -74,7 +76,7 @@ func (c *client) Search(t *testing.T, ctx context.Context, ds Dataset) error { 100, -1.0, 0.1, - 3000000000, + to.Nanoseconds(), DefaultStatusValidator, ParseAndLogError, ) @@ -162,7 +164,7 @@ func (c *client) SearchWithParameters( left, right, ok := strings.Cut(resp.GetRequestId(), "-") if !ok { sid := strings.SplitN(resp.GetRequestId(), "-", 2) - left, right = sid[0], sid[1] + left = sid[0] } idx, err := strconv.Atoi(left) @@ -267,14 +269,15 @@ func (c *client) SearchWithParameters( return rerr } -func (c *client) SearchByID(t *testing.T, ctx context.Context, ds Dataset) error { +func (c *client) StreamSearchByID(t *testing.T, ctx context.Context, ds Dataset) error { + to := time.Second * 3 return c.SearchByIDWithParameters(t, ctx, ds, 100, -1.0, 0.1, - 3000000000, + to.Nanoseconds(), DefaultStatusValidator, ParseAndLogError, ) @@ -390,7 +393,7 @@ func (c *client) SearchByIDWithParameters( return rerr } -func (c *client) LinearSearch(t *testing.T, ctx context.Context, ds Dataset) error { +func (c *client) StreamLinearSearch(t *testing.T, ctx context.Context, ds Dataset) error { return c.LinearSearchWithParameters( t, ctx, @@ -509,7 +512,7 @@ func (c *client) LinearSearchWithParameters( return rerr } -func (c *client) LinearSearchByID(t *testing.T, ctx context.Context, ds Dataset) error { +func (c *client) StreamLinearSearchByID(t *testing.T, ctx context.Context, ds Dataset) error { return c.LinearSearchByIDWithParameters(t, ctx, ds, @@ -647,27 +650,35 @@ func (c *client) InsertWithParameters( return err } + mu := sync.Mutex{} wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() + var ierr error for { res, err := sc.Recv() if err == io.EOF { + mu.Lock() + rerr = ierr + mu.Unlock() return } if err != nil { if err := evalidator(t, err); err != nil { - rerr = errors.Join( - rerr, + ierr = errors.Join( + ierr, errors.Errorf( "stream finished by an error: %s", err.Error(), ), ) } + mu.Lock() + rerr = ierr + mu.Unlock() return } @@ -680,7 +691,7 @@ func (c *client) InsertWithParameters( status.GetCode(), status.GetMessage(), errdetails.Serialize(status.GetDetails())) - rerr = errors.Join(rerr, e) + ierr = errors.Join(ierr, e) } continue } @@ -705,7 +716,10 @@ func (c *client) InsertWithParameters( }, }) if err != nil { - return err + mu.Lock() + rerr = errors.Join(err, err) + mu.Unlock() + return } } @@ -750,27 +764,35 @@ func (c *client) UpdateWithParameters( return err } + mu := sync.Mutex{} wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() + var ierr error for { res, err := sc.Recv() if err == io.EOF { + mu.Lock() + rerr = ierr + mu.Unlock() return } if err != nil { if err := evalidator(t, err); err != nil { - rerr = errors.Join( - rerr, + ierr = errors.Join( + ierr, errors.Errorf( "stream finished by an error: %s", err.Error(), ), ) } + mu.Lock() + rerr = ierr + mu.Unlock() return } @@ -783,7 +805,7 @@ func (c *client) UpdateWithParameters( st.GetCode(), st.GetMessage(), errdetails.Serialize(st.GetDetails())) - rerr = errors.Join(rerr, e) + ierr = errors.Join(ierr, e) } continue } @@ -809,7 +831,10 @@ func (c *client) UpdateWithParameters( }, }) if err != nil { - return err + mu.Lock() + rerr = errors.Join(rerr, err) + mu.Unlock() + return } } @@ -855,26 +880,34 @@ func (c *client) UpsertWithParameters( } wg := sync.WaitGroup{} + mu := sync.Mutex{} wg.Add(1) go func() { defer wg.Done() + var ierr error for { res, err := sc.Recv() if err == io.EOF { + mu.Lock() + rerr = ierr + mu.Unlock() return } if err != nil { if err := evalidator(t, err); err != nil { - rerr = errors.Join( - rerr, + ierr = errors.Join( + ierr, errors.Errorf( "stream finished by an error: %s", err.Error(), ), ) } + mu.Lock() + rerr = ierr + mu.Unlock() return } @@ -887,7 +920,7 @@ func (c *client) UpsertWithParameters( status.GetCode(), status.GetMessage(), errdetails.Serialize(status.GetDetails())) - rerr = errors.Join(rerr, e) + ierr = errors.Join(ierr, e) } continue } @@ -913,7 +946,10 @@ func (c *client) UpsertWithParameters( }, }) if err != nil { - return err + mu.Lock() + rerr = errors.Join(rerr, err) + mu.Unlock() + return } } @@ -956,27 +992,35 @@ func (c *client) RemoveWithParameters( return err } + mu := sync.Mutex{} wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() + var ierr error for { res, err := sc.Recv() if err == io.EOF { + mu.Lock() + rerr = ierr + mu.Unlock() return } if err != nil { if err := evalidator(t, err); err != nil { - rerr = errors.Join( - rerr, + ierr = errors.Join( + ierr, errors.Errorf( "stream finished by an error: %s", err.Error(), ), ) } + mu.Lock() + rerr = ierr + mu.Unlock() return } @@ -989,7 +1033,7 @@ func (c *client) RemoveWithParameters( status.GetCode(), status.GetMessage(), errdetails.Serialize(status.GetDetails())) - rerr = errors.Join(rerr, e) + ierr = errors.Join(ierr, e) } continue } @@ -1013,7 +1057,10 @@ func (c *client) RemoveWithParameters( }, }) if err != nil { - return err + mu.Lock() + rerr = errors.Join(rerr, err) + mu.Unlock() + return } } @@ -1106,26 +1153,34 @@ func (c *client) GetObject(t *testing.T, ctx context.Context, ds Dataset) (rerr return err } + mu := sync.Mutex{} wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() + var ierr error for { res, err := sc.Recv() if err == io.EOF { + mu.Lock() + rerr = ierr + mu.Unlock() return } if err != nil { err = ParseAndLogError(t, err) - rerr = errors.Join( - rerr, + ierr = errors.Join( + ierr, errors.Errorf( "stream finished by an error: %s", err.Error(), ), ) + mu.Lock() + rerr = ierr + mu.Unlock() return } @@ -1134,7 +1189,7 @@ func (c *client) GetObject(t *testing.T, ctx context.Context, ds Dataset) (rerr err := res.GetStatus() if err != nil { t.Errorf("an error returned:\tcode: %d\tmessage: %s\tdetails: %s", err.GetCode(), err.GetMessage(), errdetails.Serialize(err.GetDetails())) - rerr = errors.Wrap(rerr, err.String()) + ierr = errors.Wrap(ierr, err.String()) continue } @@ -1170,7 +1225,10 @@ func (c *client) GetObject(t *testing.T, ctx context.Context, ds Dataset) (rerr }, }) if err != nil { - return err + mu.Lock() + rerr = errors.Join(rerr, err) + mu.Unlock() + return } } diff --git a/tests/e2e/operation/unaly.go b/tests/e2e/operation/unaly.go new file mode 100644 index 0000000000..ecc1cce96c --- /dev/null +++ b/tests/e2e/operation/unaly.go @@ -0,0 +1,113 @@ +//go:build e2e + +// Copyright (C) 2019-2025 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package operation + +import ( + "context" + "strconv" + "sync" + "testing" + "time" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/sync/errgroup" +) + +func (c *client) Search(t *testing.T, ctx context.Context, ds Dataset) (rerr error) { + t.Log("Search operation started") + + client, err := c.getClient() + if err != nil { + return err + } + + var ( + num = uint32(100) + radius = -1 + epsilon = 0.01 + timeout = time.Second * 3 + ) + + eg, egctx := errgroup.New(ctx) + eg.SetLimit(100) + mu := sync.Mutex{} + for i := 0; i < len(ds.Test); i++ { + eg.Go(func() error { + id := strconv.Itoa(i) + res, err := client.Search(egctx, &payload.Search_Request{ + Vector: ds.Test[i], + Config: &payload.Search_Config{ + RequestId: id, + Num: num, + Radius: float32(radius), + Epsilon: float32(epsilon), + Timeout: timeout.Nanoseconds(), + }, + }) + if err != nil { + select { + case <-egctx.Done(): + return nil + default: + return err + } + } + resp := res.GetResults() + topKIDs := make([]string, 0, len(resp)) + for _, d := range resp { + topKIDs = append(topKIDs, d.Id) + } + if len(topKIDs) == 0 { + t.Errorf("empty result is returned for test ID %s: %#v", res.GetRequestId(), topKIDs) + } + t.Logf("id: %s, results: %d, recall: %f", id, len(topKIDs), c.recall(topKIDs, ds.Neighbors[i][:len(topKIDs)])) + // t.Logf("algo: %s, id: %d, results: %d, recall: %f", right, idx, len(topKIDs), c.recall(topKIDs, ds.Neighbors[idx][:len(topKIDs)])) + return nil + }) + } + if err := eg.Wait(); err != nil { + if err = ParseAndLogError(t, err); err != nil { + mu.Lock() + rerr = errors.Join( + rerr, + errors.Errorf( + "stream finished by an error: %s", + err.Error(), + ), + ) + mu.Unlock() + } + } + if rerr != nil { + t.Fatalf("an error occured: %s", rerr.Error()) + } + + t.Log("search operation finished") + return +} + +func (c *client) SearchByID(t *testing.T, ctx context.Context, ds Dataset) error { + return nil +} + +func (c *client) LinearSearch(t *testing.T, ctx context.Context, ds Dataset) error { + return nil +} + +func (c *client) LinearSearchByID(t *testing.T, ctx context.Context, ds Dataset) error { + return nil +}