Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Dynamically create configuration for N Pipelines #3

Closed
wants to merge 4 commits into from
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions specs/dynamic-pipelines.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Dynamically create N number of pipelines from Singer output

## `meltano.yml`

```yaml
plugins:
extractors:
# Third-party data source
- name: tap-shopify

# Configuration sources
- name: tap-postgres--shopify_configs
inherit_from: tap-postgres
select:
- public-shopify_configs.*

# Third-party destination
loaders:
- name: target-s3-parquet

schedules:
- name: sync-all-shopify
interval: @hourly
config_source:
tap-shopify: tap-postgres--shopify_configs
extractor: tap-shopify
loader: target-s3-parquet
```

Where each key in the `config_source` mapping is a plugin used in the schedule and each value is a **tap** name.

> [!NOTE]
> TBD: how well the above configuration spec plays with _jobs_ since a job name can be referenced in a schedule.

> [!NOTE]
> TBD: how to reference a _pipeline_ instead of a plain tap, in case a mapper is required, for example.

> [!NOTE]
> TBD: This design makes sense for extractors. Do we want to support config sources for loaders too? If so, how could we ensure both collections of configs have the same cardinality and order?

## Under the hood

By writing dynamic configurations at the _schedule_ level, `meltano run` would be able to invoke the respective plugin with each of the configurations, but it is TBD whether this is expected of `meltano run`, or if it's the responsibility of the orchestrator (e.g. Meltano Cloud).

## Alternatives

* [Annotations](https://docs.meltano.com/concepts/project/#annotations) could be used in the schedule definition:

```yaml
plugins: ... # same as above
schedules:
- name: sync-all-shopify
interval: @hourly
annotations:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarrmondragon I like this approach b/c it keeps it to Cloud for now and let's us iterate on the spec / structure before requiring it to be fully in the meltano.yml spec.

Can we get a first pass on this up and running on Cloud soon-ish? I think there are a few things we'll have to consider as you start to implement around how to name jobs, logs, etc and I don't see any specs around that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's other things that are not clear to me:

  1. Does state play a role? That is, do we want to pull only updated configs or everything that's in the credentials source?
  2. Do we store the list of credentials in a (temporary) storage and then iterate over each? This would help with debuging but we'd need to think of a lifecycle for that data.

@WillDaSilva do we use annotations for anything in cloud?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarrmondragon We currently only use annotations in Meltano Cloud here:

https://github.com/meltano/infra/blob/4d26a7bb575b6f460de97b33ea190d5ff12790a3/services/provisioner/provisioner_api/utils.py#L209-L228

This lets you specify which environments a schedule should run in. If unspecified, the schedule will exist for every environment, and be run for every environment for which there is a deployment in which the schedule is enabled.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does state play a role? That is, do we want to pull only updated configs or everything that's in the credentials source?

I'd think state would mainly play a role in the actual runs on a per-config basis. But for the first iteration we can make an assumption that we pull the fullset of configs each time.

Do we store the list of credentials in a (temporary) storage and then iterate over each? This would help with debuging but we'd need to think of a lifecycle for that data.

Seems like we'd have to, but you all know more about that than me. @edgarrmondragon

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get a first pass on this up and running on Cloud soon-ish?

@tayloramurphy That depends on how well the Cloud model plays with this approach.

@magreenbaum @WillDaSilva,

since we're going to rely on the orchestrator (Cloud) for this, there's a few questions that have to be answered:

On schedule, we need to execute 1+n pipelines. The first one extracts the configurations, whose cardinality is used to generate n jobs

  graph LR;
      cs["meltano invoke (config source) > (temp? storage)"]-->c1[Config 1]-- Inject config via env vars -->r1["meltano run sync-all-shopify"];;
      cs-->c2[Config 2]-- Inject config via env vars -->r2["meltano run sync-all-shopify"];
      cs-->c3[Config 3]-- Inject config via env vars -->r3["meltano run sync-all-shopify"];;
Loading

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edgarrmondragon @magreenbaum @BraedonLeonard @tayloramurphy I left my thoughts about this in a discussion in the infra repository, so as to avoid discussing our infra in this public repository:

config_source:
tap-shopify: tap-postgres--shopify_configs
extractor: tap-shopify
loader: target-s3-parquet
```

This would clearly make it the responsibility of the orchestrator to generate _N_ pipelines.