Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAREDIS-1075 - Add EnableKeyspaceNotifications for more configuration options of keyspace event listening. #503

Open
wants to merge 3 commits into
base: 2.7.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.3.0.BUILD-SNAPSHOT</version>
<version>2.3.0.DATAREDIS-1075-SNAPSHOT</version>

<name>Spring Data Redis</name>

Expand Down
3 changes: 2 additions & 1 deletion src/main/asciidoc/reference/redis-repositories.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ The repository implementation ensures subscription to https://redis.io/topics/no

When the expiration is set to a positive value, the corresponding `EXPIRE` command is executed. In addition to persisting the original, a phantom copy is persisted in Redis and set to expire five minutes after the original one. This is done to enable the Repository support to publish `RedisKeyExpiredEvent`, holding the expired value in Spring's `ApplicationEventPublisher` whenever a key expires, even though the original values have already been removed. Expiry events are received on all connected applications that use Spring Data Redis repositories.

By default, the key expiry listener is disabled when initializing the application. The startup mode can be adjusted in `@EnableRedisRepositories` or `RedisKeyValueAdapter` to start the listener with the application or upon the first insert of an entity with a TTL. See https://docs.spring.io/spring-data/redis/docs/{revnumber}/api/org/springframework/data/redis/core/RedisKeyValueAdapter.EnableKeyspaceEvents.html[`EnableKeyspaceEvents`] for possible values.
By default, the key expiry listener is disabled when initializing the application.
The startup mode can be adjusted via `@EnableKeyspaceNotifications` or `RedisKeyValueAdapter` to start the listener with the application or upon the first insert of an entity with a TTL. See https://docs.spring.io/spring-data/redis/docs/{revnumber}/api/org/springframework/data/redis/core/RedisKeyValueAdapter.EnableKeyspaceEvents.html[`EnableKeyspaceEvents`] for possible values.

The `RedisKeyExpiredEvent` holds a copy of the expired domain object as well as the key.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public class RedisKeyValueAdapter extends AbstractKeyValueAdapter
private final AtomicReference<KeyExpirationEventMessageListener> expirationListener = new AtomicReference<>(null);
private @Nullable ApplicationEventPublisher eventPublisher;

private EnableKeyspaceEvents enableKeyspaceEvents = EnableKeyspaceEvents.OFF;
private @Nullable String keyspaceNotificationsConfigParameter = null;
private RedisKeyspaceNotificationsConfig keyspaceNotificationsConfig = new RedisKeyspaceNotificationsConfig(
EnableKeyspaceEvents.OFF, null);

/**
* Creates new {@link RedisKeyValueAdapter} with default {@link RedisMappingContext} and default
Expand Down Expand Up @@ -208,8 +208,8 @@ public Object put(final Object id, Object item, String keyspace) {
converter.write(item, rdo);
}

if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_DEMAND, enableKeyspaceEvents)
&& this.expirationListener.get() == null) {
if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_DEMAND,
keyspaceNotificationsConfig.getEnableKeyspaceEvents()) && this.expirationListener.get() == null) {

if (rdo.getTimeToLive() != null && rdo.getTimeToLive() > 0) {
initKeyExpirationListener();
Expand Down Expand Up @@ -636,7 +636,7 @@ private <T> T readBackTimeToLiveIfSet(@Nullable byte[] key, @Nullable T target)
* @since 1.8
*/
public void setEnableKeyspaceEvents(EnableKeyspaceEvents enableKeyspaceEvents) {
this.enableKeyspaceEvents = enableKeyspaceEvents;
this.keyspaceNotificationsConfig.setEnableKeyspaceEvents(enableKeyspaceEvents);
}

/**
Expand All @@ -647,7 +647,17 @@ public void setEnableKeyspaceEvents(EnableKeyspaceEvents enableKeyspaceEvents) {
* @since 1.8
*/
public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {
this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;
this.keyspaceNotificationsConfig.setKeyspaceNotificationsConfigParameter(keyspaceNotificationsConfigParameter);
}

/**
* Configure the database from which to receive keyspace notifications. Use a negative value for all databases.
*
* @param database the database index to listen for keyspace notifications. Negative value for all.
* @since 2.3
*/
public void setKeyspaceNotificationsDatabase(int database) {
this.keyspaceNotificationsConfig.setDatabase(database);
}

/**
Expand All @@ -657,7 +667,8 @@ public void setKeyspaceNotificationsConfigParameter(String keyspaceNotifications
@Override
public void afterPropertiesSet() {

if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_STARTUP, this.enableKeyspaceEvents)) {
if (ObjectUtils.nullSafeEquals(EnableKeyspaceEvents.ON_STARTUP,
this.keyspaceNotificationsConfig.getEnableKeyspaceEvents())) {
initKeyExpirationListener();
}
}
Expand Down Expand Up @@ -707,10 +718,10 @@ private void initKeyExpirationListener() {

if (this.expirationListener.get() == null) {

MappingExpirationListener listener = new MappingExpirationListener(this.messageListenerContainer, this.redisOps,
this.converter);
listener.setKeyspaceNotificationsConfigParameter(keyspaceNotificationsConfigParameter);

MappingExpirationListener listener = new MappingExpirationListener(this.keyspaceNotificationsConfig.getDatabase(),
this.messageListenerContainer, this.redisOps, this.converter);
listener.setKeyspaceNotificationsConfigParameter(
keyspaceNotificationsConfig.getKeyspaceNotificationsConfigParameter());
if (this.eventPublisher != null) {
listener.setApplicationEventPublisher(this.eventPublisher);
}
Expand All @@ -737,14 +748,15 @@ static class MappingExpirationListener extends KeyExpirationEventMessageListener
/**
* Creates new {@link MappingExpirationListener}.
*
* @param database The database to listen to for expiration events. Use {@literal null} or a nevative value for all.
* @param listenerContainer
* @param ops
* @param converter
*/
MappingExpirationListener(RedisMessageListenerContainer listenerContainer, RedisOperations<?, ?> ops,
RedisConverter converter) {
MappingExpirationListener(Integer database, RedisMessageListenerContainer listenerContainer,
RedisOperations<?, ?> ops, RedisConverter converter) {

super(listenerContainer);
super(listenerContainer, database);
this.ops = ops;
this.converter = converter;
}
Expand All @@ -762,7 +774,8 @@ public void onMessage(Message message, @Nullable byte[] pattern) {

byte[] key = message.getBody();

byte[] phantomKey = ByteUtils.concat(key, converter.getConversionService().convert(KeyspaceIdentifier.PHANTOM_SUFFIX, byte[].class));
byte[] phantomKey = ByteUtils.concat(key,
converter.getConversionService().convert(KeyspaceIdentifier.PHANTOM_SUFFIX, byte[].class));

Map<byte[], byte[]> hash = ops.execute((RedisCallback<Map<byte[], byte[]>>) connection -> {

Expand All @@ -778,7 +791,8 @@ public void onMessage(Message message, @Nullable byte[] pattern) {
Object value = converter.read(Object.class, new RedisData(hash));

String channel = !ObjectUtils.isEmpty(message.getChannel())
? converter.getConversionService().convert(message.getChannel(), String.class) : null;
? converter.getConversionService().convert(message.getChannel(), String.class)
: null;

RedisKeyExpiredEvent event = new RedisKeyExpiredEvent(channel, key, value);

Expand Down Expand Up @@ -864,4 +878,54 @@ public Index(byte[] key, DataType type) {

}
}

/**
* @author Christoph Strobl
* @since 2.3
*/
private static class RedisKeyspaceNotificationsConfig {

@Nullable Integer database;
@Nullable String keyspaceNotificationsConfigParameter;
EnableKeyspaceEvents enableKeyspaceEvents;

RedisKeyspaceNotificationsConfig(EnableKeyspaceEvents enableKeyspaceEvents,
@Nullable String keyspaceNotificationsConfigParameter) {
this(enableKeyspaceEvents, keyspaceNotificationsConfigParameter, null);
}

RedisKeyspaceNotificationsConfig(EnableKeyspaceEvents enableKeyspaceEvents,
@Nullable String keyspaceNotificationsConfigParameter, @Nullable Integer database) {

this.database = database;
this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;
this.enableKeyspaceEvents = enableKeyspaceEvents;
}

@Nullable
Integer getDatabase() {
return database;
}

void setDatabase(@Nullable Integer database) {
this.database = database;
}

@Nullable
String getKeyspaceNotificationsConfigParameter() {
return keyspaceNotificationsConfigParameter;
}

void setKeyspaceNotificationsConfigParameter(@Nullable String keyspaceNotificationsConfigParameter) {
this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;
}

EnableKeyspaceEvents getEnableKeyspaceEvents() {
return enableKeyspaceEvents;
}

void setEnableKeyspaceEvents(EnableKeyspaceEvents enableKeyspaceEvents) {
this.enableKeyspaceEvents = enableKeyspaceEvents;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,36 @@
* @author Christoph Strobl
* @since 1.7
*/
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements
ApplicationEventPublisherAware {
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener
implements ApplicationEventPublisherAware {

private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
private static final String KEYEVENT_EXPIRED_TOPIC_PATTERN = "__keyevent@%s__:expired";

private @Nullable ApplicationEventPublisher publisher;
private @Nullable Integer database;

/**
* Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
*
* @param listenerContainer must not be {@literal null}.
*/
public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {
this(listenerContainer, null);
}

/**
* Creates new {@link MessageListener} for {@code __keyevent@database__:expired} messages.
*
* @param listenerContainer must not be {@literal null}.
* @param database the database index to listen to for keyspace notifications. Use {@literal null} or negative value
* for all.
* @since 2.3
*/
public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer,
@Nullable Integer database) {

super(listenerContainer);
this.database = database;
}

/*
Expand All @@ -51,7 +67,7 @@ public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerC
*/
@Override
protected void doRegister(RedisMessageListenerContainer listenerContainer) {
listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
listenerContainer.addMessageListener(this, computeTopic(database()));
}

/*
Expand All @@ -75,6 +91,17 @@ protected void publishEvent(RedisKeyExpiredEvent event) {
}
}

/**
* Get the database index to listen to.
*
* @return can be {@literal null}.
* @since 2.3
*/
@Nullable
public Integer database() {
return this.database;
}

/*
* (non-Javadoc)
* @see org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)
Expand All @@ -83,4 +110,16 @@ protected void publishEvent(RedisKeyExpiredEvent event) {
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}

/**
* Compute the {@link Topic} for the actual subscription.
*
* @param database can be {@literal null}.
* @return never {@literal null}.
*/
protected Topic computeTopic(@Nullable Integer database) {

return new PatternTopic(
String.format(KEYEVENT_EXPIRED_TOPIC_PATTERN, database != null && database >= 0 ? database : "*"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public abstract class KeyspaceEventMessageListener implements MessageListener, I

private final RedisMessageListenerContainer listenerContainer;

private String keyspaceNotificationsConfigParameter = "EA";
private @Nullable String keyspaceNotificationsConfigParameter = "EA";

/**
* Creates new {@link KeyspaceEventMessageListener}.
Expand Down Expand Up @@ -124,7 +124,7 @@ public void destroy() throws Exception {
* @param keyspaceNotificationsConfigParameter can be {@literal null}.
* @since 1.8
*/
public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {
public void setKeyspaceNotificationsConfigParameter(@Nullable String keyspaceNotificationsConfigParameter) {
this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.repository.configuration;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.data.redis.core.RedisKeyValueAdapter.EnableKeyspaceEvents;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;

/**
* Used to enhance configuration options for Redis <a href="https://redis.io/topics/notifications">Keyspace
* Notifications</a> of {@link EnableRedisRepositories RedisRepositories}.
*
* @author Christoph Strobl
* @since 2.3
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface EnableKeyspaceNotifications {

/**
* Configure usage of {@link KeyExpirationEventMessageListener}.
*
* @return {@link EnableKeyspaceEvents#ON_DEMAND} by default.
*/
EnableKeyspaceEvents enabled() default EnableKeyspaceEvents.ON_DEMAND;

/**
* Configure the database to receive keyspace notifications from. A negative value is used for {@literal all}
* databases.
*
* @return -1 (aka {@literal all databases}) by default.
*/
int database() default -1;

/**
* Configure the {@literal notify-keyspace-events} property if not already set. <br />
* Use an empty {@link String} to keep (<b>not</b> alter) existing server configuration.
*
* @return an {@literal empty String} by default.
*/
String notifyKeyspaceEvents() default "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@
Class<? extends KeyspaceConfiguration> keyspaceConfiguration() default KeyspaceConfiguration.class;

/**
* Configure usage of {@link KeyExpirationEventMessageListener}.
* Configure usage of {@link KeyExpirationEventMessageListener}. <br />
* <strong>NOTE</strong> For more configuration options please use {@link EnableKeyspaceNotifications}.
*
*
* @return
* @since 1.8
Expand All @@ -168,7 +170,8 @@

/**
* Configure the {@literal notify-keyspace-events} property if not already set. <br />
* Use an empty {@link String} to keep (<b>not</b> alter) existing server configuration.
* Use an empty {@link String} to keep (<b>not</b> alter) existing server configuration. <br />
* <strong>NOTE</strong> For more configuration options please use {@link EnableKeyspaceNotifications}.
*
* @return {@literal Ex} by default.
* @since 1.8
Expand Down
Loading