Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Lab 7 - Advanced Streaming Analytics #2700

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ public interface Options extends DataflowPipelineOptions {
String getInputTopic();
void setInputTopic(String inputTopic);


@Description("Window allowed lateness, in days")
Integer getAllowedLateness();
void setAllowedLateness(Integer allowedLateness);

@Description("The Cloud Storage bucket used for writing " + "unparseable Pubsub Messages.")
String getDeadletterBucket();
void setDeadletterBucket(String deadletterBucket);

}

/**
Expand Down Expand Up @@ -180,4 +180,4 @@ public void processElement(@Element Long views, OutputReceiver<Row> r, IntervalW

return pipeline.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,73 +18,47 @@

import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.AddFields;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.*;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The {@link StreamingMinuteTrafficPipeline} is a sample pipeline which can be used as a base for creating a real
* Dataflow pipeline.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>Requirement #1
* <li>Requirement #2
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT_ID
* PIPELINE_FOLDER=gs://${PROJECT_ID}/dataflow/pipelines/sample-pipeline
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.mypackage.pipeline.BatchUserTrafficPipeline \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --runner=${RUNNER} \
* ADDITIONAL PARAMETERS HERE"
* </pre>
*/

public class StreamingMinuteTrafficPipeline {

static final TupleTag<CommonLog> parsedMessages = new TupleTag<CommonLog>() {
};
static final TupleTag<String> unparsedMessages = new TupleTag<String>() {
};
static final TupleTag<CommonLog> parsedMessages = new TupleTag<CommonLog>() {};
static final TupleTag<String> unparsedMessages = new TupleTag<String>() {};

/*
/**
* The logger to output status messages to.
*/
private static final Logger LOG = LoggerFactory.getLogger(StreamingMinuteTrafficPipeline.class);

/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
* The {@link Options} class provides the custom execution options passed by the
* executor at the command-line.
*/
public interface Options extends PipelineOptions {
public interface Options extends DataflowPipelineOptions {
@Description("Window duration length, in seconds")
Integer getWindowDuration();
void setWindowDuration(Integer windowDuration);
Expand All @@ -97,7 +71,6 @@ public interface Options extends PipelineOptions {
String getInputTopic();
void setInputTopic(String inputTopic);


@Description("Window allowed lateness, in days")
Integer getAllowedLateness();
void setAllowedLateness(Integer allowedLateness);
Expand All @@ -108,6 +81,22 @@ public interface Options extends PipelineOptions {

}

/**
* The main entry-point for pipeline execution. This method will start the
* pipeline but will not wait for it's execution to finish. If blocking
* execution is required, use the {@link StreamingMinuteTrafficPipeline#run(Options)} method to
* start the pipeline and invoke {@code result.waitUntilFinish()} on the
* {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

run(options);
}


/**
* A PTransform accepting Json and outputting tagged CommonLog with Beam Schema or raw Json string if parsing fails
*/
Expand Down Expand Up @@ -135,39 +124,27 @@ public void processElement(ProcessContext context) {
}
}


public static final Schema pageviewsSchema = Schema.builder()
/**
* A Beam schema for counting pageviews per minute
*/
public static final Schema pageViewsSchema = Schema
.builder()
.addInt64Field("pageviews")
//TODO: change window_end in other labs
.addDateTimeField("window_end")
.addDateTimeField("minute")
.build();
public static final Schema rawSchema = Schema.builder()

public static final Schema rawSchema = Schema
.builder()
.addStringField("user_id")
.addDateTimeField("event_timestamp")
.addDateTimeField("processing_timestamp")
.build();

/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* StreamingMinuteTrafficPipeline#run(Options)} method to start the pipeline and invoke
* {@code result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
PipelineOptionsFactory.register(Options.class);
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
run(options);
}

/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
* Runs the pipeline to completion with the specified options. This method does
* not wait until the pipeline is finished before returning. Invoke
* {@code result.waitUntilFinish()} on the result object to block until the
* pipeline is finished running if blocking programmatic execution is required.
*
* @param options The execution options.
* @return The pipeline result.
Expand All @@ -180,70 +157,64 @@ public static PipelineResult run(Options options) {

/*
* Steps:
* 1) Read something
* 2) Transform something
* 3) Write something
* 1) Read something
* 2) Transform something
* 3) Write something
*/

LOG.info("Building pipeline...");


PCollectionTuple transformOut =
pipeline.apply("ReadPubSubMessages", PubsubIO.readStrings()
// Retrieve timestamp information from Pubsub Message attributes
PCollectionTuple transformOut = pipeline
.apply("ReadMessage", PubsubIO.readStrings()
.withTimestampAttribute("timestamp")
.fromTopic(options.getInputTopic()))
.apply("ConvertMessageToCommonLog", new PubsubMessageToCommonLog());

// Write parsed messages to BigQuery
.apply("ConvertMessageToCommonLog", new PubsubMessageToCommonLog());

// Window and write to BQ
transformOut
// Retrieve parsed messages
.get(parsedMessages)
.apply("WindowByMinute", Window.<CommonLog>into(
FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))).withAllowedLateness(
Duration.standardDays(options.getAllowedLateness()))
.triggering(AfterWatermark.pastEndOfWindow()
.apply("WindowByMinute", Window.<CommonLog>into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration())))
.triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardDays(options.getAllowedLateness()))
.accumulatingFiredPanes())
// update to Group.globally() after resolved: https://issues.apache.org/jira/browse/BEAM-10297
// Only if supports Row output
.apply("CountPerMinute", Combine.globally(Count.<CommonLog>combineFn())
.withoutDefaults())
.apply("CountPerMinute", Combine.globally(Count.<CommonLog>combineFn()).withoutDefaults())
.apply("ConvertToRow", ParDo.of(new DoFn<Long, Row>() {
@ProcessElement
public void processElement(@Element Long views, OutputReceiver<Row> r, IntervalWindow window) {
Instant i = Instant.ofEpochMilli(window.end()
.getMillis());
Row row = Row.withSchema(pageviewsSchema)
Instant i = Instant.ofEpochMilli(window.start().getMillis());
Row row = Row.withSchema(pageViewsSchema)
.addValues(views, i)
.build();
r.output(row);
}
}))
.setRowSchema(pageviewsSchema)
// TODO: is this a streaming insert?
.apply("WriteToBQ", BigQueryIO.<Row>write().to(options.getOutputTableName())
.useBeamSchema()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
})).setRowSchema(pageViewsSchema)
// Streaming insert of aggregate data
.apply("WriteAggregateToBQ",
BigQueryIO.<Row>write().to(options.getOutputTableName()).useBeamSchema()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

// Write unparsed messages to Cloud Storage
transformOut
// Retrieve unparsed messages
.get(unparsedMessages)
.apply("FireEvery10s", Window.<String>configure().triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10))))
.apply("FireEvery10s", Window.<String>configure()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10))))
.discardingFiredPanes())
.apply("WriteDeadletterStorage", TextIO.write()
//TODO: change this to actual full parameter
.to(options.getDeadletterBucket() + "/deadletter/*")
.withWindowedWrites()
.withNumShards(10));

LOG.info("Building pipeline...");

return pipeline.run();
}
}