-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed #16 Added option to configure mapping of fields in SinkRecord to CQL columns #18
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -84,7 +84,8 @@ All the others (BLOB, INET, UUID, TIMEUUID, LIST, SET, MAP, CUSTOM, UDT, TUPLE, | |
|
||
## CassandraSink | ||
It stores Kafka SinkRecord in Cassandra tables. Currently, we only support STRUCT type in the SinkRecord. | ||
The STRUCT can have multiple fields with primitive fieldtypes. We assume one-to-one mapping between the column names in the Cassandra sink table and the field names. | ||
The STRUCT can have multiple fields with primitive fieldtypes. | ||
By default, we assume one-to-one mapping between the column names in the Cassandra sink table and the field names. | ||
|
||
Say, the SinkRecords has the following STRUCT value | ||
``` | ||
|
@@ -97,6 +98,23 @@ Say, the SinkRecords has the following STRUCT value | |
|
||
Then the Cassandra table should have the columns - id, username, text | ||
|
||
We also support specifying the field name mapping to column names, using the property `cassandra.sink.field.mapping` | ||
Say, the SinkRecords has the following STRUCT value | ||
``` | ||
{ | ||
'id': 1, | ||
'user': { | ||
'id': 123, | ||
'name': 'Foo', | ||
'email': '[email protected]' | ||
}, | ||
'text': 'This is my first tweet' | ||
} | ||
``` | ||
and the `cassandra.sink.field.mapping` has the value `{'id': 'id', 'user': {'id': 'uid', 'name': 'username'}, 'text': 'tweet_text'}` | ||
Then the Cassandra table should have the columns - id, uid, username, tweet_text. | ||
Note that since we did not specify any mapping for 'user.email', it is ignored and not inserted in the Cassandra Sink table. | ||
|
||
Note: The library does not create the Cassandra tables - users are expected to create those before starting the sink | ||
|
||
## Configuration | ||
|
@@ -132,6 +150,7 @@ Refer `examples/config` for sample configuration files | |
|-------- |----------------------------|-----------------------| | ||
| cassandra.sink.route.\<topic_name\> | The table to write the SinkRecords to, \<keyspace\>.\<tableName\> | | | ||
| cassandra.sink.consistency | The consistency level for writes to Cassandra. | LOCAL_QUORUM | | ||
| cassandra.sink.field.mapping | The JSON String mapping field names to column names. | | | ||
|
||
|
||
## Building from Source | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,16 +19,17 @@ | |
package com.tuplejump.kafka.connect.cassandra | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.util.parsing.json.JSONObject | ||
import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct} | ||
import org.apache.kafka.connect.sink.{SinkRecord, SinkTaskContext} | ||
|
||
class CassandraSinkTaskSpec extends AbstractFlatSpec { | ||
|
||
val topicName = "test_kv_topic" | ||
val tableName = "test.kv" | ||
val config = sinkProperties(Map(topicName -> tableName)) | ||
|
||
it should "start sink task" in { | ||
val topicName = "test_kv_topic" | ||
val tableName = "test.kv" | ||
val config = sinkProperties(Map(topicName -> tableName)) | ||
|
||
val sinkTask = new CassandraSinkTask() | ||
val mockContext = mock[SinkTaskContext] | ||
|
||
|
@@ -38,6 +39,10 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec { | |
} | ||
|
||
it should "save records in cassandra" in { | ||
val topicName = "test_kv_topic" | ||
val tableName = "test.kv" | ||
val config = sinkProperties(Map(topicName -> tableName)) | ||
|
||
val sinkTask = new CassandraSinkTask() | ||
val mockContext = mock[SinkTaskContext] | ||
|
||
|
@@ -64,5 +69,76 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec { | |
rowCount should be(2) | ||
cc.shutdown() | ||
} | ||
|
||
|
||
it should "save records in cassandra with custom field mapping" in { | ||
val topicName = "test_fieldmap_topic" | ||
val tableName = "test.fieldmap" | ||
val config = sinkProperties(Map(topicName -> tableName)) | ||
|
||
val sinkTask = new CassandraSinkTask() | ||
val mockContext = mock[SinkTaskContext] | ||
|
||
val fieldMapping: JSONObject = JSONObject(Map( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't the type of fieldMapping be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
"key" -> "new_key", | ||
"value" -> "new_value", | ||
"nvalue" -> JSONObject(Map( | ||
"blah1" -> "new_nested", | ||
"blah2" -> JSONObject(Map( | ||
"blah2" -> "new_dnested" | ||
)) | ||
)) | ||
)) | ||
|
||
sinkTask.initialize(mockContext) | ||
sinkTask.start((config + ("cassandra.sink.field.mapping" -> fieldMapping.toString())).asJava) | ||
|
||
val doubleNestedSchema = SchemaBuilder.struct.name("dnestedSchema").version(1) | ||
.field("blah1", Schema.STRING_SCHEMA) | ||
.field("blah2", Schema.STRING_SCHEMA).build | ||
val nestedSchema = SchemaBuilder.struct.name("nestedSchema").version(1) | ||
.field("blah1", Schema.STRING_SCHEMA) | ||
.field("blah2", doubleNestedSchema).build | ||
val valueSchema = SchemaBuilder.struct.name("record").version(1) | ||
.field("key", Schema.STRING_SCHEMA) | ||
.field("value", Schema.INT32_SCHEMA) | ||
.field("nvalue", nestedSchema).build | ||
|
||
val dnestedValue1 = new Struct(doubleNestedSchema) | ||
.put("blah1", "dnes_blah1_1") | ||
.put("blah2", "dnes_blah2_1") | ||
val nestedValue1 = new Struct(nestedSchema) | ||
.put("blah1", "nes_blah1_1") | ||
.put("blah2", dnestedValue1) | ||
val value1 = new Struct(valueSchema) | ||
.put("key", "pqr") | ||
.put("value", 15) | ||
.put("nvalue", nestedValue1) | ||
|
||
val dnestedValue2 = new Struct(doubleNestedSchema) | ||
.put("blah1", "dnes_blah1_2") | ||
.put("blah2", "dnes_blah2_2") | ||
val nestedValue2 = new Struct(nestedSchema) | ||
.put("blah1", "nes_blah1_2") | ||
.put("blah2", dnestedValue2) | ||
val value2 = new Struct(valueSchema) | ||
.put("key", "abc") | ||
.put("value", 17) | ||
.put("nvalue", nestedValue2) | ||
|
||
val record1 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value1, 0) | ||
val record2 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value2, 0) | ||
|
||
sinkTask.put(List(record1, record2).asJavaCollection) | ||
|
||
sinkTask.stop() | ||
|
||
val cc = CassandraCluster.local | ||
val session = cc.session | ||
val result = session.execute(s"select count(1) from $tableName").one() | ||
val rowCount = result.getLong(0) | ||
rowCount should be(2) | ||
cc.shutdown() | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,10 @@ package com.tuplejump.kafka.connect.cassandra | |
|
||
import scala.collection.immutable | ||
import scala.util.control.NonFatal | ||
import scala.util.parsing.json.JSON | ||
import org.apache.kafka.common.config.ConfigException | ||
import org.apache.kafka.connect.connector.Task | ||
import org.apache.kafka.connect.errors.DataException | ||
import com.datastax.driver.core.{TableMetadata, ConsistencyLevel} | ||
import InternalConfig._ | ||
|
||
|
@@ -123,6 +125,9 @@ object TaskConfig { | |
final val SinkConsistency: Key = "cassandra.sink.consistency" | ||
final val DefaultSinkConsistency = ConsistencyLevel.LOCAL_QUORUM | ||
|
||
final val FieldMapping: Key = "cassandra.sink.field.mapping" | ||
final val DefaultFieldMapping = Map.empty[String, String] | ||
|
||
/* **** Task config **** */ | ||
final val TaskParallelismStrategy: Key = "cassandra.task.parallelism" | ||
|
||
|
@@ -156,6 +161,10 @@ private[cassandra] object InternalConfig { | |
def toInt(a: String): Int = a.toInt | ||
def toLong(a: String): Long = a.toLong | ||
def toConsistency(a: String): ConsistencyLevel = ConsistencyLevel.valueOf(a) | ||
def toMap(a: String): Map[String, Any] = JSON.parseFull(a) collect { | ||
case data: Map[_, _] => data.asInstanceOf[Map[String, Any]] | ||
} getOrElse(throw new DataException(s"Field mapping type for '$a' is not supported.")) | ||
|
||
|
||
/** A Cassandra `keyspace.table` to Kafka topic mapping. | ||
* | ||
|
@@ -319,15 +328,21 @@ private[cassandra] object InternalConfig { | |
sealed trait ClusterQueryOptions | ||
|
||
/** Settings related for individual queries, can be set per keyspace.table. */ | ||
final case class WriteOptions(consistency: ConsistencyLevel) extends ClusterQueryOptions | ||
final case class WriteOptions(consistency: ConsistencyLevel, | ||
fieldMapping: Map[String, Any]) extends ClusterQueryOptions | ||
|
||
object WriteOptions { | ||
|
||
def apply(config: Map[String,String]): WriteOptions = | ||
WriteOptions(config.valueOr[ConsistencyLevel]( | ||
SinkConsistency, toConsistency, DefaultSourceConsistency)) | ||
def apply(config: Map[String, String]): WriteOptions = { | ||
WriteOptions( | ||
consistency = config.valueOr[ConsistencyLevel]( | ||
SinkConsistency, toConsistency, DefaultSourceConsistency), | ||
fieldMapping = config.valueOr[Map[String, Any]]( | ||
FieldMapping, toMap, DefaultFieldMapping | ||
) | ||
) | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
/** Settings related for individual queries, can be set per keyspace.table. */ | ||
final case class ReadOptions(splitSize: Int, | ||
fetchSize: Int, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,8 @@ | |
|
||
package com.tuplejump.kafka.connect | ||
|
||
import org.apache.kafka.connect.data.Field | ||
|
||
/** Common package operations. */ | ||
package object cassandra { | ||
import java.util.{List => JList, Map => JMap, Date => JDate} | ||
|
@@ -69,29 +71,87 @@ package object cassandra { | |
|
||
implicit class SinkRecordOps(record: SinkRecord) { | ||
|
||
def as(namespace: String): SinkQuery = { | ||
def as(namespace: String, fieldMapping: Map[String, Any]): SinkQuery = { | ||
val colNamesVsValues: Map[String, String] = { | ||
if (fieldMapping.isEmpty) { | ||
toCqlData | ||
} else { | ||
toCqlData(fieldMapping) | ||
} | ||
} | ||
SinkQuery(namespace, colNamesVsValues) | ||
} | ||
|
||
def toCqlData(): (Map[String, String]) = { | ||
val schema = record.valueSchema | ||
val columnNames = schema.asColumnNames | ||
val columnValues = schema.`type`() match { | ||
schema.`type`() match { | ||
case STRUCT => | ||
val struct: Struct = record.value.asInstanceOf[Struct] | ||
columnNames.map(convert(schema, struct, _)).mkString(",") | ||
case other => throw new DataException( | ||
s"Unable to create insert statement with unsupported value schema type $other.") | ||
schema.fields.asScala.map { field => | ||
field.name -> convert(schema, record.value.asInstanceOf[Struct], field) | ||
}.toMap | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just
|
||
case other => | ||
throw new DataException( | ||
s"Unable to create insert statement with unsupported value schema type $other.") | ||
} | ||
} | ||
|
||
def toCqlData(fieldMapping: Map[String, Any]): Map[String, String] = { | ||
record.valueSchema.`type`() match { | ||
case STRUCT => | ||
toColNamesVsValues(record.value.asInstanceOf[Struct], fieldMapping) | ||
case other => | ||
throw new DataException( | ||
s"Unable to create insert statement with unsupported value schema type $other.") | ||
} | ||
} | ||
|
||
// scalastyle:off | ||
private def toColNamesVsValues(struct: Struct, fieldMapping: Map[String, Any], | ||
colNameVsValues: Map[String, String] = Map.empty): Map[String, String] = { | ||
lazy val exception = new DataException(s"Mismatch between fieldMapping and Schema") | ||
var result: Map[String, String] = colNameVsValues | ||
struct.schema.fields.asScala.foreach { field => | ||
val fieldMappingValue = fieldMapping.get(field.name) | ||
field.schema.`type`() match { | ||
case STRUCT => | ||
fieldMappingValue match { | ||
case Some(value) => | ||
value match { | ||
case newMap: Map[_, _] => | ||
result = toColNamesVsValues(struct.get(field).asInstanceOf[Struct], | ||
newMap.asInstanceOf[Map[String, Any]], result) | ||
case _ => | ||
throw exception | ||
} | ||
case None => | ||
} | ||
case _ => | ||
fieldMappingValue match { | ||
case Some(value) => | ||
value match { | ||
case strValue: String => | ||
result += (strValue -> convert(field.schema, struct, field)) | ||
case _ => | ||
throw exception | ||
} | ||
case None => | ||
} | ||
} | ||
} | ||
SinkQuery(namespace, columnNames, columnValues) | ||
result | ||
} | ||
// scalastyle:on | ||
|
||
/* TODO support all types. */ | ||
def convert(schema: Schema, result: Struct, col: String): AnyRef = | ||
schema.field(col).schema match { | ||
def convert(schema: Schema, result: Struct, field: Field): String = | ||
field.schema match { | ||
case x if x.`type`() == Schema.STRING_SCHEMA.`type`() => | ||
s"'${result.get(col).toString}'" | ||
s"'${result.get(field).toString}'" | ||
case x if x.name() == Timestamp.LOGICAL_NAME => | ||
val time = Timestamp.fromLogical(x, result.get(col).asInstanceOf[JDate]) | ||
val time = Timestamp.fromLogical(x, result.get(field).asInstanceOf[JDate]) | ||
s"$time" | ||
case y => | ||
result.get(col) | ||
String.valueOf(result.get(field)) | ||
} | ||
|
||
def asColumnNames: List[ColumnName] = | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using this for JSON parsing. It comes with scala-core for 2.10.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about when using scala 2.11, the norm? and 2.12 coming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it comes with 2.10 or less, I added this dependency only for scala versions >= 11.