Skip to content

Commit

Permalink
rework AioKiwiCache to correspond with KiwiCache
Browse files Browse the repository at this point in the history
  • Loading branch information
Marek Dernar committed Mar 11, 2020
1 parent 0fb2e83 commit a846189
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 174 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

### Changed

## [0.5.0] – 2020-01-22

### Added

- `AioBaseKiwiCache` corresponding with `BaseKiwiCache`

### Changed

- rework `AioKiwiCache` to correspond with `KiwiCache`

## [0.4.5] – 2019-10-08

### Added
Expand Down
267 changes: 149 additions & 118 deletions kw/cache/aio.py
Original file line number Diff line number Diff line change
@@ -1,166 +1,197 @@
import asyncio
from datetime import datetime, timedelta
import logging
from datetime import datetime
from typing import Any, Dict, ItemsView, KeysView, Optional, ValuesView

import aioredis
import attr

from . import json
from . import utils
from .base import BaseKiwiCache, CACHE_RECORD_ATTRIBUTES, CacheRecord, KiwiCache
from .helpers import CallAttempt, CallAttemptException


class AioKiwiCache: # pylint: disable=too-many-instance-attributes
"""Caches data from expensive sources to Redis and to memory."""
@attr.s
class AioBaseKiwiCache(BaseKiwiCache):
"""Helper class for load data from cache using asyncio and aioredis."""

instances = [] # type: List[AioKiwiCache]
reload_ttl = timedelta(minutes=1)
cache_ttl = reload_ttl * 10
refill_lock_ttl = timedelta(seconds=5)
resources_redis = None
resources_redis = attr.ib(None, type=aioredis.Redis, validator=attr.validators.instance_of(aioredis.Redis))

def __init__(self, resources_redis=None, logger=None, statsd=None):
# type: (redis.Connection, logging.Logger, datadog.DogStatsd) -> None
async def load_from_cache(self) -> Optional[CacheRecord]:
try:
value = await self.resources_redis.get(self._cache_key)
except aioredis.RedisError:
self._process_cache_error("kiwicache.load_failed")
return None

self.instances.append(self)
if value is None:
return None

if resources_redis is not None:
self.resources_redis = resources_redis
cache_data = self.json.loads(value)
if set(cache_data.keys()) != CACHE_RECORD_ATTRIBUTES:
self._log_warning("kiwicache.malformed_cache_data")
return None
return CacheRecord(**cache_data)

self.check_initialization()
async def save_to_cache(self, data: dict) -> None:
cache_record = CacheRecord(data=data)
try:
await self.resources_redis.set(
self._cache_key, self.json.dumps(attr.asdict(cache_record)), expire=int(self._cache_ttl.total_seconds())
)
except aioredis.RedisError:
self._process_cache_error("kiwicache.save_failed")
else:
self._increment_metric("success")

self.name = self.__class__.__name__
self.expires_at = datetime.utcnow()
self._data = {} # type: dict
self.logger = logger if logger else logging.getLogger(__name__)
self.statsd = statsd
self.call_attempt = CallAttempt("{}.load_from_source".format(self.name.lower()))
self.initialized = False
async def _get_refill_lock(self) -> Optional[bool]:
try:
return bool(
await self.resources_redis.set(
self._refill_lock_key,
"locked",
expire=int(self.refill_ttl.total_seconds()),
exist=self.resources_redis.SET_IF_NOT_EXIST,
)
)
except aioredis.RedisError:
self._process_cache_error("kiwicache.refill_lock_failed")
return None

async def _wait_for_refill_lock(self) -> Optional[bool]:
start_timestamp = utils.get_current_timestamp()
lock_check_period = 0.5
while True:
has_lock = await self._get_refill_lock()
if has_lock is None or has_lock is True:
return has_lock

self._log_warning("kiwicache.refill_locked")
# let the lock owner finish
lock_check_period = min(lock_check_period * 2, self.refill_ttl.total_seconds())
await asyncio.sleep(lock_check_period)

if await self._is_refilled(start_timestamp):
return False

async def _is_refilled(self, timestamp: float) -> bool:
cache_record = await self.load_from_cache()
return cache_record and cache_record.timestamp > timestamp

async def _release_refill_lock(self) -> Optional[bool]:
try:
return bool(await self.resources_redis.delete(self._refill_lock_key))
except aioredis.RedisError:
self._process_cache_error("kiwicache.release_lock_failed")
return None

async def _prolong_cache_expiration(self) -> None:
try:
await self.resources_redis.expire(self._cache_key, timeout=int(self._cache_ttl.total_seconds()))
except aioredis.RedisError:
self._process_cache_error("kiwicache.prolong_expiration_failed")

def check_initialization(self):
if self.resources_redis is None:
raise RuntimeError("You must set a redis.Connection object")

if self.cache_ttl < self.reload_ttl:
raise RuntimeError("The cache_ttl has to be greater then reload_ttl.")
@attr.s
class AioKiwiCache(AioBaseKiwiCache, KiwiCache):
"""Caches data from expensive sources to Redis and to memory using asyncio."""

async def acheck_initialization(self):
if await self.resources_redis.ttl(self.redis_key) > int(self.reload_ttl.total_seconds()):
await self.resources_redis.expire(self.redis_key, int(self.reload_ttl.total_seconds()))
instances: Dict[str, "AioKiwiCache"] = {}

@property
def redis_key(self):
return "resource:" + self.name
def __attrs_post_init__(self) -> None:
super().__attrs_post_init__()
self._add_instance()
self._call_attempt = CallAttempt("{}.load_from_source".format(self.name.lower()), self.max_attempts)

async def getitem(self, key):
async def getitem(self, key: Any) -> Any:
data = await self.get_data()
if key not in data:
return self.__missing__(key)
return data[key]

def __missing__(self, key):
def __missing__(self, key: Any) -> None:
raise KeyError

async def get(self, key, default=None):
async def get(self, key: Any, default: Any = None) -> None:
return (await self.get_data()).get(key, default)

async def contains(self, key):
async def contains(self, key: Any) -> bool:
return key in await self.get_data()

async def keys(self):
async def keys(self) -> KeysView:
return (await self.get_data()).keys()

async def values(self):
async def values(self) -> ValuesView:
return (await self.get_data()).values()

async def items(self):
async def items(self) -> ItemsView:
return (await self.get_data()).items()

async def get_data(self):
async def get_data(self) -> dict:
await self.maybe_reload()
return self._data

async def load_from_source(self): # type: () -> dict
"""Get the full data bundle from our expensive source."""
raise NotImplementedError()

async def load_from_cache(self): # type: () -> str
"""Get the full data bundle from cache."""
return await self.resources_redis.get(self.redis_key)

async def save_to_cache(self, data): # type: (dict) -> None
"""Save the provided full data bundle to cache."""
try:
await self.resources_redis.set(
self.redis_key, json.dumps(data), expire=int(self.cache_ttl.total_seconds()) if self.cache_ttl else 0
)
except aioredis.RedisError:
self.statsd and self.statsd.increment("kiwicache", tags=["name:" + self.name, "status:redis_error"])
self.logger.exception("kiwicache.redis_exception")

async def reload(self):
"""Load the full data bundle, from cache, or if unavailable, from source."""
try:
cache_data = await self.load_from_cache()
except aioredis.RedisError:
self.logger.exception("kiwicache.redis_exception")
self.statsd and self.statsd.increment("kiwicache", tags=["name:" + self.name, "status:redis_error"])
return

if cache_data:
self._data = json.loads(cache_data)
self.expires_at = datetime.utcnow() + self.reload_ttl
self.statsd and self.statsd.increment("kiwicache", tags=["name:" + self.name, "status:success"])
else:
await self.refill_cache()
await self.reload()

async def maybe_reload(self): # type: () -> None
"""Load the full data bundle if it's too old."""
if not self.initialized:
await self.acheck_initialization()
self.initialized = True

if not self._data or self.expires_at < datetime.utcnow():
async def reload(self) -> None:
successful_reload = await self.reload_from_cache()
while not successful_reload:
try:
await self.reload()
await self.refill_cache()
except CallAttemptException:
self._prolong_data_expiration()
raise
except Exception:
self.logger.exception("kiwicache.reload_exception")

async def get_refill_lock(self): # type: () -> bool
"""Lock loading from the expensive source.
successful_reload = await self.reload_from_cache()
if self.max_attempts < 0 and not successful_reload:
self._prolong_data_expiration()
self._log_error("kiwicache.reload_failed")
break

This lets us avoid all workers hitting database at the same time.
async def reload_from_cache(self) -> bool:
cache_data = await self.load_from_cache()

:return: Whether we got the lock or not
"""
try:
return bool(
await self.resources_redis.set(
self.redis_key + ":lock",
"locked",
expire=int(self.refill_lock_ttl.total_seconds()),
exist=self.resources_redis.SET_IF_NOT_EXIST,
)
)
except aioredis.RedisError:
pass
if not cache_data:
return False

self._data = cache_data.data
self._prolong_data_expiration()
return True

async def maybe_reload(self) -> None:
if self.expires_at <= datetime.utcnow() or (not self._data and not self.allow_empty_data):
await self.reload()

async def refill_cache(self):
"""Cache the full data bundle in Redis."""
if not await self.get_refill_lock():
await asyncio.sleep(self.refill_lock_ttl.total_seconds()) # let the lock owner finish
async def _prolong_cache_expiration(self) -> None:
await super()._prolong_cache_expiration()
successful_reload = await self.reload_from_cache()
if not successful_reload and self._data:
await self.save_to_cache(self._data)

async def _process_refill_error(self, msg: str, exception: Exception = None) -> None:
await self._prolong_cache_expiration()
self._increment_metric("load_error")
self._log_exception(msg)
self._call_attempt.countdown()

async def refill_cache(self) -> None:
has_lock = await self._wait_for_refill_lock()
if not has_lock:
if has_lock is None:
# redis error
self._call_attempt.countdown()
return

try:
source_data = await self.load_from_source()
if not source_data:
raise RuntimeError("load_from_source returned empty response!")

self.call_attempt.reset()
await self.save_to_cache(source_data)
self.statsd and self.statsd.increment("kiwicache", tags=["name:" + self.name, "status:success"])
except Exception:
self.logger.exception("kiwicache.source_exception")
self.call_attempt.countdown()
self.statsd and self.statsd.increment("kiwicache", tags=["name:" + self.name, "status:load_error"])
try:
source_data = await self.load_from_source()
except Exception as e:
await self._process_refill_error("kiwicache.source_exception", e)
return

if source_data or self.allow_empty_data:
await self.save_to_cache(source_data)
else:
await self._process_refill_error("load_from_source returned empty response!")
finally:
await self._release_refill_lock()

async def load_from_source(self) -> dict:
raise NotImplementedError()
2 changes: 1 addition & 1 deletion kw/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def _increment_metric(self, status):
:param status: metric status
"""
if self.statsd:
self.statsd.increment(self.metric, tags=["name:{}".format(self.name), "status:{}".format(status)])
self.statsd.increment(self.metric, tags=["cache_name:{}".format(self.name), "status:{}".format(status)])


@attr.s
Expand Down
2 changes: 1 addition & 1 deletion kw/cache/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class CallAttempt(object):

name = attr.ib(None, type=str)
max_attempts = attr.ib(3, type=int)
counter = attr.ib(None, type=int)
counter = attr.ib(None, init=False, type=int)

def __attrs_post_init__(self):
self.reset()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name="kiwi-cache",
version="0.4.5",
version="0.5.0",
url="https://github.com/kiwicom/kiwi-cache",
author="Stanislav Komanec",
author_email="[email protected]",
Expand Down
2 changes: 1 addition & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#
atomicwrites==1.3.0 # via pytest
attrs==19.3.0
coverage==4.5.4
coverage==5.0.3
freezegun==0.3.12
future==0.18.2
importlib-metadata==0.23 # via pluggy, pytest
Expand Down
8 changes: 8 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from freezegun import freeze_time
import pytest


@pytest.fixture
def frozen_time():
with freeze_time("2000-01-01 00:00:00", ignore=["_pytest.runner"]) as ft:
yield ft
8 changes: 0 additions & 8 deletions test/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os

from freezegun import freeze_time
import pytest
import redis as redislib
import testing.redis
Expand All @@ -20,10 +19,3 @@ def redis(redis_url): # pylint: disable=redefined-outer-name
client = redislib.StrictRedis.from_url(redis_url)
yield client
client.flushall()


@pytest.fixture
def frozen_time():
ft = freeze_time("2000-01-01 00:00:00")
yield ft.start()
ft.stop()
Loading

0 comments on commit a846189

Please sign in to comment.