Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding source-google-analytics-data-api #1462

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ on:
- "source-google-sheets-native/**"
- "source-hubspot-native/**"
- "source-hubspot/**"
- "source-google-analytics-data-api/**"
- "source-notion/**"
- "source-linkedin-pages/**"
- "source-linkedin-ads-v2/**"
Expand Down Expand Up @@ -41,6 +42,7 @@ on:
- "source-google-sheets-native/**"
- "source-hubspot-native/**"
- "source-hubspot/**"
- "source-google-analytics-data-api/**"
- "source-notion/**"
- "source-linkedin-pages/**"
- "source-stripe-native/**"
Expand Down Expand Up @@ -106,6 +108,9 @@ jobs:
type: capture
version: v5
usage_rate: "1.0"
- name: source-google-analytics-data-api
type: capture
version: v3
- name: source-notion
type: capture
version: v2
Expand Down
23 changes: 23 additions & 0 deletions source-google-analytics-data-api/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
credentials:
access_token_sops: ""
auth_type: Client
client_id_sops: ENC[AES256_GCM,data:nKYrLSJa8tX4AIud6zEjAJArzfXZ8nJhbJy1jvrZCQUF+FwY8aSBBjYm2DULDwjFavY71Lg9xs0tBVMztg5ERwreJDv7ZhZmUQ==,iv:xTIsdgelxfuYKqsHa4wWJrAnuVhoZZ5gJMYwphJhqEA=,tag:dS7rZCH1oKE0dkPA8cAX8w==,type:str]
client_secret_sops: ENC[AES256_GCM,data:lTaxOBRDGT1FzNcwhEjPZzB1UJhOdjpFsWviEEMfXkkCdd4=,iv:SzIH9vkXctgh9EO/NAeUfPgZBWRv/0QQyBoVG+b6AQE=,tag:OYcbeln0r4QQhhZWndWuwg==,type:str]
refresh_token_sops: ENC[AES256_GCM,data:S4bW08hyZuMnB+VIExQHon5cAED+CKUw015ZLjw1NGKHwJqYNuhMwhEDwS0Vr4TCq/a5/eJa7vVbJldGZbLvo8b3aJtOSXLmylOt66Gq4TZYXoE1DVAQqIJMAlQg/fO59UsoSXkt2g==,iv:GmgFm8/zEs2/EX8nipgJ65DO2JjGqpqUqTuxbzEpnF4=,tag:1oXaulNxwVcc7VDTn+LwiA==,type:str]
date_ranges_start_date: "2024-04-01"
property_id: "431462875"
window_in_days: 1
sops:
kms: []
gcp_kms:
- resource_id: projects/estuary-theatre/locations/us-central1/keyRings/connector-keyring/cryptoKeys/connector-repository
created_at: "2024-03-21T18:45:08Z"
enc: CiQAdmEdwt1N/pN5+OV1sli/b0qNy+Gc2o2mWE8lhej5FiwhZs8SSQCSobQjwPAxI+9zMwM9cYCzoGQ7f5KGNvHrQHgUxcPL5tivIssUv1QUV5TS6/kMoqr6ElvAop+ETXgE/gvv6KYm2nHv+amBL38=
azure_kv: []
hc_vault: []
age: []
lastmodified: "2024-04-08T20:17:08Z"
mac: ENC[AES256_GCM,data:4cGuunX3pZZu90iOAYn70+BijSebkzHogwGsb0tmCmkSSuDUTecS76lwUtWoEzG3mUpjtA5QdkTlcuG699tL7anjVQp/nm6UONaQM1YKHgGpYjZsiYuZaOrD92v8LKhEIbNxzlU+se0/ahRUKrQzYeSLX4Juprr50H4SQG6PZ4A=,iv:oO6vY8QqzU17j2Ou6DAiiZTSFu4fCedFMRpOyAFfLs4=,tag:wrD0ttB5AoNRg0hvVn2itw==,type:str]
pgp: []
encrypted_suffix: _sops
version: 3.8.1
1,913 changes: 1,913 additions & 0 deletions source-google-analytics-data-api/poetry.lock

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions source-google-analytics-data-api/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[tool.poetry]
name = "source-google-analytics-data-api"
version = "0.1.2"
description = ""
authors = ["Luishfs <[email protected]>"]

[tool.poetry.dependencies]
python = "^3.12"
estuary-cdk = {path="../estuary-cdk", develop = true}
airbyte-cdk = "^0.52"
pydantic = "1.10.14"
pendulum = "^3.0.0"
cryptography = "==37.0.4"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"
pytest-insta = "^0.2.0"
PyJWT = "==2.8.0"
mock = "^5.1.0"
pytest-mock = "^3.12.0"
requests-mock = "^1.11.0"
debugpy = "^1.8.0"
freezegun = "^1.4.0"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from .source import SourceGoogleAnalyticsDataApi

__all__ = ["SourceGoogleAnalyticsDataApi"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import estuary_cdk.pydantic_polyfill # Must be first.

import asyncio
import urllib
import json
from estuary_cdk import shim_airbyte_cdk, flow
from source_google_analytics_data_api import SourceGoogleAnalyticsDataApi

accessTokenBody = {
"grant_type": "authorization_code",
"client_id": "{{{ client_id }}}",
"client_secret": "{{{ client_secret }}}",
"redirect_uri": "{{{ redirect_uri }}}",
"code": "{{{ code }}}",
}


asyncio.run(
shim_airbyte_cdk.CaptureShim(
delegate=SourceGoogleAnalyticsDataApi(),
oauth2=flow.OAuth2Spec(
provider="google",
authUrlTemplate=(
"https://accounts.google.com/o/oauth2/auth?access_type=offline&prompt=consent"
r"&client_id={{#urlencode}}{{{ client_id }}}{{/urlencode}}"
r"&redirect_uri={{#urlencode}}{{{ redirect_uri }}}{{/urlencode}}"
r"&response_type=code"
r"&scope=https://www.googleapis.com/auth/analytics.readonly"
r"&state={{#urlencode}}{{{ state }}}{{/urlencode}}"
),
accessTokenUrlTemplate="https://oauth2.googleapis.com/token",
accessTokenHeaders={},
accessTokenBody=json.dumps(accessTokenBody),
accessTokenResponseMap={
"refresh_token": "/refresh_token",
},
),
schema_inference=False,
).serve()
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import logging
from functools import wraps
from typing import Any, Iterable, Mapping, Optional

import requests

from .utils import API_LIMIT_PER_HOUR


class GoogleAnalyticsApiQuotaBase:
# Airbyte Logger
logger = logging.getLogger("airbyte")
# initial quota placeholder
initial_quota: Optional[Mapping[str, Any]] = None
# the % value cutoff, crossing which will trigger
# setting the scenario values for attrs prior to the 429 error
treshold: float = 0.1
# base attrs
should_retry: Optional[bool] = True
backoff_time: Optional[int] = None
raise_on_http_errors: bool = True
# stop making new slices globally
stop_iter: bool = False
error_message = None
# mapping with scenarios for each quota kind
quota_mapping: Mapping[str, Any] = {
"concurrentRequests": {
"error_pattern": "Exhausted concurrent requests quota.",
"backoff": 30,
"should_retry": True,
"raise_on_http_errors": False,
"stop_iter": False,
},
"tokensPerProjectPerHour": {
"error_pattern": "Exhausted property tokens for a project per hour.",
"backoff": 1800,
"should_retry": True,
"raise_on_http_errors": False,
"stop_iter": False,
"error_message": API_LIMIT_PER_HOUR,
},
"potentiallyThresholdedRequestsPerHour": {
"error_pattern": "Exhausted potentially thresholded requests quota.",
"backoff": 1800,
"should_retry": True,
"raise_on_http_errors": False,
"stop_iter": False,
"error_message": API_LIMIT_PER_HOUR,
},
# TODO: The next scenarios are commented out for now.
# When we face with one of these at least 1 time,
# we should be able to uncomment the one matches the criteria
# and fill-in the `error_pattern` to track that quota as well.
# IMPORTANT: PLEASE DO NOT REMOVE the scenario down bellow!
# 'tokensPerDay': {
# 'error_pattern': "___",
# "backoff": None,
# "should_retry": False,
# "raise_on_http_errors": False,
# "stop_iter": True,
# },
# 'tokensPerHour': {
# 'error_pattern': "___",
# "backoff": 1800,
# "should_retry": True,
# "raise_on_http_errors": False,
# "stop_iter": False,
# },
# 'serverErrorsPerProjectPerHour': {
# 'error_pattern': "___",
# "backoff": 3600,
# "should_retry": True,
# "raise_on_http_errors": False,
# "stop_iter": False,
# },
}

def _get_known_quota_list(self) -> Iterable[str]:
return self.quota_mapping.keys()

def _get_initial_quota_value(self, quota_name: str) -> int:
init_remaining = self.initial_quota.get(quota_name).get("remaining")
# before the 429 is hit the `remaining` could become -1 or 0
return 1 if init_remaining <= 0 else init_remaining

def _get_quota_name_from_error_message(self, error_msg: str) -> Optional[str]:
for quota, value in self.quota_mapping.items():
if value.get("error_pattern") in error_msg:
return quota
return None

def _get_known_quota_from_response(self, property_quota: Mapping[str, Any]) -> Mapping[str, Any]:
current_quota = {}
for quota in property_quota.keys():
if quota in self._get_known_quota_list():
current_quota.update(**{quota: property_quota.get(quota)})
return current_quota

def _set_retry_attrs_for_quota(self, quota_name: str) -> None:
quota = self.quota_mapping.get(quota_name, {})
if quota:
self.should_retry = quota.get("should_retry")
self.raise_on_http_errors = quota.get("raise_on_http_errors")
self.stop_iter = quota.get("stop_iter")
self.backoff_time = quota.get("backoff")
self.error_message = quota.get("error_message")

def _set_default_retry_attrs(self) -> None:
self.should_retry = True
self.backoff_time = None
self.raise_on_http_errors = True
self.stop_iter = False

def _set_initial_quota(self, current_quota: Optional[Mapping[str, Any]] = None) -> None:
if not self.initial_quota:
self.initial_quota = current_quota

def _check_remaining_quota(self, current_quota: Mapping[str, Any]) -> None:
for quota_name, quota_value in current_quota.items():
total_available = self._get_initial_quota_value(quota_name)
remaining: int = quota_value.get("remaining")
remaining_percent: float = remaining / total_available
# make an early stop if we faced with the quota that is going to run out
if remaining_percent <= self.treshold:
self.logger.warning(f"The `{quota_name}` quota is running out of tokens. Available {remaining} out of {total_available}.")
self._set_retry_attrs_for_quota(quota_name)
return None
else:
if self.error_message is None:
pass
else:
self.logger.warning(f"{self.error_message}")

def _check_for_errors(self, response: requests.Response) -> None:
try:
# revert to default values after successul retry
self._set_default_retry_attrs()
error = response.json().get("error")
if error:
quota_name = self._get_quota_name_from_error_message(error.get("message"))
if quota_name:
self._set_retry_attrs_for_quota(quota_name)
self.logger.warn(f"The `{quota_name}` quota is exceeded!")
return None
except AttributeError as attr_e:
self.logger.warning(
f"`GoogleAnalyticsApiQuota._check_for_errors`: Received non JSON response from the API. Full error: {attr_e}. Bypassing."
)
pass
except Exception as e:
self.logger.fatal(f"Other `GoogleAnalyticsApiQuota` error: {e}")
raise


class GoogleAnalyticsApiQuota(GoogleAnalyticsApiQuotaBase):
def _check_quota(self, response: requests.Response):
# try get json from response
try:
parsed_response = response.json()
except AttributeError as e:
self.logger.warn(
f"`GoogleAnalyticsApiQuota._check_quota`: Received non JSON response from the API. Full error: {e}. Bypassing."
)
parsed_response = {}
# get current quota
property_quota: dict = parsed_response.get("propertyQuota")
if property_quota:
# return default attrs values once successfully retried
# or until another 429 error is hit
self._set_default_retry_attrs()
# reduce quota list to known kinds only
current_quota = self._get_known_quota_from_response(property_quota)
if current_quota:
# save the initial quota
self._set_initial_quota(current_quota)
# check for remaining quota
self._check_remaining_quota(current_quota)
else:
self._check_for_errors(response)

def handle_quota(self) -> None:
"""
The function decorator is used to integrate with the `should_retry` method,
or any other method that provides early access to the `response` object.
"""

def decorator(func):
@wraps(func)
def wrapper_handle_quota(*args, **kwargs):
# find the requests.Response inside args list
for arg in args:
response = arg if isinstance(arg, requests.models.Response) else None
# check for the quota
self._check_quota(response)
# return actual function
return func(*args, **kwargs)

return wrapper_handle_quota

return decorator
Loading
Loading