Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming BigQuery Loader modules #7

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 24 additions & 35 deletions README.md

Large diffs are not rendered by default.

209 changes: 67 additions & 142 deletions main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ locals {
module_name = "bigquery-loader-pubsub-ce"
module_version = "0.3.1"

app_name = "snowplow-bigquery-loader"
app_version = var.app_version
app_name = "snowplow-bigquery-loader"
app_version = var.app_version
ingress_port = 8000

local_labels = {
name = var.name
Expand Down Expand Up @@ -32,10 +33,6 @@ module "telemetry" {
app_version = local.app_version
module_name = local.module_name
module_version = local.module_version

app_name_override_1 = "snowplow-bigquery-mutator"
app_name_override_2 = "snowplow-bigquery-repeater"
app_name_override_3 = "snowplow-bigquery-streamloader"
}

# --- IAM: Service Account setup
Expand Down Expand Up @@ -75,20 +72,6 @@ resource "google_project_iam_member" "sa_bigquery_data_editor" {
member = "serviceAccount:${google_service_account.sa.email}"
}

resource "google_project_iam_member" "sa_storage_object_viewer" {
project = var.project_id
role = "roles/storage.objectViewer"
member = "serviceAccount:${google_service_account.sa.email}"
}

resource "google_storage_bucket_iam_binding" "dead_letter_storage_object_admin_binding" {
bucket = var.gcs_dead_letter_bucket_name
role = "roles/storage.objectAdmin"
members = [
"serviceAccount:${google_service_account.sa.email}"
]
}

resource "google_bigquery_dataset_iam_member" "dataset_bigquery_data_editor_binding" {
project = var.project_id
dataset_id = var.bigquery_dataset_id
Expand All @@ -98,6 +81,21 @@ resource "google_bigquery_dataset_iam_member" "dataset_bigquery_data_editor_bind

# --- CE: Firewall rules

resource "google_compute_firewall" "ingress_health_check" {
count = var.healthcheck_enabled == true ? 1 : 0
name = "${var.name}-traffic-in"

network = var.network
target_tags = [var.name]

allow {
protocol = "tcp"
ports = ["${local.ingress_port}"]
}

source_ranges = ["130.211.0.0/22", "35.191.0.0/16"]
}

resource "google_compute_firewall" "ingress_ssh" {
project = (var.network_project_id != "") ? var.network_project_id : var.project_id
name = "${var.name}-ssh-in"
Expand Down Expand Up @@ -136,16 +134,6 @@ resource "google_compute_firewall" "egress" {

# --- PubSub: Topics and subscriptions

resource "google_pubsub_topic" "types" {
name = "${var.name}-types-topic"
labels = local.labels
}

resource "google_pubsub_topic" "failed_inserts" {
name = "${var.name}-failed-inserts-topic"
labels = local.labels
}

resource "google_pubsub_subscription" "input" {
name = "${var.name}-input"
topic = var.input_topic_name
Expand All @@ -157,28 +145,6 @@ resource "google_pubsub_subscription" "input" {
labels = local.labels
}

resource "google_pubsub_subscription" "types" {
name = "${var.name}-types"
topic = google_pubsub_topic.types.name

expiration_policy {
ttl = ""
}

labels = local.labels
}

resource "google_pubsub_subscription" "failed_inserts" {
name = "${var.name}-failed-inserts"
topic = google_pubsub_topic.failed_inserts.name

expiration_policy {
ttl = ""
}

labels = local.labels
}

# --- CE: Instance group setup

locals {
Expand Down Expand Up @@ -220,115 +186,74 @@ locals {
local.resolvers_closed
])

iglu_resolver = templatefile("${path.module}/templates/iglu_resolver.json.tmpl", { resolvers = jsonencode(local.resolvers) })

config = templatefile("${path.module}/templates/config.json.tmpl", {
project_id = var.project_id

# config: loader
input_subscription_name = google_pubsub_subscription.input.name
dataset_id = var.bigquery_dataset_id
table_id = var.bigquery_table_id
bad_rows_topic_name = var.bad_rows_topic_name
types_topic_name = google_pubsub_topic.types.name
failed_inserts_topic_name = google_pubsub_topic.failed_inserts.name

# config: mutator
types_sub_name = google_pubsub_subscription.types.name
iglu_resolver = templatefile("${path.module}/templates/iglu_resolver.json.tmpl", {
resolvers = jsonencode(local.resolvers)
cache_size = var.iglu_cache_size
cache_ttl = var.iglu_cache_ttl_seconds
})

# config: repeater
failed_inserts_sub_name = google_pubsub_subscription.failed_inserts.name
gcs_dead_letter_bucket_name = var.gcs_dead_letter_bucket_name
hocon = templatefile("${path.module}/templates/config.json.tmpl", {
project_id = var.project_id
input_subscription_id = google_pubsub_subscription.input.id
dataset_id = var.bigquery_dataset_id
table_id = var.bigquery_table_id
bad_rows_topic_id = var.bad_rows_topic_id
bigquery_service_account_json_b64 = var.bigquery_service_account_json_b64

skip_schemas = jsonencode(var.skip_schemas)
legacy_columns = jsonencode(var.legacy_columns)
webhook_collector = var.webhook_collector
tags = jsonencode(var.labels)

telemetry_disable = !var.telemetry_enabled
telemetry_collector_uri = join("", module.telemetry.*.collector_uri)
telemetry_collector_port = 443
telemetry_secure = true
telemetry_user_provided_id = var.user_provided_id
telemetry_auto_gen_id = join("", module.telemetry.*.auto_generated_id)
telemetry_module_name = local.module_name
telemetry_module_version = local.module_version

exit_on_missing_iglu_schema = var.exit_on_missing_iglu_schema

legacy_column_mode = var.legacy_column_mode
})

config_base64 = base64encode(local.config)
iglu_resolver_base64 = base64encode(local.iglu_resolver)

applications = {
mutator = {
metadata_startup_script = templatefile("${path.module}/templates/startup-script.sh.tmpl", {
application_script = templatefile("${path.module}/templates/bq-mutator.sh.tmpl", {
accept_limited_use_license = var.accept_limited_use_license

version = local.app_version
require_partition_filter = var.bigquery_require_partition_filter
partition_column = var.bigquery_partition_column
config_base64 = local.config_base64
iglu_resolver_base64 = local.iglu_resolver_base64
gcp_logs_enabled = var.gcp_logs_enabled
java_opts = var.java_opts
})
telemetry_script = join("", module.telemetry.*.gcp_ubuntu_20_04_user_data_1)
})

machine_type = var.machine_type_mutator
target_size = 1
}

repeater = {
metadata_startup_script = templatefile("${path.module}/templates/startup-script.sh.tmpl", {
application_script = templatefile("${path.module}/templates/bq-repeater.sh.tmpl", {
accept_limited_use_license = var.accept_limited_use_license

version = local.app_version
config_base64 = local.config_base64
iglu_resolver_base64 = local.iglu_resolver_base64
gcp_logs_enabled = var.gcp_logs_enabled
java_opts = var.java_opts
})
telemetry_script = join("", module.telemetry.*.gcp_ubuntu_20_04_user_data_2)
})

machine_type = var.machine_type_repeater
target_size = var.target_size_repeater
}

streamloader = {
metadata_startup_script = templatefile("${path.module}/templates/startup-script.sh.tmpl", {
application_script = templatefile("${path.module}/templates/bq-streamloader.sh.tmpl", {
accept_limited_use_license = var.accept_limited_use_license

version = local.app_version
config_base64 = local.config_base64
iglu_resolver_base64 = local.iglu_resolver_base64
gcp_logs_enabled = var.gcp_logs_enabled
java_opts = var.java_opts
})
telemetry_script = join("", module.telemetry.*.gcp_ubuntu_20_04_user_data_3)
})

machine_type = var.machine_type_streamloader
target_size = var.target_size_streamloader
}
}
startup_script = templatefile("${path.module}/templates/startup-script.sh.tmpl", {
version = local.app_version
config_b64 = base64encode(local.hocon)
iglu_config_b64 = base64encode(local.iglu_resolver)
accept_limited_use_license = var.accept_limited_use_license
bigquery_service_account_json_b64 = base64decode(var.bigquery_service_account_json_b64)
telemetry_script = join("", module.telemetry.*.gcp_ubuntu_20_04_user_data)
gcp_logs_enabled = var.gcp_logs_enabled
java_opts = var.java_opts
})
}

module "service" {
source = "snowplow-devops/service-ce/google"
version = "0.1.0"

for_each = local.applications

user_supplied_script = each.value.metadata_startup_script
name = "${var.name}-${each.key}"
user_supplied_script = local.startup_script
name = var.name
instance_group_version_name = "${local.app_name}-${local.app_version}"

labels = merge(
local.labels,
{
app_project_name = each.key
}
)
labels = local.labels

region = var.region
network = var.network
subnetwork = var.subnetwork

ubuntu_20_04_source_image = var.ubuntu_20_04_source_image
machine_type = each.value.machine_type
target_size = each.value.target_size
machine_type = var.machine_type
target_size = var.target_size
ssh_block_project_keys = var.ssh_block_project_keys
ssh_key_pairs = var.ssh_key_pairs
service_account_email = google_service_account.sa.email
associate_public_ip_address = var.associate_public_ip_address

named_port_http = var.healthcheck_enabled == true ? "http" : ""
ingress_port = var.healthcheck_enabled == true ? local.ingress_port : -1
health_check_path = var.healthcheck_enabled == true ? "/" : ""
}
32 changes: 23 additions & 9 deletions outputs.tf
Original file line number Diff line number Diff line change
@@ -1,20 +1,34 @@
output "named_port_http" {
value = module.service.named_port_http
description = "The name of the port exposed by the instance group"
}

output "named_port_value" {
value = module.service.named_port_value
description = "The named port value (e.g. 8080)"
}

output "manager_id" {
value = {
for k, v in module.service : k => v.manager_id
}
value = module.service.manager_id
description = "Identifier for the instance group manager"
}

output "manager_self_link" {
value = {
for k, v in module.service : k => v.manager_self_link
}
value = module.service.manager_self_link
description = "The URL for the instance group manager"
}

output "instance_group_url" {
value = {
for k, v in module.service : k => v.instance_group_url
}
value = module.service.instance_group_url
description = "The full URL of the instance group created by the manager"
}

output "health_check_id" {
value = module.service.health_check_id
description = "Identifier for the health check on the instance group"
}

output "health_check_self_link" {
value = module.service.health_check_self_link
description = "The URL for the health check on the instance group"
}
43 changes: 0 additions & 43 deletions templates/bq-mutator.sh.tmpl

This file was deleted.

20 changes: 0 additions & 20 deletions templates/bq-repeater.sh.tmpl

This file was deleted.

Loading