Skip to content
Distractic edited this page Jun 11, 2023 · 12 revisions

Cache client

A custom cache client is defined in the project. It allows managing data through Redis using the cache and publish/subscribe systems.

Our client uses several Thread pools to manage the connections.

Create

The following code shows you how to create a new client instance :

import com.github.rushyverse.core.cache.CacheClient
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisURI
import io.lettuce.core.codec.ByteArrayCodec
import io.lettuce.core.support.BoundedPoolConfig
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.serialization.protobuf.ProtoBuf

suspend fun createCacheClient(): CacheClient {
    return CacheClient {
        uri = RedisURI.create("localhost:6379") // required
        client = RedisClient.create() // optional
        binaryFormat = ProtoBuf { } // optional
        codec = ByteArrayCodec.INSTANCE // optional
        poolConfiguration = BoundedPoolConfig.builder().maxTotal(-1).build() // optional
        coroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
    }
}

If you want a specific configuration for the Redis client or pool, you can check the documentation of the different dependencies used.

The client uses the coroutineScope for his lifecycle. So if you close it, all tasks (connection, subscription, etc.) linked to his scope will be closed too.

Usage

For this part, we will coonsider having a cache client instance using the previous function.

// In suspend function
val cacheClient: CacheClient = createCacheClient()

Get a new connection

Currently, a unique function to get a connection (connection to store data) exists.

cacheClient.connect { connection ->
    // The connection is opened
    connection .ping()
}

After the function, the connection is automatically returned to the connection pool.

If you need another type of connection (like Pub/Sub), you can get the connection manager.

cacheClient.connectionManager.poolPubSub.acquire { connection ->
    // do something
}

Fortunately, you probably won't have to do this.

Publish/Subscribe

Several methods have been added to simplify the publishing and the subscribing to channels.

Subscribe

For the subscription, according to the parameters, you can define :

  • Which type of data should be retrieved.
  • The CoroutineScope where the subscription will be executed.
  • The channels to listen.

The following code uses the function with the most parameters. For your need, you can check the other functions.

import com.github.rushyverse.core.serializer.UUIDSerializer
import kotlinx.coroutines.Job

val job: Job = cacheClient.subscribe(
    channels = arrayOf("channel1", "channel2"), // can be just a string
    messageSerializer = UUIDSerializer, // if not defined, use a string serializer
    scope = cacheClient // optional
) { channel: String, uuid: UUID ->
    println("Received message from channel $channel: $uuid")
}

You can notice several things here.

  • The method returns a Job instance. With this instance, you will be able to cancel the subscription when you want. His lifecycle is based on the parameter scope.
  • You can define a message serializer. When a message is received, the system will try to deserialize it using the given serializer. If the deserialization is successful, your code will be called. However, if the deserialization is not possible, the message will be skipped and doesn't throw an exception or close the subscription.
  • The CoroutineScope can be defined. By default, the cache client is used to subscribe, so if the client is closed, all subscription jobs will be closed too. However, if you need to specify another CoroutineScope, you will be able to do it.

For special usage, other subscription methods are available.

import com.github.rushyverse.core.serializer.UUIDSerializer
import com.github.rushyverse.core.cache.message.subscribeIdentifiableMessage
import kotlinx.coroutines.Job

val job: Job = cacheClient.subscribeIdentifiableMessage(
    channels = arrayOf("channel1", "channel2"),
    messageSerializer = UUIDSerializer,
    scope = cacheClient,
) { channel: String, id: String, message: UUID ->
    println("Received message from channel $channel with id $id: $message")
    cacheClient.publishIdentifiableMessage("channelResponse", id, "I received your message with id: $id", String.serializer())
}

These methods are similar to the subscribe method. However, they expect a specific type of message. This type is defined by the class IdentifiableMessage. With this class and the methods, you can listen to a message with an ID. The ID must be used for the response. Another method has been added for this system, and we will see it later.

Publish

For the subscription, according to the parameters, you can define :

  • Which type of data should be sent
  • The channel to which the data is sent
import com.github.rushyverse.core.serializer.UUIDSerializer
import java.util.*

cacheClient.publish(
    channel = "channel",
    message = UUID.randomUUID(),
    messageSerializer = UUIDSerializer
)

You don't need to serialize the data, the method will do it, for you using the Serializer defined in the parameters.

Previously, we talked about IdentifiableMessage. A publish method has been added to publish a message and wait for the response before continuing the code execution.

import com.github.rushyverse.core.serializer.UUIDSerializer
import kotlinx.serialization.builtins.serializer
import java.util.*
import kotlin.time.Duration.Companion.minutes

cacheClient.publishAndWaitResponse(
    channelPublish = "channel",
    channelSubscribe = "channelResponse",
    messagePublish = UUID.randomUUID(),
    messageSerializer = UUIDSerializer,
    responseSerializer = String.serializer(),
    id = "my message", // optional
    subscribeScope = cacheClient, // optional
    timeout = 1.minutes // optional
) { response : String ->
    println("Received response for my message: $response")
}

println("This line is not executed until the response is received or timeout is reached")

Using this method will publish a message of IdentifiableMessage type (and not the message type sent in the parameters). So, for the subscription part, you should use the subscribeIdentifiableMessage explained previously and not the subscribe basic method.

This publish method will suspend the current Coroutine until at least one of the following states occurs :

  • The response is received and the body function is executed.
  • The timeout is reached.
  • The subscribeScope is finished.

Internally, a subscription job is created to listen to a single message, after that, it is canceled.

To try this functionality, this is an example of publish and subscribe system :

cacheClient.subscribeIdentifiableMessage(
    channel = "channel",
    messageSerializer = UUIDSerializer
) { id: String, message: UUID ->
    println("Received message with id[$id]: $message")
    cacheClient.publishIdentifiableMessage(
        channel = "channelResponse",
        id = id,
        message = "Hello, $message!",
        messageSerializer = String.serializer()
    )
}

cacheClient.publishAndWaitResponse(
    channelPublish = "channel",
    channelSubscribe = "channelResponse",
    messagePublish = UUID.fromString("b4a93b09-e37c-448f-83ba-0eaf510524b5"),
    messageSerializer = UUIDSerializer,
    responseSerializer = String.serializer(),
    id = "my message", // optional
    subscribeScope = cacheClient, // optional
    timeout = 1.minutes // optional
) { response : String ->
    println("Received response: $response")
}

When executed, the console will print :

Received message with id[my message]: b4a93b09-e37c-448f-83ba-0eaf510524b5
Received response: Hello, b4a93b09-e37c-448f-83ba-0eaf510524b5!

Close

When you stop using the client, it's important to free resources to avoid memory leaks. The cache client implements AsyncCloseable and allows free resources using the method closeAsync().

import kotlinx.coroutines.future.await
cacheClient.closeAsync().await()

Close the client will stop all subscribe jobs and the Thread pools.

After doing that, you will not be able to interact with Redis using the same instance of a cache client.

HTTP client

The HTTP client used is the one from Ktor. There is no wrapper on top of the implementation. However, we're recommending to create an instance using the following declaration :

import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.contentnegotiation.*
import io.ktor.serialization.kotlinx.json.*
import kotlinx.serialization.json.Json

val httpClient = HttpClient(CIO) {
    install(ContentNegotiation) {
        json(Json { ignoreUnknownKeys = true })
    }
}

Usage

For personal usage of the client, you can check the official documentation of Ktor.

In this project, the client is used within some services.

Services

To simplify the data management between database / HTTP and cache, each data type is linked to one service.

Suppliers

Each service uses a strategy to know how to interact with the data. There are two types of suppliers to retrieve a strategy :

Supplier Usage
IDatabaseEntitySupplier Use to interact with the database with the possibility to use the cache on top.
IHttpEntitySupplier Use to interact with the HTTP services with the possibility to use the cache on top.

Database supplier

In IDatabaseEntitySupplier, you can find several static methods to create a strategy.

Supplier Usage
database(...) Create a strategy to only get and set data in the database.
cache(...) Create a strategy to only get and set data in the cache.
cachingDatabase(...) Create a strategy to get and set data in the database. If the set operation in the database succeeds, the data will be set in the cache too. For all get operations, the data will be always set in it.
cacheWithDatabaseFallback(...) Create a strategy to get the data in the cache, if it's not found, will get into the database. When new data is set, it will only be in the database.
cacheWithCachingDatabaseFallback(...) Create a strategy to get data from cache. If it's not found, will get into the database and store the result in the cache. For all set operations, the data is set in the database and if succeeds, in the cache too.

Http supplier

In IHttpEntitySupplier, you can find several static methods to create a strategy.

Supplier Usage
rest(...) Create a strategy to only get data from http request.
cache(...) Create a strategy to only get and set data in the cache.
cachingRest(...) Create a strategy to get data from http request. If the value retrieved is valid, the data will be set in the cache too. For all get operations, the data will be always set in it.
cacheWithRestFallback(...) Create a strategy to get the data in the cache, if it's not found, will get from http request.
cacheWithCachingRestFallback(...) Create a strategy to get data from cache. If it's not found, will get from http request and store the result in the cache.

HTTP services

MojangService is a service that depends on Mojang API to retrieve data about Minecraft accounts.

The data retrieved can be cached, but not stored in the database.

Usage

First of all, you need to create an HTTP client and a Cache client. You can check the HTTP client and Cache client sections to know how to do it.

Then, you can create a new instance of the service :

import com.github.rushyverse.core.cache.CacheClient
import com.github.rushyverse.core.data.MojangService
import com.github.rushyverse.core.data.ProfileIdCacheService
import com.github.rushyverse.core.data.ProfileSkinCacheService
import com.github.rushyverse.core.supplier.http.HttpSupplierConfiguration
import com.github.rushyverse.core.supplier.http.IHttpEntitySupplier
import io.github.universeproject.kotlinmojangapi.MojangAPIImpl
import kotlin.time.Duration.Companion.hours

val httpClient = createHttpClient()
val cacheClient = createCacheClient()

val configuration = HttpSupplierConfiguration(
    mojangAPI = MojangAPIImpl(httpClient),
    profileIdCache = ProfileIdCacheService(cacheClient, expirationKey = 12.hours),
    profileSkinCache = ProfileSkinCacheService(cacheClient, expirationKey = 12.hours)
)
// If you want to use the default parameters, you can just send the cache client and the mojang api
//    val configuration = HttpSupplierConfiguration(
//        mojangAPI = MojangAPIImpl(httpClient),
//        cacheClient = cacheClient,
//    )

val supplier = IHttpEntitySupplier.cacheWithCachingRestFallback(configuration)
val mojangService = MojangService(supplier)

With this configuration, all the data retrieved from Mojang API will be cached for 12 hours.

Skin

To get a skin using name or uuid, you need to use the following code :

val skinByName = mojangService.getSkinByName("Notch")
val skinById = mojangService.getSkinById("069a79f4-44e9-4726-a5be-fca90e38aaf5")

Profile

To get a profile to get the uuid from a name, you need to use the following code :

val profile = mojangService.getProfile("Notch")