Skip to content

legacy Kafka

Fabiano V. Santos edited this page Jun 13, 2017 · 1 revision

Nightfall comes with support for reading messages from, it supports Kafka until 0.8.2 and both kafka client APIs.

Kafka High Level

KafkaHighLevel provides messages from Kafka, it creates a DStream.

This provider uses the Kafka High Level API, which control offsets through Zookeeper. At least once guarantees can be achieved by enabling checkpoints and write ahead logs.

All data read from Kafka are replicated in the checkpoints, at the coast of some performance. More details about this approach can be fount at Spark Kafka Receiver Approach.

Deployment: when code changes, you need to use a new checkpoint directory, due to metadata saved on checkpoint, so you also need to ensure that all data is processed before shutdown. This can be achieve by enabling the property stream.graceful.shutdown=true.

Configurations:

  • kafka.group: kafka consumer group name, example: NightfallGroup.
  • kafka.topics.map: topic list with its number of partitions, format: topic-1:partitions,topic-n:partitions. Also accepts the following format: topic-1,topic-2, with this configuration the number of partitions is set through kafka.default.topic.partitions. Note that the partitions is the number of threads used to read data from Kafka for the given topic.
  • kafka.default.topic.partitions: default number of partitions when its not set on kafka.topics.map, defaults to 1.
  • kafka.zookeeper: Zookeeper quorum address, format: host-1:port,host-2:port/chroot, example: zookeeper-1:2181,zookeeper-2:2181/kafka.

Kafka Simple

KafkaSimple provides messages from Kafka, it creates a DStream.

This provider uses the Kafka Simple API, which handle offsset control in a storage. The advantage of using Kafka Simple over High Level are better performance and better guarantees of message delivery, for more details see Direct Approach.

With this provider you do not need to use write ahead logs, as long as the kafka retention policy is long enough. Checkpoint is still required to recover from failures.

To achieve exactly once semantics you need to persist to offset range from kafka within the same transaction that you persist all your processed data, otherwise some corner cases may cause duplicated data.

Deployment: when code changes, you need to use a new checkpoint directory, due to metadata saved on checkpoint, so you also need to ensure that all data is processed before shutdown. This can be achieve by enabling the property stream.graceful.shutdown=true.

OBS: this approach do not support group consuming.

Configurations:

  • kafka.brokers: Kafka broker list, format: host-1:port,host-2:port, example: kafka-1:9092,kafka-2:9092.
  • kafka.topics: kafka topic list, format: topic-1,topic-2,topic-3.
  • kafka.offset.persistent: enables offset range persistency, default: false.
  • kafka.auto.offset.reset: what to do when there is no initial offset, defaults to largest. Possible values:
    • largest: starts from the latest offset range.
    • smallest: starts from the first offset found on Kafka.
  • kafka.simple.repository.class: repository implementation for offset persistency, when persistency is enabled defaults to com.elo7.nightfall.di.providers.kafka.topics.CassandraKafkaTopicRepository.

Offset range persistency

Nightfall comes with a default implementation for offset range persistency, which uses Cassandra as storage.

This implementation use two tables to persist the offset range:

  • offset_ranges: persist the latest processed offset range, uses the application name, topic name and partition as primary key. Thus, when application starts the offset range information is retrieved from this table.
  • offset_ranges_history: persist historical of the offset ranges processed, used to find data duplication or when an offset range was processed. Uses application name, topic name, partition and the timestamp which was processed as primary key.

The offset ranges are persisted with a task that is added as the last task to be executed, so the offset range will only be persisted after the processing of all that of an event, as long as Fair Scheduler Pools is not enabled.

** Configurations for offset range persistency with Cassandra: **

  • kafka.cassandra.offsetRange.history.ttl.days: number of days that the history of offset ranges will be stored. Default 7.
  • kafka.cassandra.offsetRange.fetch.size: number of rows read at each request. Default: 100.
  • kafka.cassandra.hosts: Cassandra host list, required. Example: node-a,node-b,node-c.
  • kafka.cassandra.port: Cassandra port connection, default: 9042.
  • kafka.cassandra.user: user to connect on Cassandra, optional.
  • kafka.cassandra.password: user password to connect on Cassandra, opcional.
  • kafka.cassandra.keyspace: keyspace to connect on Cassandra, required. Obrigatório.
  • kafka.cassandra.datacenter: data center name for cluster identification, opcional.

Migrations for cassandra

When kafka offset range history is enabled, you will need to create the keyspace in cassandra and execute the migrations, thus all required tables will be created.

For more information see Migrations.

Dependencies

nightfall-persistence-cassandra and nightfall-persistence-relational are provided and should be added with compile scope based on the persistency option that you choose. These dependencies aren't optional due the lack of support with gradle: GRADLE-1749.