Skip to content

How to use

Fabiano V. Santos edited this page Jun 26, 2017 · 10 revisions

How to use

First of all you need to add Nightfall to your project.

To use dependency inject you need to create a main class to boot the container. This class need to be annotated with @Nightfall, which requires an ExecutionMode.

  • ExecutionMode.BATCH for batch jobs.
  • ExecutionMode.STREAM for stream jobs.

This is required because Spark has different apis for reading and/or wrinting to streams and batches.

The tasks to be performed mandatorily need to be annotated with Task, otherwise they will not be injected.

  1. "Hello World!"
  2. Dependency injection on Spark jobs
  3. Streaming Listener

Hello World

The following is a Stream example using nightfall-kafka-0_10: First of all the main application needs to start the container:

@Nightfall(ExecutionMode.STREAM)
public class KafkaApplicationExample {

    public static void main(String[] args) {
        NightfallApplication.run(KafkaApplicationExample.class, args);
    }
}

Below a task example:

@Task
@Singleton
class KafkaExampleTask implements TaskProcessor {

    private static final long serialVersionUID = 1L;
    private final Dataset<Row> events;

    @Inject
    KafkaExampleTask(@Kafka Dataset<Row> events) {
        this.events = events;
    }

    @Override
    public void process() {
        events
                .selectExpr("CAST(value AS STRING)")
                .as(Encoders.STRING())
                .writeStream()
                .format("console")
                .start();
    }
}

View source code for Stream

View source code for Batch

Remember to put the nightfall.properties file into the classpath or set it through the -e flag when running your job. See Configurations for more information.

Dependency injection on Spark jobs

To inject dependencies on Tasks you just need to annotate the constructor with @Inject, just like that:

@Task
@Singleton
public class MyBeanExampleTask implements TaskProcessor {

    private static final long serialVersionUID = 1L;

    private final SparkSession session;
    private final MyBean myBean;

    @Inject
    public MyBeanExampleTask(MyBean myBean, SparkSession session) {
        this.myBean = myBean;
        this.session = session;
    }

    @Override
    public void process() {
        session.createDataset(Arrays.asList("input 1", "input 2", "invalid input"), Encoders.STRING())
                .foreach(myBean::log);
    }
}

Note: injection not only works for Tasks, MyBean can inject other object, like this:

public class MyBean implements Serializable {

    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(MyBean.class);

    private final MyReporter reporter;

    @Inject
    public MyBean(MyReporter reporter) {
        this.reporter = reporter;
    }

    public void log(Row row) {
        reporter.sendMetric();
        LOGGER.info("######################## \n Processed: {} \n ######################## ", row);
    }
}

View source code for Batch

StreamingListener

Any StreamingListener can be automatically added to Spark streaming listener by:

  • Annotating the class with @Component
  • Enable it in your properties: com.elo7.nightfall.di.providers.reporter.ConsoleReporter=true

View source code for ConsoleReporter

StatsD Reporter

Send metrics about streaming progress to StatsD.

Configurations:

  • com.elo7.nightfall.di.providers.reporter.statsd.StatsDReporter=true: enables StatsD reporter;
  • nightfall.statsd.prefix: prefix for all metrics, required;
  • nightfall.statsd.host: StatsD host address, required;
  • nightfall.statsd.port: StatsD host port, default 8125;
  • nightfall.statsd.bufferSize: buffer size for multi metric sending, default 512;

The following metrics are sent to StatsD at query progress:

  • inputRowsPerSecond: the aggregate (across all sources) rate of data arriving, gauge and roundend to the nearest long value;
  • processedRowsPerSecond: the aggregate (across all sources) rate at which Spark is processing data, gauge and roundend to the nearest long value;
  • numInputRows: the aggregate (across all sources) number of records processed in a trigger;
  • durationMs: each duration is sent as a timer using its key as metric name with duration appended to the prefix, example: for queryPlanning the metric sent to StatsD would be <STATSD_PREFIX>.duration.queryPlanning:123|ms.