From b1bf732ef2acf01cda01bf6d8aad5e7cae446cdb Mon Sep 17 00:00:00 2001 From: dothetrick Date: Sat, 21 Nov 2020 20:09:30 +0800 Subject: [PATCH] init --- .gitignore | 37 ++++ README.md | 138 ++++++++++++++ binlogportal-spring-boot-starter-test/pom.xml | 72 ++++++++ ...ortalSpringBootStarterTestApplication.java | 13 ++ .../classes/HttpCallBackHandler.java | 14 ++ .../eventhandler/LogEventHandler.java | 15 ++ .../job/BinlogSync.java | 24 +++ .../src/main/resources/application.yml | 28 +++ ...SpringBootStarterTestApplicationTests.java | 13 ++ binlogportal-spring-boot-starter/pom.xml | 32 ++++ .../BinlogPortalAutoConfiguration.java | 90 ++++++++++ .../autoconfig/BinlogPortalBootConfig.java | 65 +++++++ .../binlogportal/autoconfig/DbConfig.java | 51 ++++++ .../autoconfig/HttpHandlerConfig.java | 24 +++ .../main/resources/META-INF/spring.factories | 1 + binlogportal/README.md | 16 ++ binlogportal/doc/task.md | 14 ++ binlogportal/pom.xml | 85 +++++++++ .../binlogportal/BinlogPortalException.java | 23 +++ .../binlogportal/BinlogPortalStarter.java | 97 ++++++++++ .../config/BinlogPortalConfig.java | 56 ++++++ .../binlogportal/config/RedisConfig.java | 57 ++++++ .../binlogportal/config/SyncConfig.java | 79 ++++++++ .../distributed/IDistributedHandler.java | 8 + .../distributed/RedisDistributedHandler.java | 92 ++++++++++ .../binlogportal/event/EventEntity.java | 130 ++++++++++++++ .../binlogportal/event/EventEntityType.java | 21 +++ .../binlogportal/event/IEventListener.java | 6 + .../event/MultiEventHandlerListener.java | 125 +++++++++++++ .../handler/HttpRequestEventHandler.java | 84 +++++++++ .../event/handler/IEventHandler.java | 8 + .../event/handler/IHttpCallback.java | 7 + .../lifecycle/BaseLifeCycleEventListener.java | 37 ++++ .../BaseLifeCycleListenerFactory.java | 11 ++ .../event/lifecycle/ILifeCycleFactory.java | 8 + .../parser/CommonEventParserDispatcher.java | 81 +++++++++ .../event/parser/DeleteEventParser.java | 53 ++++++ .../event/parser/EventParserFactory.java | 25 +++ .../event/parser/IEventParser.java | 14 ++ .../event/parser/IEventParserDispatcher.java | 14 ++ .../event/parser/InsertEventParser.java | 53 ++++++ .../event/parser/UpdateEventParser.java | 60 +++++++ .../converter/CommonConverterProcessor.java | 17 ++ .../event/parser/converter/IConverter.java | 10 ++ .../parser/converter/StringConverter.java | 42 +++++ .../factory/BinaryLogClientFactory.java | 121 +++++++++++++ .../position/BinlogPositionEntity.java | 40 +++++ .../position/IPositionHandler.java | 13 ++ .../position/RedisPositionHandler.java | 60 +++++++ .../tablemeta/TableMetaEntity.java | 73 ++++++++ .../tablemeta/TableMetaFactory.java | 68 +++++++ pom.xml | 168 ++++++++++++++++++ 52 files changed, 2493 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 binlogportal-spring-boot-starter-test/pom.xml create mode 100644 binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/BinlogportalSpringBootStarterTestApplication.java create mode 100644 binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/classes/HttpCallBackHandler.java create mode 100644 binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/eventhandler/LogEventHandler.java create mode 100644 binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/job/BinlogSync.java create mode 100644 binlogportal-spring-boot-starter-test/src/main/resources/application.yml create mode 100644 binlogportal-spring-boot-starter-test/src/test/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/BinlogportalSpringBootStarterTestApplicationTests.java create mode 100644 binlogportal-spring-boot-starter/pom.xml create mode 100644 binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/BinlogPortalAutoConfiguration.java create mode 100644 binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/BinlogPortalBootConfig.java create mode 100644 binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/DbConfig.java create mode 100644 binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/HttpHandlerConfig.java create mode 100644 binlogportal-spring-boot-starter/src/main/resources/META-INF/spring.factories create mode 100644 binlogportal/README.md create mode 100644 binlogportal/doc/task.md create mode 100644 binlogportal/pom.xml create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/BinlogPortalException.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/BinlogPortalStarter.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/config/BinlogPortalConfig.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/config/RedisConfig.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/config/SyncConfig.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/distributed/IDistributedHandler.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/distributed/RedisDistributedHandler.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/EventEntity.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/EventEntityType.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/IEventListener.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/MultiEventHandlerListener.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/HttpRequestEventHandler.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/IEventHandler.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/IHttpCallback.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/BaseLifeCycleEventListener.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/BaseLifeCycleListenerFactory.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/ILifeCycleFactory.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/CommonEventParserDispatcher.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/DeleteEventParser.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/EventParserFactory.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/IEventParser.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/IEventParserDispatcher.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/InsertEventParser.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/UpdateEventParser.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/CommonConverterProcessor.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/IConverter.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/StringConverter.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/factory/BinaryLogClientFactory.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/position/BinlogPositionEntity.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/position/IPositionHandler.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/position/RedisPositionHandler.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/tablemeta/TableMetaEntity.java create mode 100644 binlogportal/src/main/java/com/insistingon/binlogportal/tablemeta/TableMetaFactory.java create mode 100644 pom.xml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0b55e5c --- /dev/null +++ b/.gitignore @@ -0,0 +1,37 @@ +# Created by .ignore support plugin (hsz.mobi) +### Example user template template +### Example user template + +# IntelliJ project files +.idea +*.iml +out +gen +### Java template +# Compiled class file +*.class +target + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +.DS_Store + diff --git a/README.md b/README.md new file mode 100644 index 0000000..4579855 --- /dev/null +++ b/README.md @@ -0,0 +1,138 @@ +# binlogportal + +A simple mysql binlog sync tool + +It has following features: + +1.Store binlog postion use redis + +2.Support springboot starter + +3.Easy to use and deploy + +- - - +## 项目简介 + +一个轻量级的mysql binlog同步工具。可以单机部署,同时支持分布式高可用。 + +项目主要目标是提供可基于spring boot快速部署的同步工具,外部依赖只有redis。 + +当前版本特性: +- 提供了binlogportal-spring-boot-starter包,可使用spring boot快速部署 +- 使用redis保存binlog position信息,重启后可从上次position位置开始 +- 当前支持insert和update的结构化 +- 提供默认的http事件处理器。可通过实现`IEventHandler`接口,自定义事件处理器 +- 使用redis作为分布式协调器,可多机部署实现高可用 + +## Mysql配置 +- Mysql需要开启binlog并设置为row模式 +- 同步binlog使用的mysql账号,需要添加REPLICATION权限,示例如下: +```sql +CREATE USER binlogportal IDENTIFIED BY '123456'; +GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'binlogportal'@'%'; +GRANT ALL PRIVILEGES ON *.* TO 'binlogportal'@'%'; +FLUSH PRIVILEGES; +``` + +### 1.快速开始 + +#### 1.1 通过spring boot构建项目 +- 直接依赖binlogportal-spring-boot-starter +```xml + + com.insistingon.binlogportal + binlogportal-spring-boot-starter + 1.0.11 + +``` +- 通过spring boot的application.yml配置启动器 +```yaml +binlogportal: + enable: true # 是否启用autoconfig + distributed-enable: true # 是否启用分布式部署 + distributed-redis: # distributed-enable为true时,要提供一个redis作为分布式协调器 + host: 127.0.0.1 + port: 6379 + auth: + position-redis: # 保存binlog position的redis,必须配置 + host: 127.0.0.1 + port: 6379 + auth: + db-config: # 数据库配置,可以有多个,key自定义即可 + d1: + host: 0.0.0.0 + port: 3306 + user-name: binlogportal + password: 123456 + handler-list: [logEventHandler] # 该数据库使用的事件处理器,名称为spring的bean name + http-handler: # 启用自带的http事件处理器,可发送请求 + url-list: [http://127.0.0.1:8988/testit] # 要发送的url列表,http参数为统一的格式 + result-callback: httpCallBack # 配置自定义的结果处理器,需要实现IHttpCallback接口,值为bean name +``` +- Starter启动 + - spring boot autoconfig启动成功后,会把BinlogPortalStarter的实例注入到IOC中 + - 项目中通过注入的方式获取binlogPortalStarter使用 + - binlogPortalStarter.start()会为每个mysql库创建一个线程处理binlog + - 下面是使用CommandLineRunner启动starter的一个例子 +```java +@Slf4j +@Component +public class BinlogSync implements CommandLineRunner { + @Resource + BinlogPortalStarter binlogPortalStarter; + + public void run(String... args) throws Exception { + try { + binlogPortalStarter.start(); + } catch (BinlogPortalException e) { + log.error(e.getMessage(), e); + } + } +} +``` + +#### 1.2 非spring boot项目 +- 非spring boot项目,可以使用基础包 +```xml + + com.insistingon.binlogportal + binlogportal + 1.0.5 + +``` +- 依赖后实现配置类`BinlogPortalConfig`和`SyncConfig`,传入Starter中运行即可 +```java +public class TestClass{ + public static void main(String[] args) { + SyncConfig syncConfig = new SyncConfig(); + syncConfig.setHost("0.0.0.0"); + syncConfig.setPort(3306); + syncConfig.setUserName("binlogportal"); + syncConfig.setPassword("123456"); + + BinlogPortalConfig binlogPortalConfig = new BinlogPortalConfig(); + binlogPortalConfig.addSyncConfig(syncConfig); + + RedisConfig redisConfig = new RedisConfig("127.0.0.1", 6379); + RedisPositionHandler redisPositionHandler = new RedisPositionHandler(redisConfig); + binlogPortalConfig.setPositionHandler(redisPositionHandler); + + binlogPortalConfig.setDistributedHandler(new RedisDistributedHandler(redisConfig)); + + BinlogPortalStarter binlogPortalStarter = new BinlogPortalStarter(); + binlogPortalStarter.setBinlogPortalConfig(binlogPortalConfig); + try { + binlogPortalStarter.start(); + } catch (BinlogPortalException e) { + e.printStackTrace(); + } + } +} +``` + +### 2.分布式部署实现 +项目中高可用实现是基于redis的分布式锁。 + +每个实例都会加载全部数据库的配置,在创建binlog连接之前,先要获取redis锁,获取锁后会定时刷新锁的过期时间。所有实例会定时重新抢锁。 + +同一个mysql库的binlog文件和position会保存在redis里,如果一个实例宕机。新抢到锁的实例在初始化时,会使用上个实例已保存的binlog信息继续获取。 diff --git a/binlogportal-spring-boot-starter-test/pom.xml b/binlogportal-spring-boot-starter-test/pom.xml new file mode 100644 index 0000000..0eca6e9 --- /dev/null +++ b/binlogportal-spring-boot-starter-test/pom.xml @@ -0,0 +1,72 @@ + + + 4.0.0 + + + binlogportal-parent + com.insistingon.binlogportal + 1.0.11 + + + binlogportal-spring-boot-starter-test + + binlogportal-spring-boot-starter-test + binlogportal-spring-boot-starter-test + + + true + + + + + org.springframework.boot + spring-boot-starter + ${springboot.version} + + + + org.springframework.boot + spring-boot-starter-test + ${springboot.version} + test + + + org.junit.vintage + junit-vintage-engine + + + + + + org.springframework.boot + spring-boot-autoconfigure + ${springboot.version} + compile + + + + com.insistingon.binlogportal + binlogportal-spring-boot-starter + 1.0.11 + + + + org.projectlombok + lombok + 1.18.12 + provided + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 2.3.0.RELEASE + + + + + diff --git a/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/BinlogportalSpringBootStarterTestApplication.java b/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/BinlogportalSpringBootStarterTestApplication.java new file mode 100644 index 0000000..5fd5401 --- /dev/null +++ b/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/BinlogportalSpringBootStarterTestApplication.java @@ -0,0 +1,13 @@ +package com.insistingon.binlogportal.binlogportalspringbootstartertest; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class BinlogportalSpringBootStarterTestApplication { + + public static void main(String[] args) { + SpringApplication.run(BinlogportalSpringBootStarterTestApplication.class, args); + } + +} diff --git a/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/classes/HttpCallBackHandler.java b/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/classes/HttpCallBackHandler.java new file mode 100644 index 0000000..c1d146d --- /dev/null +++ b/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/classes/HttpCallBackHandler.java @@ -0,0 +1,14 @@ +package com.insistingon.binlogportal.binlogportalspringbootstartertest.classes; + +import com.insistingon.binlogportal.event.handler.IHttpCallback; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class HttpCallBackHandler implements IHttpCallback { + public void call(CloseableHttpResponse closeableHttpResponse) { + log.info(closeableHttpResponse.toString()); + } +} diff --git a/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/eventhandler/LogEventHandler.java b/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/eventhandler/LogEventHandler.java new file mode 100644 index 0000000..484d23a --- /dev/null +++ b/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/eventhandler/LogEventHandler.java @@ -0,0 +1,15 @@ +package com.insistingon.binlogportal.binlogportalspringbootstartertest.eventhandler; + +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.event.EventEntity; +import com.insistingon.binlogportal.event.handler.IEventHandler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class LogEventHandler implements IEventHandler { + public void process(EventEntity eventEntity) throws BinlogPortalException { + log.info(eventEntity.getJsonFormatData()); + } +} diff --git a/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/job/BinlogSync.java b/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/job/BinlogSync.java new file mode 100644 index 0000000..5e0bdfb --- /dev/null +++ b/binlogportal-spring-boot-starter-test/src/main/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/job/BinlogSync.java @@ -0,0 +1,24 @@ +package com.insistingon.binlogportal.binlogportalspringbootstartertest.job; + +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.BinlogPortalStarter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Slf4j +@Component +public class BinlogSync implements CommandLineRunner { + @Resource + BinlogPortalStarter binlogPortalStarter; + + public void run(String... args) throws Exception { + try { + binlogPortalStarter.start(); + } catch (BinlogPortalException e) { + log.error(e.getMessage(), e); + } + } +} diff --git a/binlogportal-spring-boot-starter-test/src/main/resources/application.yml b/binlogportal-spring-boot-starter-test/src/main/resources/application.yml new file mode 100644 index 0000000..fd27a13 --- /dev/null +++ b/binlogportal-spring-boot-starter-test/src/main/resources/application.yml @@ -0,0 +1,28 @@ +#显示debug日志信息 +debug: true + +binlogportal: + enable: true # 是否启用autoconfig + distributed-enable: true # 是否启用分布式部署 + distributed-redis: # distributed-enable为true时,要提供一个redis作为分布式协调器,最好开启持久化 + host: 127.0.0.1 + port: 6379 + auth: + position-redis: # 保存binlog position的redis,必须配置,最好开启持久化 + host: 127.0.0.1 + port: 6379 + auth: + db-config: # 数据库配置,可以有多个,key自定义即可 + d1: + host: 127.0.0.1 + port: 3306 + user-name: binlogportal + password: 123456 + handler-list: [logEventHandler] # 该数据库使用的事件处理器,名称为spring的bean name +# http-handler: # 启用自带的http时间处理器,可发送请求 +# url-list: [http://127.0.0.1:8988/testit] # 要发送的url列表,http参数为统一的格式 +# result-callback: httpCallBack # 配置自定义的结果处理器,需要实现IHttpCallback接口,值为bean name + +logging: + file: + path: /var/log/binlogportal diff --git a/binlogportal-spring-boot-starter-test/src/test/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/BinlogportalSpringBootStarterTestApplicationTests.java b/binlogportal-spring-boot-starter-test/src/test/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/BinlogportalSpringBootStarterTestApplicationTests.java new file mode 100644 index 0000000..3316637 --- /dev/null +++ b/binlogportal-spring-boot-starter-test/src/test/java/com/insistingon/binlogportal/binlogportalspringbootstartertest/BinlogportalSpringBootStarterTestApplicationTests.java @@ -0,0 +1,13 @@ +package com.insistingon.binlogportal.binlogportalspringbootstartertest; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class BinlogportalSpringBootStarterTestApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/binlogportal-spring-boot-starter/pom.xml b/binlogportal-spring-boot-starter/pom.xml new file mode 100644 index 0000000..4856e43 --- /dev/null +++ b/binlogportal-spring-boot-starter/pom.xml @@ -0,0 +1,32 @@ + + + + binlogportal-parent + com.insistingon.binlogportal + 1.0.11 + + 4.0.0 + + binlogportal-spring-boot-starter + + + 1.8 + true + + + + + com.insistingon.binlogportal + binlogportal + 1.0.11 + + + + org.springframework.boot + spring-boot-autoconfigure + ${springboot.version} + + + \ No newline at end of file diff --git a/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/BinlogPortalAutoConfiguration.java b/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/BinlogPortalAutoConfiguration.java new file mode 100644 index 0000000..3e840e1 --- /dev/null +++ b/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/BinlogPortalAutoConfiguration.java @@ -0,0 +1,90 @@ +package com.insistingon.binlogportal.autoconfig; + +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.BinlogPortalStarter; +import com.insistingon.binlogportal.config.BinlogPortalConfig; +import com.insistingon.binlogportal.config.RedisConfig; +import com.insistingon.binlogportal.config.SyncConfig; +import com.insistingon.binlogportal.distributed.RedisDistributedHandler; +import com.insistingon.binlogportal.event.handler.HttpRequestEventHandler; +import com.insistingon.binlogportal.event.handler.IEventHandler; +import com.insistingon.binlogportal.event.handler.IHttpCallback; +import com.insistingon.binlogportal.position.RedisPositionHandler; +import org.apache.commons.lang.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Map; + +@Configuration +@EnableConfigurationProperties(BinlogPortalBootConfig.class) +@ConditionalOnClass(BinlogPortalStarter.class) +@ConditionalOnProperty(prefix = "binlogportal", value = "enable", havingValue = "true") +public class BinlogPortalAutoConfiguration { + @Autowired + BinlogPortalBootConfig binlogPortalBootConfig; + + @Autowired + ApplicationContext applicationContext; + + @Bean + @ConditionalOnMissingBean(BinlogPortalStarter.class) + public BinlogPortalStarter binlogPortalStarter() throws BinlogPortalException { + BinlogPortalConfig binlogPortalConfig = new BinlogPortalConfig(); + Map eventHandlerList = applicationContext.getBeansOfType(IEventHandler.class); + + //common http event handler + HttpRequestEventHandler httpRequestEventHandler = null; + HttpHandlerConfig httpHandlerConfig = binlogPortalBootConfig.getHttpHandler(); + if (httpHandlerConfig != null && httpHandlerConfig.getUrlList() != null) { + httpRequestEventHandler = new HttpRequestEventHandler(); + httpRequestEventHandler.setUrlList(binlogPortalBootConfig.getHttpHandler().getUrlList()); + if (!StringUtils.isBlank(httpHandlerConfig.getResultCallback())) { + httpRequestEventHandler.setHttpCallback(applicationContext.getBeansOfType(IHttpCallback.class).get(httpHandlerConfig.getResultCallback())); + } + } + + //dbconfig list + for (Map.Entry entry : binlogPortalBootConfig.getDbConfig().entrySet()) { + DbConfig val = entry.getValue(); + SyncConfig syncConfig = new SyncConfig(); + syncConfig.setHost(val.getHost()); + syncConfig.setPort(val.getPort()); + syncConfig.setUserName(val.getUserName()); + syncConfig.setPassword(val.getPassword()); + if (val.getHandlerList() != null) { + val.getHandlerList().forEach(eventHandler -> { + syncConfig.addEventHandlerList(eventHandlerList.get(eventHandler)); + }); + } + if (httpRequestEventHandler != null) { + syncConfig.addEventHandlerList(httpRequestEventHandler); + } + binlogPortalConfig.addSyncConfig(syncConfig); + } + + //binlog position config + RedisConfig positionRedisConfig = binlogPortalBootConfig.getPositionRedis(); + if (positionRedisConfig != null) { + RedisPositionHandler redisPositionHandler = new RedisPositionHandler(positionRedisConfig); + binlogPortalConfig.setPositionHandler(redisPositionHandler); + } else { + throw new BinlogPortalException("binlog position redis should not be null"); + } + + //distributed config + if (binlogPortalBootConfig.getDistributedEnable()) { + binlogPortalConfig.setDistributedHandler(new RedisDistributedHandler(binlogPortalBootConfig.getDistributedRedis())); + } + + BinlogPortalStarter binlogPortalStarter = new BinlogPortalStarter(); + binlogPortalStarter.setBinlogPortalConfig(binlogPortalConfig); + return binlogPortalStarter; + } +} diff --git a/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/BinlogPortalBootConfig.java b/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/BinlogPortalBootConfig.java new file mode 100644 index 0000000..573e218 --- /dev/null +++ b/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/BinlogPortalBootConfig.java @@ -0,0 +1,65 @@ +package com.insistingon.binlogportal.autoconfig; + + +import com.insistingon.binlogportal.config.RedisConfig; +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.util.Map; + +@ConfigurationProperties(prefix = "binlogportal") +public class BinlogPortalBootConfig { + private Map dbConfig; + private Boolean enable; + private Boolean distributedEnable; + private RedisConfig distributedRedis; + private RedisConfig positionRedis; + private HttpHandlerConfig httpHandler; + + public Map getDbConfig() { + return dbConfig; + } + + public void setDbConfig(Map dbConfig) { + this.dbConfig = dbConfig; + } + + public Boolean getEnable() { + return enable; + } + + public void setEnable(Boolean enable) { + this.enable = enable; + } + + public Boolean getDistributedEnable() { + return distributedEnable; + } + + public void setDistributedEnable(Boolean distributedEnable) { + this.distributedEnable = distributedEnable; + } + + public RedisConfig getDistributedRedis() { + return distributedRedis; + } + + public void setDistributedRedis(RedisConfig distributedRedis) { + this.distributedRedis = distributedRedis; + } + + public RedisConfig getPositionRedis() { + return positionRedis; + } + + public void setPositionRedis(RedisConfig positionRedis) { + this.positionRedis = positionRedis; + } + + public HttpHandlerConfig getHttpHandler() { + return httpHandler; + } + + public void setHttpHandler(HttpHandlerConfig httpHandler) { + this.httpHandler = httpHandler; + } +} diff --git a/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/DbConfig.java b/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/DbConfig.java new file mode 100644 index 0000000..8a3cffd --- /dev/null +++ b/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/DbConfig.java @@ -0,0 +1,51 @@ +package com.insistingon.binlogportal.autoconfig; + +import java.util.List; + +public class DbConfig { + String host; + Integer port; + String userName; + String password; + List handlerList; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public List getHandlerList() { + return handlerList; + } + + public void setHandlerList(List handlerList) { + this.handlerList = handlerList; + } +} diff --git a/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/HttpHandlerConfig.java b/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/HttpHandlerConfig.java new file mode 100644 index 0000000..2018e65 --- /dev/null +++ b/binlogportal-spring-boot-starter/src/main/java/com/insistingon/binlogportal/autoconfig/HttpHandlerConfig.java @@ -0,0 +1,24 @@ +package com.insistingon.binlogportal.autoconfig; + +import java.util.List; + +public class HttpHandlerConfig { + private List urlList; + private String resultCallback; + + public List getUrlList() { + return urlList; + } + + public void setUrlList(List urlList) { + this.urlList = urlList; + } + + public String getResultCallback() { + return resultCallback; + } + + public void setResultCallback(String resultCallback) { + this.resultCallback = resultCallback; + } +} diff --git a/binlogportal-spring-boot-starter/src/main/resources/META-INF/spring.factories b/binlogportal-spring-boot-starter/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..483bdf9 --- /dev/null +++ b/binlogportal-spring-boot-starter/src/main/resources/META-INF/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.insistingon.binlogportal.autoconfig.BinlogPortalAutoConfiguration diff --git a/binlogportal/README.md b/binlogportal/README.md new file mode 100644 index 0000000..7ee8399 --- /dev/null +++ b/binlogportal/README.md @@ -0,0 +1,16 @@ +# binlogportal + +A simple mysql binlog sync tool + +It has following features: + +1.Store binlog postion use redis + +2.Support springboot starter + +3.Easy to use and deploy + +- - - +项目简介 + +一个轻量级的 diff --git a/binlogportal/doc/task.md b/binlogportal/doc/task.md new file mode 100644 index 0000000..6627a51 --- /dev/null +++ b/binlogportal/doc/task.md @@ -0,0 +1,14 @@ +## 开发进度 + +- ~~整体结构规划~~ +- ~~binlogclient工厂类实现~~ +- ~~获取表元数据,缓存~~ +- ~~mysql原始时间类型映射处理~~ + - ~~统一一个类表示全部类型,方便处理~~ +- ~~统一事件处理器逻辑~~ +- ~~自定义事件处理器注册~~ +- ~~基于redis的分布式实现~~ +- ~~springboot获取配置信息~~ +- ~~mysql binlog文件有效性检测~~ +- ~~delete事件处理~~ +- mysql binlog position有效性检测 \ No newline at end of file diff --git a/binlogportal/pom.xml b/binlogportal/pom.xml new file mode 100644 index 0000000..8ea3798 --- /dev/null +++ b/binlogportal/pom.xml @@ -0,0 +1,85 @@ + + + + 4.0.0 + + + com.insistingon.binlogportal + binlogportal-parent + 1.0.11 + + + binlogportal + jar + + binlogportal + A simple mysql binlog sync tool + https://github.com/dothetrick/binlogportal + + + UTF-8 + 1.8 + 1.8 + + + + + junit + junit + 4.13.1 + test + + + + com.github.shyiko + mysql-binlog-connector-java + 0.21.0 + + + + mysql + mysql-connector-java + 8.0.22 + + + + org.slf4j + slf4j-api + 1.8.0-alpha1 + + + + org.apache.httpcomponents + httpclient + 4.5.11 + + + + + com.fasterxml.jackson.core + jackson-core + 2.11.3 + + + + redis.clients + jedis + 3.3.0 + + + + commons-lang + commons-lang + 2.6 + + + + org.redisson + redisson + 3.13.5 + + + + + diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/BinlogPortalException.java b/binlogportal/src/main/java/com/insistingon/binlogportal/BinlogPortalException.java new file mode 100644 index 0000000..b3a0fae --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/BinlogPortalException.java @@ -0,0 +1,23 @@ +package com.insistingon.binlogportal; + +public class BinlogPortalException extends Exception { + public BinlogPortalException() { + super(); + } + + public BinlogPortalException(String message) { + super(message); + } + + public BinlogPortalException(String message, Throwable cause) { + super(message, cause); + } + + public BinlogPortalException(Throwable cause) { + super(cause); + } + + protected BinlogPortalException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/BinlogPortalStarter.java b/binlogportal/src/main/java/com/insistingon/binlogportal/BinlogPortalStarter.java new file mode 100644 index 0000000..35a22bb --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/BinlogPortalStarter.java @@ -0,0 +1,97 @@ +package com.insistingon.binlogportal; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.insistingon.binlogportal.config.BinlogPortalConfig; +import com.insistingon.binlogportal.config.RedisConfig; +import com.insistingon.binlogportal.config.SyncConfig; +import com.insistingon.binlogportal.distributed.RedisDistributedHandler; +import com.insistingon.binlogportal.factory.BinaryLogClientFactory; +import com.insistingon.binlogportal.position.RedisPositionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Starter + */ +public class BinlogPortalStarter { + private final Logger log = LoggerFactory.getLogger(BinlogPortalStarter.class); + + private BinlogPortalConfig binlogPortalConfig; + + public BinlogPortalConfig getBinlogPortalConfig() { + return binlogPortalConfig; + } + + public void setBinlogPortalConfig(BinlogPortalConfig binlogPortalConfig) { + this.binlogPortalConfig = binlogPortalConfig; + } + + /** + * main start method + */ + public void start() throws BinlogPortalException { + if (binlogPortalConfig.getDistributedHandler() != null) { + binlogPortalConfig.getDistributedHandler().start(binlogPortalConfig); + } else { + SingleStart(); + } + } + + private void SingleStart() { + //新建工厂对象 + BinaryLogClientFactory binaryLogClientFactory = new BinaryLogClientFactory(); + binaryLogClientFactory.setPositionHandler(binlogPortalConfig.getPositionHandler()); + binaryLogClientFactory.setLifeCycleFactory(binlogPortalConfig.getLifeCycleFactory()); + + //生成全部client + List binaryLogClientList = new ArrayList<>(); + binlogPortalConfig.getSyncConfigList().forEach(syncConfig -> { + try { + binaryLogClientList.add(binaryLogClientFactory.getClient(syncConfig)); + } catch (BinlogPortalException e) { + log.error(e.getMessage(), e); + } + }); + + //执行 + binaryLogClientList.forEach(binaryLogClient -> { + new Thread(() -> { + try { + binaryLogClient.setHeartbeatInterval(10 * 1000L); + binaryLogClient.connect(); + } catch (IOException e) { + log.error("binaryLogClient connect error!" + binaryLogClient.toString()); + } + }).start(); + }); + } + + public static void main(String[] args) { + SyncConfig syncConfig = new SyncConfig(); + syncConfig.setHost("0.0.0.0"); + syncConfig.setPort(3306); + syncConfig.setUserName("binlogportal"); + syncConfig.setPassword("123456"); + + BinlogPortalConfig binlogPortalConfig = new BinlogPortalConfig(); + binlogPortalConfig.addSyncConfig(syncConfig); + + RedisConfig redisConfig = new RedisConfig("127.0.0.1", 6379); + RedisPositionHandler redisPositionHandler = new RedisPositionHandler(redisConfig); + binlogPortalConfig.setPositionHandler(redisPositionHandler); + + binlogPortalConfig.setDistributedHandler(new RedisDistributedHandler(redisConfig)); + + BinlogPortalStarter binlogPortalStarter = new BinlogPortalStarter(); + binlogPortalStarter.setBinlogPortalConfig(binlogPortalConfig); + try { + binlogPortalStarter.start(); + } catch (BinlogPortalException e) { + e.printStackTrace(); + } + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/config/BinlogPortalConfig.java b/binlogportal/src/main/java/com/insistingon/binlogportal/config/BinlogPortalConfig.java new file mode 100644 index 0000000..ac16ab4 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/config/BinlogPortalConfig.java @@ -0,0 +1,56 @@ +package com.insistingon.binlogportal.config; + +import com.insistingon.binlogportal.distributed.IDistributedHandler; +import com.insistingon.binlogportal.event.lifecycle.BaseLifeCycleListenerFactory; +import com.insistingon.binlogportal.event.lifecycle.ILifeCycleFactory; +import com.insistingon.binlogportal.position.IPositionHandler; + +import java.util.ArrayList; +import java.util.List; + +public class BinlogPortalConfig { + //配置列表 + List syncConfigList = new ArrayList<>(); + + //binlog位点处理器 + IPositionHandler positionHandler; + + //分布式处理器 + IDistributedHandler distributedHandler; + + //LifeCycleEvent监听器 + ILifeCycleFactory lifeCycleFactory = new BaseLifeCycleListenerFactory(); + + //增加配置项 + public void addSyncConfig(SyncConfig syncConfig) { + syncConfigList.add(syncConfig); + } + + public List getSyncConfigList() { + return syncConfigList; + } + + public IPositionHandler getPositionHandler() { + return positionHandler; + } + + public void setPositionHandler(IPositionHandler positionHandler) { + this.positionHandler = positionHandler; + } + + public IDistributedHandler getDistributedHandler() { + return distributedHandler; + } + + public void setDistributedHandler(IDistributedHandler distributedHandler) { + this.distributedHandler = distributedHandler; + } + + public ILifeCycleFactory getLifeCycleFactory() { + return lifeCycleFactory; + } + + public void setLifeCycleFactory(ILifeCycleFactory lifeCycleFactory) { + this.lifeCycleFactory = lifeCycleFactory; + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/config/RedisConfig.java b/binlogportal/src/main/java/com/insistingon/binlogportal/config/RedisConfig.java new file mode 100644 index 0000000..5dbd98e --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/config/RedisConfig.java @@ -0,0 +1,57 @@ +package com.insistingon.binlogportal.config; + +/** + * 内部抽象redis配置 + */ +public class RedisConfig { + String host; + Integer port; + String auth; + + public RedisConfig() { + } + + public RedisConfig(String host, Integer port) { + this.host = host; + this.port = port; + } + + public RedisConfig(String host, Integer port, String auth) { + this.host = host; + this.port = port; + this.auth = auth; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getAuth() { + return auth; + } + + public void setAuth(String auth) { + this.auth = auth; + } + + @Override + public String toString() { + return "RedisConfig{" + + "host='" + host + '\'' + + ", port=" + port + + ", auth='" + auth + '\'' + + '}'; + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/config/SyncConfig.java b/binlogportal/src/main/java/com/insistingon/binlogportal/config/SyncConfig.java new file mode 100644 index 0000000..252e801 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/config/SyncConfig.java @@ -0,0 +1,79 @@ +package com.insistingon.binlogportal.config; + + +import com.insistingon.binlogportal.event.handler.IEventHandler; + +import java.util.ArrayList; +import java.util.List; + +public class SyncConfig { + String host; + Integer port; + String userName; + String password; + List eventHandlerList = new ArrayList<>(); + + public SyncConfig() { + } + + public SyncConfig(String host, Integer port, String userName, String password) { + this.host = host; + this.port = port; + this.userName = userName; + this.password = password; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public List getEventHandlerList() { + return eventHandlerList; + } + + public void setEventHandlerList(List eventHandlerList) { + this.eventHandlerList = eventHandlerList; + } + + public void addEventHandlerList(IEventHandler eventHandler) { + eventHandlerList.add(eventHandler); + } + + @Override + public String toString() { + return "SyncConfig{" + + "host='" + host + '\'' + + ", port=" + port + + ", userName='" + userName + '\'' + + ", password='" + password + '\'' + + '}'; + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/distributed/IDistributedHandler.java b/binlogportal/src/main/java/com/insistingon/binlogportal/distributed/IDistributedHandler.java new file mode 100644 index 0000000..387ed9a --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/distributed/IDistributedHandler.java @@ -0,0 +1,8 @@ +package com.insistingon.binlogportal.distributed; + +import com.insistingon.binlogportal.config.BinlogPortalConfig; +import com.insistingon.binlogportal.BinlogPortalException; + +public interface IDistributedHandler { + void start(BinlogPortalConfig binlogPortalConfig) throws BinlogPortalException; +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/distributed/RedisDistributedHandler.java b/binlogportal/src/main/java/com/insistingon/binlogportal/distributed/RedisDistributedHandler.java new file mode 100644 index 0000000..4a168a0 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/distributed/RedisDistributedHandler.java @@ -0,0 +1,92 @@ +package com.insistingon.binlogportal.distributed; + +import com.insistingon.binlogportal.config.BinlogPortalConfig; +import com.insistingon.binlogportal.config.RedisConfig; +import com.insistingon.binlogportal.config.SyncConfig; +import com.insistingon.binlogportal.factory.BinaryLogClientFactory; +import com.insistingon.binlogportal.BinlogPortalException; +import org.apache.commons.codec.digest.Md5Crypt; +import org.apache.commons.lang.StringUtils; +import org.redisson.Redisson; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; +import org.redisson.config.SingleServerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Timer; +import java.util.TimerTask; + +public class RedisDistributedHandler implements IDistributedHandler { + + private final Logger log = LoggerFactory.getLogger(RedisDistributedHandler.class); + + //redis配置,支持集群模式 + RedisConfig redisConfig; + + public RedisDistributedHandler(RedisConfig redisConfig) { + this.redisConfig = redisConfig; + } + + @Override + public void start(BinlogPortalConfig binlogPortalConfig) throws BinlogPortalException { + if (redisConfig == null) { + throw new BinlogPortalException("redis config can not be null"); + } + Config config = new Config(); + SingleServerConfig singleServerConfig = config.useSingleServer(); + singleServerConfig.setAddress("redis://" + redisConfig.getHost() + ":" + redisConfig.getPort()); + if (!StringUtils.isBlank(redisConfig.getAuth())) { + singleServerConfig.setPassword(redisConfig.getAuth()); + } + config.setLockWatchdogTimeout(10000L); + RedissonClient redisson = Redisson.create(config); + + //新建工厂对象 + BinaryLogClientFactory binaryLogClientFactory = new BinaryLogClientFactory(); + binaryLogClientFactory.setPositionHandler(binlogPortalConfig.getPositionHandler()); + binaryLogClientFactory.setLifeCycleFactory(binlogPortalConfig.getLifeCycleFactory()); + + //定时创建客户端,抢到锁的就创建 + new Timer().schedule(new TimerTask() { + @Override + public void run() { + binlogPortalConfig.getSyncConfigList().forEach(syncConfig -> { + String lockStr = Md5Crypt.md5Crypt(syncConfig.toString().getBytes(), null, ""); + RLock lock = redisson.getLock(lockStr); + try { + if (lock.tryLock()) { + binaryLogClientFactory.getClient(syncConfig).connect(); + } + } catch (BinlogPortalException | IOException e) { + log.error(e.getMessage(), e); + } finally { + lock.unlock(); + } + }); + } + }, 0, 1000); + } + + public Boolean canBuild(SyncConfig syncConfig) { + Config config = new Config(); + config.useSingleServer().setAddress("redis://127.0.0.1:6379"); + config.setLockWatchdogTimeout(10000L); + RedissonClient redisson = Redisson.create(config); + RLock lock = redisson.getLock("myLock"); + lock.lock(); + + return null; + } + + public static void main(String[] args) { + Config config = new Config(); + config.useSingleServer().setAddress("redis://127.0.0.1:6379"); + config.setLockWatchdogTimeout(10000L); + RedissonClient redisson = Redisson.create(config); + RLock lock = redisson.getLock("myLock"); + lock.lock(); + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/EventEntity.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/EventEntity.java new file mode 100644 index 0000000..1d77a2d --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/EventEntity.java @@ -0,0 +1,130 @@ +package com.insistingon.binlogportal.event; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.shyiko.mysql.binlog.event.Event; +import com.insistingon.binlogportal.tablemeta.TableMetaEntity; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * 事件实体,简化binlog事件,方便处理 + */ +public class EventEntity { + //保存原始binlog事件信息 + Event event; + + EventEntityType eventEntityType; + + String databaseName; + + String tableName; + + List columns; + + List changeBefore; + + List changeAfter; + + public Event getEvent() { + return event; + } + + public void setEvent(Event event) { + this.event = event; + } + + public EventEntityType getEventEntityType() { + return eventEntityType; + } + + public void setEventEntityType(EventEntityType eventEntityType) { + this.eventEntityType = eventEntityType; + } + + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public List getColumns() { + return columns; + } + + public void setColumns(List columns) { + this.columns = columns; + } + + public List getChangeBefore() { + return changeBefore; + } + + public void setChangeBefore(List changeBefore) { + this.changeBefore = changeBefore; + } + + public List getChangeAfter() { + return changeAfter; + } + + public void setChangeAfter(List changeAfter) { + this.changeAfter = changeAfter; + } + + @Override + public String toString() { + return "EventEntity{" + + "event=" + event + + ", eventEntityType=" + eventEntityType + + ", databaseName='" + databaseName + '\'' + + ", tableName='" + tableName + '\'' + + ", columns=" + columns + + ", changeBefore=" + changeBefore + + ", changeAfter=" + changeAfter + + '}'; + } + + public String getJsonFormatData() { + Map params = new HashMap<>(); + params.put("change_type", this.getEventEntityType().getDesc()); + Map data = new HashMap<>(); + for (int i = 0; i < this.getColumns().size(); i++) { + String before = ""; + if (this.getChangeBefore() != null) { + before = this.getChangeBefore().get(i) != null ? this.getChangeBefore().get(i).toString() : ""; + } + String after = ""; + if (this.getChangeAfter() != null) { + after = this.getChangeAfter().get(i) != null ? this.getChangeAfter().get(i).toString() : ""; + } + String[] subData = new String[]{ + before, + after, + Objects.equals(before, after) ? "0" : "1" + }; + data.put(this.getColumns().get(i).getName(), subData); + } + params.put("change_data", data); + params.put("table_name", this.getTableName()); + ObjectMapper objectMapper = new ObjectMapper(); + try { + return objectMapper.writeValueAsString(params); + } catch (JsonProcessingException e) { + return null; + } + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/EventEntityType.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/EventEntityType.java new file mode 100644 index 0000000..efa42b0 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/EventEntityType.java @@ -0,0 +1,21 @@ +package com.insistingon.binlogportal.event; + +public enum EventEntityType { + UPDATE("update"), + INSERT("insert"), + DELETE("delete"); + + String desc; + + public String getDesc() { + return desc; + } + + public void setDesc(String desc) { + this.desc = desc; + } + + EventEntityType(String desc) { + this.desc = desc; + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/IEventListener.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/IEventListener.java new file mode 100644 index 0000000..8865d73 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/IEventListener.java @@ -0,0 +1,6 @@ +package com.insistingon.binlogportal.event; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; + +public interface IEventListener extends BinaryLogClient.EventListener { +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/MultiEventHandlerListener.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/MultiEventHandlerListener.java new file mode 100644 index 0000000..497c7e1 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/MultiEventHandlerListener.java @@ -0,0 +1,125 @@ +package com.insistingon.binlogportal.event; + +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventHeaderV4; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.RotateEventData; +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.config.SyncConfig; +import com.insistingon.binlogportal.event.handler.IEventHandler; +import com.insistingon.binlogportal.event.parser.IEventParserDispatcher; +import com.insistingon.binlogportal.position.BinlogPositionEntity; +import com.insistingon.binlogportal.position.IPositionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * 主要事件统一处理器 + */ +public class MultiEventHandlerListener implements IEventListener { + + private final Logger log = LoggerFactory.getLogger(MultiEventHandlerListener.class); + + //配置信息 + SyncConfig syncConfig; + + //处理器列表 + List eventHandlerList = new ArrayList<>(); + + //事件解析调度器 + IEventParserDispatcher eventParserDispatcher; + + //binlog位点处理器 + IPositionHandler positionHandler; + + public void registerEventHandler(IEventHandler eventHandler) { + eventHandlerList.add(eventHandler); + } + + public SyncConfig getSyncConfig() { + return syncConfig; + } + + public void setSyncConfig(SyncConfig syncConfig) { + this.syncConfig = syncConfig; + } + + public List getEventHandlerList() { + return eventHandlerList; + } + + public void setEventHandlerList(List eventHandlerList) { + this.eventHandlerList = eventHandlerList; + } + + public IEventParserDispatcher getEventParserDispatcher() { + return eventParserDispatcher; + } + + public void setEventParserDispatcher(IEventParserDispatcher eventParserDispatcher) { + this.eventParserDispatcher = eventParserDispatcher; + } + + public IPositionHandler getPositionHandler() { + return positionHandler; + } + + public void setPositionHandler(IPositionHandler positionHandler) { + this.positionHandler = positionHandler; + } + + @Override + public void onEvent(Event event) { + try { + /* + * 不计入position更新的事件类型 + * FORMAT_DESCRIPTION类型为binlog起始时间 + * HEARTBEAT为心跳检测事件,不会写入master的binlog,记录该事件的position会导致重启时报错 + */ + List excludePositionEventType = new ArrayList<>(); + excludePositionEventType.add(EventType.FORMAT_DESCRIPTION); + excludePositionEventType.add(EventType.HEARTBEAT); + //获取当前事件的类型 + EventType eventType = event.getHeader().getEventType(); + if (!excludePositionEventType.contains(eventType)) { + BinlogPositionEntity binlogPositionEntity = new BinlogPositionEntity(); + //处理rotate事件,这里会替换调binlog fileName + if (event.getHeader().getEventType().equals(EventType.ROTATE)) { + RotateEventData rotateEventData = (RotateEventData) event.getData(); + binlogPositionEntity.setBinlogName(rotateEventData.getBinlogFilename()); + binlogPositionEntity.setPosition(rotateEventData.getBinlogPosition()); + binlogPositionEntity.setServerId(event.getHeader().getServerId()); + } else { //统一处理事件对应的binlog position + binlogPositionEntity = positionHandler.getPosition(syncConfig); + EventHeaderV4 eventHeaderV4 = (EventHeaderV4) event.getHeader(); + binlogPositionEntity.setPosition(eventHeaderV4.getPosition()); + binlogPositionEntity.setServerId(event.getHeader().getServerId()); + } + if (positionHandler != null) { + log.debug(event.toString() + "---" + binlogPositionEntity.toString()); + positionHandler.savePosition(syncConfig, binlogPositionEntity); + } + } + //解析事件为统一实体 + List eventEntityList = eventParserDispatcher.parse(event); + if (eventEntityList != null) { + //循环调用处理器 + eventEntityList.forEach(eventEntity -> { + eventHandlerList.forEach(eventHandler -> { + try { + eventHandler.process(eventEntity); + } catch (BinlogPortalException e) { + log.error(eventHandler.toString() + " process error:" + e.getMessage(), e); + } + }); + }); + } + + } catch (BinlogPortalException e) { + log.error(e.getMessage(), e); + } + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/HttpRequestEventHandler.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/HttpRequestEventHandler.java new file mode 100644 index 0000000..59a9d89 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/HttpRequestEventHandler.java @@ -0,0 +1,84 @@ +package com.insistingon.binlogportal.event.handler; + +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.event.EventEntity; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.config.SocketConfig; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; + +/** + * 默认的http请求类,使用固定的参数格式,发送post请求 + */ +public class HttpRequestEventHandler implements IEventHandler { + private final static Logger log = LoggerFactory.getLogger(HttpRequestEventHandler.class); + + CloseableHttpClient httpClient = HttpClients.custom().setDefaultSocketConfig(SocketConfig.custom().setSoTimeout(1000).build()) + .setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(1000).setConnectionRequestTimeout(1000).build()).build(); + + List urlList = new ArrayList<>(); + + IHttpCallback httpCallback; + + public List getUrlList() { + return urlList; + } + + public void setUrlList(List urlList) { + this.urlList = urlList; + } + + public void registerUrl(String url) { + urlList.add(url); + } + + public IHttpCallback getHttpCallback() { + return httpCallback; + } + + public void setHttpCallback(IHttpCallback httpCallback) { + this.httpCallback = httpCallback; + } + + @Override + public void process(EventEntity eventEntity) throws BinlogPortalException { + post(eventEntity.getJsonFormatData()); + } + + private void post(String param) throws BinlogPortalException { + for (String url : urlList) { + try { + HttpPost httpPost = new HttpPost(url); + StringEntity requestEntity = new StringEntity(param, "UTF-8"); + requestEntity.setContentEncoding("UTF-8"); + httpPost.setHeader("Content-type", "application/json"); + httpPost.setEntity(requestEntity); + CloseableHttpResponse response = httpClient.execute(httpPost); + //设置了回调实现,调用回调处理返回结果 + if (httpCallback != null) { + httpCallback.call(response); + } else { + String res = EntityUtils.toString(response.getEntity()); + if (response.getStatusLine().getStatusCode() != 200) { + log.error("http request failed.url:{},params:{},code:{}", url, param, response.getStatusLine().getStatusCode()); + throw new BinlogPortalException("http request failed.code:" + response.getStatusLine().getStatusCode()); + } + log.info("http request success.url:{},params:{},res:{}", url, param, res); + } + //response要关闭,不然httpClient无法复用,实际上是断开长连接 + response.close(); + } catch (IOException e) { + throw new BinlogPortalException(e); + } + } + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/IEventHandler.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/IEventHandler.java new file mode 100644 index 0000000..748e7f2 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/IEventHandler.java @@ -0,0 +1,8 @@ +package com.insistingon.binlogportal.event.handler; + +import com.insistingon.binlogportal.event.EventEntity; +import com.insistingon.binlogportal.BinlogPortalException; + +public interface IEventHandler { + public void process(EventEntity eventEntity) throws BinlogPortalException; +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/IHttpCallback.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/IHttpCallback.java new file mode 100644 index 0000000..f3ab8ae --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/handler/IHttpCallback.java @@ -0,0 +1,7 @@ +package com.insistingon.binlogportal.event.handler; + +import org.apache.http.client.methods.CloseableHttpResponse; + +public interface IHttpCallback { + void call(CloseableHttpResponse closeableHttpResponse); +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/BaseLifeCycleEventListener.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/BaseLifeCycleEventListener.java new file mode 100644 index 0000000..da70b64 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/BaseLifeCycleEventListener.java @@ -0,0 +1,37 @@ +package com.insistingon.binlogportal.event.lifecycle; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.insistingon.binlogportal.config.SyncConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseLifeCycleEventListener implements BinaryLogClient.LifecycleListener { + + private final Logger log = LoggerFactory.getLogger(BaseLifeCycleEventListener.class); + + SyncConfig syncConfig; + + public BaseLifeCycleEventListener(SyncConfig syncConfig) { + this.syncConfig = syncConfig; + } + + @Override + public void onConnect(BinaryLogClient client) { + + } + + @Override + public void onCommunicationFailure(BinaryLogClient client, Exception ex) { + log.error(syncConfig.getHost() + ":" + syncConfig.getPort() + "," + ex.getMessage() + "." + client.getBinlogFilename() + "/" + client.getBinlogPosition(), ex); + } + + @Override + public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) { + log.error(syncConfig.getHost() + ":" + syncConfig.getPort() + "," + ex.getMessage(), ex); + } + + @Override + public void onDisconnect(BinaryLogClient client) { + + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/BaseLifeCycleListenerFactory.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/BaseLifeCycleListenerFactory.java new file mode 100644 index 0000000..7e9ffa1 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/BaseLifeCycleListenerFactory.java @@ -0,0 +1,11 @@ +package com.insistingon.binlogportal.event.lifecycle; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.insistingon.binlogportal.config.SyncConfig; + +public class BaseLifeCycleListenerFactory implements ILifeCycleFactory { + @Override + public BinaryLogClient.LifecycleListener getLifeCycleListener(SyncConfig syncConfig) { + return new BaseLifeCycleEventListener(syncConfig); + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/ILifeCycleFactory.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/ILifeCycleFactory.java new file mode 100644 index 0000000..dc57747 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/lifecycle/ILifeCycleFactory.java @@ -0,0 +1,8 @@ +package com.insistingon.binlogportal.event.lifecycle; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.insistingon.binlogportal.config.SyncConfig; + +public interface ILifeCycleFactory { + BinaryLogClient.LifecycleListener getLifeCycleListener(SyncConfig syncConfig); +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/CommonEventParserDispatcher.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/CommonEventParserDispatcher.java new file mode 100644 index 0000000..008d69c --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/CommonEventParserDispatcher.java @@ -0,0 +1,81 @@ +package com.insistingon.binlogportal.event.parser; + +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.TableMapEventData; +import com.insistingon.binlogportal.event.EventEntity; +import com.insistingon.binlogportal.tablemeta.TableMetaFactory; +import com.insistingon.binlogportal.BinlogPortalException; + +import java.util.List; + +/** + * 通用事件解析调度器 + * 根据不同事件类型调用事件解析器 + */ +public class CommonEventParserDispatcher implements IEventParserDispatcher { + + //数据表元数据工厂 + TableMetaFactory tableMetaFactory; + + public TableMetaFactory getTableMetaFactory() { + return tableMetaFactory; + } + + public void setTableMetaFactory(TableMetaFactory tableMetaFactory) { + this.tableMetaFactory = tableMetaFactory; + } + + public CommonEventParserDispatcher(TableMetaFactory tableMetaFactory) { + this.tableMetaFactory = tableMetaFactory; + this.updateEventParser = new UpdateEventParser(tableMetaFactory); + this.insertEventParser = new InsertEventParser(tableMetaFactory); + this.deleteEventParser = new DeleteEventParser(tableMetaFactory); + } + + //更新事件解析器 + IEventParser updateEventParser; + + //插入事件解析器 + IEventParser insertEventParser; + + //删除事件解析器 + IEventParser deleteEventParser; + + @Override + public List parse(Event event) throws BinlogPortalException { + + /* + * table_id不固定对应一个表,它是表载入table cache时临时分配的,一个不断增长的变量 + * 连续往同一个table中进行多次DML操作,table_id不变。 一般来说,出现DDL操作时,table_id才会变化 + * 所有更新和插入操作,都会产生一个TABLE_MAP事件 + * 通过该事件缓存table_id对应的表信息,然后再处理对应的事件 + */ + if (EventType.TABLE_MAP.equals(event.getHeader().getEventType())) { + TableMapEventData tableMapEventData = event.getData(); + //table_map事件,要更新下tableMetaFactory中的tableId对应的信息缓存 + tableMetaFactory.getTableMetaEntity( + tableMapEventData.getTableId(), + tableMapEventData.getDatabase(), + tableMapEventData.getTable() + ); + } + + //处理更新事件 + if (EventType.isUpdate(event.getHeader().getEventType())) { + return updateEventParser.parse(event); + } + + //处理插入事件 + if (EventType.isWrite(event.getHeader().getEventType())) { + return insertEventParser.parse(event); + } + + //删除事件处理 + if (EventType.isDelete(event.getHeader().getEventType())) { + return deleteEventParser.parse(event); + } + + return null; + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/DeleteEventParser.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/DeleteEventParser.java new file mode 100644 index 0000000..522981c --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/DeleteEventParser.java @@ -0,0 +1,53 @@ +package com.insistingon.binlogportal.event.parser; + +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import com.github.shyiko.mysql.binlog.event.Event; +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.event.EventEntity; +import com.insistingon.binlogportal.event.EventEntityType; +import com.insistingon.binlogportal.event.parser.converter.CommonConverterProcessor; +import com.insistingon.binlogportal.tablemeta.TableMetaEntity; +import com.insistingon.binlogportal.tablemeta.TableMetaFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class DeleteEventParser implements IEventParser { + private CommonConverterProcessor commonConverterProcessor = new CommonConverterProcessor(); + + private TableMetaFactory tableMetaFactory; + + public DeleteEventParser(TableMetaFactory tableMetaFactory) { + this.tableMetaFactory = tableMetaFactory; + } + + @Override + public List parse(Event event) throws BinlogPortalException { + List eventEntityList = new ArrayList<>(); + DeleteRowsEventData deleteRowsEventData = event.getData(); + TableMetaEntity tableMetaEntity = tableMetaFactory.getTableMetaEntity(deleteRowsEventData.getTableId()); + List rows = deleteRowsEventData.getRows(); + rows.forEach(rowMap -> { + List columnMetaDataList = tableMetaEntity.getColumnMetaDataList(); + String[] after = commonConverterProcessor.convertToString(rowMap, columnMetaDataList); + List columns = new ArrayList<>(); + List changeAfter = new ArrayList<>(); + for (int i = 0; i < after.length; i++) { + columns.add(columnMetaDataList.get(i).getName()); + changeAfter.add(after[i]); + } + + EventEntity eventEntity = new EventEntity(); + eventEntity.setEvent(event); + eventEntity.setEventEntityType(EventEntityType.DELETE); + eventEntity.setDatabaseName(tableMetaEntity.getDbName()); + eventEntity.setTableName(tableMetaEntity.getTableName()); + eventEntity.setColumns(columnMetaDataList); + eventEntity.setChangeAfter(changeAfter); + + eventEntityList.add(eventEntity); + }); + return eventEntityList; + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/EventParserFactory.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/EventParserFactory.java new file mode 100644 index 0000000..50029b1 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/EventParserFactory.java @@ -0,0 +1,25 @@ +package com.insistingon.binlogportal.event.parser; + +import com.insistingon.binlogportal.config.SyncConfig; +import com.insistingon.binlogportal.tablemeta.TableMetaFactory; +import com.insistingon.binlogportal.BinlogPortalException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 事件解析器工厂 + */ +public class EventParserFactory { + private static final Logger log = LoggerFactory.getLogger(EventParserFactory.class); + + /** + * 获取事件解析调度器 + * + * @param syncConfig + * @return + */ + public static IEventParserDispatcher getEventParserDispatcher(SyncConfig syncConfig) throws BinlogPortalException { + //目前只有一种解析器,这里可扩展为根据syncConfig的配置获取不同的解析器 + return new CommonEventParserDispatcher(new TableMetaFactory(syncConfig)); + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/IEventParser.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/IEventParser.java new file mode 100644 index 0000000..75294a6 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/IEventParser.java @@ -0,0 +1,14 @@ +package com.insistingon.binlogportal.event.parser; + +import com.github.shyiko.mysql.binlog.event.Event; +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.event.EventEntity; + +import java.util.List; + +/** + * 事件解析器接口 + */ +public interface IEventParser { + List parse(Event event) throws BinlogPortalException; +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/IEventParserDispatcher.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/IEventParserDispatcher.java new file mode 100644 index 0000000..58e4b34 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/IEventParserDispatcher.java @@ -0,0 +1,14 @@ +package com.insistingon.binlogportal.event.parser; + +import com.github.shyiko.mysql.binlog.event.Event; +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.event.EventEntity; + +import java.util.List; + +/** + * 事件解析调度器接口 + */ +public interface IEventParserDispatcher { + public List parse(Event event) throws BinlogPortalException; +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/InsertEventParser.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/InsertEventParser.java new file mode 100644 index 0000000..b7282e3 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/InsertEventParser.java @@ -0,0 +1,53 @@ +package com.insistingon.binlogportal.event.parser; + +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import com.insistingon.binlogportal.event.EventEntity; +import com.insistingon.binlogportal.event.parser.converter.CommonConverterProcessor; +import com.insistingon.binlogportal.tablemeta.TableMetaEntity; +import com.insistingon.binlogportal.tablemeta.TableMetaFactory; +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.event.EventEntityType; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class InsertEventParser implements IEventParser { + CommonConverterProcessor commonConverterProcessor = new CommonConverterProcessor(); + + TableMetaFactory tableMetaFactory; + + public InsertEventParser(TableMetaFactory tableMetaFactory) { + this.tableMetaFactory = tableMetaFactory; + } + + @Override + public List parse(Event event) throws BinlogPortalException { + WriteRowsEventData writeRowsEventData = event.getData(); + TableMetaEntity tableMetaEntity = tableMetaFactory.getTableMetaEntity(writeRowsEventData.getTableId()); + List rows = writeRowsEventData.getRows(); + List eventEntityList = new ArrayList<>(); + rows.forEach(rowMap -> { + List columnMetaDataList = tableMetaEntity.getColumnMetaDataList(); + String[] after = commonConverterProcessor.convertToString(rowMap, columnMetaDataList); + List columns = new ArrayList<>(); + List changeAfter = new ArrayList<>(); + for (int i = 0; i < after.length; i++) { + columns.add(columnMetaDataList.get(i).getName()); + changeAfter.add(after[i]); + } + + EventEntity eventEntity = new EventEntity(); + eventEntity.setEvent(event); + eventEntity.setEventEntityType(EventEntityType.INSERT); + eventEntity.setDatabaseName(tableMetaEntity.getDbName()); + eventEntity.setTableName(tableMetaEntity.getTableName()); + eventEntity.setColumns(columnMetaDataList); + eventEntity.setChangeAfter(changeAfter); + + eventEntityList.add(eventEntity); + }); + return eventEntityList; + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/UpdateEventParser.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/UpdateEventParser.java new file mode 100644 index 0000000..e297f8b --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/UpdateEventParser.java @@ -0,0 +1,60 @@ +package com.insistingon.binlogportal.event.parser; + +import com.github.shyiko.mysql.binlog.event.Event; +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import com.insistingon.binlogportal.tablemeta.TableMetaEntity; +import com.insistingon.binlogportal.tablemeta.TableMetaFactory; +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.event.EventEntity; +import com.insistingon.binlogportal.event.EventEntityType; +import com.insistingon.binlogportal.event.parser.converter.CommonConverterProcessor; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class UpdateEventParser implements IEventParser { + + CommonConverterProcessor commonConverterProcessor = new CommonConverterProcessor(); + + TableMetaFactory tableMetaFactory; + + UpdateEventParser(TableMetaFactory tableMetaFactory) { + this.tableMetaFactory = tableMetaFactory; + } + + @Override + public List parse(Event event) throws BinlogPortalException { + List eventEntityList = new ArrayList<>(); + UpdateRowsEventData updateRowsEventData = event.getData(); + TableMetaEntity tableMetaEntity = tableMetaFactory.getTableMetaEntity(updateRowsEventData.getTableId()); + List> rows = updateRowsEventData.getRows(); + rows.forEach(rowMap -> { + EventEntity eventEntity = new EventEntity(); + eventEntity.setEvent(event); + eventEntity.setDatabaseName(tableMetaEntity.getDbName()); + eventEntity.setTableName(tableMetaEntity.getTableName()); + eventEntity.setEventEntityType(EventEntityType.UPDATE); + List columnMetaDataList = tableMetaEntity.getColumnMetaDataList(); + //解析update前后的数据 + String[] before = commonConverterProcessor.convertToString(rowMap.getKey(), columnMetaDataList); + String[] after = commonConverterProcessor.convertToString(rowMap.getValue(), columnMetaDataList); + + List columns = new ArrayList<>(); + List changeBefore = new ArrayList<>(); + List changeAfter = new ArrayList<>(); + for (int i = 0; i < before.length; i++) { + columns.add(columnMetaDataList.get(i).getName()); + changeBefore.add(before[i]); + changeAfter.add(after[i]); + } + eventEntity.setColumns(columnMetaDataList); + eventEntity.setChangeBefore(changeBefore); + eventEntity.setChangeAfter(changeAfter); + + eventEntityList.add(eventEntity); + }); + return eventEntityList; + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/CommonConverterProcessor.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/CommonConverterProcessor.java new file mode 100644 index 0000000..17fa9bb --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/CommonConverterProcessor.java @@ -0,0 +1,17 @@ +package com.insistingon.binlogportal.event.parser.converter; + +import com.insistingon.binlogportal.tablemeta.TableMetaEntity; + +import java.io.Serializable; +import java.util.List; + +public class CommonConverterProcessor { + public String[] convertToString(Serializable[] serializables, List columnMetaDataList) { + String[] res = new String[serializables.length]; + StringConverter stringConverter = new StringConverter(); + for (int i = 0; i < serializables.length; i++) { + res[i] = stringConverter.convert(serializables[i], columnMetaDataList.get(i).getType()); + } + return res; + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/IConverter.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/IConverter.java new file mode 100644 index 0000000..7352327 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/IConverter.java @@ -0,0 +1,10 @@ +package com.insistingon.binlogportal.event.parser.converter; + +/** + * 类型转换器接口 + */ +public interface IConverter { + T convert(Object from); + + T convert(Object from, String type); +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/StringConverter.java b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/StringConverter.java new file mode 100644 index 0000000..5c028f4 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/event/parser/converter/StringConverter.java @@ -0,0 +1,42 @@ +package com.insistingon.binlogportal.event.parser.converter; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +public class StringConverter implements IConverter { + + @Override + public String convert(Object from) { + if (from == null) { + return null; + } + if (from.getClass() == byte[].class) { + return new String((byte[]) from); + } + return String.valueOf(from); + } + + @Override + public String convert(Object from, String type) { + if (from == null) { + return ""; + } + List timeToStringTypeList = Arrays.asList("DATETIME", "TIMESTAMP"); + if (timeToStringTypeList.contains(type)) { + return LocalDateTime.ofEpochSecond((Long) from / 1000, 0, ZoneOffset.ofHours(8)).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + } + + if (Objects.equals(type, "DATE")) { + return LocalDateTime.ofEpochSecond((Long) from / 1000, 0, ZoneOffset.ofHours(8)).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); + } + + if (from.getClass() == byte[].class) { + return new String((byte[]) from); + } + return String.valueOf(from); + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/factory/BinaryLogClientFactory.java b/binlogportal/src/main/java/com/insistingon/binlogportal/factory/BinaryLogClientFactory.java new file mode 100644 index 0000000..baf492e --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/factory/BinaryLogClientFactory.java @@ -0,0 +1,121 @@ +package com.insistingon.binlogportal.factory; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.config.SyncConfig; +import com.insistingon.binlogportal.event.MultiEventHandlerListener; +import com.insistingon.binlogportal.event.lifecycle.ILifeCycleFactory; +import com.insistingon.binlogportal.event.parser.EventParserFactory; +import com.insistingon.binlogportal.position.BinlogPositionEntity; +import com.insistingon.binlogportal.position.IPositionHandler; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.RandomUtils; + +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.concurrent.ConcurrentHashMap; + +/** + * BinaryLogClient Factory + */ +public class BinaryLogClientFactory { + + private ConcurrentHashMap cache = new ConcurrentHashMap<>(); + + private IPositionHandler positionHandler; + + //LifeCycleEvent监听器 + private ILifeCycleFactory lifeCycleFactory; + + public IPositionHandler getPositionHandler() { + return positionHandler; + } + + public void setPositionHandler(IPositionHandler positionHandler) { + this.positionHandler = positionHandler; + } + + public ConcurrentHashMap getCache() { + return cache; + } + + public void setCache(ConcurrentHashMap cache) { + this.cache = cache; + } + + public ILifeCycleFactory getLifeCycleFactory() { + return lifeCycleFactory; + } + + public void setLifeCycleFactory(ILifeCycleFactory lifeCycleFactory) { + this.lifeCycleFactory = lifeCycleFactory; + } + + /** + * 获取客户端 + * + * @param syncConfig SyncConfig + * @return BinaryLogClient + */ + public BinaryLogClient getClient(SyncConfig syncConfig) throws BinlogPortalException { + String key = syncConfig.toString(); + //有缓存拿缓存里的 + if (cache.get(key) != null) { + return cache.get(key); + } else { + //创建客户端 + BinaryLogClient client = new BinaryLogClient( + syncConfig.getHost(), + syncConfig.getPort(), + syncConfig.getUserName(), + syncConfig.getPassword() + ); + EventDeserializer eventDeserializer = new EventDeserializer(); + eventDeserializer.setCompatibilityMode( + EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, + EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY + ); + client.setEventDeserializer(eventDeserializer); + //设置slave的serverId,不同集群中,两个机器不能相同 + client.setServerId(getRandomServerId()); + + //处理binlog位点信息 + if (positionHandler != null && positionHandler.getPosition(syncConfig) != null) { + BinlogPositionEntity positionEntity = positionHandler.getPosition(syncConfig); + if (positionEntity != null + && !StringUtils.isBlank(positionEntity.getBinlogName()) + && positionEntity.getPosition() != null) { + client.setBinlogFilename(positionEntity.getBinlogName()); + long position = positionEntity.getPosition() != null ? positionEntity.getPosition() : 0L; + client.setBinlogPosition((position)); + } + } + + //创建多事件统一处理器 + MultiEventHandlerListener multiEventHandlerListener = new MultiEventHandlerListener(); + //设置事件解析器 + multiEventHandlerListener.setEventParserDispatcher(EventParserFactory.getEventParserDispatcher(syncConfig)); + //保存配置信息 + multiEventHandlerListener.setSyncConfig(syncConfig); + //设置binlog位点信息 + multiEventHandlerListener.setPositionHandler(positionHandler); + //注册配置信息中的事件处理器 + syncConfig.getEventHandlerList().forEach(multiEventHandlerListener::registerEventHandler); + + //注册client的监听器 + client.registerEventListener(multiEventHandlerListener); + client.registerLifecycleListener(lifeCycleFactory.getLifeCycleListener(syncConfig)); + + return client; + } + } + + private long getRandomServerId() { + try { + return SecureRandom.getInstanceStrong().nextLong(); + } catch (NoSuchAlgorithmException e) { + return RandomUtils.nextLong(); + } + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/position/BinlogPositionEntity.java b/binlogportal/src/main/java/com/insistingon/binlogportal/position/BinlogPositionEntity.java new file mode 100644 index 0000000..7cc02fc --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/position/BinlogPositionEntity.java @@ -0,0 +1,40 @@ +package com.insistingon.binlogportal.position; + +public class BinlogPositionEntity { + String binlogName; + Long position; + Long serverId; + + public String getBinlogName() { + return binlogName; + } + + public void setBinlogName(String binlogName) { + this.binlogName = binlogName; + } + + public Long getPosition() { + return position; + } + + public void setPosition(Long position) { + this.position = position; + } + + public Long getServerId() { + return serverId; + } + + public void setServerId(Long serverId) { + this.serverId = serverId; + } + + @Override + public String toString() { + return "BinlogPositionEntity{" + + "binlogName='" + binlogName + '\'' + + ", position=" + position + + ", serverId=" + serverId + + '}'; + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/position/IPositionHandler.java b/binlogportal/src/main/java/com/insistingon/binlogportal/position/IPositionHandler.java new file mode 100644 index 0000000..8d62782 --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/position/IPositionHandler.java @@ -0,0 +1,13 @@ +package com.insistingon.binlogportal.position; + +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.config.SyncConfig; + +/** + * 处理binlog位点信息接口,实现该接口创建自定义位点处理类 + */ +public interface IPositionHandler { + BinlogPositionEntity getPosition(SyncConfig syncConfig) throws BinlogPortalException; + + void savePosition(SyncConfig syncConfig, BinlogPositionEntity binlogPositionEntity) throws BinlogPortalException; +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/position/RedisPositionHandler.java b/binlogportal/src/main/java/com/insistingon/binlogportal/position/RedisPositionHandler.java new file mode 100644 index 0000000..c60856b --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/position/RedisPositionHandler.java @@ -0,0 +1,60 @@ +package com.insistingon.binlogportal.position; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.insistingon.binlogportal.BinlogPortalException; +import com.insistingon.binlogportal.config.RedisConfig; +import com.insistingon.binlogportal.config.SyncConfig; +import org.apache.commons.lang.StringUtils; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; + +public class RedisPositionHandler implements IPositionHandler { + + private RedisConfig redisConfig; + private Jedis jedis; + private JedisPool jedisPool; + + public RedisPositionHandler(RedisConfig redisConfig) { + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + jedisPoolConfig.setMaxTotal(10); + if (!StringUtils.isBlank(redisConfig.getAuth())) { + jedisPool = new JedisPool(jedisPoolConfig, redisConfig.getHost(), redisConfig.getPort(), 1000, redisConfig.getAuth()); + } else { + jedisPool = new JedisPool(jedisPoolConfig, redisConfig.getHost(), redisConfig.getPort(), 1000); + } + } + + @Override + public BinlogPositionEntity getPosition(SyncConfig syncConfig) throws BinlogPortalException { + Jedis jedis = null; + try { + jedis = jedisPool.getResource(); + String position = jedis.get(syncConfig.getHost() + ":" + syncConfig.getPort()); + if (position != null) { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.readValue(position, BinlogPositionEntity.class); + } + } catch (JsonProcessingException e) { + return null; + } finally { + if (jedis != null) jedis.close(); + } + return null; + } + + @Override + public void savePosition(SyncConfig syncConfig, BinlogPositionEntity binlogPositionEntity) throws BinlogPortalException { + Jedis jedis = null; + try { + jedis = jedisPool.getResource(); + ObjectMapper objectMapper = new ObjectMapper(); + jedis.set(syncConfig.getHost() + ":" + syncConfig.getPort(), objectMapper.writeValueAsString(binlogPositionEntity)); + } catch (JsonProcessingException e) { + throw new BinlogPortalException("save position error!" + binlogPositionEntity.toString(), e); + } finally { + if (jedis != null) jedis.close(); + } + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/tablemeta/TableMetaEntity.java b/binlogportal/src/main/java/com/insistingon/binlogportal/tablemeta/TableMetaEntity.java new file mode 100644 index 0000000..980a57d --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/tablemeta/TableMetaEntity.java @@ -0,0 +1,73 @@ +package com.insistingon.binlogportal.tablemeta; + +import java.util.ArrayList; +import java.util.List; + +public class TableMetaEntity { + private Long tableId; + private String dbName; + private String tableName; + private List columnMetaDataList = new ArrayList(); + private String createSql; + + public static class ColumnMetaData { + String name; + String type; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + } + + public Long getTableId() { + return tableId; + } + + public void setTableId(Long tableId) { + this.tableId = tableId; + } + + public String getDbName() { + return dbName; + } + + public void setDbName(String dbName) { + this.dbName = dbName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public List getColumnMetaDataList() { + return columnMetaDataList; + } + + public void setColumnMetaDataList(List columnMetaDataList) { + this.columnMetaDataList = columnMetaDataList; + } + + public String getCreateSql() { + return createSql; + } + + public void setCreateSql(String createSql) { + this.createSql = createSql; + } +} diff --git a/binlogportal/src/main/java/com/insistingon/binlogportal/tablemeta/TableMetaFactory.java b/binlogportal/src/main/java/com/insistingon/binlogportal/tablemeta/TableMetaFactory.java new file mode 100644 index 0000000..2a7955b --- /dev/null +++ b/binlogportal/src/main/java/com/insistingon/binlogportal/tablemeta/TableMetaFactory.java @@ -0,0 +1,68 @@ +package com.insistingon.binlogportal.tablemeta; + +import com.insistingon.binlogportal.config.SyncConfig; +import com.insistingon.binlogportal.BinlogPortalException; + +import java.sql.*; +import java.util.HashMap; +import java.util.Map; + +public class TableMetaFactory { + SyncConfig syncConfig; + + public TableMetaFactory(SyncConfig syncConfig) { + this.syncConfig = syncConfig; + } + + //缓存tableId信息 + private Map tableMetaEntityIdMap = new HashMap<>(); + //缓存表名信息 + private Map tableMetaEntityNameMap = new HashMap<>(); + + public TableMetaEntity getTableMetaEntity(Long tableId, String dbName, String tableName) throws BinlogPortalException { + Connection connection = null; + try { + if (tableMetaEntityIdMap.get(tableId) != null) { + return tableMetaEntityIdMap.get(tableId); + } else { + String url = "jdbc:mysql://" + syncConfig.getHost() + ":" + syncConfig.getPort() + "/" + dbName; + connection = DriverManager.getConnection(url, syncConfig.getUserName(), syncConfig.getPassword()); + DatabaseMetaData dbmd = connection.getMetaData(); + ResultSet rs = dbmd.getColumns(null, dbName, tableName, null); + TableMetaEntity tableMetaEntity = new TableMetaEntity(); + tableMetaEntity.setTableId(tableId); + tableMetaEntity.setDbName(dbName); + tableMetaEntity.setTableName(tableName); + while (rs.next()) { + TableMetaEntity.ColumnMetaData columnMetaData = new TableMetaEntity.ColumnMetaData(); + String colName = rs.getString("COLUMN_NAME"); + columnMetaData.setName(colName); + String dbType = rs.getString("TYPE_NAME"); + columnMetaData.setType(dbType); + tableMetaEntity.getColumnMetaDataList().add(columnMetaData); + } + tableMetaEntityIdMap.put(tableId, tableMetaEntity); + return tableMetaEntity; + } + } catch (Throwable e) { + throw new BinlogPortalException(e.getCause()); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + } + } + } + } + + /** + * 根据tableId获取表元数据 + * + * @param tableId + * @return + */ + public TableMetaEntity getTableMetaEntity(Long tableId) { + return tableMetaEntityIdMap.get(tableId); + } +} diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..94372ac --- /dev/null +++ b/pom.xml @@ -0,0 +1,168 @@ + + + + 4.0.0 + + com.insistingon.binlogportal + binlogportal-parent + pom + 1.0.11 + + binlogportal-parent + A simple mysql binlog sync tool + https://github.com/dothetrick/binlogportal + + + UTF-8 + UTF-8 + 1.8 + true + 2.3.0.RELEASE + 1.8 + 1.8 + 1.0.11 + + + + binlogportal + binlogportal-spring-boot-starter + binlogportal-spring-boot-starter-test + + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + actable + + + + + + dothetrick + flyinthewar@126.com + dothetrick + + + + + master + git@github.com:dothetrick/binlogportal.git + git@github.com:dothetrick/binlogportal.git + git@github.com:dothetrick/binlogportal.git + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + + ossrh + Nexus Release Repository + http://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + + + + maven-clean-plugin + 3.1.0 + + + + maven-compiler-plugin + 3.8.0 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + 2.8.2 + + + org.codehaus.mojo + versions-maven-plugin + 2.3 + + false + ${newVersion} + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.2.0 + + + attach-javadocs + package + + jar + + + + -Xdoclint:none + + + + + + + maven-jar-plugin + 3.0.2 + + + + dothetrick + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + package + + sign + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.8 + true + + ossrh + https://oss.sonatype.org/ + true + + + + +