Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rollout restart agent e2e test #2799

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions .github/actions/setup-e2e/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ inputs:
description: "If k3d is not required, set this to false"
required: false
default: "true"
require_k3d_storage:
description: "If local-path-storage is not required, set this to false"
required: false
default: "false"
require_minikube:
description: "If minikube is not required, set this to true and set require_k3d to false"
required: false
Expand Down Expand Up @@ -85,6 +89,7 @@ runs:
with:
agents: 3
ingress_port: ${{ inputs.ingress_port }}
storage: ${{ inputs.require_k3d_storage }}
- name: Setup Minikube environment
if: ${{ inputs.require_minikube == 'true' }}
shell: bash
Expand Down
9 changes: 9 additions & 0 deletions .github/actions/setup-k3d/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ inputs:
description: "Number of agents"
required: false
default: "3"
storage:
description: "If true, the local-path-storage will be deployed"
required: false
default: "false"
options:
description: "Options for k3d cluster create command"
required: false
Expand Down Expand Up @@ -124,6 +128,11 @@ runs:
echo $KUBECONFIG
cat $KUBECONFIG
cat /etc/hosts
- name: Set local path storage
if: ${{ inputs.storage == 'true' }}
shell: bash
run: |
make k3d/storage
- name: Show Kubernetes Cluster Info
shell: bash
run: |
Expand Down
84 changes: 84 additions & 0 deletions .github/helm/values/values-rollout-agent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Copyright (C) 2019-2025 vdaas.org vald team <[email protected]>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

defaults:
logging:
level: debug
networkPolicy:
enabled: true
gateway:
lb:
enabled: true
minReplicas: 1
hpa:
enabled: false
resources:
requests:
cpu: 100m
memory: 50Mi
gateway_config:
index_replica: 2
agent:
minReplicas: 3
maxReplicas: 3
podManagementPolicy: Parallel
hpa:
enabled: false
resources:
requests:
cpu: 100m
memory: 50Mi
# We recommend to set this value long enough to ensure the backup speed of PV, since the Index is backed up at the end of the pod.
terminationGracePeriodSeconds: 600
# This is the persistent volume settings.
# Please change it according to your environment.
persistentVolume:
enabled: true
accessMode: ReadWriteOncePod
storageClass: local-path
size: 500Mi
ngt:
auto_index_duration_limit: 2m
auto_index_check_duration: 30s
auto_index_length: 500
dimension: 784
enable_in_memory_mode: false
index_path: "/var/ngt/index"
discoverer:
minReplicas: 1
hpa:
enabled: false
resources:
requests:
cpu: 100m
memory: 50Mi
manager:
index:
replicas: 1
resources:
requests:
cpu: 100m
memory: 30Mi
indexer:
auto_index_duration_limit: 2m
auto_index_check_duration: 30s
auto_index_length: 1000
corrector:
enabled: true
# suspend because you do not want corrector to start automatically in CI
# instead run it manually
suspend: true
schedule: "1 2 3 4 5"
44 changes: 44 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,49 @@ jobs:
e2e/readreplica
env:
POD_NAME: ${{ steps.deploy_vald_readreplica.outputs.POD_NAME }}
e2e-stream-crud-with-rollout-restart-agent:
name: "E2E test (Stream CRUD) with rollout restart agent"
needs: [detect-ci-container]
runs-on: ubuntu-latest
timeout-minutes: 60
container:
image: ghcr.io/vdaas/vald/vald-ci-container:${{ needs.detect-ci-container.outputs.TAG }}
options: "--add-host host.docker.internal:host-gateway"
steps:
- uses: actions/checkout@v4
- name: Set Git config
run: |
git config --global --add safe.directory ${GITHUB_WORKSPACE}
- name: Setup E2E environment
id: setup_e2e
uses: ./.github/actions/setup-e2e
with:
require_k3d_storage: true
- name: Deploy Vald
id: deploy_vald
uses: ./.github/actions/e2e-deploy-vald
with:
helm_extra_options: ${{ steps.setup_e2e.outputs.HELM_EXTRA_OPTIONS }}
values: .github/helm/values/values-rollout-agent.yaml
wait_for_selector: app=vald-lb-gateway
- name: Run E2E CRUD with rollout restart agent
run: |
make hack/benchmark/assets/dataset/${{ env.DATASET }}
make E2E_BIND_PORT=8081 \
E2E_DATASET_NAME=${{ env.DATASET }} \
E2E_INSERT_COUNT=1000 \
E2E_SEARCH_COUNT=1000 \
E2E_SEARCH_BY_ID_COUNT=1000 \
E2E_GET_OBJECT_COUNT=100 \
E2E_UPDATE_COUNT=100 \
E2E_UPSERT_COUNT=100 \
E2E_REMOVE_COUNT=100 \
E2E_WAIT_FOR_CREATE_INDEX_DURATION=3m \
E2E_TARGET_POD_NAME=${POD_NAME} \
E2E_TARGET_NAMESPACE=default \
e2e/rollout/restart/agent
env:
POD_NAME: ${{ steps.deploy_vald.outputs.POD_NAME }}
e2e-stream-crud-with-mirror:
name: "E2E test (Stream CRUD) with mirror"
needs: [detect-ci-container]
Expand Down Expand Up @@ -417,6 +460,7 @@ jobs:
- e2e-stream-crud-under-index-management-jobs
- e2e-stream-crud-with-mirror
- e2e-stream-crud-with-readreplica
- e2e-stream-crud-with-rollout-restart-agent
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ E2E_UPDATE_COUNT ?= 10
E2E_UPSERT_COUNT ?= 10
E2E_WAIT_FOR_CREATE_INDEX_DURATION ?= 8m
E2E_WAIT_FOR_START_TIMEOUT ?= 10m
E2E_WAIT_FOR_RESOURCE_READY ?= 3m
E2E_SEARCH_FROM ?= 0
E2E_SEARCH_BY_ID_FROM ?= 0
E2E_INSERT_FROM ?= 0
Expand Down
5 changes: 5 additions & 0 deletions Makefile.d/e2e.mk
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ e2e/index/job/correction:
e2e/readreplica:
$(call run-e2e-crud-test,-run TestE2EReadReplica)

.PHONY: e2e/rollaout/restart/agent
## run rollout-restart agent e2e
e2e/rollout/restart/agent:
$(call run-e2e-crud-test,-run TestE2EAgentRolloutRestart)

.PHONY: e2e/maxdim
## run e2e/maxdim
e2e/maxdim:
Expand Down
1 change: 1 addition & 0 deletions Makefile.d/functions.mk
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ define run-e2e-crud-test
-search-by-id-from=$(E2E_SEARCH_BY_ID_FROM) \
-get-object-from=$(E2E_GET_OBJECT_FROM) \
-wait-after-insert=$(E2E_WAIT_FOR_CREATE_INDEX_DURATION) \
-wait-resource-ready=$(E2E_WAIT_FOR_RESOURCE_READY) \
-portforward=$(E2E_PORTFORWARD_ENABLED) \
-portforward-pod-name=$(E2E_TARGET_POD_NAME) \
-portforward-pod-port=$(E2E_TARGET_PORT) \
Expand Down
116 changes: 115 additions & 1 deletion tests/e2e/crud/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"os/exec"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -62,7 +63,8 @@ var (
upsertFrom int
removeFrom int

waitAfterInsertDuration time.Duration
waitAfterInsertDuration time.Duration
waitResourceReadyDuration time.Duration

kubeClient client.Client
namespace string
Expand Down Expand Up @@ -96,6 +98,7 @@ func init() {

datasetName := flag.String("dataset", "fashion-mnist-784-euclidean.hdf5", "dataset")
waitAfterInsert := flag.String("wait-after-insert", "3m", "wait duration after inserting vectors")
waitResourceReady := flag.String("wait-resource-ready", "3m", "wait duration for resource ready")

pf := flag.Bool("portforward", false, "enable port forwarding")
pfPodName := flag.String("portforward-pod-name", "vald-gateway-0", "pod name (only for port forward)")
Expand Down Expand Up @@ -131,6 +134,11 @@ func init() {
if err != nil {
panic(err)
}

waitResourceReadyDuration, err = time.ParseDuration(*waitResourceReady)
if err != nil {
panic(err)
}
}

func teardown() {
Expand Down Expand Up @@ -987,3 +995,109 @@ func TestE2EReadReplica(t *testing.T) {
t.Fatalf("an error occurred: %s", err)
}
}

// TestE2EReadReplica tests that search requests succeed with read replica resources.
func TestE2EAgentRolloutRestart(t *testing.T) {
t.Cleanup(teardown)

if kubeClient == nil {
var err error
kubeClient, err = client.New(kubeConfig)
if err != nil {
t.Skipf("TestE2EReadReplica needs kubernetes client but failed to create one: %s", err)
}
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

op, err := operation.New(host, port)
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

err = op.Upsert(t, ctx, operation.Dataset{
Train: ds.Train[insertFrom : insertFrom+insertNum],
})
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

sleep(t, waitAfterInsertDuration)

searchFunc := func() error {
return op.Search(t, ctx, operation.Dataset{
Test: ds.Test[searchFrom : searchFrom+searchNum],
Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum],
})
}

wg := sync.WaitGroup{}
mu := sync.Mutex{}
var serr error
wg.Add(1)
done := make(chan struct{})
go func() {
defer wg.Done()
for {
select {
case <-done:
return
default:
err = searchFunc()
if err != nil {
st, ok := status.FromError(err)
if ok && st.Code() == codes.DeadlineExceeded {
_, _, rerr := status.ParseError(err, codes.DeadlineExceeded, "an error occurred")
mu.Lock()
serr = errors.Join(serr, rerr)
mu.Unlock()
}
}
time.Sleep(10 * time.Second)
}
}
}()

// Wait for StatefulSet to be ready
t.Log("rollout restart agent and waiting for agent pods ready...")
err = kubectl.RolloutResourceName(ctx, t, "statefulset", "vald-agent", waitResourceReadyDuration.String())
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

cnt, err := op.IndexInfo(t, ctx)
if err != nil {
t.Fatalf("an error occurred: count = %d, err = %s", cnt.Stored, err)
}

err = op.Exists(t, ctx, "0")
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

err = op.GetObject(t, ctx, operation.Dataset{
Train: ds.Train[getObjectFrom : getObjectFrom+getObjectNum],
})
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

err = op.Remove(t, ctx, operation.Dataset{
Train: ds.Train[removeFrom : removeFrom+removeNum],
})
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

// Remove all vector data after the current - 1 hour.
err = op.RemoveByTimestamp(t, ctx, time.Now().Add(-time.Hour).UnixNano())
if err != nil {
t.Fatalf("an error occurred: %s", err)
}
close(done)
wg.Wait()
if serr != nil {
t.Fatalf("an error occurred: %s", serr)
}
}
Loading
Loading