Skip to content

Commit

Permalink
feat(ice): Support set instance name.
Browse files Browse the repository at this point in the history
  • Loading branch information
yizzuide committed Apr 16, 2020
1 parent f81d2d1 commit c73d396
Show file tree
Hide file tree
Showing 14 changed files with 193 additions and 52 deletions.
4 changes: 2 additions & 2 deletions Milkomeda/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<properties>
<java.version>1.8</java.version>
<project.release.version>3.0.6-SNAPSHOT</project.release.version>
<project.release.version>3.0.7-SNAPSHOT</project.release.version>
<spring-boot.version>2.2.4</spring-boot.version>
<spring-cloud.version>Hoxton.RELEASE</spring-cloud.version>
<mybatis.starter.version>2.1.1</mybatis.starter.version>
Expand Down Expand Up @@ -65,7 +65,7 @@
<profile>
<id>sonatype-oss-release</id>
<properties>
<project.release.version>3.0.6</project.release.version>
<project.release.version>3.0.7</project.release.version>
</properties>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import com.github.yizzuide.milkomeda.universe.metadata.HandlerMetaData;
import com.github.yizzuide.milkomeda.universe.polyfill.RedisPolyfill;
import com.github.yizzuide.milkomeda.util.RedisUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.lang.reflect.Method;
Expand All @@ -22,8 +21,9 @@
*/
@Slf4j
@Data
@AllArgsConstructor
public class DelayJobHandler implements Runnable {
public class DelayJobHandler implements Runnable, ApplicationListener<IceInstanceChangeEvent> {

private IceProperties props;

private StringRedisTemplate redisTemplate;

Expand All @@ -48,18 +48,27 @@ public class DelayJobHandler implements Runnable {
private int index;

// 延迟桶分布式锁Key
private static final String KEY_IDEMPOTENT_LIMITER = "ice:execute_delay_bucket_lock_";

@Autowired
private IceProperties props;
private String lockKey;

public void fill(StringRedisTemplate redisTemplate, JobPool jobPool, DelayBucket delayBucket, ReadyQueue readyQueue, int i, IceProperties props) {
this.redisTemplate = redisTemplate;
this.jobPool = jobPool;
this.delayBucket = delayBucket;
this.readyQueue = readyQueue;
this.index = i;
this.props = props;
if (IceProperties.DEFAULT_INSTANCE_NAME.equals(props.getInstanceName())) {
this.lockKey = "ice:execute_delay_bucket_lock_" + i;
} else {
this.lockKey = "ice:execute_delay_bucket_lock_" + i + ":" + props.getInstanceName();
}
}

@Override
public void run() {
// 延迟桶处理锁住资源,防止同一桶索引分布式并发执行时出现相同记录问题
String indexLockKey = null;
if (props.isEnableJobTimerDistributed()) {
indexLockKey = indexLockKey();
boolean hasObtainLock = RedisUtil.setIfAbsent(indexLockKey, props.getJobTimerLockTimeoutSeconds().getSeconds(), redisTemplate);
boolean hasObtainLock = RedisUtil.setIfAbsent(this.lockKey, props.getJobTimerLockTimeoutSeconds().getSeconds(), redisTemplate);
if (!hasObtainLock) return;
}

Expand Down Expand Up @@ -99,7 +108,7 @@ public void run() {
} finally {
if (props.isEnableJobTimerDistributed()) {
// 删除Lock
RedisPolyfill.redisDelete(redisTemplate, indexLockKey);
RedisPolyfill.redisDelete(redisTemplate, this.lockKey);
}
}
}
Expand Down Expand Up @@ -164,14 +173,9 @@ private void processDelayJob(DelayJob delayJob, Job<?> job) {
}, redisTemplate);
}

/**
* 获取桶索引锁key
* @return key
*/
private String indexLockKey() {
String indexLockKey = KEY_IDEMPOTENT_LIMITER + index;
// 使用常量池里的引用
indexLockKey = indexLockKey.intern();
return indexLockKey;
@Override
public void onApplicationEvent(IceInstanceChangeEvent event) {
String instanceName = event.getSource().toString();
this.lockKey = "ice:execute_delay_bucket_lock_" + this.index + ":" + instanceName;
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package com.github.yizzuide.milkomeda.ice;

import com.github.yizzuide.milkomeda.universe.context.ApplicationContextHolder;
import com.github.yizzuide.milkomeda.universe.context.WebContext;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.util.ArrayList;
import java.util.List;

/**
* DelayTimer
*
* @author yizzuide
* @since 1.15.0
* @version 2.0.1
* @version 3.0.7
* Create at 2019/11/16 18:57
*/
public class DelayTimer implements ApplicationListener<ContextRefreshedEvent> {
public class DelayTimer implements InitializingBean, ApplicationListener<ContextRefreshedEvent> {

@Autowired
private JobPool jobPool;
Expand All @@ -37,14 +44,27 @@ public class DelayTimer implements ApplicationListener<ContextRefreshedEvent> {
// 启动标识
private boolean startup;

// 延迟桶处理器
private List<DelayJobHandler> delayJobHandlers;

@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
if (startup) return;
// 启动Timer
for (int i = 0; i < props.getDelayBucketCount(); i++) {
taskScheduler.scheduleWithFixedDelay(new DelayJobHandler(redisTemplate, jobPool, delayBucket, readyQueue, i, props),
props.getDelayBucketPollRate());
for (DelayJobHandler delayJobHandler : delayJobHandlers) {
taskScheduler.scheduleWithFixedDelay(delayJobHandler, props.getDelayBucketPollRate());
}
startup = true;
}

@Override
public void afterPropertiesSet() throws Exception {
delayJobHandlers = new ArrayList<>();
for (int i = 0; i < props.getDelayBucketCount(); i++) {
// 注册为bean,让其可以接收Spring事件
DelayJobHandler delayJobHandler = WebContext.registerBean((ConfigurableApplicationContext) ApplicationContextHolder.get(), "delayJobHandler" + i, DelayJobHandler.class);
delayJobHandler.fill(redisTemplate, jobPool, delayBucket, readyQueue, i, props);
delayJobHandlers.add(delayJobHandler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*
* @author yizzuide
* @since 1.15.2
* @since 3.0.0
* @since 3.0.7
* Create at 2019/11/21 11:16
*/
@Import(MilkomedaContextConfig.class)
Expand All @@ -32,28 +32,31 @@ public class IceBasicConfig {
@Autowired
private ApplicationContextHolder applicationContextHolder;

@Autowired
private IceProperties props;

@Bean
@ConditionalOnMissingBean(JobPool.class)
public JobPool jobPool() {
return new RedisJobPool();
return new RedisJobPool(props);
}

@Bean
@ConditionalOnMissingBean(DelayBucket.class)
public DelayBucket delayBucket() {
return new RedisDelayBucket();
return new RedisDelayBucket(props);
}

@Bean
@ConditionalOnMissingBean(ReadyQueue.class)
public ReadyQueue readyQueue() {
return new RedisReadyQueue();
return new RedisReadyQueue(props);
}

@Bean
@ConditionalOnMissingBean(Ice.class)
public Ice redisIce() {
RedisIce redisIce = new RedisIce();
RedisIce redisIce = new RedisIce(props);
IceHolder.setIce(redisIce);
return redisIce;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.github.yizzuide.milkomeda.ice;

import com.github.yizzuide.milkomeda.universe.context.ApplicationContextHolder;

/**
* IceHolder
* 一个管理Ice实例的类
*
* @author yizzuide
* @since 3.0.0
* @version 3.0.7
* Create at 2020/04/09 14:03
*/
public class IceHolder {
Expand All @@ -23,4 +26,14 @@ static void setIce(Ice ice) {
public static Ice getIce() {
return ice;
}

/**
* 修改实例名(用于多产品)
* @param instanceName 实例名
* @since 3.0.7
*/
public static void setInstanceName(String instanceName) {
IceInstanceChangeEvent event = new IceInstanceChangeEvent(instanceName);
ApplicationContextHolder.get().publishEvent(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.github.yizzuide.milkomeda.ice;

import org.springframework.context.ApplicationEvent;

/**
* IceInstanceChangeEvent
* 实例名更改事件
*
* @author yizzuide
* @since 3.0.7
* Create at 2020/04/16 16:00
*/
public class IceInstanceChangeEvent extends ApplicationEvent {
public IceInstanceChangeEvent(Object source) {
super(source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
@Data
@ConfigurationProperties("milkomeda.ice")
class IceProperties {

public static final String DEFAULT_INSTANCE_NAME = "default";

/**
* 实例名(用于多产品隔离,否则不要修改)
*/
private String instanceName = DEFAULT_INSTANCE_NAME;
/**
* 开启作业Timer(仅作为消费端使用时需要设置为false)<br>
* 注意:使用 {@link EnableIceServer} 时,设置为false无效
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.github.yizzuide.milkomeda.universe.context.ApplicationContextHolder;
import com.github.yizzuide.milkomeda.util.JSONUtil;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.data.redis.core.StringRedisTemplate;
Expand All @@ -22,11 +22,11 @@
*
* @author yizzuide
* @since 1.15.0
* @version 3.0.7
* Create at 2019/11/16 16:17
*/
public class RedisDelayBucket implements DelayBucket, InitializingBean {
public class RedisDelayBucket implements DelayBucket, InitializingBean, ApplicationListener<IceInstanceChangeEvent> {

@Autowired
private IceProperties props;

private StringRedisTemplate redisTemplate;
Expand All @@ -38,6 +38,17 @@ public class RedisDelayBucket implements DelayBucket, InitializingBean {
// 默认最大桶大小
public static final int DEFAULT_MAX_BUCKET_SIZE = 100;

public RedisDelayBucket(IceProperties props) {
this.props = props;
for (int i = 0; i < props.getDelayBucketCount(); i++) {
if (IceProperties.DEFAULT_INSTANCE_NAME.equals(props.getInstanceName())) {
bucketNames.add("ice:bucket" + i);
} else {
bucketNames.add("ice:bucket" + i + ":" + props.getInstanceName());
}
}
}

@Override
public void add(DelayJob delayJob) {
String bucketName = getCurrentBucketName();
Expand Down Expand Up @@ -99,8 +110,14 @@ private String getCurrentBucketName() {
@Override
public void afterPropertiesSet() {
redisTemplate = ApplicationContextHolder.get().getBean(StringRedisTemplate.class);
}

@Override
public void onApplicationEvent(IceInstanceChangeEvent event) {
String instanceName = event.getSource().toString();
bucketNames.clear();
for (int i = 0; i < props.getDelayBucketCount(); i++) {
bucketNames.add("ice:bucket" + i);
bucketNames.add("ice:bucket" + i + ":" + instanceName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.github.yizzuide.milkomeda.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.CollectionUtils;

Expand All @@ -23,7 +24,7 @@
* Create at 2019/11/16 15:20
*/
@Slf4j
public class RedisIce implements Ice {
public class RedisIce implements Ice, ApplicationListener<IceInstanceChangeEvent> {

@Autowired
private JobPool jobPool;
Expand All @@ -37,10 +38,16 @@ public class RedisIce implements Ice {
@Autowired
private StringRedisTemplate redisTemplate;

@Autowired
private IceProperties props;

private static final String KEY_IDEMPOTENT_LIMITER = "ice:range_pop_lock";
private String lockKey = "ice:range_pop_lock";

public RedisIce(IceProperties props) {
this.props = props;
if (!IceProperties.DEFAULT_INSTANCE_NAME.equals(props.getInstanceName())) {
this.lockKey = "ice:range_pop_lock:" + props.getInstanceName();
}
}

@SuppressWarnings("rawtypes")
@Override
Expand Down Expand Up @@ -100,7 +107,7 @@ public <T> List<Job<T>> pop(String topic, int count) {
if (count == 1) return Collections.singletonList(pop(topic));

// 使用SetNX锁住资源,防止多线程并发执行,造成重复消费问题
boolean hasObtainLock = RedisUtil.setIfAbsent(KEY_IDEMPOTENT_LIMITER, props.getTaskPopCountLockTimeoutSeconds().getSeconds(), redisTemplate);
boolean hasObtainLock = RedisUtil.setIfAbsent(this.lockKey, props.getTaskPopCountLockTimeoutSeconds().getSeconds(), redisTemplate);
if (!hasObtainLock) return null;

List<Job<T>> jobList;
Expand Down Expand Up @@ -131,7 +138,7 @@ public <T> List<Job<T>> pop(String topic, int count) {
}, redisTemplate);
} finally {
// 删除Lock
RedisPolyfill.redisDelete(redisTemplate, KEY_IDEMPOTENT_LIMITER);
RedisPolyfill.redisDelete(redisTemplate, this.lockKey);
}
return jobList;
}
Expand All @@ -156,4 +163,10 @@ public <T> void delete(List<Job<T>> jobs) {
public void delete(Object... jobIds) {
jobPool.remove(jobIds);
}

@Override
public void onApplicationEvent(IceInstanceChangeEvent event) {
String instanceName = event.getSource().toString();
this.lockKey = "ice:range_pop_lock:" + instanceName;
}
}
Loading

0 comments on commit c73d396

Please sign in to comment.