-
Notifications
You must be signed in to change notification settings - Fork 0
Usage
A custom cache client is defined in the project. It allows managing data through Redis using the cache and publish/subscribe systems.
Our client uses several Thread pools to manage the connections.
The following code shows you how to create a new client instance :
import com.github.rushyverse.core.cache.CacheClient
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisURI
import io.lettuce.core.codec.ByteArrayCodec
import io.lettuce.core.support.BoundedPoolConfig
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.serialization.protobuf.ProtoBuf
suspend fun createCacheClient(): CacheClient {
return CacheClient {
uri = RedisURI.create("localhost:6379") // required
client = RedisClient.create() // optional
binaryFormat = ProtoBuf { } // optional
codec = ByteArrayCodec.INSTANCE // optional
poolConfiguration = BoundedPoolConfig.builder().maxTotal(-1).build() // optional
coroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
}
}
If you want a specific configuration for the Redis client or pool, you can check the documentation of the different dependencies used.
The client uses the coroutineScope
for his lifecycle. So if you close it, all tasks (connection, subscription, etc.) linked to his scope will be closed too.
For this part, we will coonsider having a cache client instance using the previous function.
// In suspend function
val cacheClient: CacheClient = createCacheClient()
Currently, a unique function to get a connection (connection to store data) exists.
cacheClient.connect { connection ->
// The connection is opened
connection .ping()
}
After the function, the connection is automatically returned to the connection pool.
If you need another type of connection (like Pub/Sub), you can get the connection manager.
cacheClient.connectionManager.poolPubSub.acquire { connection ->
// do something
}
Fortunately, you probably won't have to do this.
Several methods have been added to simplify the publishing and the subscribing to channels.
For the subscription, according to the parameters, you can define :
- Which type of data should be retrieved.
- The CoroutineScope where the subscription will be executed.
- The channels to listen.
The following code uses the function with the most parameters. For your need, you can check the other functions.
import com.github.rushyverse.core.serializer.UUIDSerializer
import kotlinx.coroutines.Job
val job: Job = cacheClient.subscribe(
channels = arrayOf("channel1", "channel2"), // can be just a string
messageSerializer = UUIDSerializer, // if not defined, use a string serializer
scope = cacheClient // optional
) { channel: String, uuid: UUID ->
println("Received message from channel $channel: $uuid")
}
You can notice several things here.
- The method returns a Job instance. With this instance, you will be able to cancel the subscription when you want. His lifecycle is based on the parameter
scope
. - You can define a message serializer. When a message is received, the system will try to deserialize it using the given serializer. If the deserialization is successful, your code will be called. However, if the deserialization is not possible, the message will be skipped and doesn't throw an exception or close the subscription.
- The CoroutineScope can be defined. By default, the cache client is used to subscribe, so if the client is closed, all subscription jobs will be closed too. However, if you need to specify another CoroutineScope, you will be able to do it.
For special usage, other subscription methods are available.
import com.github.rushyverse.core.serializer.UUIDSerializer
import com.github.rushyverse.core.cache.message.subscribeIdentifiableMessage
import kotlinx.coroutines.Job
val job: Job = cacheClient.subscribeIdentifiableMessage(
channels = arrayOf("channel1", "channel2"),
messageSerializer = UUIDSerializer,
scope = cacheClient,
) { channel: String, id: String, message: UUID ->
println("Received message from channel $channel with id $id: $message")
cacheClient.publishIdentifiableMessage("channelResponse", id, "I received your message with id: $id", String.serializer())
}
These methods are similar to the subscribe
method. However, they expect a specific type of message. This type is defined by the class IdentifiableMessage. With this class and the methods, you can listen to a message with an ID. The ID must be used for the response. Another method has been added for this system, and we will see it later.
For the subscription, according to the parameters, you can define :
- Which type of data should be sent
- The channel to which the data is sent
import com.github.rushyverse.core.serializer.UUIDSerializer
import java.util.*
cacheClient.publish(
channel = "channel",
message = UUID.randomUUID(),
messageSerializer = UUIDSerializer
)
You don't need to serialize the data, the method will do it, for you using the Serializer defined in the parameters.
Previously, we talked about IdentifiableMessage. A publish method has been added to publish a message and wait for the response before continuing the code execution.
import com.github.rushyverse.core.serializer.UUIDSerializer
import kotlinx.serialization.builtins.serializer
import java.util.*
import kotlin.time.Duration.Companion.minutes
cacheClient.publishAndWaitResponse(
channelPublish = "channel",
channelSubscribe = "channelResponse",
messagePublish = UUID.randomUUID(),
messageSerializer = UUIDSerializer,
responseSerializer = String.serializer(),
id = "my message", // optional
subscribeScope = cacheClient, // optional
timeout = 1.minutes // optional
) { response : String ->
println("Received response for my message: $response")
}
println("This line is not executed until the response is received or timeout is reached")
Using this method will publish a message of IdentifiableMessage type (and not the message type sent in the parameters). So, for the subscription part, you should use the subscribeIdentifiableMessage
explained previously and not the subscribe
basic method.
This publish method will suspend the current Coroutine until at least one of the following states occurs :
- The response is received and the body function is executed.
- The
timeout
is reached. - The
subscribeScope
is finished.
Internally, a subscription job is created to listen to a single message, after that, it is canceled.
To try this functionality, this is an example of publish and subscribe system :
cacheClient.subscribeIdentifiableMessage(
channel = "channel",
messageSerializer = UUIDSerializer
) { id: String, message: UUID ->
println("Received message with id[$id]: $message")
cacheClient.publishIdentifiableMessage(
channel = "channelResponse",
id = id,
message = "Hello, $message!",
messageSerializer = String.serializer()
)
}
cacheClient.publishAndWaitResponse(
channelPublish = "channel",
channelSubscribe = "channelResponse",
messagePublish = UUID.fromString("b4a93b09-e37c-448f-83ba-0eaf510524b5"),
messageSerializer = UUIDSerializer,
responseSerializer = String.serializer(),
id = "my message", // optional
subscribeScope = cacheClient, // optional
timeout = 1.minutes // optional
) { response : String ->
println("Received response: $response")
}
When executed, the console will print :
Received message with id[my message]: b4a93b09-e37c-448f-83ba-0eaf510524b5
Received response: Hello, b4a93b09-e37c-448f-83ba-0eaf510524b5!
When you stop using the client, it's important to free resources to avoid memory leaks.
The cache client implements AsyncCloseable
and allows free resources using the method closeAsync()
.
import kotlinx.coroutines.future.await
cacheClient.closeAsync().await()
Close the client will stop all subscribe jobs and the Thread pools.
After doing that, you will not be able to interact with Redis using the same instance of a cache client.
The HTTP client used is the one from Ktor. There is no wrapper on top of the implementation. However, we're recommending to create an instance using the following declaration :
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.contentnegotiation.*
import io.ktor.serialization.kotlinx.json.*
import kotlinx.serialization.json.Json
val httpClient = HttpClient(CIO) {
install(ContentNegotiation) {
json(Json { ignoreUnknownKeys = true })
}
}
For personal usage of the client, you can check the official documentation of Ktor.
In this project, the client is used within some services.
To simplify the data management between database / HTTP and cache, each data type is linked to one service.
Each service uses a strategy to know how to interact with the data. There are two types of suppliers to retrieve a strategy :
Supplier | Usage |
---|---|
IDatabaseEntitySupplier | Use to interact with the database with the possibility to use the cache on top. |
IHttpEntitySupplier | Use to interact with the HTTP services with the possibility to use the cache on top. |
In IDatabaseEntitySupplier, you can find several static methods to create a strategy.
Supplier | Usage |
---|---|
database(...) |
Create a strategy to only get and set data in the database. |
cache(...) |
Create a strategy to only get and set data in the cache. |
cachingDatabase(...) |
Create a strategy to get and set data in the database. If the set operation in the database succeeds, the data will be set in the cache too. For all get operations, the data will be always set in it. |
cacheWithDatabaseFallback(...) |
Create a strategy to get the data in the cache, if it's not found, will get into the database. When new data is set, it will only be in the database. |
cacheWithCachingDatabaseFallback(...) |
Create a strategy to get data from cache. If it's not found, will get into the database and store the result in the cache. For all set operations, the data is set in the database and if succeeds, in the cache too. |
In IHttpEntitySupplier, you can find several static methods to create a strategy.
Supplier | Usage |
---|---|
rest(...) |
Create a strategy to only get data from http request. |
cache(...) |
Create a strategy to only get and set data in the cache. |
cachingRest(...) |
Create a strategy to get data from http request. If the value retrieved is valid, the data will be set in the cache too. For all get operations, the data will be always set in it. |
cacheWithRestFallback(...) |
Create a strategy to get the data in the cache, if it's not found, will get from http request. |
cacheWithCachingRestFallback(...) |
Create a strategy to get data from cache. If it's not found, will get from http request and store the result in the cache. |
MojangService is a service that depends on Mojang API to retrieve data about Minecraft accounts.
The data retrieved can be cached, but not stored in the database.
First of all, you need to create an HTTP client and a Cache client. You can check the HTTP client and Cache client sections to know how to do it.
Then, you can create a new instance of the service :
import com.github.rushyverse.core.cache.CacheClient
import com.github.rushyverse.core.data.MojangService
import com.github.rushyverse.core.data.ProfileIdCacheService
import com.github.rushyverse.core.data.ProfileSkinCacheService
import com.github.rushyverse.core.supplier.http.HttpSupplierConfiguration
import com.github.rushyverse.core.supplier.http.IHttpEntitySupplier
import io.github.universeproject.kotlinmojangapi.MojangAPIImpl
import kotlin.time.Duration.Companion.hours
val httpClient = createHttpClient()
val cacheClient = createCacheClient()
val configuration = HttpSupplierConfiguration(
mojangAPI = MojangAPIImpl(httpClient),
profileIdCache = ProfileIdCacheService(cacheClient, expirationKey = 12.hours),
profileSkinCache = ProfileSkinCacheService(cacheClient, expirationKey = 12.hours)
)
// If you want to use the default parameters, you can just send the cache client and the mojang api
// val configuration = HttpSupplierConfiguration(
// mojangAPI = MojangAPIImpl(httpClient),
// cacheClient = cacheClient,
// )
val supplier = IHttpEntitySupplier.cacheWithCachingRestFallback(configuration)
val mojangService = MojangService(supplier)
With this configuration, all the data retrieved from Mojang API will be cached for 12 hours.
To get a skin using name or uuid, you need to use the following code :
val skinByName = mojangService.getSkinByName("Notch")
val skinById = mojangService.getSkinById("069a79f4-44e9-4726-a5be-fca90e38aaf5")
To get a profile to get the uuid from a name, you need to use the following code :
val profile = mojangService.getProfile("Notch")