Skip to content

Commit

Permalink
source-facebook-marketing: upgrade to estuary-cdk and fix ads_insights
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Mar 5, 2024
1 parent d279f17 commit f36ec00
Show file tree
Hide file tree
Showing 76 changed files with 950 additions and 1,577 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ jobs:
- connector: source-shopify
connector_type: capture
python: true
- connector: source-facebook-marketing
connector_type: capture
python: true
- connector: source-hubspot
connector_type: capture
python: true
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:
- "estuary-cdk/**"
- "source-airtable/**"
- "source-asana/**"
- "source-facebook-marketing/**"
- "source-gladly/**"
- "source-google-ads/**"
- "source-google-sheets-native/**"
Expand All @@ -17,6 +18,7 @@ on:
- "estuary-cdk/**"
- "source-airtable/**"
- "source-asana/**"
- "source-facebook-marketing/**"
- "source-gladly/**"
- "source-google-ads/**"
- "source-google-sheets-native/**"
Expand All @@ -39,6 +41,9 @@ jobs:
- name: source-asana
type: capture
version: v1
- name: source-facebook-marketing
type: capture
version: v4
- name: source-hubspot-native
type: capture
version: v1
Expand Down
16 changes: 0 additions & 16 deletions source-facebook-marketing/__main__.py

This file was deleted.

1,163 changes: 733 additions & 430 deletions source-facebook-marketing/poetry.lock

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions source-facebook-marketing/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
[tool.poetry]
name = "source-facebook-marketing-estuary"
name = "source_facebook_marketing"
version = "0.1.0"
description = ""
authors = ["Jonathan Wihl <[email protected]>"]

[tool.poetry.dependencies]
source_facebook_marketing = { path = "source_facebook_marketing", develop = true}
airbyte-cdk = "^0.58.9"
flow-sdk = {path="../python", develop = true}
jsonlines = "^4.0.0"
mypy = "^1.5"
orjson = "^3.9.7"
pydantic = "1.10.12"
python = ">=3.11,<3.12"
requests = "^2.31.0"
airbyte-cdk = "^0.52"
estuary-cdk = {path="../estuary-cdk", develop = true}
python = "^3.11"
types-requests = "^2.31"
pytest = "^7.4.3"
facebook-business = "17.0.0"
cached-property = "^1.5.2"
pendulum = "^2"

[tool.poetry.group.dev.dependencies]
debugpy = "^1.8.0"
mypy = "^1.8.0"
pytest = "^7.4.3"
pytest-insta = "^0.2.0"
pytest-insta = "^0.3.0"
requests-mock = "^1.11.0"
pytest-mock = "^3.12.0"

[build-system]
requires = ["poetry-core"]
Expand Down

This file was deleted.

18 changes: 0 additions & 18 deletions source-facebook-marketing/source_facebook_marketing/BOOTSTRAP.md

This file was deleted.

17 changes: 0 additions & 17 deletions source-facebook-marketing/source_facebook_marketing/Dockerfile

This file was deleted.

184 changes: 76 additions & 108 deletions source-facebook-marketing/source_facebook_marketing/README.md
Original file line number Diff line number Diff line change
@@ -1,130 +1,98 @@
# Facebook Marketing Source
# Structure

This is the repository for the Facebook Marketing source connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/facebook-marketing).
- api.py - everything related to FB API, error handling, throttle, call rate
- source.py - mainly check and discovery logic
- spec.py - connector's specification
- streams/ - everything related to streams, usually it is a module, but we have too much for one file
- base_streams.py - all general logic should go there, you define class of streams as general as possible
- streams.py - concrete classes, one for each stream, here should be only declarative logic and small overrides
- base_insights_streams.py - piece of general logic for big subclass of streams - insight streams

## Local development
- async_job.py - logic about asynchronous jobs
- async_job_manager.py - you will find everything about managing groups of async job here
- common.py - some utils

### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**
# FB findings

#### Minimum Python version required `= 3.9.0`
## API

#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
```
FB Marketing API provides three ways to interact:
- single request
- batch request
- async request

This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
FB provides a `facebook_business` library, which is an auto generated code from their API spec.
We use it because it provides:
- nice error handling
- batch requests helpers
- auto serialize/de-serialize responses to FB objects
- transparently iterates over paginated response

Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.
## Single request
Is the most common way to request something.
We use the two-steps strategy to read most of the data:
1. first request to get list of IDs (filtered by cursor if supported)
2. loop over list of ids and request details for each ID, this step sometimes use batch request

#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:source-facebook-marketing:build
```
## Batch request
is a batch of requests serialized in the body of a single request.
The response of such request will be a list of responses for each individual request (body, headers, etc).
FB lib use interface with callbacks, batch object will call corresponding (success or failure) callback for each type of response.
FB lib also catch fatal errors from the API (500, …) and instead of calling `on_failure` callback will return a new batch object with list of failed requests.
FB API limit number of requests in a single batch to 50.

#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/facebook-marketing)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_facebook_marketing/spec.json` file.
Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.
**Important note**:

**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source facebook-marketing test creds`
and place them into `secrets/config.json`.
Batch object doesn’t perform pagination of individual responses,
so you may lose data if the response have pagination.

### Locally running the connector
```
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```
## Async Request
FB recommends to use Async Requests when common requests begin to timeout.
Async Request is a 3-step process:
- create async request
- check its status (in a loop)
- fetch response when status is done

### Locally running the connector docker image
### Combination with batch
Unfortunately all attempts to create multiple async requests in a single batch failed - `ObjectParser` from FB lib don’t know how to parse `AdReportRun` response.
Instead, we use batch to check status of multiple async jobs at once (respecting batch limit of 50)

#### Build
First, make sure you build the latest Docker image:
```
docker build . -t airbyte/source-facebook-marketing:dev
```
### Insights
We use Async Requests to read Insights, FB API for this called `AdReportRun`.
Insights are reports based on ads performance, you can think about it as an SQL query:

You can also build the connector image via Gradle:
```sql
select <fields> from <edge_object> where <filter> group by <level>, <breakdowns>;
```
./gradlew :airbyte-integrations:connectors:source-facebook-marketing:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.

#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/source-facebook-marketing:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-facebook-marketing:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-facebook-marketing:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-facebook-marketing:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .'[tests]'
```
### Unit Tests
To run unit tests locally, from the connector directory run:
```
python -m pytest unit_tests
```
Our insights by default look like this:

### Integration Tests
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector).
#### Custom Integration tests
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
```
python -m pytest integration_tests
```
#### Acceptance Tests
Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
```sql
select <all possible fields> from AdAccount(me) where start_date = …. and end_date = …. group by ad, <breakdown>
```
docker build . --no-cache -t airbyte/source-facebook-marketing:dev \
&& python -m pytest -p connector_acceptance_test.plugin
```
To run your integration tests with docker

### Using gradle to run tests
All commands should be run from airbyte project root.
To run unittest run:
```
./gradlew :airbyte-integrations:connectors:source-facebook-marketing:unitTest
```
To run acceptance and custom integration tests run:
FB will perform calculations on its backed with various complexity depending on fields we ask, most heavy fields are unique metrics: `unique_clicks`, `unique_actions`, etc.

Additionally, Insights has fields that show stats from last N days, so-called attribution window, it can be `1d`, `7d`, and `28d`, by default we use all of them.
According to FB docs insights data can be changed up to 28 days after it has being published.
That's why we re-read 28 days in the past from now each time we sync insight stream.

When amount of data and computation is too big for FB servers to handle the jobs start to failing. Throttle and call rate metrics don’t reflect this problem and can’t be used to monitor.
Instead, we use the following technic.
Taking into account that we group by ad we can safely change our from table to smaller dataset/edge_object (campaign, adset, ad).
Empirically we figured out that account level insights contains data for all campaigns from last 28 days and, very rarely, campaigns that didn’t even start yet.
To solve this mismatch, at least partially, we get list of campaigns for last 28 days from the insight start date.
The current algorithm looks like this:

```
./gradlew :airbyte-integrations:connectors:source-facebook-marketing:IntegrationTest
create async job for account level insight for the day A
if async job failed:
restart it
if async job failed again:
get list of campaigns for last 28 day
create async job for each campaign and day A
```
If campaign-level async job fails second time we split it by `AdSets` or `Ads`.

## Dependency Management
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
We split dependencies between two groups, dependencies that are:
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
* required for the testing need to go to `TEST_REQUIREMENTS` list

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request
1. Pat yourself on the back for being an awesome contributor
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master
Reports from users show that sometimes async job can stuck for very long time (hours+),
and because FB doesn’t provide any canceling API after 1 hour of waiting we start another job.
20 changes: 20 additions & 0 deletions source-facebook-marketing/source_facebook_marketing/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import estuary_cdk.pydantic_polyfill # Must be first.

import asyncio
from estuary_cdk import shim_airbyte_cdk, flow
from source_facebook_marketing import SourceFacebookMarketing

asyncio.run(
shim_airbyte_cdk.CaptureShim(
delegate=SourceFacebookMarketing(),
oauth2=flow.OAuth2Spec(
provider="facebook",
authUrlTemplate="https://www.facebook.com/v19.0/dialog/oauth?client_id={{#urlencode}}{{{ client_id }}}{{/urlencode}}&redirect_uri={{#urlencode}}{{{ redirect_uri }}}{{/urlencode}}&state={{#urlencode}}{{{ state }}}{{/urlencode}}&scope=ads_management,ads_read,read_insights,business_management",
accessTokenResponseMap={"access_token": "/access_token"},
accessTokenUrlTemplate="https://graph.facebook.com/v19.0/oauth/access_token?client_id={{#urlencode}}{{{ client_id }}}{{/urlencode}}&client_secret={{#urlencode}}{{{ client_secret }}}{{/urlencode}}&code={{#urlencode}}{{{ code }}}{{/urlencode}}&redirect_uri={{#urlencode}}{{{ redirect_uri }}}{{/urlencode}}",
accessTokenBody="", # Uses query arguments.
accessTokenHeaders={},
),
schema_inference=False,
).serve()
)
Loading

0 comments on commit f36ec00

Please sign in to comment.