-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathbeamPubSubXml2Gcs.py
191 lines (167 loc) · 6.51 KB
/
beamPubSubXml2Gcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Copyright 2023 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
from datetime import datetime
import logging
import json
import random
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, \
PTransform, WindowInto, WithKeys, Map
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
import xml.etree.ElementTree as ET
class GroupMessagesByFixedWindows(PTransform):
"""A composite transform that groups Pub/Sub messages based on publish time
and outputs a list of tuples, each containing a message and its publish
time.
"""
def __init__(self, window_size, num_shards=5):
# Set window size to 60 seconds * window_size.
self.window_size = int(window_size * 60)
self.num_shards = num_shards
def expand(self, pcoll):
return (
pcoll
# Bind window info to each element using element timestamp (or \
# publish time).
| "Window into fixed intervals"
>> WindowInto(FixedWindows(self.window_size))
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
# Assign a random key to each windowed element based on the \
# number of shards.
| "Add key" >> WithKeys(
lambda _: random.randint(0, self.num_shards - 1)
)
# Group windowed elements by key. All the elements in the same \
# window must fit memory for this. If not, you need to use \
# `beam.util.BatchElements`.
| "Group by key" >> GroupByKey()
| "Drop shard key after grouping" >> Map(lambda element: element[1])
)
class AddTimestamp(DoFn):
def process(self, element, publish_time=DoFn.TimestampParam):
"""Processes each parsed element by extracting the message body and its
received time into a tuple.
"""
yield (
{
"ts": datetime.utcfromtimestamp(float(publish_time)).
strftime("%Y-%m-%d %H:%M:%S.%f")
} | element
)
class ParseXML(DoFn):
def process(self, message_body):
"""Parse all tags and attributes from an XML and serialize them to a
dict for later storage."""
try:
parsedXml = ET.fromstring(message_body)
allTags = []
allTagsText = []
for element in parsedXml:
allTags.append(element.tag)
allTagsText.append(element.text)
yield {"tags": allTags, "text": allTagsText}
except Exception as e:
yield {"error": str(e), "raw_contents": message_body}
def run(project_id,
input_topic,
gcs_path,
window_size,
num_shards,
runner,
region,
pipeline_args=None):
# Set `save_main_session` to True so DoFns can access globally imported
# modules.
input_topic = "projects/{0}/topics/{1}".format(project_id, input_topic)
if gcs_path[-1] == "/":
gcs_path = gcs_path[:-1]
output_path = "{0}/output/".format(gcs_path)
provided_args = {
"project": project_id,
"runner": runner,
"region": region,
"staging_location": "{0}/staging/".format(gcs_path),
"temp_location": "{0}/temp/".format(gcs_path),
"streaming": True,
"save_main_session": True
}
pipeline_options = PipelineOptions(
pipeline_args, **provided_args
)
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
# Because `timestamp_attribute` is unspecified in `ReadFromPubSub`,
# Beam binds the publish time returned by the Pub/Sub server for
# each message to the element's timestamp parameter, accessible via
# `DoFn.TimestampParam`.
# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
# https://cloud.google.com/pubsub/docs/stream-messages-dataflow#set_up_your_pubsub_project
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Parse XML tags and attributes" >> ParDo(ParseXML())
| "Window into" >> GroupMessagesByFixedWindows(window_size,
num_shards)
| "Serialize" >> Map(json.dumps, indent = 2)
| "Write to GCS" >> fileio.WriteToFiles(path=output_path, shards=0)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--project_id",
help="The GCP project that hosts the PubSub and Dataflow.",
)
parser.add_argument(
"--input_topic_id",
help="The Cloud Pub/Sub topic to read from.",
)
parser.add_argument(
"--runner",
help="""The beam runner to be used. For cloud Dataflow:
'DataflowRunner'. For local debugging: 'DirectRunner'.
[Defaults to: 'DataflowRunner']""",
default='DataflowRunner',
)
parser.add_argument(
"--region",
help="The GCP region for Dataflow. [Defaults to: 'us-central1']",
default='us-central1',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in minutes. [Defaults to: 1.0]",
)
parser.add_argument(
"--gcs_path",
help="Path of the output GCS file including the prefix.",
)
parser.add_argument(
"--num_shards",
type=int,
default=5,
help="""Number of shards to use when writing windowed elements to GCS.
[Defaults to: 5]""",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.project_id,
known_args.input_topic_id,
known_args.gcs_path,
known_args.window_size,
known_args.num_shards,
known_args.runner,
known_args.region,
pipeline_args,
)