Skip to content

Commit

Permalink
♻️ fix
Browse files Browse the repository at this point in the history
Signed-off-by: vankichi <[email protected]>
  • Loading branch information
vankichi committed Jan 22, 2025
1 parent 0289138 commit 625c035
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 25 deletions.
57 changes: 55 additions & 2 deletions .github/helm/values/values-rollout-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,75 @@
#

defaults:
image:
tag: pr-2798
logging:
level: debug
server_config:
metrics:
pprof:
enabled: true # enable Pyroscope
healths:
liveness:
livenessProbe:
initialDelaySeconds: 60
readiness:
readinessProbe:
initialDelaySeconds: 60
servers:
grpc:
server:
grpc:
interceptors:
- RecoverInterceptor
- TraceInterceptor
- MetricInterceptor
grpc:
client:
dial_option:
interceptors:
- TraceInterceptor
observability:
enabled: true
otlp:
collector_endpoint: "opentelemetry-collector-collector.default.svc.cluster.local:4317"
trace:
enabled: true
networkPolicy:
enabled: true
custom:
ingress:
- from:
- podSelector:
matchLabels:
app.kubernetes.io/name: pyroscope
egress:
- to:
- podSelector:
matchLabels:
app.kubernetes.io/name: opentelemetry-collector-collector
gateway:
lb:
enabled: true
maxReplicas: 1
minReplicas: 1
hpa:
enabled: false
resources:
requests:
cpu: 100m
memory: 50Mi
cpu: 200m
memory: 150Mi
gateway_config:
index_replica: 2
# ingress:
# # if enabled is true, vald-lb-gateway can be connected through Kubernetes ingress from the external network.
# enabled: true
# # TODO: Set your ingress host.
# host: localhost
# service:
# # NOTE: https://doc.traefik.io/traefik/routing/providers/kubernetes-ingress/#on-service
# annotations:
# traefik.ingress.kubernetes.io/service.serversscheme: h2c
agent:
minReplicas: 3
maxReplicas: 3
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ func (n *ngt) initNGT(opts ...core.Option) (err error) {
var current uint64
if err != nil {
if !n.enableCopyOnWrite {
log.Debug("failed to load vald index from %s\t error: %v", n.path, err)
log.Debugf("failed to load vald index from %s\t error: %v", n.path, err)
if n.kvs == nil {
n.kvs = kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency))
} else if n.kvs.Len() > 0 {
Expand Down
58 changes: 37 additions & 21 deletions tests/e2e/crud/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/vdaas/vald/internal/file"
"github.com/vdaas/vald/internal/net/grpc/codes"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/sync/errgroup"
"github.com/vdaas/vald/tests/e2e/hdf5"
"github.com/vdaas/vald/tests/e2e/kubernetes/client"
"github.com/vdaas/vald/tests/e2e/kubernetes/kubectl"
Expand All @@ -55,13 +56,14 @@ var (
upsertNum int
removeNum int

insertFrom int
searchFrom int
searchByIDFrom int
getObjectFrom int
updateFrom int
upsertFrom int
removeFrom int
insertFrom int
searchFrom int
searchByIDFrom int
searchConcurrency int
getObjectFrom int
updateFrom int
upsertFrom int
removeFrom int

waitAfterInsertDuration time.Duration
waitResourceReadyDuration time.Duration
Expand All @@ -83,6 +85,7 @@ func init() {
flag.IntVar(&correctionInsertNum, "correction-insert-num", 10000, "number of id-vector pairs used for insert")
flag.IntVar(&searchNum, "search-num", 10000, "number of id-vector pairs used for search")
flag.IntVar(&searchByIDNum, "search-by-id-num", 100, "number of id-vector pairs used for search-by-id")
flag.IntVar(&searchConcurrency, "search-conn", 100, "number of search concurrency")
flag.IntVar(&getObjectNum, "get-object-num", 100, "number of id-vector pairs used for get-object")
flag.IntVar(&updateNum, "update-num", 10000, "number of id-vector pairs used for update")
flag.IntVar(&upsertNum, "upsert-num", 10000, "number of id-vector pairs used for upsert")
Expand Down Expand Up @@ -1025,7 +1028,7 @@ func TestE2EAgentRolloutRestart(t *testing.T) {

sleep(t, waitAfterInsertDuration)

searchFunc := func() error {
searchFunc := func(ctx context.Context) error {
return op.Search(t, ctx, operation.Dataset{
Test: ds.Test[searchFrom : searchFrom+searchNum],
Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum],
Expand All @@ -1039,23 +1042,32 @@ func TestE2EAgentRolloutRestart(t *testing.T) {
done := make(chan struct{})
go func() {
defer wg.Done()
var ierr error

for {
select {
case <-done:
return
default:
ierr = searchFunc()
if ierr != nil {
st, ok := status.FromError(ierr)
if ok && st.Code() == codes.DeadlineExceeded {
_, _, rerr := status.ParseError(ierr, codes.DeadlineExceeded, "an error occurred")
mu.Lock()
serr = errors.Join(serr, rerr)
mu.Unlock()
}
eg, egctx := errgroup.New(ctx)
for i := 0; i < searchConcurrency; i++ {
eg.Go(func() (e error) {
ierr := searchFunc(egctx)
if ierr != nil {
st, ok := status.FromError(ierr)
if ok && st.Code() == codes.DeadlineExceeded {
_, _, rerr := status.ParseError(ierr, codes.DeadlineExceeded, "an error occurred")
mu.Lock()
e = errors.Join(e, rerr)
mu.Unlock()
}
}
return
})
}
egerr := eg.Wait()
mu.Lock()
serr = errors.Join(serr, egerr)
mu.Unlock()
time.Sleep(10 * time.Second)
}
}
Expand All @@ -1070,6 +1082,9 @@ func TestE2EAgentRolloutRestart(t *testing.T) {

cnt, err := op.IndexInfo(t, ctx)
if err != nil {
if cnt == nil {
t.Fatalf("an error occurred: err = %s", err)
}
t.Fatalf("an error occurred: count = %d, err = %s", cnt.Stored, err)
}

Expand All @@ -1093,13 +1108,14 @@ func TestE2EAgentRolloutRestart(t *testing.T) {
}

// Remove all vector data after the current - 1 hour.
// err = op.Flush(t, ctx)
err = op.RemoveByTimestamp(t, ctx, time.Now().Add(-time.Hour).UnixNano())
if err != nil {
t.Fatalf("an error occurred: %s", err)
}
close(done)
wg.Wait()
if serr != nil {
t.Fatalf("an error occurred: %s", serr)
}
if err != nil {
t.Fatalf("an error occurred: %s", err)
}
}
4 changes: 3 additions & 1 deletion tests/e2e/operation/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"strconv"
"testing"
"time"

"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/internal/errors"
Expand Down Expand Up @@ -268,13 +269,14 @@ func (c *client) SearchWithParameters(
}

func (c *client) SearchByID(t *testing.T, ctx context.Context, ds Dataset) error {
to := time.Second * 3
return c.SearchByIDWithParameters(t,
ctx,
ds,
100,
-1.0,
0.1,
3000000000,
to.Nanoseconds(),
DefaultStatusValidator,
ParseAndLogError,
)
Expand Down

0 comments on commit 625c035

Please sign in to comment.