diff --git a/README.md b/README.md index 01bbc3ea45..2cb080de07 100644 --- a/README.md +++ b/README.md @@ -37,47 +37,50 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源 DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,详情请点击:[DataX数据源参考指南](https://github.com/alibaba/DataX/wiki/DataX-all-data-channels) -| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 | -|--------------|---------------------------|:---------:|:---------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:| -| RDBMS 关系型数据库 | MySQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) | -| | Oracle | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) | -| | OceanBase | √ | √ | [读](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) | -| | SQLServer | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) | -| | PostgreSQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) | -| | DRDS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | -| | Kingbase | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | -| | 通用RDBMS(支持所有关系型数据库) | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) | -| 阿里云数仓数据存储 | ODPS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) | -| | ADB | | √ | [写](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) | -| | ADS | | √ | [写](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) | -| | OSS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) | -| | OCS | | √ | [写](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) | -| | Hologres | | √ | [写](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) | -| | AnalyticDB For PostgreSQL | | √ | 写 | -| 阿里云中间件 | datahub | √ | √ | 读 、写 | -| | SLS | √ | √ | 读 、写 | -| 阿里云图数据库 | GDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) | -| NoSQL数据存储 | OTS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) | -| | Hbase0.94 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) | -| | Hbase1.1 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) | -| | Phoenix4.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) | -| | Phoenix5.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) | -| | MongoDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) | -| | Cassandra | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) | -| 数仓数据存储 | StarRocks | √ | √ | 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) | -| | ApacheDoris | | √ | [写](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) | -| | ClickHouse | | √ | 写 | -| | Databend | | √ | [写](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) | -| | Hive | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| | kudu | | √ | [写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| | selectdb | | √ | [写](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) | -| 无结构化数据存储 | TxtFile | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | -| | FTP | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | -| | HDFS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | -| | Elasticsearch | | √ | [写](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) | -| 时间序列数据库 | OpenTSDB | √ | | [读](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) | -| | TSDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) | -| | TDengine | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) | +| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 | +| ------------------ | ------------------------------- | :--------: | :--------: | :----------------------------------------------------------: | +| RDBMS 关系型数据库 | MySQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md) | +| | Oracle | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/oraclewriter/doc/oraclewriter.md) | +| | OceanBase | √ | √ | [读](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) 、[写](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/use-datax-to-full-migration-data-to-oceanbase) | +| | SQLServer | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/sqlserverreader/doc/sqlserverreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/sqlserverwriter/doc/sqlserverwriter.md) | +| | PostgreSQL | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/postgresqlwriter/doc/postgresqlwriter.md) | +| | DRDS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | +| | Kingbase | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/drdsreader/doc/drdsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/drdswriter/doc/drdswriter.md) | +| | 通用RDBMS(支持所有关系型数据库) | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/rdbmsreader/doc/rdbmsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/rdbmswriter/doc/rdbmswriter.md) | +| 阿里云数仓数据存储 | ODPS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/odpsreader/doc/odpsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/odpswriter/doc/odpswriter.md) | +| | ADB | | √ | [写](https://github.com/alibaba/DataX/blob/master/adbmysqlwriter/doc/adbmysqlwriter.md) | +| | ADS | | √ | [写](https://github.com/alibaba/DataX/blob/master/adswriter/doc/adswriter.md) | +| | OSS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ossreader/doc/ossreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/osswriter/doc/osswriter.md) | +| | OCS | | √ | [写](https://github.com/alibaba/DataX/blob/master/ocswriter/doc/ocswriter.md) | +| | Hologres | | √ | [写](https://github.com/alibaba/DataX/blob/master/hologresjdbcwriter/doc/hologresjdbcwriter.md) | +| | AnalyticDB For PostgreSQL | | √ | 写 | +| 阿里云中间件 | datahub | √ | √ | 读 、写 | +| | SLS | √ | √ | 读 、写 | +| 图数据库 | 阿里云 GDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/gdbreader/doc/gdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/gdbwriter/doc/gdbwriter.md) | +| | Neo4j | | √ | 写 | +| NoSQL数据存储 | OTS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md) | +| | Hbase0.94 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md) | +| | Hbase1.1 | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md) | +| | Phoenix4.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md) | +| | Phoenix5.x | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hbase20xsqlreader/doc/hbase20xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase20xsqlwriter/doc/hbase20xsqlwriter.md) | +| | MongoDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongodbwriter/doc/mongodbwriter.md) | +| | Cassandra | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/cassandrareader/doc/cassandrareader.md) 、[写](https://github.com/alibaba/DataX/blob/master/cassandrawriter/doc/cassandrawriter.md) | +| 数仓数据存储 | StarRocks | √ | √ | 读 、[写](https://github.com/alibaba/DataX/blob/master/starrockswriter/doc/starrockswriter.md) | +| | ApacheDoris | | √ | [写](https://github.com/alibaba/DataX/blob/master/doriswriter/doc/doriswriter.md) | +| | ClickHouse | | √ | 写 | +| | Databend | | √ | [写](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) | +| | Hive | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | kudu | | √ | [写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | selectdb | | √ | [写](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) | +| 无结构化数据存储 | TxtFile | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | +| | FTP | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | +| | HDFS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | Elasticsearch | | √ | [写](https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md) | +| 时间序列数据库 | OpenTSDB | √ | | [读](https://github.com/alibaba/DataX/blob/master/opentsdbreader/doc/opentsdbreader.md) | +| | TSDB | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tsdbreader/doc/tsdbreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/tsdbwriter/doc/tsdbhttpwriter.md) | +| | TDengine | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/tdenginereader/doc/tdenginereader-CN.md) 、[写](https://github.com/alibaba/DataX/blob/master/tdenginewriter/doc/tdenginewriter-CN.md) | + + # 阿里云DataWorks数据集成 @@ -94,11 +97,11 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N - 新增比如:DB2、Kafka、Hologres、MetaQ、SAPHANA、达梦等等,持续扩充中 - 离线同步支持的数据源:https://help.aliyun.com/document_detail/137670.html - 具备同步解决方案: - - 解决方案系统:https://help.aliyun.com/document_detail/171765.html - - 一键全增量:https://help.aliyun.com/document_detail/175676.html - - 整库迁移:https://help.aliyun.com/document_detail/137809.html - - 批量上云:https://help.aliyun.com/document_detail/146671.html - - 更新更多能力请访问:https://help.aliyun.com/document_detail/137663.html + - 解决方案系统:https://help.aliyun.com/document_detail/171765.html + - 一键全增量:https://help.aliyun.com/document_detail/175676.html + - 整库迁移:https://help.aliyun.com/document_detail/137809.html + - 批量上云:https://help.aliyun.com/document_detail/146671.html + - 更新更多能力请访问:https://help.aliyun.com/document_detail/137663.html # 我要开发新的插件 @@ -119,10 +122,10 @@ DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull re - 涉及通道能力更新(OceanBase、Tdengine、Doris等) - [datax_v202209](https://github.com/alibaba/DataX/releases/tag/datax_v202209) - - 涉及通道能力更新(MaxCompute、Datahub、SLS等)、安全漏洞更新、通用打包更新等 + - 涉及通道能力更新(MaxCompute、Datahub、SLS等)、安全漏洞更新、通用打包更新等 - [datax_v202205](https://github.com/alibaba/DataX/releases/tag/datax_v202205) - - 涉及通道能力更新(MaxCompute、Hologres、OSS、Tdengine等)、安全漏洞更新、通用打包更新等 + - 涉及通道能力更新(MaxCompute、Hologres、OSS、Tdengine等)、安全漏洞更新、通用打包更新等 # 项目成员 diff --git a/neo4jwriter/doc/neo4jwriter.md b/neo4jwriter/doc/neo4jwriter.md new file mode 100644 index 0000000000..0c6e356cfb --- /dev/null +++ b/neo4jwriter/doc/neo4jwriter.md @@ -0,0 +1,193 @@ +# DataX neo4jWriter 插件文档 + +## 功能简介 + +本目前市面上的neo4j 批量导入主要有Cypher Create,Load CSV,第三方或者官方提供的Batch Import。Load CSV支持节点10W级别一下,Batch Import 需要对数据库进行停机。要想实现不停机的数据写入,Cypher是最好的方式。 + +## 支持版本 + +支持Neo4j 4 和Neo4j 5,如果是Neo4j 3,需要自行将驱动降低至相对应的版本进行编译。 + +## 实现原理 + +将datax的数据转换成了neo4j驱动能识别的对象,利用 unwind 语法进行批量插入。 + +## 如何配置 + +### 配置项介绍 + +| 配置 | 说明 | 是否必须 | 默认值 | 示例 | +|:-------------------------------|--------------------| -------- | ------ | ---------------------------------------------------- | +| database | 数据库名字 | 是 | - | neo4j | +| uri | 数据库访问链接 | 是 | - | bolt://localhost:7687 | +| username | 访问用户名 | 是 | - | neo4j | +| password | 访问密码 | 是 | - | neo4j | +| bearerToken | 权限相关 | 否 | - | - | +| kerberosTicket | 权限相关 | 否 | - | - | +| cypher | 同步语句 | 是 | - | unwind $batch as row create(p) set p.name = row.name | +| batchDataVariableName | unwind 携带的数据变量名 | | | batch | +| properties | 定义neo4j中数据的属性名字和类型 | 是 | - | 见后续案例 | +| batchSize | 一批写入数据量 | 否 | 1000 | | +| maxTransactionRetryTimeSeconds | 事务运行最长时间 | 否 | 30秒 | 30 | +| maxConnectionTimeoutSeconds | 驱动最长链接时间 | 否 | 30秒 | 30 | +| retryTimes | 发生错误的重试次数 | 否 | 3次 | 3 | +| retrySleepMills | 重试失败后的等待时间 | 否 | 3秒 | 3 | + +### 支持的数据类型 +> 配置时均忽略大小写 +``` +BOOLEAN, +STRING, +LONG, +SHORT, +INTEGER, +DOUBLE, +FLOAT, +LOCAL_DATE, +LOCAL_TIME, +LOCAL_DATE_TIME, +LIST, +//map类型支持 . 属性表达式取值 +MAP, +CHAR_ARRAY, +BYTE_ARRAY, +BOOLEAN_ARRAY, +STRING_ARRAY, +LONG_ARRAY, +INT_ARRAY, +SHORT_ARRAY, +DOUBLE_ARRAY, +FLOAT_ARRAY, +Object_ARRAY +``` + +### 写节点 + +这里提供了一个写节点包含很多类型属性的例子。你可以在我的测试方法中运行。 + +```json +"writer": { + "name": "neo4jWriter", + "parameter": { + "uri": "neo4j://localhost:7687", + "username": "neo4j", + "password": "Test@12343", + "database": "neo4j", + "cypher": "unwind $batch as row create(p:Person) set p.pbool = row.pbool,p.pstring = row.pstring,p.plong = row.plong,p.pshort = row.pshort,p.pdouble=row.pdouble,p.pstringarr=row.pstringarr,p.plocaldate=row.plocaldate", + "batchDataVariableName": "batch", + "batchSize": "33", + "properties": [ + { + "name": "pbool", + "type": "BOOLEAN" + }, + { + "name": "pstring", + "type": "STRING" + }, + { + "name": "plong", + "type": "LONG" + }, + { + "name": "pshort", + "type": "SHORT" + }, + { + "name": "pdouble", + "type": "DOUBLE" + }, + { + "name": "pstringarr", + "type": "STRING_ARRAY", + "split": "," + }, + { + "name": "plocaldate", + "type": "LOCAL_DATE", + "dateFormat": "yyyy-MM-dd" + } + ] + } + } +``` + +### 写关系 + +```json +"writer": { + "name": "neo4jWriter", + "parameter": { + "uri": "neo4j://localhost:7687", + "username": "neo4j", + "password": "Test@12343", + "database": "neo4j", + "cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)", + "batchDataVariableName": "batch", + "batch_size": "33", + "properties": [ + { + "name": "startNodeId", + "type": "STRING" + }, + { + "name": "endNodeId", + "type": "STRING" + } + ] + } + } +``` + +### 节点/关系类型动态写 + +> 需要使用AOPC函数拓展,如果你的数据库没有,请安装APOC函数拓展 + +```json + "writer": { + "name": "neo4jWriter", + "parameter": { + "uri": "bolt://localhost:7687", + "username": "yourUserName", + "password": "yourPassword", + "database": "yourDataBase", + "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ", + "batchDataVariableName": "batch", + "batch_size": "1", + "properties": [ + { + "name": "Label", + "type": "STRING" + }, + { + "name": "id", + "type": "STRING" + } + ] + } + } +``` + +## 注意事项 + +* properties定义的顺序需要与reader端顺序一一对应。 +* 灵活使用map类型,可以免去很多数据加工的烦恼。在cypher中,可以根据 . 属性访问符号一直取值。比如 unwind $batch as row create (p) set p.name = row.prop.name,set p.age = row.prop.age,在这个例子中,prop是map类型,包含name和age两个属性。 +* 如果提示事务超时,建议调大事务运行时间或者调小batchSize +* 如果用于更新场景,遇到死锁问题影响写入,建议二开源码加入死锁异常检测,并进行重试。 + +## 性能报告 + +**JVM参数** + +16G G1垃圾收集器 8核心 + +**Neo4j数据库配置** + +32核心,256G + +**datax 配置** + +* Channel 20 batchsize = 1000 +* 任务平均流量:15.23MB/s +* 记录写入速度:44440 rec/s +* 读出记录总数:2222013 diff --git a/neo4jwriter/pom.xml b/neo4jwriter/pom.xml new file mode 100644 index 0000000000..a9ae43e93c --- /dev/null +++ b/neo4jwriter/pom.xml @@ -0,0 +1,90 @@ + + + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + 4.0.0 + + neo4jwriter + neo4jwriter + jar + + + 8 + 8 + UTF-8 + 4.4.9 + 4.13.2 + 1.17.6 + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + org.neo4j.driver + neo4j-java-driver + ${neo4j-java-driver.version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + + org.testcontainers + testcontainers + ${test.container.version} + + + + junit + junit + ${junit4.version} + test + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/neo4jwriter/src/main/assembly/package.xml b/neo4jwriter/src/main/assembly/package.xml new file mode 100644 index 0000000000..3acbe6740d --- /dev/null +++ b/neo4jwriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/neo4jwriter + + + target/ + + neo4jwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/neo4jwriter + + + + + + false + plugin/writer/neo4jwriter/libs + runtime + + + diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java new file mode 100644 index 0000000000..4451bbdf2d --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jClient.java @@ -0,0 +1,256 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter; + + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.common.util.RetryUtil; +import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.DateAdapter; +import com.alibaba.datax.plugin.writer.neo4jwriter.adapter.ValueAdapter; +import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; +import com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode; +import com.alibaba.fastjson2.JSON; +import org.apache.commons.lang3.StringUtils; +import org.neo4j.driver.*; +import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.driver.internal.value.MapValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static com.alibaba.datax.plugin.writer.neo4jwriter.config.ConfigConstants.*; +import static com.alibaba.datax.plugin.writer.neo4jwriter.exception.Neo4jErrorCode.DATABASE_ERROR; + +public class Neo4jClient { + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jClient.class); + private Driver driver; + + private WriteConfig writeConfig; + private RetryConfig retryConfig; + private TaskPluginCollector taskPluginCollector; + + private Session session; + + private List writerBuffer; + + + public Neo4jClient(Driver driver, + WriteConfig writeConfig, + RetryConfig retryConfig, + TaskPluginCollector taskPluginCollector) { + this.driver = driver; + this.writeConfig = writeConfig; + this.retryConfig = retryConfig; + this.taskPluginCollector = taskPluginCollector; + this.writerBuffer = new ArrayList<>(writeConfig.batchSize); + } + + public void init() { + String database = writeConfig.database; + //neo4j 3.x 没有数据库 + if (null != database && !"".equals(database)) { + this.session = driver.session(SessionConfig.forDatabase(database)); + } else { + this.session = driver.session(); + } + } + + public static Neo4jClient build(Configuration config, TaskPluginCollector taskPluginCollector) { + + Driver driver = buildNeo4jDriver(config); + String cypher = checkCypher(config); + String database = config.getString(DATABASE.getKey()); + String batchVariableName = config.getString(BATCH_DATA_VARIABLE_NAME.getKey(), + BATCH_DATA_VARIABLE_NAME.getDefaultValue()); + List neo4jProperties = JSON.parseArray(config.getString(NEO4J_PROPERTIES.getKey()), Neo4jProperty.class); + int batchSize = config.getInt(BATCH_SIZE.getKey(), BATCH_SIZE.getDefaultValue()); + int retryTimes = config.getInt(RETRY_TIMES.getKey(), RETRY_TIMES.getDefaultValue()); + + return new Neo4jClient(driver, + new WriteConfig(cypher, database, batchVariableName, neo4jProperties, batchSize), + new RetryConfig(retryTimes, config.getLong(RETRY_SLEEP_MILLS.getKey(), RETRY_SLEEP_MILLS.getDefaultValue())), + taskPluginCollector + ); + } + + private static String checkCypher(Configuration config) { + String cypher = config.getString(CYPHER.getKey()); + if (StringUtils.isBlank(cypher)) { + throw DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "cypher must not null or empty"); + } + return cypher; + } + + private static Driver buildNeo4jDriver(Configuration config) { + + Config.ConfigBuilder configBuilder = Config.builder().withMaxConnectionPoolSize(1); + String uri = checkUriConfig(config); + + //connection timeout + //连接超时时间 + Long maxConnTime = config.getLong(MAX_CONNECTION_TIMEOUT_SECONDS.getKey(), MAX_TRANSACTION_RETRY_TIME.getDefaultValue()); + configBuilder + .withConnectionAcquisitionTimeout( + maxConnTime * 2, TimeUnit.SECONDS) + .withConnectionTimeout(maxConnTime, TimeUnit.SECONDS); + + + //transaction timeout + //事务运行超时时间 + Long txRetryTime = config.getLong(MAX_TRANSACTION_RETRY_TIME.getKey(), MAX_TRANSACTION_RETRY_TIME.getDefaultValue()); + configBuilder.withMaxTransactionRetryTime(txRetryTime, TimeUnit.SECONDS); + String username = config.getString(USERNAME.getKey()); + String password = config.getString(PASSWORD.getKey()); + String bearerToken = config.getString(BEARER_TOKEN.getKey()); + String kerberosTicket = config.getString(KERBEROS_TICKET.getKey()); + + if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) { + + return GraphDatabase.driver(uri, AuthTokens.basic(username, password), configBuilder.build()); + + } else if (StringUtils.isNotBlank(bearerToken)) { + + return GraphDatabase.driver(uri, AuthTokens.bearer(bearerToken), configBuilder.build()); + + } else if (StringUtils.isNotBlank(kerberosTicket)) { + + return GraphDatabase.driver(uri, AuthTokens.kerberos(kerberosTicket), configBuilder.build()); + + } + + throw DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "Invalid Auth config."); + } + + private static String checkUriConfig(Configuration config) { + String uri = config.getString(URI.getKey()); + if (null == uri || uri.length() == 0) { + throw DataXException.asDataXException(Neo4jErrorCode.CONFIG_INVALID, "Invalid uri configuration"); + } + return uri; + } + + public void destroy() { + tryFlushBuffer(); + if (driver != null) { + driver.close(); + } + if (session != null) { + session.close(); + } + DateAdapter.destroy(); + } + + private void tryFlushBuffer() { + if (!writerBuffer.isEmpty()) { + doWrite(writerBuffer); + writerBuffer.clear(); + } + } + + private void tryBatchWrite() { + if (!writerBuffer.isEmpty() && writerBuffer.size() >= writeConfig.batchSize) { + doWrite(writerBuffer); + writerBuffer.clear(); + } + } + + private void doWrite(List values) { + Value batchValues = Values.parameters(this.writeConfig.batchVariableName, values); + Query query = new Query(this.writeConfig.cypher, batchValues); +// LOGGER.debug("query:{}", query.text()); +// LOGGER.debug("batch:{}", toUnwindStr(values)); + try { + RetryUtil.executeWithRetry(() -> { + session.writeTransaction(tx -> tx.run(query)); + return null; + }, this.retryConfig.retryTimes, retryConfig.retrySleepMills, true, + Collections.singletonList(Neo4jException.class)); + } catch (Exception e) { + LOGGER.error("an exception occurred while writing to the database,message:{}", e.getMessage()); + throw DataXException.asDataXException(DATABASE_ERROR, e.getMessage()); + } + + + } + + private String toUnwindStr(List values) { + StringJoiner joiner = new StringJoiner(","); + for (MapValue value : values) { + joiner.add(value.toString()); + } + return "[" + joiner + "]"; + } + + public void tryWrite(Record record) { + MapValue neo4jValue = checkAndConvert(record); + writerBuffer.add(neo4jValue); + tryBatchWrite(); + } + + private MapValue checkAndConvert(Record record) { + int sourceColNum = record.getColumnNumber(); + List neo4jProperties = writeConfig.neo4jProperties; + if (neo4jProperties == null || neo4jProperties.size() != sourceColNum) { + throw new DataXException(Neo4jErrorCode.CONFIG_INVALID, "the read and write columns do not match!"); + } + Map data = new HashMap<>(sourceColNum * 4 / 3); + for (int i = 0; i < sourceColNum; i++) { + Column column = record.getColumn(i); + Neo4jProperty neo4jProperty = neo4jProperties.get(i); + try { + + Value value = ValueAdapter.column2Value(column, neo4jProperty); + data.put(neo4jProperty.getName(), value); + } catch (Exception e) { + LOGGER.info("dirty record:{},message :{}", column, e.getMessage()); + this.taskPluginCollector.collectDirtyRecord(record, e.getMessage()); + } + } + return new MapValue(data); + } + + public List getNeo4jFields() { + return this.writeConfig.neo4jProperties; + } + + + static class RetryConfig { + int retryTimes; + long retrySleepMills; + + RetryConfig(int retryTimes, long retrySleepMills) { + this.retryTimes = retryTimes; + this.retrySleepMills = retrySleepMills; + } + } + + static class WriteConfig { + String cypher; + + String database; + + String batchVariableName; + + List neo4jProperties; + + int batchSize; + + public WriteConfig(String cypher, + String database, + String batchVariableName, + List neo4jProperties, + int batchSize) { + this.cypher = cypher; + this.database = database; + this.batchVariableName = batchVariableName; + this.neo4jProperties = neo4jProperties; + this.batchSize = batchSize; + } + + + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java new file mode 100644 index 0000000000..a89f46749c --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/Neo4jWriter.java @@ -0,0 +1,63 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter; + +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.common.element.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class Neo4jWriter extends Writer { + public static class Job extends Writer.Job { + private static final Logger LOGGER = LoggerFactory.getLogger(Job.class); + + private Configuration jobConf = null; + @Override + public void init() { + LOGGER.info("Neo4jWriter Job init success"); + } + + @Override + public void destroy() { + LOGGER.info("Neo4jWriter Job destroyed"); + } + + @Override + public List split(int mandatoryNumber) { + List configurations = new ArrayList(mandatoryNumber); + for (int i = 0; i < mandatoryNumber; i++) { + configurations.add(this.jobConf.clone()); + } + return configurations; + } + } + + public static class Task extends Writer.Task { + private static final Logger TASK_LOGGER = LoggerFactory.getLogger(Task.class); + private Neo4jClient neo4jClient; + @Override + public void init() { + Configuration taskConf = super.getPluginJobConf(); + this.neo4jClient = Neo4jClient.build(taskConf,getTaskPluginCollector()); + this.neo4jClient.init(); + TASK_LOGGER.info("neo4j writer task init success."); + } + + @Override + public void destroy() { + this.neo4jClient.destroy(); + TASK_LOGGER.info("neo4j writer task destroyed."); + } + + @Override + public void startWrite(RecordReceiver receiver) { + Record record; + while ((record = receiver.getFromReader()) != null){ + this.neo4jClient.tryWrite(record); + } + } + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/DateAdapter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/DateAdapter.java new file mode 100644 index 0000000000..51b214bd07 --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/DateAdapter.java @@ -0,0 +1,70 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.adapter; + + +import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; +import org.testcontainers.shaded.com.google.common.base.Supplier; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; + +/** + * @author fuyouj + */ +public class DateAdapter { + private static final ThreadLocal LOCAL_DATE_FORMATTER_MAP = new ThreadLocal<>(); + private static final ThreadLocal LOCAL_TIME_FORMATTER_MAP = new ThreadLocal<>(); + private static final ThreadLocal LOCAL_DATE_TIME_FORMATTER_MAP = new ThreadLocal<>(); + private static final String DEFAULT_LOCAL_DATE_FORMATTER = "yyyy-MM-dd"; + private static final String DEFAULT_LOCAL_TIME_FORMATTER = "HH:mm:ss"; + private static final String DEFAULT_LOCAL_DATE_TIME_FORMATTER = "yyyy-MM-dd HH:mm:ss"; + + + public static LocalDate localDate(String text, Neo4jProperty neo4jProperty) { + if (LOCAL_DATE_FORMATTER_MAP.get() != null) { + return LocalDate.parse(text, LOCAL_DATE_FORMATTER_MAP.get()); + } + + String format = getOrDefault(neo4jProperty::getDateFormat, DEFAULT_LOCAL_DATE_FORMATTER); + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); + LOCAL_DATE_FORMATTER_MAP.set(dateTimeFormatter); + return LocalDate.parse(text, dateTimeFormatter); + } + + public static String getOrDefault(Supplier dateFormat, String defaultFormat) { + String format = dateFormat.get(); + if (null == format || "".equals(format)) { + return defaultFormat; + } else { + return format; + } + } + + public static void destroy() { + LOCAL_DATE_FORMATTER_MAP.remove(); + LOCAL_TIME_FORMATTER_MAP.remove(); + LOCAL_DATE_TIME_FORMATTER_MAP.remove(); + } + + public static LocalTime localTime(String text, Neo4jProperty neo4JProperty) { + if (LOCAL_TIME_FORMATTER_MAP.get() != null) { + return LocalTime.parse(text, LOCAL_TIME_FORMATTER_MAP.get()); + } + + String format = getOrDefault(neo4JProperty::getDateFormat, DEFAULT_LOCAL_TIME_FORMATTER); + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); + LOCAL_TIME_FORMATTER_MAP.set(dateTimeFormatter); + return LocalTime.parse(text, dateTimeFormatter); + } + + public static LocalDateTime localDateTime(String text, Neo4jProperty neo4JProperty) { + if (LOCAL_DATE_TIME_FORMATTER_MAP.get() != null){ + return LocalDateTime.parse(text,LOCAL_DATE_TIME_FORMATTER_MAP.get()); + } + String format = getOrDefault(neo4JProperty::getDateFormat, DEFAULT_LOCAL_DATE_TIME_FORMATTER); + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(format); + LOCAL_DATE_TIME_FORMATTER_MAP.set(dateTimeFormatter); + return LocalDateTime.parse(text, dateTimeFormatter); + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/ValueAdapter.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/ValueAdapter.java new file mode 100644 index 0000000000..d0f4044d79 --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/adapter/ValueAdapter.java @@ -0,0 +1,95 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.adapter; + + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; +import com.alibaba.datax.plugin.writer.neo4jwriter.element.PropertyType; +import com.alibaba.fastjson2.JSON; +import org.neo4j.driver.Value; +import org.neo4j.driver.Values; +import org.neo4j.driver.internal.value.NullValue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** + * @author fuyouj + */ +public class ValueAdapter { + + + public static Value column2Value(final Column column, final Neo4jProperty neo4JProperty) { + + String typeStr = neo4JProperty.getType(); + PropertyType type = PropertyType.fromStrIgnoreCase(typeStr); + if (column.asString() == null) { + return NullValue.NULL; + } + + switch (type) { + case NULL: + return NullValue.NULL; + case MAP: + return Values.value(JSON.parseObject(column.asString(), Map.class)); + case BOOLEAN: + return Values.value(column.asBoolean()); + case STRING: + return Values.value(column.asString()); + case INTEGER: + case LONG: + return Values.value(column.asLong()); + case SHORT: + return Values.value(Short.valueOf(column.asString())); + case FLOAT: + case DOUBLE: + return Values.value(column.asDouble()); + case BYTE_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Byte::valueOf)); + case CHAR_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), (s) -> s.charAt(0))); + case BOOLEAN_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Boolean::valueOf)); + case STRING_ARRAY: + case Object_ARRAY: + case LIST: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Function.identity())); + case LONG_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Long::valueOf)); + case INT_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Integer::valueOf)); + case SHORT_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Short::valueOf)); + case DOUBLE_ARRAY: + case FLOAT_ARRAY: + return Values.value(parseArrayType(neo4JProperty, column.asString(), Double::valueOf)); + case LOCAL_DATE: + return Values.value(DateAdapter.localDate(column.asString(), neo4JProperty)); + case LOCAL_TIME: + return Values.value(DateAdapter.localTime(column.asString(), neo4JProperty)); + case LOCAL_DATE_TIME: + return Values.value(DateAdapter.localDateTime(column.asString(), neo4JProperty)); + default: + return Values.value(column.getRawData()); + + } + } + + + private static List parseArrayType(final Neo4jProperty neo4JProperty, + final String strValue, + final Function convertFunc) { + if (null == strValue || "".equals(strValue)) { + return Collections.emptyList(); + } + String split = neo4JProperty.getSplitOrDefault(); + String[] strArr = strValue.split(split); + List ans = new ArrayList<>(); + for (String s : strArr) { + ans.add(convertFunc.apply(s)); + } + return ans; + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java new file mode 100644 index 0000000000..eed3588e2d --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/ConfigConstants.java @@ -0,0 +1,116 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.config; + + +import java.util.List; + +/** + * @author fuyouj + */ +public final class ConfigConstants { + + public static final Long DEFAULT_MAX_TRANSACTION_RETRY_SECONDS = 30L; + + public static final Long DEFAULT_MAX_CONNECTION_SECONDS = 30L; + + + + public static final Option RETRY_TIMES = + Option.builder() + .key("retryTimes") + .defaultValue(3) + .desc("The number of overwrites when an error occurs") + .build(); + + public static final Option RETRY_SLEEP_MILLS = + Option.builder() + .key("retrySleepMills") + .defaultValue(3000L) + .build(); + + /** + * cluster mode please reference + * how to connect cluster mode + */ + public static final Option URI = + Option.builder() + .key("uri") + .noDefaultValue() + .desc("uir of neo4j database") + .build(); + + public static final Option USERNAME = + Option.builder() + .key("username") + .noDefaultValue() + .desc("username for accessing the neo4j database") + .build(); + + public static final Option PASSWORD = + Option.builder() + .key("password") + .noDefaultValue() + .desc("password for accessing the neo4j database") + .build(); + + public static final Option BEARER_TOKEN = + Option.builder() + .key("bearerToken") + .noDefaultValue() + .desc("base64 encoded bearer token of the Neo4j. for Auth.") + .build(); + + public static final Option KERBEROS_TICKET = + Option.builder() + .key("kerberosTicket") + .noDefaultValue() + .desc("base64 encoded kerberos ticket of the Neo4j. for Auth.") + .build(); + + public static final Option DATABASE = + Option.builder() + .key("database") + .noDefaultValue() + .desc("database name.") + .build(); + + public static final Option CYPHER = + Option.builder() + .key("cypher") + .noDefaultValue() + .desc("cypher query.") + .build(); + + public static final Option MAX_TRANSACTION_RETRY_TIME = + Option.builder() + .key("maxTransactionRetryTimeSeconds") + .defaultValue(DEFAULT_MAX_TRANSACTION_RETRY_SECONDS) + .desc("maximum transaction retry time(seconds). transaction fail if exceeded.") + .build(); + public static final Option MAX_CONNECTION_TIMEOUT_SECONDS = + Option.builder() + .key("maxConnectionTimeoutSeconds") + .defaultValue(DEFAULT_MAX_CONNECTION_SECONDS) + .desc("The maximum amount of time to wait for a TCP connection to be established (seconds).") + .build(); + + public static final Option BATCH_DATA_VARIABLE_NAME = + Option.builder() + .key("batchDataVariableName") + .defaultValue("batch") + .desc("in a cypher statement, a variable name that represents a batch of data") + .build(); + + public static final Option> NEO4J_PROPERTIES = + Option.>builder() + .key("properties") + .noDefaultValue() + .desc("neo4j node or relation`s props") + .build(); + + public static final Option BATCH_SIZE = + Option.builder(). + key("batchSize") + .defaultValue(1000) + .desc("max batch size") + .build(); +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jProperty.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jProperty.java new file mode 100644 index 0000000000..5c5867b3b0 --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Neo4jProperty.java @@ -0,0 +1,82 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.config; + +/** + * 由于dataX并不能传输数据的元数据,所以只能在writer端定义每列数据的名字 + * datax does not support data metadata, + * only the name of each column of data can be defined on neo4j writer + * + * @author fuyouj + */ +public class Neo4jProperty { + public static final String DEFAULT_SPLIT = ","; + + /** + * name of neo4j field + */ + private String name; + + /** + * neo4j type + * reference by org.neo4j.driver.Values + */ + private String type; + + /** + * for date + */ + private String dateFormat; + + /** + * for array type + */ + private String split; + + public Neo4jProperty() { + } + + public Neo4jProperty(String name, String type, String format, String split) { + this.name = name; + this.type = type; + this.dateFormat = format; + this.split = split; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getDateFormat() { + return dateFormat; + } + + public void setDateFormat(String dateFormat) { + this.dateFormat = dateFormat; + } + + public String getSplit() { + return getSplitOrDefault(); + } + + public String getSplitOrDefault() { + if (split == null || "".equals(split)) { + return DEFAULT_SPLIT; + } + return split; + } + + public void setSplit(String split) { + this.split = split; + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Option.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Option.java new file mode 100644 index 0000000000..f22bd2054e --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/config/Option.java @@ -0,0 +1,65 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.config; + + +public class Option { + + public static class Builder { + private String key; + private String desc; + + private T defaultValue; + + public Builder key(String key) { + this.key = key; + return this; + } + + public Builder desc(String desc) { + this.desc = desc; + return this; + } + + public Builder defaultValue(T defaultValue) { + this.defaultValue = defaultValue; + return this; + } + + public Builder noDefaultValue() { + return this; + } + + public Option build() { + return new Option<>(this.key, this.desc, this.defaultValue); + } + } + + private final String key; + private final String desc; + + private final T defaultValue; + + public Option(String key, String desc, T defaultValue) { + this.key = key; + this.desc = desc; + this.defaultValue = defaultValue; + } + + public static Builder builder(){ + return new Builder<>(); + } + + public String getKey() { + return key; + } + + public String getDesc() { + return desc; + } + + public T getDefaultValue() { + if (defaultValue == null){ + throw new IllegalStateException(key + ":defaultValue is null"); + } + return defaultValue; + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/PropertyType.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/PropertyType.java new file mode 100644 index 0000000000..b3446de730 --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/element/PropertyType.java @@ -0,0 +1,40 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.element; + +import java.util.Arrays; + +/** + * @see org.neo4j.driver.Values + * @author fuyouj + */ +public enum PropertyType { + NULL, + BOOLEAN, + STRING, + LONG, + SHORT, + INTEGER, + DOUBLE, + FLOAT, + LOCAL_DATE, + LOCAL_TIME, + LOCAL_DATE_TIME, + LIST, + MAP, + CHAR_ARRAY, + BYTE_ARRAY, + BOOLEAN_ARRAY, + STRING_ARRAY, + LONG_ARRAY, + INT_ARRAY, + SHORT_ARRAY, + DOUBLE_ARRAY, + FLOAT_ARRAY, + Object_ARRAY; + + public static PropertyType fromStrIgnoreCase(String typeStr) { + return Arrays.stream(PropertyType.values()) + .filter(e -> e.name().equalsIgnoreCase(typeStr)) + .findFirst() + .orElse(PropertyType.STRING); + } +} diff --git a/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/exception/Neo4jErrorCode.java b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/exception/Neo4jErrorCode.java new file mode 100644 index 0000000000..d7df79ffa3 --- /dev/null +++ b/neo4jwriter/src/main/java/com/alibaba/datax/plugin/writer/neo4jwriter/exception/Neo4jErrorCode.java @@ -0,0 +1,37 @@ +package com.alibaba.datax.plugin.writer.neo4jwriter.exception; + +import com.alibaba.datax.common.spi.ErrorCode; + + +public enum Neo4jErrorCode implements ErrorCode { + + /** + * Invalid configuration + * 配置校验异常 + */ + CONFIG_INVALID("NEO4J_ERROR_01","invalid configuration"), + /** + * database error + * 在执行写入到数据库时抛出的异常,可能是权限异常,也可能是连接超时,或者是配置到了从节点。 + * 如果是更新操作,还会有死锁异常。具体原因根据报错信息确定,但是这与dataX无关。 + */ + DATABASE_ERROR("NEO4J_ERROR_02","database error"); + + private final String code; + private final String description; + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } + + Neo4jErrorCode(String code, String description) { + this.code = code; + this.description = description; + } +} diff --git a/neo4jwriter/src/main/resources/plugin.json b/neo4jwriter/src/main/resources/plugin.json new file mode 100644 index 0000000000..3c8878f638 --- /dev/null +++ b/neo4jwriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "neo4jWriter", + "class": "com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jWriter", + "description": "dataX neo4j 写插件", + "developer": "付有杰" +} \ No newline at end of file diff --git a/neo4jwriter/src/main/resources/plugin_job_template.json b/neo4jwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000000..45bf3c88c2 --- /dev/null +++ b/neo4jwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,42 @@ +{ + "uri": "neo4j://localhost:7687", + "username": "neo4j", + "password": "Test@12343", + "database": "neo4j", + "cypher": "unwind $batch as row create(p:Person) set p.pbool = row.pbool,p.pstring = row.pstring,p.plong = row.plong,p.pshort = row.pshort,p.pdouble=row.pdouble,p.pstringarr=row.pstringarr,p.plocaldate=row.plocaldate", + "batchDataVariableName": "batch", + "batchSize": "33", + "properties": [ + { + "name": "pbool", + //type 忽略大小写 + "type": "BOOLEAN" + }, + { + "name": "pstring", + "type": "STRING" + }, + { + "name": "plong", + "type": "LONG" + }, + { + "name": "pshort", + "type": "SHORT" + }, + { + "name": "pdouble", + "type": "DOUBLE" + }, + { + "name": "pstringarr", + "type": "STRING_ARRAY", + "split": "," + }, + { + "name": "plocaldate", + "type": "LOCAL_DATE", + "dateFormat": "yyyy-MM-dd" + } + ] +} \ No newline at end of file diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java new file mode 100644 index 0000000000..53c9235e99 --- /dev/null +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/Neo4jWriterTest.java @@ -0,0 +1,257 @@ +package com.alibaba.datax.plugin.writer; + + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.mock.MockRecord; +import com.alibaba.datax.plugin.writer.mock.MockUtil; +import com.alibaba.datax.plugin.writer.neo4jwriter.Neo4jClient; +import com.alibaba.datax.plugin.writer.neo4jwriter.config.Neo4jProperty; +import com.alibaba.datax.plugin.writer.neo4jwriter.element.PropertyType; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.neo4j.driver.*; +import org.neo4j.driver.types.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.File; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class Neo4jWriterTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jWriterTest.class); + private static final int MOCK_NUM = 100; + private static final String CONTAINER_IMAGE = "neo4j:5.9.0"; + + private static final String CONTAINER_HOST = "neo4j-host"; + private static final int HTTP_PORT = 7474; + private static final int BOLT_PORT = 7687; + private static final String CONTAINER_NEO4J_USERNAME = "neo4j"; + private static final String CONTAINER_NEO4J_PASSWORD = "Test@12343"; + private static final URI CONTAINER_URI = URI.create("neo4j://localhost:" + BOLT_PORT); + + protected static final Network NETWORK = Network.newNetwork(); + + private GenericContainer container; + private Driver neo4jDriver; + private Session neo4jSession; + + @Before + public void init() { + DockerImageName imageName = DockerImageName.parse(CONTAINER_IMAGE); + container = + new GenericContainer<>(imageName) + .withNetwork(NETWORK) + .withNetworkAliases(CONTAINER_HOST) + .withExposedPorts(HTTP_PORT, BOLT_PORT) + .withEnv( + "NEO4J_AUTH", + CONTAINER_NEO4J_USERNAME + "/" + CONTAINER_NEO4J_PASSWORD) + .withEnv("apoc.export.file.enabled", "true") + .withEnv("apoc.import.file.enabled", "true") + .withEnv("apoc.import.file.use_neo4j_config", "true") + .withEnv("NEO4J_PLUGINS", "[\"apoc\"]") + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(CONTAINER_IMAGE))); + container.setPortBindings( + Arrays.asList( + String.format("%s:%s", HTTP_PORT, HTTP_PORT), + String.format("%s:%s", BOLT_PORT, BOLT_PORT))); + Startables.deepStart(Stream.of(container)).join(); + LOGGER.info("container started"); + Awaitility.given() + .ignoreExceptions() + .await() + .atMost(30, TimeUnit.SECONDS) + .untilAsserted(this::initConnection); + } + + @Test + public void testCreateNodeAllTypeField() { + final Result checkExists = neo4jSession.run("MATCH (p:Person) RETURN p limit 1"); + if (checkExists.hasNext()) { + neo4jSession.run("MATCH (p:Person) delete p"); + } + + Configuration configuration = Configuration.from(new File("src/test/resources/allTypeFieldNode.json")); + Neo4jClient neo4jClient = Neo4jClient.build(configuration, null); + + neo4jClient.init(); + for (int i = 0; i < MOCK_NUM; i++) { + neo4jClient.tryWrite(mockAllTypeFieldTestNode(neo4jClient.getNeo4jFields())); + } + neo4jClient.destroy(); + + + Result result = neo4jSession.run("MATCH (p:Person) return p"); + // nodes + assertTrue(result.hasNext()); + int cnt = 0; + while (result.hasNext()) { + org.neo4j.driver.Record record = result.next(); + record.get("p").get("pbool").asBoolean(); + record.get("p").get("pstring").asString(); + record.get("p").get("plong").asLong(); + record.get("p").get("pshort").asInt(); + record.get("p").get("pdouble").asDouble(); + List list = (List) record.get("p").get("pstringarr").asObject(); + record.get("p").get("plocaldate").asLocalDate(); + cnt++; + + } + assertEquals(cnt, MOCK_NUM); + } + + + /** + * 创建关系 必须先有节点 + * 所以先创建节点再模拟关系 + */ + @Test + public void testCreateRelation() { + final Result checkExists = neo4jSession.run("MATCH (p1:Person)-[r:LINK]->(p1:Person) return r limit 1"); + if (checkExists.hasNext()) { + neo4jSession.run("MATCH (p1:Person)-[r:LINK]->(p1:Person) delete r,p1,p2"); + } + + String createNodeCql = "create (p:Person) set p.id = '%s'"; + Configuration configuration = Configuration.from(new File("src/test/resources/relationship.json")); + + Neo4jClient neo4jClient = Neo4jClient.build(configuration, null); + neo4jClient.init(); + //创建节点为后续写关系做准备 + //Create nodes to prepare for subsequent write relationships + for (int i = 0; i < MOCK_NUM; i++) { + neo4jSession.run(String.format(createNodeCql, i + "start")); + neo4jSession.run(String.format(createNodeCql, i + "end")); + Record record = new MockRecord(); + record.addColumn(new StringColumn(i + "start")); + record.addColumn(new StringColumn(i + "end")); + neo4jClient.tryWrite(record); + + } + neo4jClient.destroy(); + + Result result = neo4jSession.run("MATCH (start:Person)-[r:LINK]->(end:Person) return r,start,end"); + // relationships + assertTrue(result.hasNext()); + int cnt = 0; + while (result.hasNext()) { + org.neo4j.driver.Record record = result.next(); + + Node startNode = record.get("start").asNode(); + assertTrue(startNode.hasLabel("Person")); + assertTrue(startNode.asMap().containsKey("id")); + + Node endNode = record.get("end").asNode(); + assertTrue(startNode.hasLabel("Person")); + assertTrue(endNode.asMap().containsKey("id")); + + + String name = record.get("r").type().name(); + assertEquals("RELATIONSHIP", name); + cnt++; + } + assertEquals(cnt, MOCK_NUM); + } + + /** + * neo4j中,Label和关系类型,想动态的写,需要借助于apoc函数 + */ + @Test + public void testUseApocCreateDynamicLabel() { + List dynamicLabel = new ArrayList<>(); + for (int i = 0; i < MOCK_NUM; i++) { + dynamicLabel.add("Label" + i); + } + //删除原有数据 + //remove test data if exist + //这种占位符的方式不支持批量动态写,当然可以使用union拼接,但是性能不好 + String query = "match (p:%s) return p"; + String delete = "match (p:%s) delete p"; + for (String label : dynamicLabel) { + Result result = neo4jSession.run(String.format(query, label)); + if (result.hasNext()) { + neo4jSession.run(String.format(delete, label)); + } + } + + Configuration configuration = Configuration.from(new File("src/test/resources/dynamicLabel.json")); + Neo4jClient neo4jClient = Neo4jClient.build(configuration, null); + + neo4jClient.init(); + for (int i = 0; i < dynamicLabel.size(); i++) { + Record record = new MockRecord(); + record.addColumn(new StringColumn(dynamicLabel.get(i))); + record.addColumn(new StringColumn(String.valueOf(i))); + neo4jClient.tryWrite(record); + } + neo4jClient.destroy(); + + //校验脚本的批量写入是否正确 + int cnt = 0; + for (int i = 0; i < dynamicLabel.size(); i++) { + String label = dynamicLabel.get(i); + Result result = neo4jSession.run(String.format(query, label)); + while (result.hasNext()) { + org.neo4j.driver.Record record = result.next(); + Node node = record.get("p").asNode(); + assertTrue(node.hasLabel(label)); + assertEquals(node.asMap().get("id"), i + ""); + cnt++; + } + } + assertEquals(cnt, MOCK_NUM); + + } + + + private Record mockAllTypeFieldTestNode(List neo4JProperties) { + Record mock = new MockRecord(); + for (Neo4jProperty field : neo4JProperties) { + mock.addColumn(MockUtil.mockColumnByType(PropertyType.fromStrIgnoreCase(field.getType()))); + } + return mock; + } + + @After + public void destroy() { + if (neo4jSession != null) { + neo4jSession.close(); + } + if (neo4jDriver != null) { + neo4jDriver.close(); + } + if (container != null) { + container.close(); + } + } + + private void initConnection() { + neo4jDriver = + GraphDatabase.driver( + CONTAINER_URI, + AuthTokens.basic(CONTAINER_NEO4J_USERNAME, CONTAINER_NEO4J_PASSWORD)); + neo4jSession = neo4jDriver.session(SessionConfig.forDatabase("neo4j")); + } +} diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockRecord.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockRecord.java new file mode 100644 index 0000000000..77d3f5005c --- /dev/null +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockRecord.java @@ -0,0 +1,104 @@ +package com.alibaba.datax.plugin.writer.mock; + + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson2.JSON; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class MockRecord implements Record { + private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16; + + private List columns; + + private int byteSize; + + + private Map meta; + + public MockRecord() { + this.columns = new ArrayList<>(RECORD_AVERGAE_COLUMN_NUMBER); + } + + @Override + public void addColumn(Column column) { + columns.add(column); + incrByteSize(column); + } + + @Override + public Column getColumn(int i) { + if (i < 0 || i >= columns.size()) { + return null; + } + return columns.get(i); + } + + @Override + public void setColumn(int i, final Column column) { + if (i < 0) { + throw new IllegalArgumentException("不能给index小于0的column设置值"); + } + + if (i >= columns.size()) { + expandCapacity(i + 1); + } + + decrByteSize(getColumn(i)); + this.columns.set(i, column); + incrByteSize(getColumn(i)); + } + + @Override + public String toString() { + Map json = new HashMap(); + json.put("size", this.getColumnNumber()); + json.put("data", this.columns); + return JSON.toJSONString(json); + } + + @Override + public int getColumnNumber() { + return this.columns.size(); + } + + @Override + public int getByteSize() { + return byteSize; + } + + public int getMemorySize() { + throw new UnsupportedOperationException(); + } + + @Override + public void setMeta(Map meta) { + + } + + @Override + public Map getMeta() { + return null; + } + + private void decrByteSize(final Column column) { + } + + private void incrByteSize(final Column column) { + } + + private void expandCapacity(int totalSize) { + if (totalSize <= 0) { + return; + } + + int needToExpand = totalSize - columns.size(); + while (needToExpand-- > 0) { + this.columns.add(null); + } + } +} diff --git a/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockUtil.java b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockUtil.java new file mode 100644 index 0000000000..8f05f1e86e --- /dev/null +++ b/neo4jwriter/src/test/java/com/alibaba/datax/plugin/writer/mock/MockUtil.java @@ -0,0 +1,50 @@ +package com.alibaba.datax.plugin.writer.mock; + + +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.plugin.writer.neo4jwriter.element.PropertyType; +import com.alibaba.fastjson2.JSON; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class MockUtil { + + public static Column mockColumnByType(PropertyType type) { + Random random = new Random(); + switch (type) { + case SHORT: + return new StringColumn("1"); + case BOOLEAN: + return new BoolColumn(random.nextInt() % 2 == 0); + case INTEGER: + case LONG: + return new LongColumn(random.nextInt(Integer.MAX_VALUE)); + case FLOAT: + case DOUBLE: + return new DoubleColumn(random.nextDouble()); + case NULL: + return null; + case BYTE_ARRAY: + return new BytesColumn(new byte[]{(byte) (random.nextInt() % 2)}); + case LOCAL_DATE: + return new StringColumn(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + case MAP: + return new StringColumn(JSON.toJSONString(propmap())); + case STRING_ARRAY: + return new StringColumn("[1,1,1,1,1,1,1]"); + default: + return new StringColumn("randomStr" + random.nextInt(Integer.MAX_VALUE)); + } + } + + public static Map propmap() { + Map prop = new HashMap<>(); + prop.put("name", "neo4jWriter"); + prop.put("age", "1"); + return prop; + } +} diff --git a/neo4jwriter/src/test/resources/allTypeFieldNode.json b/neo4jwriter/src/test/resources/allTypeFieldNode.json new file mode 100644 index 0000000000..6d504d7930 --- /dev/null +++ b/neo4jwriter/src/test/resources/allTypeFieldNode.json @@ -0,0 +1,41 @@ +{ + "uri": "neo4j://localhost:7687", + "username":"neo4j", + "password":"Test@12343", + "database":"neo4j", + "cypher": "unwind $batch as row create(p:Person) set p.pbool = row.pbool,p.pstring = row.pstring,p.plong = row.plong,p.pshort = row.pshort,p.pdouble=row.pdouble,p.pstringarr=row.pstringarr,p.plocaldate=row.plocaldate", + "batchDataVariableName": "batch", + "batchSize": "33", + "properties": [ + { + "name": "pbool", + "type": "BOOLEAN" + }, + { + "name": "pstring", + "type": "STRING" + }, + { + "name": "plong", + "type": "LONG" + }, + { + "name": "pshort", + "type": "SHORT" + }, + { + "name": "pdouble", + "type": "DOUBLE" + }, + { + "name": "pstringarr", + "type": "STRING_ARRAY", + "split": "," + }, + { + "name": "plocaldate", + "type": "LOCAL_DATE", + "dateFormat": "yyyy-MM-dd" + } + ] +} \ No newline at end of file diff --git a/neo4jwriter/src/test/resources/dynamicLabel.json b/neo4jwriter/src/test/resources/dynamicLabel.json new file mode 100644 index 0000000000..05ed3e7668 --- /dev/null +++ b/neo4jwriter/src/test/resources/dynamicLabel.json @@ -0,0 +1,19 @@ +{ + "uri": "bolt://localhost:7687", + "username":"neo4j", + "password":"Test@12343", + "database":"neo4j", + "cypher": "unwind $batch as row CALL apoc.cypher.doIt( 'create (n:`' + row.Label + '`{id:$id})' ,{id: row.id} ) YIELD value RETURN 1 ", + "batchDataVariableName": "batch", + "batchSize": "33", + "properties": [ + { + "name": "Label", + "type": "string" + }, + { + "name": "id", + "type": "STRING" + } + ] +} \ No newline at end of file diff --git a/neo4jwriter/src/test/resources/relationship.json b/neo4jwriter/src/test/resources/relationship.json new file mode 100644 index 0000000000..cb9bbdf444 --- /dev/null +++ b/neo4jwriter/src/test/resources/relationship.json @@ -0,0 +1,19 @@ +{ + "uri": "neo4j://localhost:7687", + "username":"neo4j", + "password":"Test@12343", + "database":"neo4j", + "cypher": "unwind $batch as row match(p1:Person) where p1.id = row.startNodeId match(p2:Person) where p2.id = row.endNodeId create (p1)-[:LINK]->(p2)", + "batchDataVariableName": "batch", + "batchSize": "33", + "properties": [ + { + "name": "startNodeId", + "type": "STRING" + }, + { + "name": "endNodeId", + "type": "STRING" + } + ] +} \ No newline at end of file diff --git a/package.xml b/package.xml index 4c1aff04be..0654a39186 100755 --- a/package.xml +++ b/package.xml @@ -497,5 +497,12 @@ datax + + neo4jwriter/target/datax/ + + **/*.* + + datax + diff --git a/pom.xml b/pom.xml index 957c60ee43..bbb128be43 100644 --- a/pom.xml +++ b/pom.xml @@ -123,6 +123,7 @@ doriswriter selectdbwriter adbmysqlwriter + neo4jwriter plugin-rdbms-util