From 62879e34f4ad9a6aa28c15a7e322cd6f6b85a6ed Mon Sep 17 00:00:00 2001 From: Aalok Mehta Date: Sun, 24 Nov 2024 22:50:59 -0500 Subject: [PATCH 1/3] fix whitespace in quest for Dataflow --- .../mypackage/pipeline/StreamingMinuteTrafficPipeline.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/quests/dataflow/7_Advanced_Streaming_Analytics/labs/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java b/quests/dataflow/7_Advanced_Streaming_Analytics/labs/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java index 5bd7a2a555..743963ba3c 100644 --- a/quests/dataflow/7_Advanced_Streaming_Analytics/labs/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java +++ b/quests/dataflow/7_Advanced_Streaming_Analytics/labs/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java @@ -66,7 +66,6 @@ public interface Options extends DataflowPipelineOptions { String getInputTopic(); void setInputTopic(String inputTopic); - @Description("Window allowed lateness, in days") Integer getAllowedLateness(); void setAllowedLateness(Integer allowedLateness); @@ -74,6 +73,7 @@ public interface Options extends DataflowPipelineOptions { @Description("The Cloud Storage bucket used for writing " + "unparseable Pubsub Messages.") String getDeadletterBucket(); void setDeadletterBucket(String deadletterBucket); + } /** @@ -180,4 +180,4 @@ public void processElement(@Element Long views, OutputReceiver r, IntervalW return pipeline.run(); } -} \ No newline at end of file +} From bfe2c113c707f5b266c2832ba09f4d437cd7d068 Mon Sep 17 00:00:00 2001 From: Aalok Mehta Date: Mon, 25 Nov 2024 10:05:22 -0500 Subject: [PATCH 2/3] Standardize Lab 7 solution to quest --- .../StreamingMinuteTrafficPipeline.java | 150 +++++++----------- 1 file changed, 57 insertions(+), 93 deletions(-) diff --git a/quests/dataflow/7_Advanced_Streaming_Analytics/solution/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java b/quests/dataflow/7_Advanced_Streaming_Analytics/solution/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java index 9726b35637..9fdbcf9a84 100644 --- a/quests/dataflow/7_Advanced_Streaming_Analytics/solution/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java +++ b/quests/dataflow/7_Advanced_Streaming_Analytics/solution/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java @@ -18,6 +18,7 @@ import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; @@ -35,56 +36,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * The {@link StreamingMinuteTrafficPipeline} is a sample pipeline which can be used as a base for creating a real - * Dataflow pipeline. - * - *

Pipeline Requirements - * - *

    - *
  • Requirement #1 - *
  • Requirement #2 - *
- * - *

Example Usage - * - *

- * # Set the pipeline vars
- * PROJECT_ID=PROJECT_ID
- * PIPELINE_FOLDER=gs://${PROJECT_ID}/dataflow/pipelines/sample-pipeline
- *
- * # Set the runner
- * RUNNER=DataflowRunner
- *
- * # Build the template
- * mvn compile exec:java \
- * -Dexec.mainClass=com.mypackage.pipeline.BatchUserTrafficPipeline \
- * -Dexec.cleanupDaemonThreads=false \
- * -Dexec.args=" \
- * --project=${PROJECT_ID} \
- * --stagingLocation=${PIPELINE_FOLDER}/staging \
- * --tempLocation=${PIPELINE_FOLDER}/temp \
- * --runner=${RUNNER} \
- * ADDITIONAL PARAMETERS HERE"
- * 
- */ + public class StreamingMinuteTrafficPipeline { - static final TupleTag parsedMessages = new TupleTag() { - }; - static final TupleTag unparsedMessages = new TupleTag() { - }; + static final TupleTag parsedMessages = new TupleTag() {}; + static final TupleTag unparsedMessages = new TupleTag() {}; - /* + /** * The logger to output status messages to. */ private static final Logger LOG = LoggerFactory.getLogger(StreamingMinuteTrafficPipeline.class); /** - * The {@link Options} class provides the custom execution options passed by the executor at the - * command-line. + * The {@link Options} class provides the custom execution options passed by the + * executor at the command-line. */ - public interface Options extends PipelineOptions { + public interface Options extends DataflowPipelineOptions { @Description("Window duration length, in seconds") Integer getWindowDuration(); void setWindowDuration(Integer windowDuration); @@ -97,7 +64,6 @@ public interface Options extends PipelineOptions { String getInputTopic(); void setInputTopic(String inputTopic); - @Description("Window allowed lateness, in days") Integer getAllowedLateness(); void setAllowedLateness(Integer allowedLateness); @@ -108,6 +74,22 @@ public interface Options extends PipelineOptions { } + /** + * The main entry-point for pipeline execution. This method will start the + * pipeline but will not wait for it's execution to finish. If blocking + * execution is required, use the {@link StreamingMinuteTrafficPipeline#run(Options)} method to + * start the pipeline and invoke {@code result.waitUntilFinish()} on the + * {@link PipelineResult}. + * + * @param args The command-line args passed by the executor. + */ + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + run(options); + } + + /** * A PTransform accepting Json and outputting tagged CommonLog with Beam Schema or raw Json string if parsing fails */ @@ -135,39 +117,27 @@ public void processElement(ProcessContext context) { } } - - public static final Schema pageviewsSchema = Schema.builder() + /** + * A Beam schema for counting pageviews per minute + */ + public static final Schema pageViewsSchema = Schema + .builder() .addInt64Field("pageviews") - //TODO: change window_end in other labs - .addDateTimeField("window_end") + .addDateTimeField("minute") .build(); - public static final Schema rawSchema = Schema.builder() + + public static final Schema rawSchema = Schema + .builder() .addStringField("user_id") .addDateTimeField("event_timestamp") .addDateTimeField("processing_timestamp") .build(); /** - * The main entry-point for pipeline execution. This method will start the pipeline but will not - * wait for it's execution to finish. If blocking execution is required, use the {@link - * StreamingMinuteTrafficPipeline#run(Options)} method to start the pipeline and invoke - * {@code result.waitUntilFinish()} on the {@link PipelineResult}. - * - * @param args The command-line args passed by the executor. - */ - public static void main(String[] args) { - PipelineOptionsFactory.register(Options.class); - Options options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(Options.class); - run(options); - } - - /** - * Runs the pipeline to completion with the specified options. This method does not wait until the - * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result - * object to block until the pipeline is finished running if blocking programmatic execution is - * required. + * Runs the pipeline to completion with the specified options. This method does + * not wait until the pipeline is finished before returning. Invoke + * {@code result.waitUntilFinish()} on the result object to block until the + * pipeline is finished running if blocking programmatic execution is required. * * @param options The execution options. * @return The pipeline result. @@ -180,52 +150,46 @@ public static PipelineResult run(Options options) { /* * Steps: - * 1) Read something - * 2) Transform something - * 3) Write something + * 1) Read something + * 2) Transform something + * 3) Write something */ - LOG.info("Building pipeline..."); - - - PCollectionTuple transformOut = - pipeline.apply("ReadPubSubMessages", PubsubIO.readStrings() - // Retrieve timestamp information from Pubsub Message attributes + PCollectionTuple transformOut = pipeline + .apply("ReadMessage", PubsubIO.readStrings() .withTimestampAttribute("timestamp") .fromTopic(options.getInputTopic())) - .apply("ConvertMessageToCommonLog", new PubsubMessageToCommonLog()); - // Write parsed messages to BigQuery + .apply("ConvertMessageToCommonLog", new PubsubMessageToCommonLog()); + + // Window and write to BQ transformOut // Retrieve parsed messages .get(parsedMessages) - .apply("WindowByMinute", Window.into( - FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))).withAllowedLateness( + .apply("WindowByMinute", Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))) + .withAllowedLateness( Duration.standardDays(options.getAllowedLateness())) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterPane.elementCountAtLeast(1))) .accumulatingFiredPanes()) // update to Group.globally() after resolved: https://issues.apache.org/jira/browse/BEAM-10297 // Only if supports Row output - .apply("CountPerMinute", Combine.globally(Count.combineFn()) - .withoutDefaults()) + .apply("CountPerMinute", Combine.globally(Count.combineFn()).withoutDefaults()) .apply("ConvertToRow", ParDo.of(new DoFn() { @ProcessElement public void processElement(@Element Long views, OutputReceiver r, IntervalWindow window) { - Instant i = Instant.ofEpochMilli(window.end() - .getMillis()); - Row row = Row.withSchema(pageviewsSchema) + Instant i = Instant.ofEpochMilli(window.start().getMillis()); + Row row = Row.withSchema(pageViewsSchema) .addValues(views, i) .build(); r.output(row); } - })) - .setRowSchema(pageviewsSchema) - // TODO: is this a streaming insert? - .apply("WriteToBQ", BigQueryIO.write().to(options.getOutputTableName()) - .useBeamSchema() - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); + })).setRowSchema(pageViewsSchema) + // Streaming insert of aggregate data + .apply("WriteAggregateToBQ", + BigQueryIO.write().to(options.getOutputTableName()).useBeamSchema() + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); // Write unparsed messages to Cloud Storage transformOut @@ -242,8 +206,8 @@ public void processElement(@Element Long views, OutputReceiver r, IntervalW .withWindowedWrites() .withNumShards(10)); + LOG.info("Building pipeline..."); return pipeline.run(); } } - From bcf584ec39edecb806b1d6902a8f06bea51fb9a8 Mon Sep 17 00:00:00 2001 From: Aalok Mehta Date: Mon, 25 Nov 2024 11:15:23 -0500 Subject: [PATCH 3/3] Apply style to solution --- .../StreamingMinuteTrafficPipeline.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/quests/dataflow/7_Advanced_Streaming_Analytics/solution/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java b/quests/dataflow/7_Advanced_Streaming_Analytics/solution/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java index 9fdbcf9a84..324194e911 100644 --- a/quests/dataflow/7_Advanced_Streaming_Analytics/solution/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java +++ b/quests/dataflow/7_Advanced_Streaming_Analytics/solution/src/main/java/com/mypackage/pipeline/StreamingMinuteTrafficPipeline.java @@ -25,12 +25,19 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.AddFields; +import org.apache.beam.sdk.schemas.transforms.Select; import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.transforms.windowing.*; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.*; +import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -167,10 +174,10 @@ public static PipelineResult run(Options options) { // Retrieve parsed messages .get(parsedMessages) .apply("WindowByMinute", Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))) - .withAllowedLateness( - Duration.standardDays(options.getAllowedLateness())) - .triggering(AfterWatermark.pastEndOfWindow() + .triggering( + AfterWatermark.pastEndOfWindow() .withLateFirings(AfterPane.elementCountAtLeast(1))) + .withAllowedLateness(Duration.standardDays(options.getAllowedLateness())) .accumulatingFiredPanes()) // update to Group.globally() after resolved: https://issues.apache.org/jira/browse/BEAM-10297 // Only if supports Row output @@ -195,13 +202,13 @@ public void processElement(@Element Long views, OutputReceiver r, IntervalW transformOut // Retrieve unparsed messages .get(unparsedMessages) - .apply("FireEvery10s", Window.configure().triggering( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardSeconds(10)))) + .apply("FireEvery10s", Window.configure() + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardSeconds(10)))) .discardingFiredPanes()) .apply("WriteDeadletterStorage", TextIO.write() - //TODO: change this to actual full parameter .to(options.getDeadletterBucket() + "/deadletter/*") .withWindowedWrites() .withNumShards(10));