Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Valkey as a native store #3892

Merged
merged 3 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"trio": ("https://trio.readthedocs.io/en/stable/", None),
"pydantic": ("https://docs.pydantic.dev/latest/", None),
"typing_extensions": ("https://typing-extensions.readthedocs.io/en/stable/", None),
"valkey": ("https://valkey-py.readthedocs.io/en/latest/", None),
}

napoleon_google_docstring = True
Expand Down Expand Up @@ -102,6 +103,7 @@
(PY_CLASS, "sqlalchemy.dialects.postgresql.named_types.ENUM"),
(PY_CLASS, "sqlalchemy.orm.decl_api.DeclarativeMeta"),
(PY_CLASS, "sqlalchemy.sql.sqltypes.TupleType"),
(PY_CLASS, "valkey.asyncio.Valkey"),
(PY_METH, "_types.TypeDecorator.process_bind_param"),
(PY_METH, "_types.TypeDecorator.process_result_value"),
(PY_METH, "litestar.typing.ParsedType.is_subclass_of"),
Expand Down
1 change: 1 addition & 0 deletions docs/reference/stores/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ stores
memory
redis
registry
valkey
5 changes: 5 additions & 0 deletions docs/reference/stores/valkey.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
valkey
======

.. automodule:: litestar.stores.valkey
:members:
6 changes: 6 additions & 0 deletions docs/usage/stores.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ Built-in stores
A store backend by `redis <https://redis.io/>`_. It offers all the guarantees and features of Redis, making it
suitable for almost all applications. Offers `namespacing`_.

:class:`ValKeyStore <litestar.stores.valkey.ValkeyStore>`
A store backed by `valkey <https://valkey.io>`_, a fork of Redis created as the result of Redis' license changes.
Similarly to the RedisStore, it is suitable for almost all applications and supports `namespacing`_.
At the time of writing, :class:`Valkey <valkey.asyncio.Valkey>` is equivalent to :class:`redis.asyncio.Redis`,
and all notes pertaining to Redis also apply to Valkey.

.. admonition:: Why not memcached?
:class: info

Expand Down
210 changes: 210 additions & 0 deletions litestar/stores/valkey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, cast

from valkey.asyncio import Valkey
from valkey.asyncio.connection import ConnectionPool

from litestar.exceptions import ImproperlyConfiguredException
from litestar.types import Empty, EmptyType
from litestar.utils.empty import value_or_default

from .base import NamespacedStore

if TYPE_CHECKING:
from types import TracebackType


__all__ = ("ValkeyStore",)


class ValkeyStore(NamespacedStore):
"""Valkey based, thread and process safe asynchronous key/value store."""

__slots__ = (
"_delete_all_script",
"_get_and_renew_script",
"_valkey",
"handle_client_shutdown",
)

def __init__(
self, valkey: Valkey, namespace: str | None | EmptyType = Empty, handle_client_shutdown: bool = False
) -> None:
"""Initialize :class:`ValkeyStore`

Args:
valkey: An :class:`valkey.asyncio.Valkey` instance
namespace: A key prefix to simulate a namespace in valkey. If not given,
defaults to ``LITESTAR``. Namespacing can be explicitly disabled by passing
``None``. This will make :meth:`.delete_all` unavailable.
handle_client_shutdown: If ``True``, handle the shutdown of the `valkey` instance automatically during the store's lifespan. Should be set to `True` unless the shutdown is handled externally
"""
self._valkey = valkey
self.namespace: str | None = value_or_default(namespace, "LITESTAR")
self.handle_client_shutdown = handle_client_shutdown

# script to get and renew a key in one atomic step
self._get_and_renew_script = self._valkey.register_script(
b"""
local key = KEYS[1]
local renew = tonumber(ARGV[1])

local data = server.call('GET', key)
local ttl = server.call('TTL', key)

if ttl > 0 then
server.call('EXPIRE', key, renew)
end

return data
"""
)

# script to delete all keys in the namespace
self._delete_all_script = self._valkey.register_script(
b"""
local cursor = 0

repeat
local result = server.call('SCAN', cursor, 'MATCH', ARGV[1])
for _,key in ipairs(result[2]) do
server.call('UNLINK', key)
end
cursor = tonumber(result[1])
until cursor == 0
"""
)

async def _shutdown(self) -> None:
if self.handle_client_shutdown:
await self._valkey.aclose(close_connection_pool=True)

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
await self._shutdown()

@classmethod
def with_client(
cls,
url: str = "valkey://localhost:6379",
*,
db: int | None = None,
port: int | None = None,
username: str | None = None,
password: str | None = None,
namespace: str | None | EmptyType = Empty,
) -> ValkeyStore:
"""Initialize a :class:`ValkeyStore` instance with a new class:`valkey.asyncio.Valkey` instance.

Args:
url: Valkey URL to connect to
db: Valkey database to use
port: Valkey port to use
username: Valkey username to use
password: Valkey password to use
namespace: Virtual key namespace to use
"""
pool: ConnectionPool = ConnectionPool.from_url(
url=url,
db=db,
decode_responses=False,
port=port,
username=username,
password=password,
)
return cls(
valkey=Valkey(connection_pool=pool),
namespace=namespace,
handle_client_shutdown=True,
)

def with_namespace(self, namespace: str) -> ValkeyStore:
"""Return a new :class:`ValkeyStore` with a nested virtual key namespace.
The current instances namespace will serve as a prefix for the namespace, so it
can be considered the parent namespace.
"""
return type(self)(
valkey=self._valkey,
namespace=f"{self.namespace}_{namespace}" if self.namespace else namespace,
handle_client_shutdown=self.handle_client_shutdown,
)

def _make_key(self, key: str) -> str:
prefix = f"{self.namespace}:" if self.namespace else ""
return prefix + key

async def set(self, key: str, value: str | bytes, expires_in: int | timedelta | None = None) -> None:
"""Set a value.

Args:
key: Key to associate the value with
value: Value to store
expires_in: Time in seconds before the key is considered expired

Returns:
``None``
"""
if isinstance(value, str):
value = value.encode("utf-8")
await self._valkey.set(self._make_key(key), value, ex=expires_in)

async def get(self, key: str, renew_for: int | timedelta | None = None) -> bytes | None:
"""Get a value.

Args:
key: Key associated with the value
renew_for: If given and the value had an initial expiry time set, renew the
expiry time for ``renew_for`` seconds. If the value has not been set
with an expiry time this is a no-op. Atomicity of this step is guaranteed
by using a lua script to execute fetch and renewal. If ``renew_for`` is
not given, the script will be bypassed so no overhead will occur

Returns:
The value associated with ``key`` if it exists and is not expired, else
``None``
"""
key = self._make_key(key)
if renew_for:
if isinstance(renew_for, timedelta):
renew_for = renew_for.seconds
data = await self._get_and_renew_script(keys=[key], args=[renew_for])
return cast("bytes | None", data)
return await self._valkey.get(key) # type: ignore[no-any-return]

async def delete(self, key: str) -> None:
"""Delete a value.

If no such key exists, this is a no-op.

Args:
key: Key of the value to delete
"""
await self._valkey.delete(self._make_key(key))

async def delete_all(self) -> None:
"""Delete all stored values in the virtual key namespace.

Raises:
ImproperlyConfiguredException: If no namespace was configured
"""
if not self.namespace:
raise ImproperlyConfiguredException("Cannot perform delete operation: No namespace configured")

await self._delete_all_script(keys=[], args=[f"{self.namespace}*:*"])

async def exists(self, key: str) -> bool:
"""Check if a given ``key`` exists."""
return await self._valkey.exists(self._make_key(key)) == 1 # type: ignore[no-any-return]

async def expires_in(self, key: str) -> int | None:
"""Get the time in seconds ``key`` expires in. If no such ``key`` exists or no
expiry time was set, return ``None``.
"""
ttl = await self._valkey.ttl(self._make_key(key))
return None if ttl == -2 else ttl
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ brotli = ["brotli"]
cli = ["jsbeautifier", "uvicorn[standard]", "uvloop>=0.18.0; sys_platform != 'win32'"]
cryptography = ["cryptography"]
full = [
"litestar[annotated-types,attrs,brotli,cli,cryptography,jinja,jwt,mako,minijinja,opentelemetry,piccolo,picologging,prometheus,pydantic,redis,sqlalchemy,standard,structlog]",
"litestar[annotated-types,attrs,brotli,cli,cryptography,jinja,jwt,mako,minijinja,opentelemetry,piccolo,picologging,prometheus,pydantic,redis,sqlalchemy,standard,structlog,valkey]",
]
jinja = ["jinja2>=3.1.2"]
jwt = [
Expand All @@ -98,6 +98,7 @@ picologging = ["picologging"]
prometheus = ["prometheus-client"]
pydantic = ["pydantic", "email-validator", "pydantic-extra-types"]
redis = ["redis[hiredis]>=4.4.4"]
valkey = ["valkey[libvalkey]>=6.0.2"]
sqlalchemy = ["advanced-alchemy>=0.2.2"]
standard = ["jinja2", "jsbeautifier", "uvicorn[standard]", "uvloop>=0.18.0; sys_platform != 'win32'", "fast-query-parsers>=1.0.2"]
structlog = ["structlog"]
Expand Down
29 changes: 28 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from redis.asyncio import Redis as AsyncRedis
from redis.client import Redis
from time_machine import travel
from valkey.asyncio import Valkey as AsyncValkey
from valkey.client import Valkey

from litestar.logging import LoggingConfig
from litestar.middleware.session import SessionMiddleware
Expand All @@ -29,6 +31,7 @@
from litestar.stores.file import FileStore
from litestar.stores.memory import MemoryStore
from litestar.stores.redis import RedisStore
from litestar.stores.valkey import ValkeyStore
from litestar.testing import RequestFactory
from tests.helpers import not_none

Expand Down Expand Up @@ -82,6 +85,11 @@ def redis_store(redis_client: AsyncRedis) -> RedisStore:
return RedisStore(redis=redis_client)


@pytest.fixture()
def valkey_store(valkey_client: AsyncValkey) -> ValkeyStore:
return ValkeyStore(valkey=valkey_client)


@pytest.fixture()
def memory_store() -> MemoryStore:
return MemoryStore()
Expand All @@ -105,7 +113,12 @@ def file_store_create_directories_flag_false(tmp_path: Path) -> FileStore:


@pytest.fixture(
params=[pytest.param("redis_store", marks=pytest.mark.xdist_group("redis")), "memory_store", "file_store"]
params=[
pytest.param("redis_store", marks=pytest.mark.xdist_group("redis")),
pytest.param("valkey_store", marks=pytest.mark.xdist_group("valkey")),
"memory_store",
"file_store",
]
)
def store(request: FixtureRequest) -> Store:
return cast("Store", request.getfixturevalue(request.param))
Expand Down Expand Up @@ -327,6 +340,20 @@ async def redis_client(docker_ip: str, redis_service: None) -> AsyncGenerator[As
pass


@pytest.fixture()
async def valkey_client(docker_ip: str, valkey_service: None) -> AsyncGenerator[AsyncValkey, None]:
# this is to get around some weirdness with pytest-asyncio and valkey interaction
# on 3.8 and 3.9

Valkey(host=docker_ip, port=6381).flushall()
client: AsyncValkey = AsyncValkey(host=docker_ip, port=6381)
yield client
try:
await client.aclose()
except RuntimeError:
pass


@pytest.fixture(autouse=True)
def _patch_openapi_config(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("litestar.app.DEFAULT_OPENAPI_CONFIG", OpenAPIConfig(title="Litestar API", version="1.0.0"))
5 changes: 5 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@ services:
restart: always
ports:
- "6397:6379" # use a non-standard port here
valkey:
image: valkey/valkey:latest
restart: always
ports:
- "6381:6379" # also a non-standard port
17 changes: 17 additions & 0 deletions tests/docker_service_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import pytest
from redis.asyncio import Redis as AsyncRedis
from redis.exceptions import ConnectionError as RedisConnectionError
from valkey.asyncio import Valkey as AsyncValkey
from valkey.exceptions import ConnectionError as ValkeyConnectionError

from litestar.utils import ensure_async_callable

Expand Down Expand Up @@ -127,6 +129,21 @@ async def redis_service(docker_services: DockerServiceRegistry) -> None:
await docker_services.start("redis", check=redis_responsive)


async def valkey_responsive(host: str) -> bool:
client: AsyncValkey = AsyncValkey(host=host, port=6381)
try:
return await client.ping()
except (ConnectionError, ValkeyConnectionError):
return False
finally:
await client.aclose()


@pytest.fixture()
async def valkey_service(docker_services: DockerServiceRegistry) -> None:
await docker_services.start("valkey", check=valkey_responsive)


async def postgres_responsive(host: str) -> bool:
try:
conn = await asyncpg.connect(
Expand Down
Loading
Loading