Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed #16 Added option to configure mapping of fields in SinkRecord to CQL columns #18

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ All the others (BLOB, INET, UUID, TIMEUUID, LIST, SET, MAP, CUSTOM, UDT, TUPLE,

## CassandraSink
It stores Kafka SinkRecord in Cassandra tables. Currently, we only support STRUCT type in the SinkRecord.
The STRUCT can have multiple fields with primitive fieldtypes. We assume one-to-one mapping between the column names in the Cassandra sink table and the field names.
The STRUCT can have multiple fields with primitive fieldtypes.
By default, we assume one-to-one mapping between the column names in the Cassandra sink table and the field names.

Say, the SinkRecords has the following STRUCT value
```
Expand All @@ -97,6 +98,23 @@ Say, the SinkRecords has the following STRUCT value

Then the Cassandra table should have the columns - id, username, text

We also support specifying the field name mapping to column names, using the property `cassandra.sink.field.mapping`
Say, the SinkRecords has the following STRUCT value
```
{
'id': 1,
'user': {
'id': 123,
'name': 'Foo',
'email': '[email protected]'
},
'text': 'This is my first tweet'
}
```
and the `cassandra.sink.field.mapping` has the value `{'id': 'id', 'user': {'id': 'uid', 'name': 'username'}, 'text': 'tweet_text'}`
Then the Cassandra table should have the columns - id, uid, username, tweet_text.
Note that since we did not specify any mapping for 'user.email', it is ignored and not inserted in the Cassandra Sink table.

Note: The library does not create the Cassandra tables - users are expected to create those before starting the sink

## Configuration
Expand Down Expand Up @@ -132,6 +150,7 @@ Refer `examples/config` for sample configuration files
|-------- |----------------------------|-----------------------|
| cassandra.sink.route.\<topic_name\> | The table to write the SinkRecords to, \<keyspace\>.\<tableName\> | |
| cassandra.sink.consistency | The consistency level for writes to Cassandra. | LOCAL_QUORUM |
| cassandra.sink.field.mapping | The JSON String mapping field names to column names. | |


## Building from Source
Expand Down
18 changes: 10 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ libraryDependencies ++= Seq(
"org.joda" % "joda-convert" % "1.8.1",
"org.scalatest" %% "scalatest" % "2.2.6" % "test,it",
"org.mockito" % "mockito-core" % "2.0.34-beta" % "test,it",
"ch.qos.logback" % "logback-classic" % "1.1.3" % "test,it",
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, minor)) if minor < 11 =>
"org.slf4j" % "slf4j-api" % "1.7.13"
case _ =>
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0"
}
)
"ch.qos.logback" % "logback-classic" % "1.1.3" % "test,it"
) ++ (CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, minor)) if minor < 11 => Seq(
"org.slf4j" % "slf4j-api" % "1.7.13"
)
case _ => Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this for JSON parsing. It comes with scala-core for 2.10.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about when using scala 2.11, the norm? and 2.12 coming?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it comes with 2.10 or less, I added this dependency only for scala versions >= 11.

)
})

publishMavenStyle := true

Expand Down
7 changes: 7 additions & 0 deletions src/it/resources/setup.cql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ CREATE TABLE IF NOT EXISTS test.kv (
value int,
PRIMARY KEY (key));

CREATE TABLE IF NOT EXISTS test.fieldmap (
new_key text,
new_value int,
new_nested text,
new_dnested text,
PRIMARY KEY (new_key));

CREATE TABLE test.playlists (
id bigint,
song_order int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
package com.tuplejump.kafka.connect.cassandra

import scala.collection.JavaConverters._
import scala.util.parsing.json.JSONObject
import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct}
import org.apache.kafka.connect.sink.{SinkRecord, SinkTaskContext}

class CassandraSinkTaskSpec extends AbstractFlatSpec {

val topicName = "test_kv_topic"
val tableName = "test.kv"
val config = sinkProperties(Map(topicName -> tableName))

it should "start sink task" in {
val topicName = "test_kv_topic"
val tableName = "test.kv"
val config = sinkProperties(Map(topicName -> tableName))

val sinkTask = new CassandraSinkTask()
val mockContext = mock[SinkTaskContext]

Expand All @@ -38,6 +39,10 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec {
}

it should "save records in cassandra" in {
val topicName = "test_kv_topic"
val tableName = "test.kv"
val config = sinkProperties(Map(topicName -> tableName))

val sinkTask = new CassandraSinkTask()
val mockContext = mock[SinkTaskContext]

Expand All @@ -64,5 +69,76 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec {
rowCount should be(2)
cc.shutdown()
}


it should "save records in cassandra with custom field mapping" in {
val topicName = "test_fieldmap_topic"
val tableName = "test.fieldmap"
val config = sinkProperties(Map(topicName -> tableName))

val sinkTask = new CassandraSinkTask()
val mockContext = mock[SinkTaskContext]

val fieldMapping: JSONObject = JSONObject(Map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the type of fieldMapping be Map[String,String] ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CassandraSinkTask.start takes a Properties JMap[String, String] so we are adding the property "field.mapping" with the stringified JSON as the value.

"key" -> "new_key",
"value" -> "new_value",
"nvalue" -> JSONObject(Map(
"blah1" -> "new_nested",
"blah2" -> JSONObject(Map(
"blah2" -> "new_dnested"
))
))
))

sinkTask.initialize(mockContext)
sinkTask.start((config + ("cassandra.sink.field.mapping" -> fieldMapping.toString())).asJava)

val doubleNestedSchema = SchemaBuilder.struct.name("dnestedSchema").version(1)
.field("blah1", Schema.STRING_SCHEMA)
.field("blah2", Schema.STRING_SCHEMA).build
val nestedSchema = SchemaBuilder.struct.name("nestedSchema").version(1)
.field("blah1", Schema.STRING_SCHEMA)
.field("blah2", doubleNestedSchema).build
val valueSchema = SchemaBuilder.struct.name("record").version(1)
.field("key", Schema.STRING_SCHEMA)
.field("value", Schema.INT32_SCHEMA)
.field("nvalue", nestedSchema).build

val dnestedValue1 = new Struct(doubleNestedSchema)
.put("blah1", "dnes_blah1_1")
.put("blah2", "dnes_blah2_1")
val nestedValue1 = new Struct(nestedSchema)
.put("blah1", "nes_blah1_1")
.put("blah2", dnestedValue1)
val value1 = new Struct(valueSchema)
.put("key", "pqr")
.put("value", 15)
.put("nvalue", nestedValue1)

val dnestedValue2 = new Struct(doubleNestedSchema)
.put("blah1", "dnes_blah1_2")
.put("blah2", "dnes_blah2_2")
val nestedValue2 = new Struct(nestedSchema)
.put("blah1", "nes_blah1_2")
.put("blah2", dnestedValue2)
val value2 = new Struct(valueSchema)
.put("key", "abc")
.put("value", 17)
.put("nvalue", nestedValue2)

val record1 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value1, 0)
val record2 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value2, 0)

sinkTask.put(List(record1, record2).asJavaCollection)

sinkTask.stop()

val cc = CassandraCluster.local
val session = cc.session
val result = session.execute(s"select count(1) from $tableName").one()
val rowCount = result.getLong(0)
rowCount should be(2)
cc.shutdown()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CassandraSinkTask extends SinkTask with CassandraTask {
private def write(sc: SinkConfig, byTopic: Iterable[SinkRecord]): Unit = {
// TODO needs ticket: if (byTopic.size > 1) boundWrite(sc, byTopic) else
for (record <- byTopic) {
val query = record.as(sc.schema.namespace)
val query = record.as(sc.schema.namespace, sc.options.fieldMapping)
Try(session.executeAsync(query.cql)) recover { case NonFatal(e) =>
throw new ConnectException(
s"Error executing ${byTopic.size} records for schema '${sc.schema}'.", e)
Expand All @@ -64,7 +64,7 @@ class CassandraSinkTask extends SinkTask with CassandraTask {
private def boundWrite(sc: SinkConfig, byTopic: Iterable[SinkRecord]): Unit = {
val statement = prepare(session, sc)
val futures = for (record <- byTopic) yield {
val query = record.as(sc.schema.namespace)
val query = record.as(sc.schema.namespace, sc.options.fieldMapping)
try {
val bs = statement.bind(query.cql)
session.executeAsync(bs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ private[cassandra] object Syntax {
namespace.length >= 3 || namespace.contains(".")
}

def apply(namespace: String, columnNames: List[ColumnName], columnValues: String): SinkQuery = {
val columns = columnNames.mkString(",")
SinkQuery(s"INSERT INTO $namespace($columns) VALUES($columnValues)")
def apply(namespace: String, columnNamesVsValues: Map[ColumnName, String]): SinkQuery = {
val query = columnNamesVsValues.view.map(e => Vector(e._1, e._2)).transpose match {
case columnNames :: columnValues :: Nil =>
s"INSERT INTO ${namespace}(${columnNames.mkString(",")}) VALUES(${columnValues.mkString(",")})"
}
SinkQuery(query)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package com.tuplejump.kafka.connect.cassandra

import scala.collection.immutable
import scala.util.control.NonFatal
import scala.util.parsing.json.JSON
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.errors.DataException
import com.datastax.driver.core.{TableMetadata, ConsistencyLevel}
import InternalConfig._

Expand Down Expand Up @@ -123,6 +125,9 @@ object TaskConfig {
final val SinkConsistency: Key = "cassandra.sink.consistency"
final val DefaultSinkConsistency = ConsistencyLevel.LOCAL_QUORUM

final val FieldMapping: Key = "cassandra.sink.field.mapping"
final val DefaultFieldMapping = Map.empty[String, String]

/* **** Task config **** */
final val TaskParallelismStrategy: Key = "cassandra.task.parallelism"

Expand Down Expand Up @@ -156,6 +161,10 @@ private[cassandra] object InternalConfig {
def toInt(a: String): Int = a.toInt
def toLong(a: String): Long = a.toLong
def toConsistency(a: String): ConsistencyLevel = ConsistencyLevel.valueOf(a)
def toMap(a: String): Map[String, Any] = JSON.parseFull(a) collect {
case data: Map[_, _] => data.asInstanceOf[Map[String, Any]]
} getOrElse(throw new DataException(s"Field mapping type for '$a' is not supported."))


/** A Cassandra `keyspace.table` to Kafka topic mapping.
*
Expand Down Expand Up @@ -319,15 +328,21 @@ private[cassandra] object InternalConfig {
sealed trait ClusterQueryOptions

/** Settings related for individual queries, can be set per keyspace.table. */
final case class WriteOptions(consistency: ConsistencyLevel) extends ClusterQueryOptions
final case class WriteOptions(consistency: ConsistencyLevel,
fieldMapping: Map[String, Any]) extends ClusterQueryOptions

object WriteOptions {

def apply(config: Map[String,String]): WriteOptions =
WriteOptions(config.valueOr[ConsistencyLevel](
SinkConsistency, toConsistency, DefaultSourceConsistency))
def apply(config: Map[String, String]): WriteOptions = {
WriteOptions(
consistency = config.valueOr[ConsistencyLevel](
SinkConsistency, toConsistency, DefaultSourceConsistency),
fieldMapping = config.valueOr[Map[String, Any]](
FieldMapping, toMap, DefaultFieldMapping
)
)
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 def apply(config: Map[String, String]): WriteOptions = {

  def toMap(s: String) = JSON.parseFull(s) collect {
    case data: Map[_, _] => data.asInstanceOf[Map[String, Any]]
  } getOrElse(throw new DataException(s"Field mapping type for '$s' is not supported."))

  WriteOptions(
    consistency = config.valueOr[ConsistencyLevel](
      SinkConsistency, toConsistency, DefaultSourceConsistency),
    fieldMapping = config.valueOr[Map[String, Any]](
      FieldMapping, toMap, DefaultFieldMapping))
}


/** Settings related for individual queries, can be set per keyspace.table. */
final case class ReadOptions(splitSize: Int,
fetchSize: Int,
Expand Down
86 changes: 73 additions & 13 deletions src/main/scala/com/tuplejump/kafka/connect/cassandra/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.tuplejump.kafka.connect

import org.apache.kafka.connect.data.Field

/** Common package operations. */
package object cassandra {
import java.util.{List => JList, Map => JMap, Date => JDate}
Expand Down Expand Up @@ -69,29 +71,87 @@ package object cassandra {

implicit class SinkRecordOps(record: SinkRecord) {

def as(namespace: String): SinkQuery = {
def as(namespace: String, fieldMapping: Map[String, Any]): SinkQuery = {
val colNamesVsValues: Map[String, String] = {
if (fieldMapping.isEmpty) {
toCqlData
} else {
toCqlData(fieldMapping)
}
}
SinkQuery(namespace, colNamesVsValues)
}

def toCqlData(): (Map[String, String]) = {
val schema = record.valueSchema
val columnNames = schema.asColumnNames
val columnValues = schema.`type`() match {
schema.`type`() match {
case STRUCT =>
val struct: Struct = record.value.asInstanceOf[Struct]
columnNames.map(convert(schema, struct, _)).mkString(",")
case other => throw new DataException(
s"Unable to create insert statement with unsupported value schema type $other.")
schema.fields.asScala.map { field =>
field.name -> convert(schema, record.value.asInstanceOf[Struct], field)
}.toMap
Copy link
Collaborator

@helena helena May 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just

 schema.fields.asScala.map { field =>
   field.name -> convert(schema, record.value.asInstanceOf[Struct], field)
 }.toMap

case other =>
throw new DataException(
s"Unable to create insert statement with unsupported value schema type $other.")
}
}

def toCqlData(fieldMapping: Map[String, Any]): Map[String, String] = {
record.valueSchema.`type`() match {
case STRUCT =>
toColNamesVsValues(record.value.asInstanceOf[Struct], fieldMapping)
case other =>
throw new DataException(
s"Unable to create insert statement with unsupported value schema type $other.")
}
}

// scalastyle:off
private def toColNamesVsValues(struct: Struct, fieldMapping: Map[String, Any],
colNameVsValues: Map[String, String] = Map.empty): Map[String, String] = {
lazy val exception = new DataException(s"Mismatch between fieldMapping and Schema")
var result: Map[String, String] = colNameVsValues
struct.schema.fields.asScala.foreach { field =>
val fieldMappingValue = fieldMapping.get(field.name)
field.schema.`type`() match {
case STRUCT =>
fieldMappingValue match {
case Some(value) =>
value match {
case newMap: Map[_, _] =>
result = toColNamesVsValues(struct.get(field).asInstanceOf[Struct],
newMap.asInstanceOf[Map[String, Any]], result)
case _ =>
throw exception
}
case None =>
}
case _ =>
fieldMappingValue match {
case Some(value) =>
value match {
case strValue: String =>
result += (strValue -> convert(field.schema, struct, field))
case _ =>
throw exception
}
case None =>
}
}
}
SinkQuery(namespace, columnNames, columnValues)
result
}
// scalastyle:on

/* TODO support all types. */
def convert(schema: Schema, result: Struct, col: String): AnyRef =
schema.field(col).schema match {
def convert(schema: Schema, result: Struct, field: Field): String =
field.schema match {
case x if x.`type`() == Schema.STRING_SCHEMA.`type`() =>
s"'${result.get(col).toString}'"
s"'${result.get(field).toString}'"
case x if x.name() == Timestamp.LOGICAL_NAME =>
val time = Timestamp.fromLogical(x, result.get(col).asInstanceOf[JDate])
val time = Timestamp.fromLogical(x, result.get(field).asInstanceOf[JDate])
s"$time"
case y =>
result.get(col)
String.valueOf(result.get(field))
}

def asColumnNames: List[ColumnName] =
Expand Down
Loading