-
Notifications
You must be signed in to change notification settings - Fork 1
How to use
First of all you need to add Nightfall to your project.
To use dependency inject you need to create a main class to boot the container. This class need to be annotated with @Nightfall
, which requires an ExecutionMode
.
-
ExecutionMode.BATCH
for batch jobs. -
ExecutionMode.STREAM
for stream jobs.
This is required because Spark has different apis for reading and/or wrinting to streams and batches.
The tasks to be performed mandatorily need to be annotated with Task
, otherwise they will not be injected.
The following is a Stream
example using nightfall-kafka-0_10
:
First of all the main application needs to start the container:
@Nightfall(ExecutionMode.STREAM)
public class KafkaApplicationExample {
public static void main(String[] args) {
NightfallApplication.run(KafkaApplicationExample.class, args);
}
}
Below a task example:
@Task
@Singleton
class KafkaExampleTask implements TaskProcessor {
private static final long serialVersionUID = 1L;
private final Dataset<Row> events;
@Inject
KafkaExampleTask(@Kafka Dataset<Row> events) {
this.events = events;
}
@Override
public void process() {
events
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING())
.writeStream()
.format("console")
.start();
}
}
Remember to put the nightfall.properties
file into the classpath or set it through the -e
flag when running your job. See Configurations for more information.
To inject dependencies on Tasks
you just need to annotate the constructor with @Inject
, just like that:
@Task
@Singleton
public class MyBeanExampleTask implements TaskProcessor {
private static final long serialVersionUID = 1L;
private final SparkSession session;
private final MyBean myBean;
@Inject
public MyBeanExampleTask(MyBean myBean, SparkSession session) {
this.myBean = myBean;
this.session = session;
}
@Override
public void process() {
session.createDataset(Arrays.asList("input 1", "input 2", "invalid input"), Encoders.STRING())
.foreach(myBean::log);
}
}
Note: injection not only works for Tasks
, MyBean
can inject other object, like this:
public class MyBean implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LoggerFactory.getLogger(MyBean.class);
private final MyReporter reporter;
@Inject
public MyBean(MyReporter reporter) {
this.reporter = reporter;
}
public void log(Row row) {
reporter.sendMetric();
LOGGER.info("######################## \n Processed: {} \n ######################## ", row);
}
}
Any StreamingListener can be automatically added to Spark streaming listener by:
- Annotating the class with
@Component
- Enable it in your properties:
com.elo7.nightfall.di.providers.reporter.ConsoleReporter=true
View source code for ConsoleReporter
Send metrics about streaming progress to StatsD.
Configurations:
-
com.elo7.nightfall.di.providers.reporter.statsd.StatsDReporter=true
: enables StatsD reporter; -
nightfall.statsd.prefix
: prefix for all metrics, required; -
nightfall.statsd.host
: StatsD host address, required; -
nightfall.statsd.port
: StatsD host port, default8125
; -
nightfall.statsd.bufferSize
: buffer size for multi metric sending, default512
;
The following metrics are sent to StatsD at query progress:
- inputRowsPerSecond: the aggregate (across all sources) rate of data arriving, gauge and roundend to the nearest long value;
- processedRowsPerSecond: the aggregate (across all sources) rate at which Spark is processing data, gauge and roundend to the nearest long value;
- numInputRows: the aggregate (across all sources) number of records processed in a trigger;
-
durationMs: each duration is sent as a timer using its key as metric name with duration appended to the prefix, example: for
queryPlanning
the metric sent to StatsD would be<STATSD_PREFIX>.duration.queryPlanning:123|ms
.
Elo7