From 4c3baeb5710e7e04f761c5e841642b1de15ff837 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 16 Oct 2024 14:58:57 +0300 Subject: [PATCH 1/4] fix: separate Heartbeat and ShardHandler to fibers Signed-off-by: kostas --- .github/workflows/ci.yml | 7 +- .github/workflows/test-fakeredis.yml | 11 +- contrib/charts/dragonfly/Chart.yaml | 4 +- .../dragonfly/ci/affinity-values.golden.yaml | 8 +- .../ci/command_extraargs-values.golden.yaml | 8 +- .../extracontainer-string-values.golden.yaml | 8 +- .../ci/extracontainer-tpl-values.golden.yaml | 8 +- .../ci/extravolumes-values.golden.yaml | 8 +- .../initcontainer-string-values.golden.yaml | 8 +- .../ci/initcontainer-tpl-values.golden.yaml | 8 +- .../ci/password-old-env-values.golden.yaml | 6 +- .../ci/passwordsecret-values.golden.yaml | 8 +- ...ersistence-and-existing-secret.golden.yaml | 8 +- .../ci/persistent-values.golden.yaml | 8 +- .../ci/priorityclassname-values.golden.yaml | 8 +- .../ci/prometheusrules-values.golden.yaml | 12 +- .../dragonfly/ci/resources-values.golden.yaml | 8 +- .../ci/securitycontext-values.golden.yaml | 8 +- .../ci/service-loadbalancer-ip.golden.yaml | 8 +- .../ci/service-monitor-values.golden.yaml | 12 +- .../ci/taints-tolerations-values.golden.yaml | 8 +- .../dragonfly/ci/tls-values.golden.yaml | 12 +- .../ci/tolerations-values.golden.yaml | 8 +- helio | 2 +- src/core/allocation_tracker.cc | 2 +- src/core/search/CMakeLists.txt | 5 +- src/core/search/ast_expr.cc | 6 +- src/core/search/ast_expr.h | 12 +- src/core/search/indices.cc | 9 + src/core/search/indices.h | 30 +--- src/core/search/lexer.lex | 1 + src/core/search/parser.y | 3 +- src/core/search/rax_tree.h | 158 ++++++++++++++++++ src/core/search/rax_tree_test.cc | 107 ++++++++++++ src/core/search/search.cc | 23 +++ src/core/search/search_parser_test.cc | 40 ++++- src/core/search/search_test.cc | 29 ++++ src/facade/dragonfly_connection.cc | 93 ++++++----- src/facade/dragonfly_connection.h | 17 +- src/server/bitops_family.cc | 26 +-- src/server/bitops_family_test.cc | 11 +- src/server/cluster/cluster_config.cc | 4 +- src/server/cluster/cluster_config_test.cc | 136 ++++++++++++--- src/server/cluster/cluster_defs.cc | 20 +++ src/server/cluster/cluster_defs.h | 49 +++++- src/server/cluster/cluster_family.cc | 2 + src/server/detail/save_stages_controller.cc | 2 + src/server/detail/snapshot_storage.cc | 7 +- src/server/dfly_main.cc | 41 +++++ src/server/engine_shard.cc | 95 ++++++++--- src/server/engine_shard.h | 22 ++- src/server/engine_shard_set.cc | 3 +- src/server/engine_shard_set.h | 1 - src/server/generic_family_test.cc | 3 + src/server/list_family_test.cc | 9 + src/server/main_service.cc | 3 + src/server/rdb_load.cc | 7 +- src/server/rdb_save.cc | 69 +++++--- src/server/rdb_save.h | 3 + src/server/replica.cc | 26 +-- src/server/server_family.cc | 6 +- src/server/transaction.cc | 119 +++++++++++-- src/server/transaction.h | 6 + tests/dragonfly/instance.py | 4 +- tests/dragonfly/pymemcached_test.py | 3 +- tests/dragonfly/replication_test.py | 54 +++++- tests/dragonfly/snapshot_test.py | 11 +- tools/requirements.txt | 2 +- 68 files changed, 1163 insertions(+), 310 deletions(-) create mode 100644 src/core/search/rax_tree.h create mode 100644 src/core/search/rax_tree_test.cc diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 06f537456c20..74d2ac8b81cb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,8 +21,9 @@ jobs: - uses: actions/setup-python@v5 - name: Install dependencies run: | + python -m venv venv + source venv/bin/activate python -m pip install pre-commit - python -m pip freeze --local lsblk -l echo "sda rotational = $(cat /sys/block/sda/queue/rotational)" echo "sdb rotational = $(cat /sys/block/sdb/queue/rotational)" @@ -31,7 +32,9 @@ jobs: path: ~/.cache/pre-commit key: pre-commit|${{ env.pythonLocation }}|${{ hashFiles('.pre-commit-config.yaml') }} - name: Run pre-commit checks - run: pre-commit run --show-diff-on-failure --color=always --from-ref HEAD^ --to-ref HEAD + run: + source venv/bin/activate + pre-commit run --show-diff-on-failure --color=always --from-ref HEAD^ --to-ref HEAD shell: bash build: strategy: diff --git a/.github/workflows/test-fakeredis.yml b/.github/workflows/test-fakeredis.yml index 29f87ece47ca..261def70b397 100644 --- a/.github/workflows/test-fakeredis.yml +++ b/.github/workflows/test-fakeredis.yml @@ -17,9 +17,6 @@ jobs: options: --security-opt seccomp=unconfined --sysctl "net.ipv6.conf.all.disable_ipv6=0" strategy: fail-fast: false - matrix: - PYTHON_VERSION: - - "3.10" name: "Run tests: " permissions: pull-requests: write @@ -39,10 +36,6 @@ jobs: - uses: actions/checkout@v4 with: submodules: true - - uses: actions/setup-python@v5 - with: - cache-dependency-path: tests/fakeredis/poetry.lock - python-version: ${{ matrix.PYTHON_VERSION }} - name: Install dependencies env: @@ -50,7 +43,7 @@ jobs: shell: bash working-directory: tests/fakeredis run: | - python -m pip --quiet install poetry + pip install poetry echo "$HOME/.poetry/bin" >> $GITHUB_PATH poetry install - name: Configure CMake @@ -135,7 +128,7 @@ jobs: - uses: actions/setup-python@v5 with: cache-dependency-path: tests/fakeredis/poetry.lock - python-version: "3.12" + python-version: "3.10" - name: Merge html results run: | diff --git a/contrib/charts/dragonfly/Chart.yaml b/contrib/charts/dragonfly/Chart.yaml index 21e1ae5030fa..ea54c60f9959 100644 --- a/contrib/charts/dragonfly/Chart.yaml +++ b/contrib/charts/dragonfly/Chart.yaml @@ -15,13 +15,13 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: v1.23.2 +version: v1.24.0 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "v1.23.2" +appVersion: "v1.24.0" home: https://dragonflydb.io/ diff --git a/contrib/charts/dragonfly/ci/affinity-values.golden.yaml b/contrib/charts/dragonfly/ci/affinity-values.golden.yaml index e709d9f2e0b5..3917fa2d3aed 100644 --- a/contrib/charts/dragonfly/ci/affinity-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/affinity-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -72,7 +72,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/command_extraargs-values.golden.yaml b/contrib/charts/dragonfly/ci/command_extraargs-values.golden.yaml index 9779e32bdd10..3fd1b7701f44 100644 --- a/contrib/charts/dragonfly/ci/command_extraargs-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/command_extraargs-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -60,7 +60,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/extracontainer-string-values.golden.yaml b/contrib/charts/dragonfly/ci/extracontainer-string-values.golden.yaml index 52c8cfa484cd..bea7d6304c11 100644 --- a/contrib/charts/dragonfly/ci/extracontainer-string-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/extracontainer-string-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -67,7 +67,7 @@ spec: image: busybox:latest name: sidecar-string - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/extracontainer-tpl-values.golden.yaml b/contrib/charts/dragonfly/ci/extracontainer-tpl-values.golden.yaml index 7e0841e5d1bd..6fef9c9bacd5 100644 --- a/contrib/charts/dragonfly/ci/extracontainer-tpl-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/extracontainer-tpl-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -64,7 +64,7 @@ spec: command: ["/bin/sh"] args: ["-c", "date; sleep 3600;"] - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/extravolumes-values.golden.yaml b/contrib/charts/dragonfly/ci/extravolumes-values.golden.yaml index 8c8ab7b00f23..cd300b2a9315 100644 --- a/contrib/charts/dragonfly/ci/extravolumes-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/extravolumes-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -60,7 +60,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/initcontainer-string-values.golden.yaml b/contrib/charts/dragonfly/ci/initcontainer-string-values.golden.yaml index 299208cc0ffb..c74cd79cf2dd 100644 --- a/contrib/charts/dragonfly/ci/initcontainer-string-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/initcontainer-string-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -68,7 +68,7 @@ spec: name: initcontainer-string containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/initcontainer-tpl-values.golden.yaml b/contrib/charts/dragonfly/ci/initcontainer-tpl-values.golden.yaml index fd9e9a6125d9..c7e12edc92e1 100644 --- a/contrib/charts/dragonfly/ci/initcontainer-tpl-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/initcontainer-tpl-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -65,7 +65,7 @@ spec: args: ["-c", "date; sleep 1;"] containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/password-old-env-values.golden.yaml b/contrib/charts/dragonfly/ci/password-old-env-values.golden.yaml index 1c77fa02cd30..ead648632015 100644 --- a/contrib/charts/dragonfly/ci/password-old-env-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/password-old-env-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/extra-manifests.yaml @@ -28,7 +28,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -50,7 +50,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 diff --git a/contrib/charts/dragonfly/ci/passwordsecret-values.golden.yaml b/contrib/charts/dragonfly/ci/passwordsecret-values.golden.yaml index 6f2c015e43c4..862186a1d840 100644 --- a/contrib/charts/dragonfly/ci/passwordsecret-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/passwordsecret-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/extra-manifests.yaml @@ -28,7 +28,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -50,7 +50,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -68,7 +68,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/persistence-and-existing-secret.golden.yaml b/contrib/charts/dragonfly/ci/persistence-and-existing-secret.golden.yaml index b047da727d40..879bcf24e7f8 100644 --- a/contrib/charts/dragonfly/ci/persistence-and-existing-secret.golden.yaml +++ b/contrib/charts/dragonfly/ci/persistence-and-existing-secret.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/extra-manifests.yaml @@ -28,7 +28,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -50,7 +50,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: serviceName: test @@ -69,7 +69,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/persistent-values.golden.yaml b/contrib/charts/dragonfly/ci/persistent-values.golden.yaml index e2a5396231e6..f2da8339656f 100644 --- a/contrib/charts/dragonfly/ci/persistent-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/persistent-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: serviceName: test @@ -61,7 +61,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/priorityclassname-values.golden.yaml b/contrib/charts/dragonfly/ci/priorityclassname-values.golden.yaml index 5a76d7d98c12..00e226a17ab2 100644 --- a/contrib/charts/dragonfly/ci/priorityclassname-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/priorityclassname-values.golden.yaml @@ -17,7 +17,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -29,7 +29,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -51,7 +51,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -70,7 +70,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/prometheusrules-values.golden.yaml b/contrib/charts/dragonfly/ci/prometheusrules-values.golden.yaml index 7d6cd0598b23..4f5a26a7ee96 100644 --- a/contrib/charts/dragonfly/ci/prometheusrules-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/prometheusrules-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/metrics-service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm type: metrics spec: @@ -43,7 +43,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -65,7 +65,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -83,7 +83,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly @@ -124,7 +124,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: endpoints: diff --git a/contrib/charts/dragonfly/ci/resources-values.golden.yaml b/contrib/charts/dragonfly/ci/resources-values.golden.yaml index 1131eedb8d46..2367f71a6c71 100644 --- a/contrib/charts/dragonfly/ci/resources-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/resources-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -60,7 +60,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/securitycontext-values.golden.yaml b/contrib/charts/dragonfly/ci/securitycontext-values.golden.yaml index 3ec450df42a1..5edaf55e5595 100644 --- a/contrib/charts/dragonfly/ci/securitycontext-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/securitycontext-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -64,7 +64,7 @@ spec: - name: dragonfly securityContext: readOnlyRootFilesystem: true - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/service-loadbalancer-ip.golden.yaml b/contrib/charts/dragonfly/ci/service-loadbalancer-ip.golden.yaml index 7a596915682a..e62e8c324014 100644 --- a/contrib/charts/dragonfly/ci/service-loadbalancer-ip.golden.yaml +++ b/contrib/charts/dragonfly/ci/service-loadbalancer-ip.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: LoadBalancer @@ -43,7 +43,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -61,7 +61,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/service-monitor-values.golden.yaml b/contrib/charts/dragonfly/ci/service-monitor-values.golden.yaml index 3027b413bd83..aa637f102fe7 100644 --- a/contrib/charts/dragonfly/ci/service-monitor-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/service-monitor-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/metrics-service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm type: metrics spec: @@ -43,7 +43,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -65,7 +65,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -83,7 +83,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly @@ -125,7 +125,7 @@ metadata: release: prometheus-stack app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: endpoints: diff --git a/contrib/charts/dragonfly/ci/taints-tolerations-values.golden.yaml b/contrib/charts/dragonfly/ci/taints-tolerations-values.golden.yaml index 4b49a4786d84..55152b0fd75a 100644 --- a/contrib/charts/dragonfly/ci/taints-tolerations-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/taints-tolerations-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -78,7 +78,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/tls-values.golden.yaml b/contrib/charts/dragonfly/ci/tls-values.golden.yaml index 9dbfa4a8eb88..a43ac5dd7d3c 100644 --- a/contrib/charts/dragonfly/ci/tls-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/tls-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/extra-manifests.yaml @@ -28,7 +28,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm type: kubernetes.io/tls data: @@ -44,7 +44,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -66,7 +66,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -77,7 +77,7 @@ spec: template: metadata: annotations: - checksum/tls-secret: 775878f21cd952370e315f82c5dae9168b722577b41a1bd65ba866a13e480efe + checksum/tls-secret: 9640bcf6cb420c9ce8d883c49b83be32311540f5f550cb664184e5cfda641eb7 labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test @@ -85,7 +85,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/contrib/charts/dragonfly/ci/tolerations-values.golden.yaml b/contrib/charts/dragonfly/ci/tolerations-values.golden.yaml index b43d04b5a278..47f1e02efb75 100644 --- a/contrib/charts/dragonfly/ci/tolerations-values.golden.yaml +++ b/contrib/charts/dragonfly/ci/tolerations-values.golden.yaml @@ -8,7 +8,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm --- # Source: dragonfly/templates/service.yaml @@ -20,7 +20,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: type: ClusterIP @@ -42,7 +42,7 @@ metadata: labels: app.kubernetes.io/name: dragonfly app.kubernetes.io/instance: test - app.kubernetes.io/version: "v1.23.2" + app.kubernetes.io/version: "v1.24.0" app.kubernetes.io/managed-by: Helm spec: replicas: 1 @@ -63,7 +63,7 @@ spec: serviceAccountName: test-dragonfly containers: - name: dragonfly - image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.23.2" + image: "docker.dragonflydb.io/dragonflydb/dragonfly:v1.24.0" imagePullPolicy: IfNotPresent ports: - name: dragonfly diff --git a/helio b/helio index fe723d3a8863..9dd56595b6b4 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit fe723d3a88637ba96690f41e2160b2bb7f9a87ef +Subproject commit 9dd56595b6b4cff71338b8c728eb12a8017c6b97 diff --git a/src/core/allocation_tracker.cc b/src/core/allocation_tracker.cc index e5a7d392c04b..e00afe2907bb 100644 --- a/src/core/allocation_tracker.cc +++ b/src/core/allocation_tracker.cc @@ -19,7 +19,7 @@ AllocationTracker& AllocationTracker::Get() { } bool AllocationTracker::Add(const TrackingInfo& info) { - if (tracking_.size() >= tracking_.max_size()) { + if (tracking_.size() >= tracking_.capacity()) { return false; } diff --git a/src/core/search/CMakeLists.txt b/src/core/search/CMakeLists.txt index 83a9af1a3e52..7c4b03d1b406 100644 --- a/src/core/search/CMakeLists.txt +++ b/src/core/search/CMakeLists.txt @@ -7,9 +7,10 @@ add_library(query_parser base.cc ast_expr.cc query_driver.cc search.cc indices.c sort_indices.cc vector_utils.cc compressed_sorted_set.cc block_list.cc ${gen_dir}/parser.cc ${gen_dir}/lexer.cc) -target_link_libraries(query_parser base absl::strings TRDP::reflex TRDP::uni-algo TRDP::hnswlib) +target_link_libraries(query_parser base absl::strings TRDP::reflex TRDP::uni-algo TRDP::hnswlib redis_lib) cxx_test(compressed_sorted_set_test query_parser LABELS DFLY) cxx_test(block_list_test query_parser LABELS DFLY) +cxx_test(rax_tree_test redis_test_lib LABELS DFLY) cxx_test(search_parser_test query_parser LABELS DFLY) -cxx_test(search_test query_parser LABELS DFLY) +cxx_test(search_test redis_test_lib query_parser LABELS DFLY) diff --git a/src/core/search/ast_expr.cc b/src/core/search/ast_expr.cc index 767e8797877f..86c200c3e24f 100644 --- a/src/core/search/ast_expr.cc +++ b/src/core/search/ast_expr.cc @@ -16,7 +16,11 @@ using namespace std; namespace dfly::search { -AstTermNode::AstTermNode(string term) : term{term} { +AstTermNode::AstTermNode(string term) : term{std::move(term)} { +} + +AstPrefixNode::AstPrefixNode(string prefix) : prefix{std::move(prefix)} { + this->prefix.pop_back(); } AstRangeNode::AstRangeNode(double lo, bool lo_excl, double hi, bool hi_excl) diff --git a/src/core/search/ast_expr.h b/src/core/search/ast_expr.h index d302e599cc6e..ef1663375bae 100644 --- a/src/core/search/ast_expr.h +++ b/src/core/search/ast_expr.h @@ -24,11 +24,17 @@ struct AstStarNode {}; // Matches terms in text fields struct AstTermNode { - AstTermNode(std::string term); + explicit AstTermNode(std::string term); std::string term; }; +struct AstPrefixNode { + explicit AstPrefixNode(std::string prefix); + + std::string prefix; +}; + // Matches numeric range struct AstRangeNode { AstRangeNode(double lo, bool lo_excl, double hi, bool hi_excl); @@ -97,8 +103,8 @@ struct AstSortNode { }; using NodeVariants = - std::variant; + std::variant; struct AstNode : public NodeVariants { using variant::variant; diff --git a/src/core/search/indices.cc b/src/core/search/indices.cc index 01121d552eff..3f7939d8f649 100644 --- a/src/core/search/indices.cc +++ b/src/core/search/indices.cc @@ -123,6 +123,15 @@ const typename BaseStringIndex::Container* BaseStringIndex::Matching(strin return (it != entries_.end()) ? &it->second : nullptr; } +template +void BaseStringIndex::MatchingPrefix(std::string_view prefix, + absl::FunctionRef cb) const { + for (auto it = entries_.lower_bound(prefix); + it != entries_.end() && (*it).first.rfind(prefix, 0) == 0; ++it) { + cb(&(*it).second); + } +} + template typename BaseStringIndex::Container* BaseStringIndex::GetOrCreate(string_view word) { auto* mr = entries_.get_allocator().resource(); diff --git a/src/core/search/indices.h b/src/core/search/indices.h index 84bedd8eb3a1..61c8a6a01853 100644 --- a/src/core/search/indices.h +++ b/src/core/search/indices.h @@ -11,10 +11,12 @@ #include #include +#include "absl/functional/function_ref.h" #include "base/pmr/memory_resource.h" #include "core/search/base.h" #include "core/search/block_list.h" #include "core/search/compressed_sorted_set.h" +#include "core/search/rax_tree.h" // TODO: move core field definitions out of big header #include "core/search/search.h" @@ -51,37 +53,17 @@ template struct BaseStringIndex : public BaseIndex { // Pointer is valid as long as index is not mutated. Nullptr if not found const Container* Matching(std::string_view str) const; + // Iterate over all Machting on prefix. + void MatchingPrefix(std::string_view prefix, absl::FunctionRef cb) const; + // Returns all the terms that appear as keys in the reverse index. std::vector GetTerms() const; protected: Container* GetOrCreate(std::string_view word); - struct PmrEqual { - using is_transparent = void; - bool operator()(const PMR_NS::string& lhs, const PMR_NS::string& rhs) const { - return lhs == rhs; - } - bool operator()(const PMR_NS::string& lhs, const std::string_view& rhs) const { - return lhs == rhs; - } - }; - - struct PmrHash { - using is_transparent = void; - size_t operator()(const std::string_view& sv) const { - return absl::Hash()(sv); - } - size_t operator()(const PMR_NS::string& pmrs) const { - return operator()(std::string_view{pmrs.data(), pmrs.size()}); - } - }; - bool case_sensitive_ = false; - - absl::flat_hash_map>> - entries_; + search::RaxTreeMap entries_; }; // Index for text fields. diff --git a/src/core/search/lexer.lex b/src/core/search/lexer.lex index 3fa6d1713992..4dd9936f6877 100644 --- a/src/core/search/lexer.lex +++ b/src/core/search/lexer.lex @@ -74,6 +74,7 @@ tag_val_char {term_char}|\\[,.<>{}\[\]\\\"\':;!@#$%^&*()\-+=~\/ ] "$"{term_char}+ return ParseParam(str(), loc()); "@"{term_char}+ return Parser::make_FIELD(str(), loc()); +{term_char}+"*" return Parser::make_PREFIX(str(), loc()); {term_char}+ return Parser::make_TERM(str(), loc()); {tag_val_char}+ return make_TagVal(str(), loc()); diff --git a/src/core/search/parser.y b/src/core/search/parser.y index 42e5113b74e5..b831d85e2c0e 100644 --- a/src/core/search/parser.y +++ b/src/core/search/parser.y @@ -69,7 +69,7 @@ double toDouble(string_view src); // Needed 0 at the end to satisfy bison 3.5.1 %token YYEOF 0 -%token TERM "term" TAG_VAL "tag_val" PARAM "param" FIELD "field" +%token TERM "term" TAG_VAL "tag_val" PARAM "param" FIELD "field" PREFIX "prefix" %precedence TERM TAG_VAL %left OR_OP @@ -132,6 +132,7 @@ search_unary_expr: LPAREN search_expr RPAREN { $$ = std::move($2); } | NOT_OP search_unary_expr { $$ = AstNegateNode(std::move($2)); } | TERM { $$ = AstTermNode(std::move($1)); } + | PREFIX { $$ = AstPrefixNode(std::move($1)); } | UINT32 { $$ = AstTermNode(std::move($1)); } | FIELD COLON field_cond { $$ = AstFieldNode(std::move($1), std::move($3)); } diff --git a/src/core/search/rax_tree.h b/src/core/search/rax_tree.h new file mode 100644 index 000000000000..743b910170b4 --- /dev/null +++ b/src/core/search/rax_tree.h @@ -0,0 +1,158 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "base/pmr/memory_resource.h" + +extern "C" { +#include "redis/rax.h" +} + +namespace dfly::search { + +// absl::flat_hash_map/std::unordered_map compatible tree map based on rax tree. +// Allocates all objects on heap (with custom memory resource) as rax tree operates fully on +// pointers. +// TODO: Add full support for polymorphic allocators, including rax trie node allocations +template struct RaxTreeMap { + struct FindIterator; + + // Simple seeking iterator + struct SeekIterator { + friend struct FindIterator; + + SeekIterator() { + raxStart(&it_, nullptr); + it_.node = nullptr; + } + + ~SeekIterator() { + raxStop(&it_); + } + + SeekIterator(SeekIterator&&) = delete; // self-referential + SeekIterator(const SeekIterator&) = delete; // self-referential + + SeekIterator(rax* tree, const char* op, std::string_view key) { + raxStart(&it_, tree); + raxSeek(&it_, op, to_key_ptr(key), key.size()); + operator++(); + } + + explicit SeekIterator(rax* tree) : SeekIterator(tree, "^", std::string_view{nullptr, 0}) { + } + + bool operator==(const SeekIterator& rhs) const { + return it_.node == rhs.it_.node; + } + + bool operator!=(const SeekIterator& rhs) const { + return !operator==(rhs); + } + + SeekIterator& operator++() { + if (!raxNext(&it_)) { + raxStop(&it_); + it_.node = nullptr; + } + return *this; + } + + std::pair operator*() const { + return {std::string_view{reinterpret_cast(it_.key), it_.key_len}, + *reinterpret_cast(it_.data)}; + } + + private: + raxIterator it_; + }; + + // Result of find() call. Inherits from pair to mimic iterator interface, not incrementable. + struct FindIterator : public std::optional> { + bool operator==(const SeekIterator& rhs) const { + if (this->has_value() != !bool(rhs.it_.flags & RAX_ITER_EOF)) + return false; + if (!this->has_value()) + return true; + return (*this)->first == + std::string_view{reinterpret_cast(rhs.it_.key), rhs.it_.key_len}; + } + + bool operator!=(const SeekIterator& rhs) const { + return !operator==(rhs); + } + }; + + public: + explicit RaxTreeMap(PMR_NS::memory_resource* mr) : tree_(raxNew()), alloc_(mr) { + } + + size_t size() const { + return raxSize(tree_); + } + + auto begin() const { + return SeekIterator{tree_}; + } + + auto end() const { + return SeekIterator{}; + } + + auto lower_bound(std::string_view key) const { + return SeekIterator{tree_, ">=", key}; + } + + FindIterator find(std::string_view key) const { + if (void* ptr = raxFind(tree_, to_key_ptr(key), key.size()); ptr != raxNotFound) + return FindIterator{std::pair(key, *reinterpret_cast(ptr))}; + return FindIterator{std::nullopt}; + } + + template + std::pair try_emplace(std::string_view key, Args&&... args); + + void erase(FindIterator it) { + V* old = nullptr; + raxRemove(tree_, to_key_ptr(it->first.data()), it->first.size(), + reinterpret_cast(&old)); + std::allocator_traits::destroy(alloc_, old); + alloc_.deallocate(old, 1); + } + + auto& get_allocator() const { + return alloc_; + } + + private: + static unsigned char* to_key_ptr(std::string_view key) { + return reinterpret_cast(const_cast(key.data())); + } + + rax* tree_; + PMR_NS::polymorphic_allocator alloc_; +}; + +template +template +std::pair::FindIterator, bool> RaxTreeMap::try_emplace( + std::string_view key, Args&&... args) { + if (auto it = find(key); it) + return {it, false}; + + V* ptr = alloc_.allocate(1); + std::allocator_traits::construct(alloc_, ptr, std::forward(args)...); + + V* old = nullptr; + raxInsert(tree_, to_key_ptr(key), key.size(), ptr, reinterpret_cast(&old)); + assert(old == nullptr); + + auto it = std::make_optional(std::pair(key, *ptr)); + return std::make_pair(FindIterator{it}, true); +} + +} // namespace dfly::search diff --git a/src/core/search/rax_tree_test.cc b/src/core/search/rax_tree_test.cc new file mode 100644 index 000000000000..69179e705ea3 --- /dev/null +++ b/src/core/search/rax_tree_test.cc @@ -0,0 +1,107 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "core/search/rax_tree.h" + +#include +#include +#include +#include + +#include +#include + +#include "base/gtest.h" +#include "base/iterator.h" +#include "base/logging.h" + +extern "C" { +#include "redis/zmalloc.h" +} + +namespace dfly::search { + +using namespace std; + +struct RaxTreeTest : public ::testing::Test { + static void SetUpTestSuite() { + auto* tlh = mi_heap_get_backing(); + init_zmalloc_threadlocal(tlh); + } +}; + +TEST_F(RaxTreeTest, EmplaceAndIterate) { + RaxTreeMap map(pmr::get_default_resource()); + + vector> elements(90); + for (int i = 10; i < 100; i++) + elements[i - 10] = make_pair(absl::StrCat("key-", i), absl::StrCat("value-", i)); + + for (auto& [key, value] : elements) { + auto [it, inserted] = map.try_emplace(key, value); + EXPECT_TRUE(inserted); + EXPECT_EQ(it->first, key); + EXPECT_EQ(it->second, value); + } + + size_t i = 0; + for (auto [key, value] : map) { + EXPECT_EQ(elements[i].first, key); + EXPECT_EQ(elements[i].second, value); + i++; + } +} + +TEST_F(RaxTreeTest, LowerBound) { + RaxTreeMap map(pmr::get_default_resource()); + vector keys; + + for (unsigned i = 0; i < 5; i++) { + for (unsigned j = 0; j < 5; j++) { + keys.emplace_back(absl::StrCat("key-", string(1, 'a' + i), "-", j)); + map.try_emplace(keys.back(), 0); + } + } + + auto it1 = map.lower_bound("key-c-3"); + auto it2 = lower_bound(keys.begin(), keys.end(), "key-c-3"); + + while (it1 != map.end()) { + EXPECT_EQ((*it1).first, *it2); + ++it1; + ++it2; + } + + EXPECT_TRUE(it1 == map.end()); + EXPECT_TRUE(it2 == keys.end()); + + // Test lower bound empty string + vector keys2; + for (auto it = map.lower_bound(string_view{}); it != map.end(); ++it) + keys2.emplace_back((*it).first); + EXPECT_EQ(keys, keys2); +} + +TEST_F(RaxTreeTest, Find) { + RaxTreeMap map(pmr::get_default_resource()); + for (unsigned i = 100; i < 999; i += 2) + map.try_emplace(absl::StrCat("value-", i), i); + + auto it = map.begin(); + for (unsigned i = 100; i < 999; i++) { + auto fit = map.find(absl::StrCat("value-", i)); + if (i % 2 == 0) { + EXPECT_TRUE(fit == it); + EXPECT_EQ(fit->second, i); + ++it; + } else { + EXPECT_TRUE(fit == map.end()); + } + } + + // Test find with empty string + EXPECT_TRUE(map.find(string_view{}) == map.end()); +} + +} // namespace dfly::search diff --git a/src/core/search/search.cc b/src/core/search/search.cc index 2ce14b97f2f2..a378cb80dc86 100644 --- a/src/core/search/search.cc +++ b/src/core/search/search.cc @@ -119,6 +119,7 @@ struct ProfileBuilder { Overloaded node_info{ [](monostate) -> string { return ""s; }, [](const AstTermNode& n) { return absl::StrCat("Term{", n.term, "}"); }, + [](const AstPrefixNode& n) { return absl::StrCat("Prefix{", n.prefix, "}"); }, [](const AstRangeNode& n) { return absl::StrCat("Range{", n.lo, "<>", n.hi, "}"); }, [](const AstLogicalNode& n) { auto op = n.op == AstLogicalNode::AND ? "and" : "or"; @@ -270,6 +271,28 @@ struct BasicSearch { return UnifyResults(GetSubResults(selected_indices, mapping), LogicOp::OR); } + IndexResult Search(const AstPrefixNode& node, string_view active_field) { + vector indices; + if (!active_field.empty()) { + if (auto* index = GetIndex(active_field); index) + indices = {index}; + else + return IndexResult{}; + } else { + indices = indices_->GetAllTextIndices(); + } + + auto mapping = [&node, this](TextIndex* index) { + IndexResult result{}; + index->MatchingPrefix(node.prefix, [&result, this](const auto* c) { + Merge(IndexResult{c}, &result, LogicOp::OR); + }); + return result; + }; + + return UnifyResults(GetSubResults(indices, mapping), LogicOp::OR); + } + // [range]: access field's numeric index IndexResult Search(const AstRangeNode& node, string_view active_field) { DCHECK(!active_field.empty()); diff --git a/src/core/search/search_parser_test.cc b/src/core/search/search_parser_test.cc index e32fc3fa3ef9..93baef9341ae 100644 --- a/src/core/search/search_parser_test.cc +++ b/src/core/search/search_parser_test.cc @@ -73,8 +73,10 @@ TEST_F(SearchParserTest, Scanner) { NEXT_EQ(TOK_TERM, string, "cd"); NEXT_TOK(TOK_YYEOF); - SetInput("(5a 6) "); + SetInput("*"); + NEXT_TOK(TOK_STAR); + SetInput("(5a 6) "); NEXT_TOK(TOK_LPAREN); NEXT_EQ(TOK_TERM, string, "5a"); NEXT_EQ(TOK_UINT32, string, "6"); @@ -151,6 +153,36 @@ TEST_F(SearchParserTest, Scanner) { NEXT_EQ(TOK_TAG_VAL, string, "blue]1#-"); NEXT_TOK(TOK_RCURLBR); + // Prefix simple + SetInput("pre*"); + NEXT_EQ(TOK_PREFIX, string, "pre*"); + + // TODO: uncomment when we support escaped terms + // Prefix escaped (redis doesn't support quoted prefix matches) + // SetInput("pre\\**"); + // NEXT_EQ(TOK_PREFIX, string, "pre*"); + + // Prefix in tag + SetInput("@color:{prefix*}"); + NEXT_EQ(TOK_FIELD, string, "@color"); + NEXT_TOK(TOK_COLON); + NEXT_TOK(TOK_LCURLBR); + NEXT_EQ(TOK_PREFIX, string, "prefix*"); + NEXT_TOK(TOK_RCURLBR); + + // Prefix escaped star + SetInput("@color:{\"prefix*\"}"); + NEXT_EQ(TOK_FIELD, string, "@color"); + NEXT_TOK(TOK_COLON); + NEXT_TOK(TOK_LCURLBR); + NEXT_EQ(TOK_TERM, string, "prefix*"); + NEXT_TOK(TOK_RCURLBR); + + // Prefix spaced with star + SetInput("pre *"); + NEXT_EQ(TOK_TERM, string, "pre"); + NEXT_TOK(TOK_STAR); + SetInput("почтальон Печкин"); NEXT_EQ(TOK_TERM, string, "почтальон"); NEXT_EQ(TOK_TERM, string, "Печкин"); @@ -172,6 +204,12 @@ TEST_F(SearchParserTest, Parse) { EXPECT_EQ(1, Parse(" foo:bar ")); EXPECT_EQ(1, Parse(" @foo:@bar ")); EXPECT_EQ(1, Parse(" @foo: ")); + + // We don't support suffix/any other position for now + EXPECT_EQ(1, Parse("*pre")); + EXPECT_EQ(1, Parse("*pre*")); + + EXPECT_EQ(1, Parse("pre***")); } TEST_F(SearchParserTest, ParseParams) { diff --git a/src/core/search/search_test.cc b/src/core/search/search_test.cc index a5bc5495acc9..f61eb37e7661 100644 --- a/src/core/search/search_test.cc +++ b/src/core/search/search_test.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -22,6 +23,10 @@ #include "core/search/query_driver.h" #include "core/search/vector_utils.h" +extern "C" { +#include "redis/zmalloc.h" +} + namespace dfly { namespace search { @@ -80,6 +85,11 @@ Schema MakeSimpleSchema(initializer_listWrite( + socket_->Write( io::Buffer("-ERR Bad TLS header, double check " "if you enabled TLS for your client.\r\n")); } @@ -697,7 +696,6 @@ void Connection::HandleRequests() { LOG(WARNING) << "Error handshaking " << aresult.error().message(); return; } - peer = socket_.get(); VLOG(1) << "TLS handshake succeeded"; } } @@ -705,7 +703,7 @@ void Connection::HandleRequests() { io::Result http_res{false}; - http_res = CheckForHttpProto(peer); + http_res = CheckForHttpProto(); // We need to check if the socket is open because the server might be // shutting down. During the shutdown process, the server iterates over @@ -717,12 +715,14 @@ void Connection::HandleRequests() { // because both Write and Recv internally check if the socket was shut // down and return with an error accordingly. if (http_res && socket_->IsOpen()) { - cc_.reset(service_->CreateContext(peer, this)); + cc_.reset(service_->CreateContext(socket_.get(), this)); + reply_builder_ = cc_->reply_builder(); + if (*http_res) { VLOG(1) << "HTTP1.1 identified"; is_http_ = true; HttpConnection http_conn{http_listener_}; - http_conn.SetSocket(peer); + http_conn.SetSocket(socket_.get()); http_conn.set_user_data(cc_.get()); // We validate the http request using basic-auth inside HttpConnection::HandleSingleRequest. @@ -741,13 +741,14 @@ void Connection::HandleRequests() { socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); }); } - ConnectionFlow(peer); + ConnectionFlow(); socket_->CancelOnErrorCb(); // noop if nothing is registered. } VLOG(1) << "Closed connection for peer " << GetClientInfo(fb2::ProactorBase::me()->GetPoolIndex()); cc_.reset(); + reply_builder_ = nullptr; } } @@ -801,13 +802,14 @@ std::pair Connection::GetClientInfoBeforeAfterTid() co string_view phase_name = PHASE_NAMES[phase_]; if (cc_) { - DCHECK(cc_->reply_builder()); + DCHECK(cc_->reply_builder() && reply_builder_); string cc_info = service_->GetContextInfo(cc_.get()).Format(); - if (cc_->reply_builder()->IsSendActive()) + if (reply_builder_->IsSendActive()) phase_name = "send"; absl::StrAppend(&after, " ", cc_info); } absl::StrAppend(&after, " phase=", phase_name); + return {std::move(before), std::move(after)}; } @@ -872,7 +874,7 @@ const absl::flat_hash_map& Connection::GetLibStatsTL() { return g_libname_ver_map; } -io::Result Connection::CheckForHttpProto(FiberSocketBase* peer) { +io::Result Connection::CheckForHttpProto() { if (!IsPrivileged() && !IsMain()) { return false; } @@ -883,6 +885,7 @@ io::Result Connection::CheckForHttpProto(FiberSocketBase* peer) { } size_t last_len = 0; + auto* peer = socket_.get(); do { auto buf = io_buf_.AppendBuffer(); DCHECK(!buf.empty()); @@ -916,27 +919,26 @@ io::Result Connection::CheckForHttpProto(FiberSocketBase* peer) { return false; } -void Connection::ConnectionFlow(FiberSocketBase* peer) { +void Connection::ConnectionFlow() { ++stats_->num_conns; ++stats_->conn_received_cnt; stats_->read_buf_capacity += io_buf_.Capacity(); ParserStatus parse_status = OK; - SinkReplyBuilder* orig_builder = cc_->reply_builder(); // At the start we read from the socket to determine the HTTP/Memstore protocol. // Therefore we may already have some data in the buffer. if (io_buf_.InputLen() > 0) { phase_ = PROCESS; if (redis_parser_) { - parse_status = ParseRedis(orig_builder); + parse_status = ParseRedis(); } else { DCHECK(memcache_parser_); parse_status = ParseMemcache(); } } - error_code ec = orig_builder->GetError(); + error_code ec = reply_builder_->GetError(); // Main loop. if (parse_status != ERROR && !ec) { @@ -944,7 +946,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { UpdateIoBufCapacity(io_buf_, stats_, [&]() { io_buf_.EnsureCapacity(io_buf_.Capacity() * 2); }); } - auto res = IoLoop(peer, orig_builder); + auto res = IoLoop(); if (holds_alternative(res)) { ec = get(res); @@ -975,10 +977,10 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { VLOG(1) << "Error parser status " << parser_error_; if (redis_parser_) { - SendProtocolError(RedisParser::Result(parser_error_), orig_builder); + SendProtocolError(RedisParser::Result(parser_error_), reply_builder_); } else { DCHECK(memcache_parser_); - orig_builder->SendProtocolError("bad command line format"); + reply_builder_->SendProtocolError("bad command line format"); } // Shut down the servers side of the socket to send a FIN to the client @@ -988,15 +990,15 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { // Otherwise the clients write could fail (or block), so they would never // read the above protocol error (see issue #1327). // TODO: we have a bug that can potentially deadlock the code below. - // If a peer does not close the socket on the other side, the while loop will never finish. + // If the socket does not close the socket on the other side, the while loop will never finish. // to reproduce: nc localhost 6379 and then run invalid sequence: *1 *1 - error_code ec2 = peer->Shutdown(SHUT_WR); + error_code ec2 = socket_->Shutdown(SHUT_WR); LOG_IF(WARNING, ec2) << "Could not shutdown socket " << ec2; if (!ec2) { while (true) { // Discard any received data. io_buf_.Clear(); - if (!peer->Recv(io_buf_.AppendBuffer())) { + if (!socket_->Recv(io_buf_.AppendBuffer())) { break; } } @@ -1064,7 +1066,7 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_ } } -Connection::ParserStatus Connection::ParseRedis(SinkReplyBuilder* orig_builder) { +Connection::ParserStatus Connection::ParseRedis() { uint32_t consumed = 0; RedisParser::Result result = RedisParser::OK; @@ -1096,7 +1098,7 @@ Connection::ParserStatus Connection::ParseRedis(SinkReplyBuilder* orig_builder) DispatchSingle(has_more, dispatch_sync, dispatch_async); } io_buf_.ConsumeInput(consumed); - } while (RedisParser::OK == result && !orig_builder->GetError()); + } while (RedisParser::OK == result && !reply_builder_->GetError()); parser_error_ = result; if (result == RedisParser::OK) @@ -1120,7 +1122,7 @@ auto Connection::ParseMemcache() -> ParserStatus { return {make_unique(std::move(cmd), value)}; }; - MCReplyBuilder* builder = static_cast(cc_->reply_builder()); + MCReplyBuilder* builder = static_cast(reply_builder_); do { string_view str = ToSV(io_buf_.InputBuffer()); @@ -1179,10 +1181,10 @@ void Connection::OnBreakCb(int32_t mask) { return; } - DCHECK(cc_->reply_builder()) << "[" << id_ << "] " << phase_ << " " << migration_in_process_; + DCHECK(reply_builder_) << "[" << id_ << "] " << phase_ << " " << migration_in_process_; VLOG(1) << "[" << id_ << "] Got event " << mask << " " << phase_ << " " - << cc_->reply_builder()->IsSendActive() << " " << cc_->reply_builder()->GetError(); + << reply_builder_->IsSendActive() << " " << reply_builder_->GetError(); cc_->conn_closing = true; BreakOnce(mask); @@ -1212,6 +1214,7 @@ void Connection::HandleMigrateRequest() { // We need to return early as the socket is closing and IoLoop will clean up. // The reason that this is true is because of the following DCHECK DCHECK(!dispatch_fb_.IsJoinable()); + // which can never trigger since we Joined on the dispatch_fb_ above and we are // atomic in respect to our proactor meaning that no other fiber will // launch the DispatchFiber. @@ -1219,17 +1222,14 @@ void Connection::HandleMigrateRequest() { return; } } - - // In case we Yield()ed in Migrate() above, dispatch_fb_ might have been started. - LaunchDispatchFiberIfNeeded(); } -auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_builder) - -> variant { +auto Connection::IoLoop() -> variant { error_code ec; ParserStatus parse_status = OK; size_t max_iobfuf_len = GetFlag(FLAGS_max_client_iobuf_len); + auto* peer = socket_.get(); do { HandleMigrateRequest(); @@ -1256,7 +1256,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil bool is_iobuf_full = io_buf_.AppendLen() == 0; if (redis_parser_) { - parse_status = ParseRedis(orig_builder); + parse_status = ParseRedis(); } else { DCHECK(memcache_parser_); parse_status = ParseMemcache(); @@ -1303,7 +1303,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil } else if (parse_status != OK) { break; } - ec = orig_builder->GetError(); + ec = reply_builder_->GetError(); } while (peer->IsOpen() && !ec); if (ec) @@ -1337,7 +1337,7 @@ bool Connection::ShouldEndDispatchFiber(const MessageHandle& msg) { return false; } -void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) { +void Connection::SquashPipeline() { DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_); vector squash_cmds; @@ -1356,8 +1356,8 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) { size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get()); if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared - builder->FlushBatch(); - builder->SetBatchMode(false); // in case the next dispatch is sync + reply_builder_->FlushBatch(); + reply_builder_->SetBatchMode(false); // in case the next dispatch is sync } cc_->async_dispatch = false; @@ -1376,7 +1376,7 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) { } void Connection::ClearPipelinedMessages() { - DispatchOperations dispatch_op{cc_->reply_builder(), this}; + DispatchOperations dispatch_op{reply_builder_, this}; // Recycle messages even from disconnecting client to keep properly track of memory stats // As well as to avoid pubsub backpressure leakege. @@ -1421,17 +1421,17 @@ std::string Connection::DebugInfo() const { // into the dispatch queue and DispatchFiber will run those commands asynchronously with // InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the // DispatchFiber. -void Connection::ExecutionFiber(util::FiberSocketBase* peer) { +void Connection::ExecutionFiber() { ThisFiber::SetName("ExecutionFiber"); - SinkReplyBuilder* builder = cc_->reply_builder(); - DispatchOperations dispatch_op{builder, this}; + + DispatchOperations dispatch_op{reply_builder_, this}; size_t squashing_threshold = GetFlag(FLAGS_pipeline_squash); uint64_t prev_epoch = fb2::FiberSwitchEpoch(); fb2::NoOpLock noop_lk; - while (!builder->GetError()) { + while (!reply_builder_->GetError()) { DCHECK_EQ(socket()->proactor(), ProactorBase::me()); cnd_.wait(noop_lk, [this] { return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch); @@ -1455,7 +1455,7 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) { } prev_epoch = cur_epoch; - builder->SetBatchMode(dispatch_q_.size() > 1); + reply_builder_->SetBatchMode(dispatch_q_.size() > 1); bool subscriber_over_limit = stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit; @@ -1468,7 +1468,7 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) { bool threshold_reached = pending_pipeline_cmd_cnt_ > squashing_threshold; bool are_all_plain_cmds = pending_pipeline_cmd_cnt_ == dispatch_q_.size(); if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) { - SquashPipeline(builder); + SquashPipeline(); } else { MessageHandle msg = std::move(dispatch_q_.front()); dispatch_q_.pop_front(); @@ -1477,7 +1477,7 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) { // last command to reply and flush. If it doesn't reply (i.e. is a control message like // migrate), we have to flush manually. if (dispatch_q_.empty() && !msg.IsReplying()) { - builder->FlushBatch(); + reply_builder_->FlushBatch(); } if (ShouldEndDispatchFiber(msg)) { @@ -1505,7 +1505,7 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) { queue_backpressure_->pubsub_ec.notify(); } - DCHECK(cc_->conn_closing || builder->GetError()); + DCHECK(cc_->conn_closing || reply_builder_->GetError()); cc_->conn_closing = true; queue_backpressure_->pipeline_cnd.notify_all(); } @@ -1582,6 +1582,7 @@ bool Connection::Migrate(util::fb2::ProactorBase* dest) { } listener()->Migrate(this, dest); + // After we migrate, it could be the case the connection was shut down. We should // act accordingly. if (!socket()->IsOpen()) { @@ -1646,8 +1647,8 @@ void Connection::SendInvalidationMessageAsync(InvalidationMessage msg) { void Connection::LaunchDispatchFiberIfNeeded() { if (!dispatch_fb_.IsJoinable() && !migration_in_process_) { VLOG(1) << "[" << id_ << "] LaunchDispatchFiberIfNeeded "; - dispatch_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch", - [this, peer = socket_.get()]() { ExecutionFiber(peer); }); + dispatch_fb_ = + fb2::Fiber(fb2::Launch::post, "connection_dispatch", [this]() { ExecutionFiber(); }); } } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index bb9613ff1844..2ff58198e4e5 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -331,14 +331,13 @@ class Connection : public util::Connection { void HandleRequests() final; // Start dispatch fiber and run IoLoop. - void ConnectionFlow(util::FiberSocketBase* peer); + void ConnectionFlow(); // Main loop reading client messages and passing requests to dispatch queue. - std::variant IoLoop(util::FiberSocketBase* peer, - SinkReplyBuilder* orig_builder); + std::variant IoLoop(); // Returns true if HTTP header is detected. - io::Result CheckForHttpProto(util::FiberSocketBase* peer); + io::Result CheckForHttpProto(); // Dispatches a single (Redis or MC) command. // `has_more` should indicate whether the io buffer has more commands @@ -348,7 +347,7 @@ class Connection : public util::Connection { absl::FunctionRef cmd_msg_cb); // Handles events from dispatch queue. - void ExecutionFiber(util::FiberSocketBase* peer); + void ExecutionFiber(); void SendAsync(MessageHandle msg); @@ -358,7 +357,7 @@ class Connection : public util::Connection { // Create new pipeline request, re-use from pool when possible. PipelineMessagePtr FromArgs(RespVec args, mi_heap_t* heap); - ParserStatus ParseRedis(SinkReplyBuilder* orig_builder); + ParserStatus ParseRedis(); ParserStatus ParseMemcache(); void OnBreakCb(int32_t mask); @@ -373,8 +372,9 @@ class Connection : public util::Connection { bool ShouldEndDispatchFiber(const MessageHandle& msg); void LaunchDispatchFiberIfNeeded(); // Dispatch fiber is started lazily + // Squashes pipelined commands from the dispatch queue to spread load over all threads - void SquashPipeline(facade::SinkReplyBuilder*); + void SquashPipeline(); // Clear pipelined messages, disaptching only intrusive ones. void ClearPipelinedMessages(); @@ -398,6 +398,9 @@ class Connection : public util::Connection { Protocol protocol_; ConnectionStats* stats_ = nullptr; + // cc_->reply_builder may change during the lifetime of the connection, due to injections. + // This is a pointer to the original, socket based reply builder that never changes. + SinkReplyBuilder* reply_builder_ = nullptr; util::HttpListenerBase* http_listener_; SSL_CTX* ssl_ctx_; diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index 043b7a49eb1d..c14152a6a4c6 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -548,6 +548,8 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) { if (!absl::SimpleAtoi(ArgS(args, 1), &value)) { return cntx->SendError(kInvalidIntErr); + } else if (value != 0 && value != 1) { + return cntx->SendError("The bit argument must be 1 or 0"); } if (args.size() >= 3) { @@ -1011,20 +1013,20 @@ nonstd::expected ParseCommonAttr(CmdArgParser* pa // Returns the CommandList if the parsing completed succefully or std::string // to indicate an error nonstd::expected ParseToCommandList(CmdArgList args, bool read_only) { - enum class Cmds { OVERFLOW, GET, SET, INCRBY }; + enum class Cmds { OVERFLOW_OPT, GET_OPT, SET_OPT, INCRBY_OPT }; CommandList result; using nonstd::make_unexpected; CmdArgParser parser(args); while (parser.HasNext()) { - auto cmd = parser.MapNext("OVERFLOW", Cmds::OVERFLOW, "GET", Cmds::GET, "SET", Cmds::SET, - "INCRBY", Cmds::INCRBY); + auto cmd = parser.MapNext("OVERFLOW", Cmds::OVERFLOW_OPT, "GET", Cmds::GET_OPT, "SET", + Cmds::SET_OPT, "INCRBY", Cmds::INCRBY_OPT); if (parser.Error()) { return make_unexpected(kSyntaxErr); } - if (cmd == Cmds::OVERFLOW) { + if (cmd == Cmds::OVERFLOW_OPT) { if (read_only) { make_unexpected("BITFIELD_RO only supports the GET subcommand"); } @@ -1045,7 +1047,7 @@ nonstd::expected ParseToCommandList(CmdArgList args, b } auto attr = maybe_attr.value(); - if (cmd == Cmds::GET) { + if (cmd == Cmds::GET_OPT) { result.push_back(Command(Get(attr))); continue; } @@ -1058,12 +1060,12 @@ nonstd::expected ParseToCommandList(CmdArgList args, b if (parser.Error()) { return make_unexpected(kSyntaxErr); } - if (cmd == Cmds::SET) { + if (cmd == Cmds::SET_OPT) { result.push_back(Command(Set(attr, value))); continue; } - if (cmd == Cmds::INCRBY) { + if (cmd == Cmds::INCRBY_OPT) { result.push_back(Command(IncrBy(attr, value))); continue; } @@ -1340,11 +1342,15 @@ OpResult FindFirstBitWithValue(const OpArgs& op_args, std::string_view int64_t start, int64_t end, bool as_bit) { OpResult value = ReadValue(op_args.db_cntx, key, op_args.shard); - std::string_view value_str; - if (value) { // non-existent keys are treated as empty strings, per Redis - value_str = value.value(); + // non-existent keys are handled exactly as in Redis's implementation, + // even though it contradicts its docs: + // If a clear bit isn't found in the specified range, the function returns -1 + // as the user specified a clear range and there are no 0 bits in that range + if (!value) { + return bit_value ? -1 : 0; } + std::string_view value_str = value.value(); int64_t size = value_str.size(); if (as_bit) { size *= OFFSET_FACTOR; diff --git a/src/server/bitops_family_test.cc b/src/server/bitops_family_test.cc index 8eaf15dc97b0..17bcca6a0161 100644 --- a/src/server/bitops_family_test.cc +++ b/src/server/bitops_family_test.cc @@ -555,8 +555,15 @@ TEST_F(BitOpsFamilyTest, BitPos) { EXPECT_EQ(-1, CheckedInt({"bitpos", "empty", "0"})); EXPECT_EQ(-1, CheckedInt({"bitpos", "empty", "0", "1"})); - // Non-existent key should be treated like an empty string. - EXPECT_EQ(-1, CheckedInt({"bitpos", "d", "0"})); + // Non-existent key should be treated like padded with zeros string. + EXPECT_EQ(-1, CheckedInt({"bitpos", "d", "1"})); + EXPECT_EQ(0, CheckedInt({"bitpos", "d", "0"})); + + // Make sure we accept only 0 and 1 for the bit mode arguement. + const auto argument_must_be_0_or_1_error = ErrArg("ERR The bit argument must be 1 or 0"); + ASSERT_THAT(Run({"bitpos", "d", "2"}), argument_must_be_0_or_1_error); + ASSERT_THAT(Run({"bitpos", "d", "42"}), argument_must_be_0_or_1_error); + ASSERT_THAT(Run({"bitpos", "d", "-1"}), argument_must_be_0_or_1_error); } TEST_F(BitOpsFamilyTest, BitFieldParsing) { diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index 9f76be02e936..675b52e42241 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -221,7 +221,7 @@ optional> ParseMigrations(const JsonType& json) { } optional BuildClusterConfigFromJson(const JsonType& json) { - ClusterShardInfos config; + std::vector config; if (!json.is_array()) { LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json; @@ -271,7 +271,7 @@ optional BuildClusterConfigFromJson(const JsonType& json) { config.push_back(std::move(shard)); } - return config; + return ClusterShardInfos(config); } } // namespace diff --git a/src/server/cluster/cluster_config_test.cc b/src/server/cluster/cluster_config_test.cc index ed6c65bd949a..ad5b0484149a 100644 --- a/src/server/cluster/cluster_config_test.cc +++ b/src/server/cluster/cluster_config_test.cc @@ -105,14 +105,15 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) { TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) { EXPECT_EQ(ClusterConfig::CreateFromConfig( - kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}), - .master = {.id = "other", .ip = "192.168.0.100", .port = 7000}, - .replicas = {}, - .migrations = {}}, - {.slot_ranges = SlotRanges({{.start = 0, .end = 0}}), - .master = {.id = "other2", .ip = "192.168.0.101", .port = 7001}, - .replicas = {}, - .migrations = {}}}), + kMyId, + ClusterShardInfos({{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}), + .master = {.id = "other", .ip = "192.168.0.100", .port = 7000}, + .replicas = {}, + .migrations = {}}, + {.slot_ranges = SlotRanges({{.start = 0, .end = 0}}), + .master = {.id = "other2", .ip = "192.168.0.101", .port = 7001}, + .replicas = {}, + .migrations = {}}})), nullptr); } @@ -150,18 +151,19 @@ TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) { TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) { auto config = ClusterConfig::CreateFromConfig( - kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}), - .master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000}, - .replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}}, - .migrations = {}}, - {.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}), - .master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002}, - .replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}}, - .migrations = {}}, - {.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}), - .master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004}, - .replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}}, - .migrations = {}}}); + kMyId, ClusterShardInfos( + {{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}), + .master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000}, + .replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}}, + .migrations = {}}, + {.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}), + .master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002}, + .replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}}, + .migrations = {}}, + {.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}), + .master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004}, + .replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}}, + .migrations = {}}})); EXPECT_NE(config, nullptr); SlotSet owned_slots = config->GetOwnedSlots(); EXPECT_EQ(owned_slots.ToSlotRanges().Size(), 1); @@ -609,4 +611,98 @@ TEST_F(ClusterConfigTest, SlotSetAPI) { } } +TEST_F(ClusterConfigTest, ConfigComparison) { + auto config1 = ClusterConfig::CreateFromConfig("id0", R"json( + [ + { + "slot_ranges": [ { "start": 0, "end": 8000 } ], + "master": { "id": "id0", "ip": "localhost", "port": 3000 }, + "replicas": [], + "migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ] + , "ip": "127.0.0.1", "port" : 9001, "node_id": "id1" }] + }, + { + "slot_ranges": [ { "start": 8001, "end": 16383 } ], + "master": { "id": "id1", "ip": "localhost", "port": 3001 }, + "replicas": [] + } + ])json"); + + EXPECT_EQ(config1->GetConfig(), config1->GetConfig()); + + auto config2 = ClusterConfig::CreateFromConfig("id0", R"json( + [ + { + "slot_ranges": [ { "start": 0, "end": 16383 } ], + "master": { "id": "id0", "ip": "localhost", "port": 3000 }, + "replicas": [], + "migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ] + , "ip": "127.0.0.1", "port" : 9001, "node_id": "id1" }] + } + ])json"); + EXPECT_NE(config1->GetConfig(), config2->GetConfig()); + EXPECT_EQ(config2->GetConfig(), config2->GetConfig()); + + auto config3 = ClusterConfig::CreateFromConfig("id0", R"json( + [ + { + "slot_ranges": [ { "start": 0, "end": 8000 } ], + "master": { "id": "id0", "ip": "localhost", "port": 3000 }, + "replicas": [], + "migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ] + , "ip": "127.0.0.1", "port" : 9002, "node_id": "id1" }] + }, + { + "slot_ranges": [ { "start": 8001, "end": 16383 } ], + "master": { "id": "id1", "ip": "localhost", "port": 3001 }, + "replicas": [] + } + ])json"); + EXPECT_NE(config1->GetConfig(), config3->GetConfig()); + EXPECT_NE(config2->GetConfig(), config3->GetConfig()); + EXPECT_EQ(config3->GetConfig(), config3->GetConfig()); + + auto config4 = ClusterConfig::CreateFromConfig("id0", R"json( + [ + { + "slot_ranges": [ { "start": 0, "end": 8000 } ], + "master": { "id": "id0", "ip": "localhost", "port": 3000 }, + "replicas": [], + "migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ] + , "ip": "127.0.0.1", "port" : 9001, "node_id": "id2" }] + }, + { + "slot_ranges": [ { "start": 8001, "end": 16383 } ], + "master": { "id": "id1", "ip": "localhost", "port": 3001 }, + "replicas": [] + } + ])json"); + + EXPECT_NE(config1->GetConfig(), config4->GetConfig()); + EXPECT_NE(config2->GetConfig(), config4->GetConfig()); + EXPECT_NE(config3->GetConfig(), config4->GetConfig()); + EXPECT_EQ(config4->GetConfig(), config4->GetConfig()); + + auto config5 = ClusterConfig::CreateFromConfig("id0", R"json( + [ + { + "slot_ranges": [ { "start": 0, "end": 8000 } ], + "master": { "id": "id2", "ip": "localhost", "port": 3000 }, + "replicas": [], + "migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ] + , "ip": "127.0.0.1", "port" : 9001, "node_id": "id1" }] + }, + { + "slot_ranges": [ { "start": 8001, "end": 16383 } ], + "master": { "id": "id1", "ip": "localhost", "port": 3001 }, + "replicas": [] + } + ])json"); + EXPECT_NE(config1->GetConfig(), config5->GetConfig()); + EXPECT_NE(config2->GetConfig(), config5->GetConfig()); + EXPECT_NE(config3->GetConfig(), config5->GetConfig()); + EXPECT_NE(config4->GetConfig(), config5->GetConfig()); + EXPECT_EQ(config5->GetConfig(), config5->GetConfig()); +} + } // namespace dfly::cluster diff --git a/src/server/cluster/cluster_defs.cc b/src/server/cluster/cluster_defs.cc index 0ff86872abd1..d5ae508fa91c 100644 --- a/src/server/cluster/cluster_defs.cc +++ b/src/server/cluster/cluster_defs.cc @@ -49,6 +49,26 @@ std::string MigrationInfo::ToString() const { slot_ranges.ToString(), ")"); } +bool ClusterShardInfo::operator==(const ClusterShardInfo& r) const { + if (slot_ranges == r.slot_ranges && master == r.master) { + auto lreplicas = replicas; + auto lmigrations = migrations; + auto rreplicas = r.replicas; + auto rmigrations = r.migrations; + std::sort(lreplicas.begin(), lreplicas.end()); + std::sort(lmigrations.begin(), lmigrations.end()); + std::sort(rreplicas.begin(), rreplicas.end()); + std::sort(rmigrations.begin(), rmigrations.end()); + return lreplicas == rreplicas && lmigrations == rmigrations; + } + return false; +} + +ClusterShardInfos::ClusterShardInfos(std::vector infos) + : infos_(std::move(infos)) { + std::sort(infos_.begin(), infos_.end()); +} + namespace { enum class ClusterMode { kUninitialized, diff --git a/src/server/cluster/cluster_defs.h b/src/server/cluster/cluster_defs.h index da89ec5c1a47..1e0c820fe344 100644 --- a/src/server/cluster/cluster_defs.h +++ b/src/server/cluster/cluster_defs.h @@ -90,6 +90,10 @@ struct ClusterNodeInfo { bool operator==(const ClusterNodeInfo& r) const noexcept { return port == r.port && ip == r.ip && id == r.id; } + + bool operator<(const ClusterNodeInfo& r) const noexcept { + return id < r.id; + } }; struct MigrationInfo { @@ -100,6 +104,10 @@ struct MigrationInfo { return node_info == r.node_info && slot_ranges == r.slot_ranges; } + bool operator<(const MigrationInfo& r) const noexcept { + return node_info < r.node_info; + } + std::string ToString() const; }; @@ -108,9 +116,48 @@ struct ClusterShardInfo { ClusterNodeInfo master; std::vector replicas; std::vector migrations; + + bool operator==(const ClusterShardInfo& r) const; + + bool operator<(const ClusterShardInfo& r) const noexcept { + return master < r.master; + } }; -using ClusterShardInfos = std::vector; +class ClusterShardInfos { + public: + ClusterShardInfos() = default; + ClusterShardInfos(std::vector infos); + ClusterShardInfos(ClusterShardInfo info) : infos_({info}) { + } + + auto begin() const noexcept { + return infos_.cbegin(); + } + + auto end() const noexcept { + return infos_.cend(); + } + + auto size() const noexcept { + return infos_.size(); + } + + bool empty() const noexcept { + return infos_.empty(); + } + + bool operator==(const ClusterShardInfos& r) const noexcept { + return infos_ == r.infos_; + } + + bool operator!=(const ClusterShardInfos& r) const noexcept { + return infos_ != r.infos_; + } + + private: + std::vector infos_; +}; // MigrationState constants are ordered in state changing order enum class MigrationState : uint8_t { diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 0337818df346..4f188fd87a45 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -530,6 +530,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) if (new_config == nullptr) { LOG(WARNING) << "Can't set cluster config"; return cntx->SendError("Invalid cluster configuration."); + } else if (tl_cluster_config && tl_cluster_config->GetConfig() == new_config->GetConfig()) { + return cntx->SendOk(); } PreparedToRemoveOutgoingMigrations outgoing_migrations; // should be removed without mutex lock diff --git a/src/server/detail/save_stages_controller.cc b/src/server/detail/save_stages_controller.cc index 5280348b186d..924aa427d97b 100644 --- a/src/server/detail/save_stages_controller.cc +++ b/src/server/detail/save_stages_controller.cc @@ -344,6 +344,7 @@ void SaveStagesController::InitResources() { GenericError SaveStagesController::FinalizeFileMovement() { if (is_cloud_) return {}; + DVLOG(1) << "FinalizeFileMovement start"; // If the shared_err is set, the snapshot saving failed bool has_error = bool(shared_err_); @@ -358,6 +359,7 @@ GenericError SaveStagesController::FinalizeFileMovement() { if (ec) break; } + DVLOG(1) << "FinalizeFileMovement end"; return GenericError(ec); } diff --git a/src/server/detail/snapshot_storage.cc b/src/server/detail/snapshot_storage.cc index cead0bcb4edd..c10280d87cda 100644 --- a/src/server/detail/snapshot_storage.cc +++ b/src/server/detail/snapshot_storage.cc @@ -55,12 +55,15 @@ FileSnapshotStorage::FileSnapshotStorage(fb2::FiberQueueThreadPool* fq_threadpoo io::Result, GenericError> FileSnapshotStorage::OpenWriteFile( const std::string& path) { if (fq_threadpool_) { // EPOLL - auto res = OpenFiberWriteFile(path, fq_threadpool_); + FiberWriteOptions opts; + opts.direct = true; + + auto res = OpenFiberWriteFile(path, fq_threadpool_, opts); if (!res) { return nonstd::make_unexpected(GenericError(res.error(), "Couldn't open file for writing")); } - return std::pair(*res, FileType::FILE); + return std::pair(*res, FileType::FILE | FileType::DIRECT); } else { #ifdef __linux__ auto res = fb2::OpenLinux(path, kRdbWriteFlags, 0666); diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index 19f6318c9ea3..5caa7cdc1bf9 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -73,6 +73,9 @@ ABSL_FLAG(string, unixsocketperm, "", "Set permissions for unixsocket, in octal ABSL_FLAG(bool, force_epoll, false, "If true - uses linux epoll engine underneath. " "Can fit for kernels older than 5.10."); +ABSL_FLAG( + string, allocation_tracker, "", + "Logs stack trace of memory allocation within these ranges. Format is min:max,min:max,...."); ABSL_FLAG(bool, version_check, true, "If true, Will monitor for new releases on Dragonfly servers once a day."); @@ -561,6 +564,42 @@ bool UpdateResourceLimitsIfInsideContainer(io::MemInfoData* mdata, size_t* max_t #endif +void SetupAllocationTracker(ProactorPool* pool) { +#ifdef DFLY_ENABLE_MEMORY_TRACKING + string flag = absl::GetFlag(FLAGS_allocation_tracker); + vector> track_ranges; + for (string_view entry : absl::StrSplit(flag, ",", absl::SkipEmpty())) { + auto separator = entry.find(":"); + if (separator == entry.npos) { + LOG(ERROR) << "Can't find ':' in element"; + exit(-1); + } + + pair p; + if (!absl::SimpleAtoi(entry.substr(0, separator), &p.first)) { + LOG(ERROR) << "Can't parse first number in pair"; + exit(-1); + } + if (!absl::SimpleAtoi(entry.substr(separator + 1), &p.second)) { + LOG(ERROR) << "Can't parse second number in pair"; + exit(-1); + } + + track_ranges.push_back(p); + } + + pool->AwaitBrief([&](unsigned, ProactorBase*) { + for (auto range : track_ranges) { + if (!AllocationTracker::Get().Add( + {.lower_bound = range.first, .upper_bound = range.second, .sample_odds = 1.0})) { + LOG(ERROR) << "Unable to track allocation range"; + exit(-1); + } + } + }); +#endif +} + } // namespace } // namespace dfly @@ -747,6 +786,8 @@ Usage: dragonfly [FLAGS] pool->Run(); + SetupAllocationTracker(pool.get()); + AcceptServer acceptor(pool.get(), &fb2::std_malloc_resource, true); acceptor.set_back_log(absl::GetFlag(FLAGS_tcp_backlog)); diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 7353cacd9e12..1366c8239e68 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -210,15 +210,20 @@ ShardId Shard(string_view v, ShardId shard_num) { } EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { - static_assert(sizeof(Stats) == 48); + static_assert(sizeof(Stats) == 64); - defrag_attempt_total += o.defrag_attempt_total; - defrag_realloc_total += o.defrag_realloc_total; - defrag_task_invocation_total += o.defrag_task_invocation_total; - poll_execution_total += o.poll_execution_total; - tx_ooo_total += o.tx_ooo_total; - tx_optimistic_total += o.tx_optimistic_total; +#define ADD(x) x += o.x + ADD(defrag_attempt_total); + ADD(defrag_realloc_total); + ADD(defrag_task_invocation_total); + ADD(poll_execution_total); + ADD(tx_ooo_total); + ADD(tx_optimistic_total); + ADD(tx_batch_schedule_calls_total); + ADD(tx_batch_scheduled_items_total); + +#undef ADD return *this; } @@ -386,28 +391,53 @@ void EngineShard::Shutdown() { queue_.Shutdown(); queue2_.Shutdown(); - DCHECK(!fiber_periodic_.IsJoinable()); + DCHECK(!fiber_heartbeat_periodic_.IsJoinable()); + DCHECK(!fiber_shard_handler_periodic_.IsJoinable()); ProactorBase::me()->RemoveOnIdleTask(defrag_task_); } void EngineShard::StopPeriodicFiber() { - fiber_periodic_done_.Notify(); - if (fiber_periodic_.IsJoinable()) { - fiber_periodic_.Join(); + fiber_heartbeat_periodic_done_.Notify(); + if (fiber_heartbeat_periodic_.IsJoinable()) { + fiber_heartbeat_periodic_.Join(); + } + fiber_shard_handler_periodic_done_.Notify(); + if (fiber_shard_handler_periodic_.IsJoinable()) { + fiber_shard_handler_periodic_.Join(); } } -void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function global_handler) { +void EngineShard::StartPeriodicFiberImpl(util::ProactorBase* pb, + std::function shard_handler, bool heartbeat) { uint32_t clock_cycle_ms = 1000 / std::max(1, GetFlag(FLAGS_hz)); if (clock_cycle_ms == 0) clock_cycle_ms = 1; - fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms, - handler = std::move(global_handler)] { - ThisFiber::SetName(absl::StrCat("shard_periodic", index)); - RunPeriodic(std::chrono::milliseconds(period_ms), std::move(handler)); - }); + if (heartbeat) { + fiber_heartbeat_periodic_ = + MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms, + handler = std::move(shard_handler)]() mutable { + ThisFiber::SetName(absl::StrCat("heartbeat_periodic", index)); + RunHeartbeatPeriodic(std::chrono::milliseconds(period_ms), std::move(handler)); + }); + } else { + fiber_shard_handler_periodic_ = + MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms, + handler = std::move(shard_handler)]() mutable { + ThisFiber::SetName(absl::StrCat("shard_handler_periodic", index)); + RunShardHandlerPeriodic(std::chrono::milliseconds(period_ms), std::move(handler)); + }); + } +} + +void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function shard_handler) { + StartPeriodicFiberImpl(pb, std::move(shard_handler), true); +} + +void EngineShard::StartPeriodicFiberWithoutHeartbeat(util::ProactorBase* pb, + std::function shard_handler) { + StartPeriodicFiberImpl(pb, std::move(shard_handler), false); } void EngineShard::InitThreadLocal(ProactorBase* pb) { @@ -689,15 +719,15 @@ void EngineShard::RetireExpiredAndEvict() { } } -void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms, - std::function shard_handler) { +void EngineShard::RunHeartbeatPeriodic(std::chrono::milliseconds period_ms, + std::function shard_handler) { VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms"; int64_t last_heartbeat_ms = INT64_MAX; int64_t last_handler_ms = 0; while (true) { - if (fiber_periodic_done_.WaitFor(period_ms)) { + if (fiber_heartbeat_periodic_done_.WaitFor(period_ms)) { VLOG(2) << "finished running engine shard periodic task"; return; } @@ -715,6 +745,31 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms, } } +void EngineShard::RunShardHandlerPeriodic(std::chrono::milliseconds period_ms, + std::function shard_handler) { + VLOG(1) << "RunShardHandlerPeriodic with period " << period_ms.count() << "ms"; + + int64_t last_handler_ms = INT64_MAX; + + while (true) { + if (fiber_shard_handler_periodic_done_.WaitFor(period_ms)) { + VLOG(2) << "finished running engine shard periodic task"; + return; + } + + int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; + if (now_ms - 5 * period_ms.count() > last_handler_ms) { + VLOG(1) << "This shard handler/sleep without heartbeat took " << now_ms - last_handler_ms + << "ms"; + } + last_handler_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; + // We need to check cause some tests pass an empty shard_handler + if (shard_handler) { + shard_handler(); + } + } +} + void EngineShard::CacheStats() { uint64_t now = fb2::ProactorBase::GetMonotonicTimeNs(); if (cache_stats_time_ + 1000000 > now) // 1ms diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 262914b2cb4b..0bece392f988 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -38,6 +38,12 @@ class EngineShard { uint64_t tx_optimistic_total = 0; uint64_t tx_ooo_total = 0; + // Number of ScheduleBatchInShard calls. + uint64_t tx_batch_schedule_calls_total = 0; + + // Number of transactions scheduled via ScheduleBatchInShard. + uint64_t tx_batch_scheduled_items_total = 0; + Stats& operator+=(const Stats&); }; @@ -201,11 +207,18 @@ class EngineShard { void Shutdown(); // called before destructing EngineShard. void StartPeriodicFiber(util::ProactorBase* pb, std::function shard_handler); + void StartPeriodicFiberWithoutHeartbeat(util::ProactorBase* pb, + std::function shard_handler); + void StartPeriodicFiberImpl(util::ProactorBase* pb, std::function shard_handler, + bool heartbeat); void Heartbeat(); void RetireExpiredAndEvict(); - void RunPeriodic(std::chrono::milliseconds period_ms, std::function shard_handler); + void RunHeartbeatPeriodic(std::chrono::milliseconds period_ms, + std::function shard_handler); + void RunShardHandlerPeriodic(std::chrono::milliseconds period_ms, + std::function shard_handler); void CacheStats(); @@ -247,8 +260,11 @@ class EngineShard { IntentLock shard_lock_; uint32_t defrag_task_ = 0; - util::fb2::Fiber fiber_periodic_; - util::fb2::Done fiber_periodic_done_; + util::fb2::Fiber fiber_heartbeat_periodic_; + util::fb2::Done fiber_heartbeat_periodic_done_; + + util::fb2::Fiber fiber_shard_handler_periodic_; + util::fb2::Done fiber_shard_handler_periodic_done_; DefragTaskState defrag_state_; std::unique_ptr tiered_storage_; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 0d6fa5c13c8b..c35cc753cc1d 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -121,7 +121,8 @@ void EngineShardSet::Init(uint32_t sz, std::function shard_handler) { // Must be last, as it accesses objects initialized above. // We can not move shard_handler because this code is called multiple times. - shard->StartPeriodicFiber(pb, shard_handler); + shard->StartPeriodicFiber(pb, {}); + shard->StartPeriodicFiberWithoutHeartbeat(pb, shard_handler); } }); } diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index a49f70b487d1..cc6123d0bb19 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -114,7 +114,6 @@ class EngineShardSet { private: void InitThreadLocal(util::ProactorBase* pb); - util::ProactorPool* pp_; std::unique_ptr shards_; uint32_t size_ = 0; diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 7e877dc53c41..487a160611ee 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -24,6 +24,8 @@ using namespace util; using namespace boost; using absl::StrCat; +ABSL_DECLARE_FLAG(bool, list_rdb_encode_v2); + namespace dfly { class GenericFamilyTest : public BaseFamilyTest {}; @@ -564,6 +566,7 @@ TEST_F(GenericFamilyTest, Persist) { TEST_F(GenericFamilyTest, Dump) { ASSERT_THAT(RDB_SER_VERSION, 9); + absl::SetFlag(&FLAGS_list_rdb_encode_v2, false); uint8_t EXPECTED_STRING_DUMP[13] = {0x00, 0xc0, 0x13, 0x09, 0x00, 0x23, 0x13, 0x6f, 0x4d, 0x68, 0xf6, 0x35, 0x6e}; uint8_t EXPECTED_HASH_DUMP[] = {0x0d, 0x12, 0x12, 0x00, 0x00, 0x00, 0x0d, 0x00, 0x00, 0x00, diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 6abecf6a5d51..b9ef3ab716e8 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -379,6 +379,15 @@ TEST_F(ListFamilyTest, LRem) { ASSERT_THAT(Run({"lrem", kKey2, "1", val}), IntArg(1)); } +TEST_F(ListFamilyTest, DumpRestorePlain) { + const string kValue(10'000, '#'); + EXPECT_EQ(CheckedInt({"LPUSH", kKey1, kValue}), 1); + auto buffer = Run({"DUMP", kKey1}).GetBuf(); + EXPECT_EQ(Run({"RESTORE", kKey2, "0", ToSV(buffer)}), "OK"); + EXPECT_EQ(CheckedInt({"LLEN", kKey2}), 1); + EXPECT_EQ(Run({"LRANGE", kKey2, "0", "1"}), kValue); +} + TEST_F(ListFamilyTest, LTrim) { Run({"rpush", kKey1, "a", "b", "c", "d"}); ASSERT_EQ(Run({"ltrim", kKey1, "-2", "-1"}), "OK"); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 7ea727f6c748..9d605f090542 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -979,6 +979,8 @@ void Service::Init(util::AcceptServer* acceptor, std::vector server_family_.GetDflyCmd()->BreakStalledFlowsInShard(); server_family_.UpdateMemoryGlobalStats(); }); + Transaction::Init(shard_num); + SetOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_oom_deny_ratio)); SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio)); @@ -1010,6 +1012,7 @@ void Service::Shutdown() { shard_set->PreShutdown(); namespaces.Clear(); shard_set->Shutdown(); + Transaction::Shutdown(); pp_.Await([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); }); diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 9d0558ad7477..5f4b1862bfa4 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -733,13 +733,14 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) { if (ec_) return false; + uint8_t* lp = nullptr; if (container == QUICKLIST_NODE_CONTAINER_PLAIN) { - quicklistAppendPlainNode(ql, (uint8_t*)sv.data(), sv.size()); + lp = (uint8_t*)zmalloc(sv.size()); + ::memcpy(lp, (uint8_t*)sv.data(), sv.size()); + quicklistAppendPlainNode(ql, lp, sv.size()); return true; } - uint8_t* lp = nullptr; - if (rdb_type_ == RDB_TYPE_LIST_QUICKLIST_2) { uint8_t* src = (uint8_t*)sv.data(); if (!lpValidateIntegrity(src, sv.size(), 0, nullptr, nullptr)) { diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 5885cd4e8303..8718020b5cdf 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -49,6 +49,9 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_ "set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot," "set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot"); ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression"); +ABSL_FLAG(bool, list_rdb_encode_v2, true, + "V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb " + "enconding of list uses ziplist encoding compatible with redis 6"); namespace dfly { @@ -164,8 +167,12 @@ uint8_t RdbObjectType(const PrimeValue& pv) { case OBJ_STRING: return RDB_TYPE_STRING; case OBJ_LIST: - if (compact_enc == OBJ_ENCODING_QUICKLIST) + if (compact_enc == OBJ_ENCODING_QUICKLIST) { + if (absl::GetFlag(FLAGS_list_rdb_encode_v2)) + return RDB_TYPE_LIST_QUICKLIST_2; return RDB_TYPE_LIST_QUICKLIST; + } + break; case OBJ_SET: if (compact_enc == kEncodingIntSet) @@ -439,7 +446,9 @@ error_code RdbSerializer::SaveListObject(const PrimeValue& pv) { DVLOG(3) << "QL node (encoding/container/sz): " << node->encoding << "/" << node->container << "/" << node->sz; - if (QL_NODE_IS_PLAIN(node)) { + if (absl::GetFlag(FLAGS_list_rdb_encode_v2)) { + // Use listpack encoding + SaveLen(node->container); if (quicklistNodeIsCompressed(node)) { void* data; size_t compress_len = quicklistGetLzf(node, &data); @@ -453,28 +462,33 @@ error_code RdbSerializer::SaveListObject(const PrimeValue& pv) { FlushIfNeeded(flush_state); } } else { - // listpack - uint8_t* lp = node->entry; - uint8_t* decompressed = NULL; - - if (quicklistNodeIsCompressed(node)) { - void* data; - size_t compress_len = quicklistGetLzf(node, &data); - decompressed = (uint8_t*)zmalloc(node->sz); - - if (lzf_decompress(data, compress_len, decompressed, node->sz) == 0) { - /* Someone requested decompress, but we can't decompress. Not good. */ - zfree(decompressed); - return make_error_code(errc::illegal_byte_sequence); + // Use ziplist encoding + if (QL_NODE_IS_PLAIN(node)) { + RETURN_ON_ERR(SavePlainNodeAsZiplist(node)); + } else { + // listpack node + uint8_t* lp = node->entry; + uint8_t* decompressed = NULL; + + if (quicklistNodeIsCompressed(node)) { + void* data; + size_t compress_len = quicklistGetLzf(node, &data); + decompressed = (uint8_t*)zmalloc(node->sz); + + if (lzf_decompress(data, compress_len, decompressed, node->sz) == 0) { + /* Someone requested decompress, but we can't decompress. Not good. */ + zfree(decompressed); + return make_error_code(errc::illegal_byte_sequence); + } + lp = decompressed; } - lp = decompressed; - } - auto cleanup = absl::MakeCleanup([=] { - if (decompressed) - zfree(decompressed); - }); - RETURN_ON_ERR(SaveListPackAsZiplist(lp)); + auto cleanup = absl::MakeCleanup([=] { + if (decompressed) + zfree(decompressed); + }); + RETURN_ON_ERR(SaveListPackAsZiplist(lp)); + } } node = node->next; } @@ -744,6 +758,17 @@ error_code RdbSerializer::SaveListPackAsZiplist(uint8_t* lp) { return ec; } +error_code RdbSerializer::SavePlainNodeAsZiplist(quicklistNode* node) { + uint8_t* zl = ziplistNew(); + zl = ziplistPush(zl, node->entry, node->sz, ZIPLIST_TAIL); + + size_t ziplen = ziplistBlobLen(zl); + error_code ec = SaveString(string_view{reinterpret_cast(zl), ziplen}); + zfree(zl); + + return ec; +} + error_code RdbSerializer::SaveStreamPEL(rax* pel, bool nacks) { /* Number of entries in the PEL. */ diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 4a5456b9bdcd..90ca435c720f 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -7,6 +7,7 @@ extern "C" { #include "redis/lzfP.h" +#include "redis/quicklist.h" } #include @@ -21,6 +22,7 @@ extern "C" { typedef struct rax rax; typedef struct streamCG streamCG; +typedef struct quicklistNode quicklistNode; namespace dfly { @@ -247,6 +249,7 @@ class RdbSerializer : public SerializerBase { std::error_code SaveListPackAsZiplist(uint8_t* lp); std::error_code SaveStreamPEL(rax* pel, bool nacks); std::error_code SaveStreamConsumers(streamCG* cg); + std::error_code SavePlainNodeAsZiplist(quicklistNode* node); // Might preempt void FlushIfNeeded(FlushState flush_state); diff --git a/src/server/replica.cc b/src/server/replica.cc index f618164dda68..df22b1e67321 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -635,18 +635,24 @@ error_code Replica::ConsumeRedisStream() { } if (!LastResponseArgs().empty()) { - VLOG(2) << "Got command " << absl::CHexEscape(ToSV(LastResponseArgs()[0].GetBuf())) - << "\n consumed: " << response->total_read; - - if (LastResponseArgs()[0].GetBuf()[0] == '\r') { - for (const auto& arg : LastResponseArgs()) { - LOG(INFO) << absl::CHexEscape(ToSV(arg.GetBuf())); + string_view cmd = absl::CHexEscape(ToSV(LastResponseArgs()[0].GetBuf())); + + // Valkey and Redis may send MULTI and EXEC as part of their replication commands. + // Dragonfly disallows some commands, such as SELECT, inside of MULTI/EXEC, so here we simply + // ignore MULTI/EXEC and execute their inner commands individually. + if (!absl::EqualsIgnoreCase(cmd, "MULTI") && !absl::EqualsIgnoreCase(cmd, "EXEC")) { + VLOG(2) << "Got command " << cmd << "\n consumed: " << response->total_read; + + if (LastResponseArgs()[0].GetBuf()[0] == '\r') { + for (const auto& arg : LastResponseArgs()) { + LOG(INFO) << absl::CHexEscape(ToSV(arg.GetBuf())); + } } - } - facade::RespExpr::VecToArgList(LastResponseArgs(), &args_vector); - CmdArgList arg_list{args_vector.data(), args_vector.size()}; - service_.DispatchCommand(arg_list, &conn_context); + facade::RespExpr::VecToArgList(LastResponseArgs(), &args_vector); + CmdArgList arg_list{args_vector.data(), args_vector.size()}; + service_.DispatchCommand(arg_list, &conn_context); + } } io_buf.ConsumeInput(response->left_in_buffer); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b5a152b16205..4c9e5a1b2e4c 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -62,7 +62,6 @@ extern "C" { #include "strings/human_readable.h" #include "util/accept_server.h" #include "util/aws/aws.h" -#include "util/fibers/fiber_file.h" using namespace std; @@ -861,6 +860,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vectorpool()->GetNextProactor(); if (pb_task_->GetKind() == ProactorBase::EPOLL) { @@ -1654,6 +1654,7 @@ GenericError ServerFamily::WaitUntilSaveFinished(Transaction* trans, bool ignore save_controller_->WaitAllSnapshots(); detail::SaveInfo save_info; + VLOG(1) << "Before WaitUntilSaveFinished::Finalize"; { util::fb2::LockGuard lk(save_mu_); save_info = save_controller_->Finalize(); @@ -2440,7 +2441,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("tx_normal_total", m.coordinator_stats.tx_normal_cnt); append("tx_inline_runs_total", m.coordinator_stats.tx_inline_runs); append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt); - + append("tx_batch_scheduled_items_total", m.shard_stats.tx_batch_scheduled_items_total); + append("tx_batch_schedule_calls_total", m.shard_stats.tx_batch_schedule_calls_total); append("tx_with_freq", absl::StrJoin(m.coordinator_stats.tx_width_freq_arr, ",")); append("tx_queue_len", m.tx_queue_len); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 2fe35e740aa8..a358e4e66f2e 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -6,6 +6,8 @@ #include +#include + #include "base/flags.h" #include "base/logging.h" #include "facade/op_status.h" @@ -86,12 +88,32 @@ uint16_t trans_id(const Transaction* ptr) { struct ScheduleContext { Transaction* trans; bool optimistic_execution = false; + + std::atomic next{nullptr}; + std::atomic_uint32_t fail_cnt{0}; ScheduleContext(Transaction* t, bool optimistic) : trans(t), optimistic_execution(optimistic) { } }; +constexpr size_t kAvoidFalseSharingSize = 64; +struct ScheduleQ { + alignas(kAvoidFalseSharingSize) base::MPSCIntrusiveQueue queue; + alignas(kAvoidFalseSharingSize) atomic_bool armed{false}; +}; + +void MPSC_intrusive_store_next(ScheduleContext* dest, ScheduleContext* next_node) { + dest->next.store(next_node, std::memory_order_relaxed); +} + +ScheduleContext* MPSC_intrusive_load_next(const ScheduleContext& src) { + return src.next.load(std::memory_order_acquire); +} + +// of shard_num arity. +ScheduleQ* schedule_queues = nullptr; + } // namespace bool Transaction::BatonBarrier::IsClaimed() const { @@ -139,6 +161,17 @@ Transaction::Guard::~Guard() { tx->Refurbish(); } +void Transaction::Init(unsigned num_shards) { + DCHECK(schedule_queues == nullptr); + schedule_queues = new ScheduleQ[num_shards]; +} + +void Transaction::Shutdown() { + DCHECK(schedule_queues); + delete[] schedule_queues; + schedule_queues = nullptr; +} + Transaction::Transaction(const CommandId* cid) : cid_{cid} { InitTxTime(); string_view cmd_name(cid_->name()); @@ -685,11 +718,11 @@ void Transaction::ScheduleInternal() { // Try running immediately (during scheduling) if we're concluding and either: // - have a single shard, and thus never have to cancel scheduling due to reordering // - run as an idempotent command, meaning we can safely repeat the operation if scheduling fails - bool can_run_immediately = !IsGlobal() && (coordinator_state_ & COORD_CONCLUDING) && - (unique_shard_cnt_ == 1 || (cid_->opt_mask() & CO::IDEMPOTENT)); + bool optimistic_exec = !IsGlobal() && (coordinator_state_ & COORD_CONCLUDING) && + (unique_shard_cnt_ == 1 || (cid_->opt_mask() & CO::IDEMPOTENT)); DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards " - << " immediate run: " << can_run_immediately; + << " optimistic_execution: " << optimistic_exec; auto is_active = [this](uint32_t i) { return IsActive(i); }; @@ -711,29 +744,40 @@ void Transaction::ScheduleInternal() { // in the lower-level code. It's not really needed otherwise because we run inline. // single shard schedule operation can't fail - CHECK(ScheduleInShard(EngineShard::tlocal(), can_run_immediately)); + CHECK(ScheduleInShard(EngineShard::tlocal(), optimistic_exec)); run_barrier_.Dec(); break; } - ScheduleContext schedule_ctx{this, can_run_immediately}; + ScheduleContext schedule_ctx{this, optimistic_exec}; - auto cb = [&schedule_ctx]() { - if (!schedule_ctx.trans->ScheduleInShard(EngineShard::tlocal(), - schedule_ctx.optimistic_execution)) { - schedule_ctx.fail_cnt.fetch_add(1, memory_order_relaxed); + if (unique_shard_cnt_ == 1) { + // Single shard optimization. Note: we could apply the same optimization + // to multi-shard transactions as well by creating a vector of ScheduleContext. + schedule_queues[unique_shard_id_].queue.Push(&schedule_ctx); + bool current_val = false; + if (schedule_queues[unique_shard_id_].armed.compare_exchange_strong(current_val, true, + memory_order_acq_rel)) { + shard_set->Add(unique_shard_id_, &Transaction::ScheduleBatchInShard); } - schedule_ctx.trans->FinishHop(); - }; + } else { + auto cb = [&schedule_ctx]() { + if (!schedule_ctx.trans->ScheduleInShard(EngineShard::tlocal(), + schedule_ctx.optimistic_execution)) { + schedule_ctx.fail_cnt.fetch_add(1, memory_order_relaxed); + } + schedule_ctx.trans->FinishHop(); + }; - IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); }); + IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); }); - // Add this debugging function to print more information when we experience deadlock - // during tests. - ThisFiber::PrintLocalsCallback locals([&] { - return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_, - " run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n"); - }); + // Add this debugging function to print more information when we experience deadlock + // during tests. + ThisFiber::PrintLocalsCallback locals([&] { + return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_, + " run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n"); + }); + } run_barrier_.Wait(); if (schedule_ctx.fail_cnt.load(memory_order_relaxed) == 0) { @@ -1115,6 +1159,45 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) { return true; } +void Transaction::ScheduleBatchInShard() { + EngineShard* shard = EngineShard::tlocal(); + auto& stats = shard->stats(); + stats.tx_batch_schedule_calls_total++; + + ShardId sid = shard->shard_id(); + auto& sq = schedule_queues[sid]; + + for (unsigned j = 0;; ++j) { + // We pull the items from the queue in a loop until we reach the stop condition. + // TODO: we may have fairness problem here, where transactions being added up all the time + // and we never break from the loop. It is possible to break early but it's not trivial + // because we must ensure that there is another ScheduleBatchInShard callback in the queue. + // Can be checked with testing sq.armed is true when j == 1. + while (true) { + ScheduleContext* item = sq.queue.Pop(); + if (!item) + break; + + if (!item->trans->ScheduleInShard(shard, item->optimistic_execution)) { + item->fail_cnt.fetch_add(1, memory_order_relaxed); + } + item->trans->FinishHop(); + stats.tx_batch_scheduled_items_total++; + }; + + // j==1 means we already signalled that we're done with the current batch. + if (j == 1) + break; + + // We signal that we're done with the current batch but then we check if there are more + // transactions to fetch in the next iteration. + // We do this to avoid the situation where we have a data race, where + // a transaction is added to the queue, we've checked that sq.armed is true and skipped + // adding the callback that fetches the transaction. + sq.armed.store(false, memory_order_release); + } +} + bool Transaction::CancelShardCb(EngineShard* shard) { ShardId idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; diff --git a/src/server/transaction.h b/src/server/transaction.h index b15a022a6ad8..5e209eb96d98 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -178,6 +178,9 @@ class Transaction { Transaction* tx; }; + static void Init(unsigned num_shards); + static void Shutdown(); + explicit Transaction(const CommandId* cid); // Initialize transaction for squashing placed on a specific shard with a given parent tx @@ -515,6 +518,9 @@ class Transaction { // subject to uncontended keys. bool ScheduleInShard(EngineShard* shard, bool execute_optimistic); + // Optimized extension of ScheduleInShard. Pulls several transactions queued for scheduling. + static void ScheduleBatchInShard(); + // Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier void DispatchHop(); diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 019d27558db2..d7dd7405ee26 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -3,6 +3,7 @@ import threading import time import subprocess +import random import aiohttp import logging from dataclasses import dataclass @@ -455,8 +456,9 @@ def __init__(self, port): self.proc = None def start(self, **kwargs): + servers = ["redis-server-6.2.11", "redis-server-7.2.2", "valkey-server-8.0.1"] command = [ - "redis-server-6.2.11", + random.choice(servers), f"--port {self.port}", "--save ''", "--appendonly no", diff --git a/tests/dragonfly/pymemcached_test.py b/tests/dragonfly/pymemcached_test.py index 56245ee57cff..13f999c67a70 100644 --- a/tests/dragonfly/pymemcached_test.py +++ b/tests/dragonfly/pymemcached_test.py @@ -48,7 +48,6 @@ def test_basic(memcached_client: MCClient): # Noreply (and pipeline) tests -@pytest.mark.skip("Flaky") @dfly_args(DEFAULT_ARGS) def test_noreply_pipeline(df_server: DflyInstance, memcached_client: MCClient): """ @@ -68,7 +67,7 @@ def test_noreply_pipeline(df_server: DflyInstance, memcached_client: MCClient): assert memcached_client.get_many(keys) == {k: v.encode() for k, v in zip(keys, values)} info = Redis(port=df_server.port).info() - assert info["total_pipelined_commands"] > len(keys) / 6 # sometimes CI is slow + assert info["total_pipelined_commands"] > 0 # sometimes CI is slow @dfly_args(DEFAULT_ARGS) diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 30d3672c0ccf..7b270fe9c89b 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -17,6 +17,7 @@ from .seeder import Seeder as SeederV2 from . import dfly_args from .proxy import Proxy +from .seeder import StaticSeeder ADMIN_PORT = 1211 @@ -1107,8 +1108,8 @@ async def test_flushall_in_full_sync(df_factory): c_replica = replica.client() # Fill master with test data - seeder = SeederV2(key_target=30_000) - await seeder.run(c_master, target_deviation=0.1) + seeder = StaticSeeder(key_target=100_000) + await seeder.run(c_master) # Start replication and wait for full sync await c_replica.execute_command(f"REPLICAOF localhost {master.port}") @@ -2316,7 +2317,7 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, await c_replica.execute_command( "debug replica pause" - ) # puase replica to trigger reconnect on master + ) # pause replica to trigger reconnect on master await asyncio.sleep(1) @@ -2625,3 +2626,50 @@ async def test_replica_of_replica(df_factory): await c_replica2.execute_command(f"REPLICAOF localhost {replica.port}") assert await c_replica2.execute_command(f"REPLICAOF localhost {master.port}") == "OK" + + +@pytest.mark.asyncio +async def test_replication_timeout_on_full_sync_heartbeat_expiry( + df_factory: DflyInstanceFactory, df_seeder_factory +): + # setting replication_timeout to a very small value to force the replica to timeout + master = df_factory.create(replication_timeout=100, vmodule="replica=2,dflycmd=2") + replica = df_factory.create() + + df_factory.start_all([master, replica]) + + c_master = master.client() + c_replica = replica.client() + + await c_master.execute_command("debug", "populate", "200000", "foo", "5000") + seeder = df_seeder_factory.create(port=master.port) + seeder_task = asyncio.create_task(seeder.run()) + + await asyncio.sleep(0.5) # wait for seeder running + + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + + # wait for full sync + async with async_timeout.timeout(3): + await wait_for_replicas_state(c_replica, state="full_sync", timeout=0.05) + + for i in range(1, 1000000): + await c_master.execute_command(f"SET key{i} val{i} EX 2") + + await c_replica.execute_command("debug replica pause") + + # Dragonfly will get stack here. The journal writes to a channel which will block on write. + # Hearbeat will be called and will get stack while it tries to evict an expired item (because + # it will try to write that item to the journal which in turn will block while it writes to + # the channel). BreakStalledFlows() will never be called and the reconnect count will stay 0. + + await asyncio.sleep(2) + + await c_replica.execute_command("debug replica resume") # resume replication + + await asyncio.sleep(1) # replica will start resync + seeder.stop() + await seeder_task + + await check_all_replicas_finished([c_replica], c_master) + await assert_replica_reconnections(replica, 0) diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index e729299abf67..e7062f48dbd9 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -150,7 +150,14 @@ async def test_dbfilenames( @pytest.mark.asyncio -@dfly_args({**BASIC_ARGS, "proactor_threads": 4, "dbfilename": "test-redis-load-rdb"}) +@dfly_args( + { + **BASIC_ARGS, + "proactor_threads": 4, + "dbfilename": "test-redis-load-rdb", + "list_rdb_encode_v2": "false", # Needed for compatibility with Redis 6 + } +) async def test_redis_load_snapshot( async_client: aioredis.Redis, df_server, redis_local_server: RedisServer, tmp_dir: Path ): @@ -161,6 +168,8 @@ async def test_redis_load_snapshot( **LIGHTWEIGHT_SEEDER_ARGS, types=["STRING", "LIST", "SET", "HASH", "ZSET"] ).run(async_client) + await async_client.lpush("list", "A" * 10_000) + await async_client.execute_command("SAVE", "rdb") dbsize = await async_client.dbsize() diff --git a/tools/requirements.txt b/tools/requirements.txt index 25f05b2be1bb..7cc75ab41d86 100644 --- a/tools/requirements.txt +++ b/tools/requirements.txt @@ -4,7 +4,7 @@ pytoml==0.1.21 PyYAML==6.0 railroad==0.5.0 redis==4.4.4 -requests==2.28.1 +requests>=2.32.0 aiocsv==1.2.3 aiofiles==22.1.0 numpy==1.24.1 From f08414f240a114334338ffe6176ea3a5e94cbaab Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 17 Oct 2024 10:35:40 +0300 Subject: [PATCH 2/4] fixes --- tests/dragonfly/replication_test.py | 55 +++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 7b270fe9c89b..f103faf2eafe 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2295,7 +2295,9 @@ async def test_announce_ip_port(df_factory): @pytest.mark.asyncio async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, df_seeder_factory): # setting replication_timeout to a very small value to force the replica to timeout - master = df_factory.create(replication_timeout=100, vmodule="replica=2,dflycmd=2") + master = df_factory.create( + proactor_threads=2, replication_timeout=100, vmodule="replica=2,dflycmd=2" + ) replica = df_factory.create() df_factory.start_all([master, replica]) @@ -2319,7 +2321,7 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, "debug replica pause" ) # pause replica to trigger reconnect on master - await asyncio.sleep(1) + await asyncio.sleep(10) await c_replica.execute_command("debug replica resume") # resume replication @@ -2632,8 +2634,11 @@ async def test_replica_of_replica(df_factory): async def test_replication_timeout_on_full_sync_heartbeat_expiry( df_factory: DflyInstanceFactory, df_seeder_factory ): - # setting replication_timeout to a very small value to force the replica to timeout - master = df_factory.create(replication_timeout=100, vmodule="replica=2,dflycmd=2") + # Timeout set to 3 seconds because we must first saturate the socket such that subsequent + # writes block. Otherwise, we will break the flows before Heartbeat actually deadlocks. + master = df_factory.create( + proactor_threads=2, replication_timeout=3000, vmodule="replica=2,dflycmd=2" + ) replica = df_factory.create() df_factory.start_all([master, replica]) @@ -2641,11 +2646,31 @@ async def test_replication_timeout_on_full_sync_heartbeat_expiry( c_master = master.client() c_replica = replica.client() - await c_master.execute_command("debug", "populate", "200000", "foo", "5000") - seeder = df_seeder_factory.create(port=master.port) - seeder_task = asyncio.create_task(seeder.run()) + await c_master.execute_command("debug", "populate", "100000", "foo", "5000") - await asyncio.sleep(0.5) # wait for seeder running + class ExpirySeeder: + def __init__(self): + self.stop_flag = False + self.i = 0 + + async def run(self, client): + while not self.stop_flag: + await client.execute_command(f"SET tmp{self.i} bar{self.i} EX 4") + self.i = self.i + 1 + + async def wait_until(self, count): + while not self.i > count: + await asyncio.sleep(0.5) + + def stop(self): + self.stop_flag = True + + c_master = master.client() + c_replica = replica.client() + + seeder = ExpirySeeder() + seeder_task = asyncio.create_task(seeder.run(c_master)) + await seeder.wait_until(50000) await c_replica.execute_command(f"REPLICAOF localhost {master.port}") @@ -2653,17 +2678,17 @@ async def test_replication_timeout_on_full_sync_heartbeat_expiry( async with async_timeout.timeout(3): await wait_for_replicas_state(c_replica, state="full_sync", timeout=0.05) - for i in range(1, 1000000): - await c_master.execute_command(f"SET key{i} val{i} EX 2") - await c_replica.execute_command("debug replica pause") - # Dragonfly will get stack here. The journal writes to a channel which will block on write. - # Hearbeat will be called and will get stack while it tries to evict an expired item (because - # it will try to write that item to the journal which in turn will block while it writes to + # Dragonfly will get stuck here. The journal writes to a channel which will block on write. + # Hearbeat() will be called and will get stuck while it tries to evict an expired item (because + # it will try to write that item to the journal which in turn will block while it pushes to # the channel). BreakStalledFlows() will never be called and the reconnect count will stay 0. + # Furthermore, that's why we pick 3 seconds for the replica to timeout. If it was less, + # we would not reach this state because the flow would break before the channel gets filled + # (even though the write to sink from the channel is blocked). - await asyncio.sleep(2) + await asyncio.sleep(6) await c_replica.execute_command("debug replica resume") # resume replication From b21f01d3271146a9a2e5360b1fedcf9a81ec1c18 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 23 Oct 2024 16:07:01 +0300 Subject: [PATCH 3/4] comments --- src/server/engine_shard.cc | 116 ++++++++++------------------ src/server/engine_shard.h | 12 +-- src/server/engine_shard_set.cc | 4 +- tests/dragonfly/replication_test.py | 39 ++-------- tests/dragonfly/utility.py | 24 ++++++ 5 files changed, 78 insertions(+), 117 deletions(-) diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 1366c8239e68..a1cf56218b45 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -408,36 +408,55 @@ void EngineShard::StopPeriodicFiber() { } } -void EngineShard::StartPeriodicFiberImpl(util::ProactorBase* pb, - std::function shard_handler, bool heartbeat) { +static void RunFPeriodically(std::function f, std::chrono::milliseconds period_ms, + std::string_view error_msg, util::fb2::Done* waiter) { + int64_t last_heartbeat_ms = INT64_MAX; + + while (true) { + if (waiter->WaitFor(period_ms)) { + VLOG(2) << "finished running engine shard periodic task"; + return; + } + + int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; + if (now_ms - 5 * period_ms.count() > last_heartbeat_ms) { + VLOG(1) << "This " << error_msg << " step took " << now_ms - last_heartbeat_ms << "ms"; + } + f(); + last_heartbeat_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; + } +} + +void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) { uint32_t clock_cycle_ms = 1000 / std::max(1, GetFlag(FLAGS_hz)); if (clock_cycle_ms == 0) clock_cycle_ms = 1; - if (heartbeat) { - fiber_heartbeat_periodic_ = - MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms, - handler = std::move(shard_handler)]() mutable { - ThisFiber::SetName(absl::StrCat("heartbeat_periodic", index)); - RunHeartbeatPeriodic(std::chrono::milliseconds(period_ms), std::move(handler)); - }); - } else { - fiber_shard_handler_periodic_ = - MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms, - handler = std::move(shard_handler)]() mutable { - ThisFiber::SetName(absl::StrCat("shard_handler_periodic", index)); - RunShardHandlerPeriodic(std::chrono::milliseconds(period_ms), std::move(handler)); - }); - } -} + auto heartbeat = [this]() { Heartbeat(); }; -void EngineShard::StartPeriodicFiber(util::ProactorBase* pb, std::function shard_handler) { - StartPeriodicFiberImpl(pb, std::move(shard_handler), true); + std::chrono::milliseconds period_ms(clock_cycle_ms); + + fiber_heartbeat_periodic_ = + MakeFiber([this, index = pb->GetPoolIndex(), period_ms, heartbeat]() mutable { + ThisFiber::SetName(absl::StrCat("heartbeat_periodic", index)); + RunFPeriodically(heartbeat, period_ms, "heartbeat", &fiber_heartbeat_periodic_done_); + }); } -void EngineShard::StartPeriodicFiberWithoutHeartbeat(util::ProactorBase* pb, - std::function shard_handler) { - StartPeriodicFiberImpl(pb, std::move(shard_handler), false); +void EngineShard::StartPeriodicShardHandlerFiber(util::ProactorBase* pb, + std::function shard_handler) { + uint32_t clock_cycle_ms = 1000 / std::max(1, GetFlag(FLAGS_hz)); + if (clock_cycle_ms == 0) + clock_cycle_ms = 1; + + // Minimum 100ms + std::chrono::milliseconds period_ms(std::max((uint32_t)100, clock_cycle_ms)); + fiber_shard_handler_periodic_ = MakeFiber( + [this, index = pb->GetPoolIndex(), period_ms, handler = std::move(shard_handler)]() mutable { + ThisFiber::SetName(absl::StrCat("shard_handler_periodic", index)); + RunFPeriodically(std::move(handler), period_ms, "shard handler", + &fiber_shard_handler_periodic_done_); + }); } void EngineShard::InitThreadLocal(ProactorBase* pb) { @@ -719,57 +738,6 @@ void EngineShard::RetireExpiredAndEvict() { } } -void EngineShard::RunHeartbeatPeriodic(std::chrono::milliseconds period_ms, - std::function shard_handler) { - VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms"; - - int64_t last_heartbeat_ms = INT64_MAX; - int64_t last_handler_ms = 0; - - while (true) { - if (fiber_heartbeat_periodic_done_.WaitFor(period_ms)) { - VLOG(2) << "finished running engine shard periodic task"; - return; - } - - int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; - if (now_ms - 5 * period_ms.count() > last_heartbeat_ms) { - VLOG(1) << "This heartbeat-sleep took " << now_ms - last_heartbeat_ms << "ms"; - } - Heartbeat(); - last_heartbeat_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; - if (shard_handler && last_handler_ms + 100 < last_heartbeat_ms) { - last_handler_ms = last_heartbeat_ms; - shard_handler(); - } - } -} - -void EngineShard::RunShardHandlerPeriodic(std::chrono::milliseconds period_ms, - std::function shard_handler) { - VLOG(1) << "RunShardHandlerPeriodic with period " << period_ms.count() << "ms"; - - int64_t last_handler_ms = INT64_MAX; - - while (true) { - if (fiber_shard_handler_periodic_done_.WaitFor(period_ms)) { - VLOG(2) << "finished running engine shard periodic task"; - return; - } - - int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; - if (now_ms - 5 * period_ms.count() > last_handler_ms) { - VLOG(1) << "This shard handler/sleep without heartbeat took " << now_ms - last_handler_ms - << "ms"; - } - last_handler_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; - // We need to check cause some tests pass an empty shard_handler - if (shard_handler) { - shard_handler(); - } - } -} - void EngineShard::CacheStats() { uint64_t now = fb2::ProactorBase::GetMonotonicTimeNs(); if (cache_stats_time_ + 1000000 > now) // 1ms diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 0bece392f988..a38f0f98f406 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -206,20 +206,12 @@ class EngineShard { // blocks the calling fiber. void Shutdown(); // called before destructing EngineShard. - void StartPeriodicFiber(util::ProactorBase* pb, std::function shard_handler); - void StartPeriodicFiberWithoutHeartbeat(util::ProactorBase* pb, - std::function shard_handler); - void StartPeriodicFiberImpl(util::ProactorBase* pb, std::function shard_handler, - bool heartbeat); + void StartPeriodicHeartbeatFiber(util::ProactorBase* pb); + void StartPeriodicShardHandlerFiber(util::ProactorBase* pb, std::function shard_handler); void Heartbeat(); void RetireExpiredAndEvict(); - void RunHeartbeatPeriodic(std::chrono::milliseconds period_ms, - std::function shard_handler); - void RunShardHandlerPeriodic(std::chrono::milliseconds period_ms, - std::function shard_handler); - void CacheStats(); // We are running a task that checks whether we need to diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index c35cc753cc1d..a057abef4e48 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -121,8 +121,8 @@ void EngineShardSet::Init(uint32_t sz, std::function shard_handler) { // Must be last, as it accesses objects initialized above. // We can not move shard_handler because this code is called multiple times. - shard->StartPeriodicFiber(pb, {}); - shard->StartPeriodicFiberWithoutHeartbeat(pb, shard_handler); + shard->StartPeriodicHeartbeatFiber(pb); + shard->StartPeriodicShardHandlerFiber(pb, shard_handler); } }); } diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index f103faf2eafe..6d14254a9f09 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2295,9 +2295,7 @@ async def test_announce_ip_port(df_factory): @pytest.mark.asyncio async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, df_seeder_factory): # setting replication_timeout to a very small value to force the replica to timeout - master = df_factory.create( - proactor_threads=2, replication_timeout=100, vmodule="replica=2,dflycmd=2" - ) + master = df_factory.create(replication_timeout=100, vmodule="replica=2,dflycmd=2") replica = df_factory.create() df_factory.start_all([master, replica]) @@ -2321,7 +2319,7 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, "debug replica pause" ) # pause replica to trigger reconnect on master - await asyncio.sleep(10) + await asyncio.sleep(1) await c_replica.execute_command("debug replica resume") # resume replication @@ -2648,29 +2646,14 @@ async def test_replication_timeout_on_full_sync_heartbeat_expiry( await c_master.execute_command("debug", "populate", "100000", "foo", "5000") - class ExpirySeeder: - def __init__(self): - self.stop_flag = False - self.i = 0 - - async def run(self, client): - while not self.stop_flag: - await client.execute_command(f"SET tmp{self.i} bar{self.i} EX 4") - self.i = self.i + 1 - - async def wait_until(self, count): - while not self.i > count: - await asyncio.sleep(0.5) - - def stop(self): - self.stop_flag = True - c_master = master.client() c_replica = replica.client() seeder = ExpirySeeder() seeder_task = asyncio.create_task(seeder.run(c_master)) - await seeder.wait_until(50000) + await seeder.wait_until_n_inserts(50000) + seeder.stop() + await seeder_task await c_replica.execute_command(f"REPLICAOF localhost {master.port}") @@ -2680,21 +2663,15 @@ def stop(self): await c_replica.execute_command("debug replica pause") - # Dragonfly will get stuck here. The journal writes to a channel which will block on write. - # Hearbeat() will be called and will get stuck while it tries to evict an expired item (because - # it will try to write that item to the journal which in turn will block while it pushes to - # the channel). BreakStalledFlows() will never be called and the reconnect count will stay 0. - # Furthermore, that's why we pick 3 seconds for the replica to timeout. If it was less, - # we would not reach this state because the flow would break before the channel gets filled - # (even though the write to sink from the channel is blocked). + # Dragonfly would get stuck here without the bug fix. When replica does not read from the + # socket, Heartbeat() will block on the journal write for the expired items and shard_handler + # would never be called and break replication. More details on #3936. await asyncio.sleep(6) await c_replica.execute_command("debug replica resume") # resume replication await asyncio.sleep(1) # replica will start resync - seeder.stop() - await seeder_task await check_all_replicas_finished([c_replica], c_master) await assert_replica_reconnections(replica, 0) diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 59d6a475ce73..1078df98d4fa 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -725,3 +725,27 @@ async def wrapper(wrapped, instance, args, kwargs): def skip_if_not_in_github(): if os.getenv("GITHUB_ACTIONS") == None: pytest.skip("Redis server not found") + + +class ExpirySeeder: + def __init__(self): + self.stop_flag = False + self.i = 0 + self.batch_size = 200 + + async def run(self, client): + while not self.stop_flag: + # await client.execute_command(f"SET tmp{self.i} bar{self.i} EX 4") + # self.i = self.i + 1 + pipeline = client.pipeline(transaction=True) + for i in range(0, self.batch_size): + pipeline.execute_command(f"SET tmp{self.i} bar{self.i} EX 3") + self.i = self.i + 1 + await pipeline.execute() + + async def wait_until_n_inserts(self, count): + while not self.i > count: + await asyncio.sleep(0.5) + + def stop(self): + self.stop_flag = True From 9fb76aca2b751c63d0a16ad3aafc5b168506ba62 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 23 Oct 2024 16:49:32 +0300 Subject: [PATCH 4/4] remove comments --- tests/dragonfly/utility.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 1078df98d4fa..b644e538d25b 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -735,8 +735,6 @@ def __init__(self): async def run(self, client): while not self.stop_flag: - # await client.execute_command(f"SET tmp{self.i} bar{self.i} EX 4") - # self.i = self.i + 1 pipeline = client.pipeline(transaction=True) for i in range(0, self.batch_size): pipeline.execute_command(f"SET tmp{self.i} bar{self.i} EX 3")