Skip to content

Commit

Permalink
adjust the cluster client with the new API
Browse files Browse the repository at this point in the history
  • Loading branch information
amirreza8002 committed Jan 10, 2025
1 parent 620843b commit c9cbf5e
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 24 deletions.
5 changes: 4 additions & 1 deletion django_valkey/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from valkey import Valkey
from valkey.asyncio import Valkey as AValkey
from valkey.cluster import ValkeyCluster
from valkey.exceptions import ConnectionError, ResponseError, TimeoutError
from valkey.typing import EncodableT

Expand Down Expand Up @@ -61,7 +62,9 @@ def __init__(
or "django_valkey.util.default_reverse_key"
)

self._clients: list[Valkey | AValkey | None] = [None] * len(self._server)
self._clients: list[Valkey | AValkey | ValkeyCluster | None] = [None] * len(
self._server
)
self._options: dict = params.get("OPTIONS", {})
self._replica_read_only = self._options.get("REPLICA_READ_ONLY", True)

Expand Down
17 changes: 12 additions & 5 deletions django_valkey/cluster_cache/cache.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from valkey.cluster import ValkeyCluster

from django_valkey.base import BaseValkeyCache
from django_valkey.cache import CONNECTION_INTERRUPTED
from django_valkey.base import (
BaseValkeyCache,
SyncCacheCommands,
CONNECTION_INTERRUPTED,
)
from django_valkey.cluster_cache.client import DefaultClusterClient


class ClusterValkeyCache(BaseValkeyCache[DefaultClusterClient, ValkeyCluster]):
DEFAULT_CLIENT_CLASS = "django_valkey.cluster_cache.client.DefaultClusterClient"

class ClusterSyncCacheCommands(SyncCacheCommands):
def set(self, *args, **kwargs):
return self.client.set(*args, **kwargs)

Expand Down Expand Up @@ -73,3 +74,9 @@ def has_key(self, *args, **kwargs):

def touch(self, *args, **kwargs):
return self.client.touch(*args, **kwargs)


class ClusterValkeyCache(
BaseValkeyCache[DefaultClusterClient, ValkeyCluster], ClusterSyncCacheCommands
):
DEFAULT_CLIENT_CLASS = "django_valkey.cluster_cache.client.DefaultClusterClient"
40 changes: 38 additions & 2 deletions django_valkey/cluster_cache/client/default.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,51 @@
from valkey.cluster import ValkeyCluster
from valkey.typing import KeyT, EncodableT

from django_valkey.base_client import BaseClient, _main_exceptions
from django_valkey.base_client import _main_exceptions
from django_valkey.client.default import SyncClientMethod
from django_valkey.exceptions import ConnectionInterrupted


class DefaultClusterClient(BaseClient):
class DefaultClusterClient(SyncClientMethod):
CONNECTION_FACTORY_PATH = (
"django_valkey.cluster_cache.pool.ClusterConnectionFactory"
)

def _get_client(self, write=True, tried=None, client=None) -> ValkeyCluster:
if client:
return client
return self.get_client(write=write, tried=tried)

def get_client(self, write=True, tried=None) -> ValkeyCluster:
index = self.get_next_client_index(write=write, tried=tried)

if self._clients[index] is None:
self._clients[index] = self.connect(index)

return self._clients[index]

def get_client_with_index(
self, write=True, tried=None
) -> tuple[ValkeyCluster, int]:
index = self.get_next_client_index(write=write, tried=tried)
if self._clients[index] is None:
self._clients[index] = self.connect(index)

return self._clients[index], index

def connect(self, index: int = 0) -> ValkeyCluster:
return self.connection_factory.connect(self._server[index])

def disconnect(self, index: int = 0, client: ValkeyCluster | None = None) -> None:
"""
delegates the connection factory to disconnect the client
"""
if client is None:
client = self._clients[index]

if client is not None:
self.connection_factory.disconnect(client)

def readonly(self, target_nodes=None, client=None):
client = self._get_client(write=True, client=client)
return client.readonly(target_nodes)
Expand Down
25 changes: 15 additions & 10 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,21 @@ def cache() -> Iterable[BaseValkeyCache]:
default_cache.clear()


if isawaitable(default_valkey.clear()):
try:
if isawaitable(default_valkey.clear()):

@pytest_asyncio.fixture(loop_scope="session")
async def valkey():
yield default_valkey
await default_valkey.aclear()
@pytest_asyncio.fixture(loop_scope="session")
async def valkey():
yield default_valkey
await default_valkey.aclear()

else:
else:

@pytest.fixture
def valkey() -> Iterable[BaseValkeyCache]:
yield default_valkey
default_valkey.clear()
@pytest.fixture
def valkey() -> Iterable[BaseValkeyCache]:
yield default_valkey
default_valkey.clear()

except AttributeError:
# cluster client doesn't support this feature yet
pass
6 changes: 3 additions & 3 deletions tests/settings/sqlite_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
CACHES = {
"default": {
"BACKEND": "django_valkey.cluster_cache.cache.ClusterValkeyCache",
"LOCATION": ["valkey://127.0.0.1:7005", "valkey://127.0.0.1:7005"],
"LOCATION": ["valkey://127.0.0.1:16379", "valkey://127.0.0.1:16380"],
"OPTIONS": {
"CLIENT_CLASS": "django_valkey.cluster_cache.client.DefaultClusterClient"
},
Expand All @@ -17,14 +17,14 @@
},
"sample": {
"BACKEND": "django_valkey.cluster_cache.cache.ClusterValkeyCache",
"LOCATION": "valkey://127.0.0.1:7005:0,valkey://127.0.0.1:7002:0",
"LOCATION": "valkey://127.0.0.1:16381:0,valkey://127.0.0.1:16379:0",
"OPTIONS": {
"CLIENT_CLASS": "django_valkey.cluster_cache.client.DefaultClusterClient"
},
},
"with_prefix": {
"BACKEND": "django_valkey.cluster_cache.cache.ClusterValkeyCache",
"LOCATION": "valkey://127.0.0.1:7005?db=0",
"LOCATION": "valkey://127.0.0.1:16380?db=0",
"OPTIONS": {
"CLIENT_CLASS": "django_valkey.cluster_cache.client.DefaultClusterClient"
},
Expand Down
9 changes: 6 additions & 3 deletions tests/tests_cluster/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,8 @@ def test_srandmember_default_count(self, cache: ClusterValkeyCache):

def test_srandmember(self, cache: ClusterValkeyCache):
cache.sadd("foo", "bar1", "bar2")
assert cache.srandmember("foo", 1) in [["bar1"], ["bar2"]]
assert cache.srandmember("foo", 1) in [{"bar1"}, {"bar2"}]
assert cache.srandmember("foo", 1, return_set=False) in [["bar1"], ["bar2"]]

def test_srem(self, cache: ClusterValkeyCache):
cache.sadd("foo", "bar1", "bar2")
Expand All @@ -1071,15 +1072,17 @@ def test_srem(self, cache: ClusterValkeyCache):

def test_sscan(self, cache: ClusterValkeyCache):
cache.sadd("foo", "bar1", "bar2")
items = cache.sscan("foo")
cursor, items = cache.sscan("foo")
assert items == {"bar1", "bar2"}
assert cursor == 0

def test_sscan_with_match(self, cache: ClusterValkeyCache):
if cache.client._has_compression_enabled():
pytest.skip("Compression is enabled, sscan with match is not supported")
cache.sadd("foo", "bar1", "bar2", "zoo")
items = cache.sscan("foo", match="zoo")
cursor, items = cache.sscan("foo", match="zoo")
assert items == {"zoo"}
assert cursor == 0

def test_sscan_iter(self, cache: ClusterValkeyCache):
cache.sadd("foo", "bar1", "bar2")
Expand Down

0 comments on commit c9cbf5e

Please sign in to comment.