Skip to content

Commit

Permalink
API to v14 and respective minor schema updates
Browse files Browse the repository at this point in the history
SDK to 0.29.0
Python to >=3.7.1 (required by SDK update)
README clarifications
Linting (mostly removing unused imports)
Implement get_url_params instead of monolithic path
  • Loading branch information
sebastianswms committed Jul 25, 2023
1 parent a3fcf93 commit 49b95ad
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 123 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ THIS IS NOT READY FOR PRODUCTION. Bearer tokens sometimes slip out to logs. Use
| start_date | True | 2022-03-24T00:00:00Z (Today-7d) | Date to start our search from, applies to Streams where there is a filter date. Note that Google responds to Data in buckets of 1 Day increments |
| end_date | True | 2022-03-31T00:00:00Z (Today) | Date to end our search on, applies to Streams where there is a filter date. Note that the query is BETWEEN start_date AND end_date |

Note that although customer IDs are often displayed in the Google Ads UI in the format 123-456-7890, they should be provided to the tap in the format 1234567890, with no dashes.

### Get refresh token
1. GET https://accounts.google.com/o/oauth2/v2/auth?response_type=code&client_id=client_id&redirect_uri=http://127.0.0.1&scope=https://www.googleapis.com/auth/adwords&state=autoidm&access_type=offline&prompt=select_account&include_granted_scopes=true
1. POST https://www.googleapis.com/oauth2/v4/token?code={code}&client_id={client_id}&client_secret={client_secret}&redirect_uri=http://127.0.0.1&grant_type=authorization_code
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ keywords = [
license = "Apache 2.0"

[tool.poetry.dependencies]
python = "<3.11,>=3.6.2"
python = "<3.11,>=3.7.1"
requests = "^2.25.1"
singer-sdk = "0.3.17"
singer-sdk = "0.29.0"

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
Expand Down
58 changes: 15 additions & 43 deletions tap_googleads/client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
"""REST client handling, including GoogleAdsStream base class."""

import requests
from urllib.parse import urlencode, urljoin
from pathlib import Path
from typing import Any, Dict, Optional, Union, List, Iterable
from typing import Any, Dict, Optional, Iterable

from memoization import cached

from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.streams import RESTStream
from singer_sdk.exceptions import FatalAPIError, RetriableAPIError
from singer_sdk.pagination import JSONPathPaginator

from tap_googleads.auth import GoogleAdsAuthenticator
from tap_googleads.utils import replicate_pk_at_root
Expand All @@ -20,10 +20,10 @@
class GoogleAdsStream(RESTStream):
"""GoogleAds stream class."""

url_base = "https://googleads.googleapis.com/v12"
url_base = "https://googleads.googleapis.com/v14"

records_jsonpath = "$[*]" # Or override `parse_response`.
next_page_token_jsonpath = "$.nextPageToken" # Or override `get_next_page_token`.
next_page_token_jsonpath = "$.nextPageToken"
primary_keys_jsonpaths = None
_LOG_REQUEST_METRIC_URLS: bool = True

Expand All @@ -32,11 +32,13 @@ class GoogleAdsStream(RESTStream):
def authenticator(self) -> GoogleAdsAuthenticator:
"""Return a new authenticator object."""
base_auth_url = "https://www.googleapis.com/oauth2/v4/token"
# Silly way to do parameters but it works
auth_url = base_auth_url + f"?refresh_token={self.config['refresh_token']}"
auth_url = auth_url + f"&client_id={self.config['client_id']}"
auth_url = auth_url + f"&client_secret={self.config['client_secret']}"
auth_url = auth_url + f"&grant_type=refresh_token"
auth_params = {
"refresh_token": self.config["refresh_token"],
"client_id": self.config["client_id"],
"client_secret": self.config["client_secret"],
"grant_type": "refresh_token",
}
auth_url = urljoin(base_auth_url, "?" + urlencode(auth_params))
return GoogleAdsAuthenticator(stream=self, auth_endpoint=auth_url)

@property
Expand All @@ -49,23 +51,8 @@ def http_headers(self) -> dict:
headers["login-customer-id"] = self.config["login_customer_id"]
return headers

def get_next_page_token(
self, response: requests.Response, previous_token: Optional[Any]
) -> Optional[Any]:
"""Return a token for identifying next page or None if no more pages."""
# TODO: If pagination is required, return a token which can be used to get the
# next page. If this is the final page, return "None" to end the
# pagination loop.
if self.next_page_token_jsonpath:
all_matches = extract_jsonpath(
self.next_page_token_jsonpath, response.json()
)
first_match = next(iter(all_matches), None)
next_page_token = first_match
else:
next_page_token = None

return next_page_token
def get_new_paginator(self) -> JSONPathPaginator:
return JSONPathPaginator(self.next_page_token_jsonpath)

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
Expand All @@ -79,7 +66,7 @@ def get_url_params(
params["order_by"] = self.replication_key
return params

def validate_response(self, response):
def validate_22response(self, response):
# Still catch error status codes
if response.status_code == 403:
msg = (
Expand Down Expand Up @@ -145,21 +132,6 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
f"disabled after the API that lists customers is called. {e=}"
)

def prepare_request_payload(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Optional[dict]:
"""Prepare the data payload for the REST API request.
By default, no payload will be sent (return None).
"""
# TODO: Delete this method if no payload is required. (Most REST APIs.)
return None

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
# TODO: Parse response body and return a set of records.
yield from extract_jsonpath(self.records_jsonpath, input=response.json())

def post_process(self, row: dict, context: Optional[dict] = None) -> Optional[dict]:
"""As needed, append or transform raw data to match expected structure."""
return replicate_pk_at_root(row, self.primary_keys_jsonpaths)
Expand Down
67 changes: 67 additions & 0 deletions tap_googleads/schemas/customer_hierarchy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"type": "object",
"properties": {
"customerClient": {
"type": [
"object",
"null"
],
"properties": {
"resourceName": {
"type": [
"string",
"null"
]
},
"clientCustomer": {
"type": [
"string",
"null"
]
},
"level": {
"type": [
"string",
"null"
]
},
"timeZone": {
"type": [
"string",
"null"
]
},
"manager": {
"type": [
"boolean",
"null"
]
},
"descriptiveName": {
"type": [
"string",
"null"
]
},
"currencyCode": {
"type": [
"string",
"null"
]
},
"id": {
"type": [
"string",
"null"
]
}
}
},
"_sdc_primary_key": {
"type": [
"string",
"null"
]
}
}
}
103 changes: 43 additions & 60 deletions tap_googleads/streams.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""Stream type classes for tap-googleads."""

from pathlib import Path
from typing import Any, Dict, Optional, Union, List, Iterable
from typing import Any, Dict, Optional, Iterable
from datetime import datetime

from singer_sdk import typing as th # JSON Schema typing helpers

from tap_googleads.client import GoogleAdsStream
from tap_googleads.auth import GoogleAdsAuthenticator

SCHEMAS_DIR = Path(__file__).parent / Path("./schemas")

Expand Down Expand Up @@ -42,20 +41,27 @@ class CustomerHierarchyStream(GoogleAdsStream):

# TODO add a seperate stream to get the Customer information and return i
rest_method = "POST"
path = "/customers/{client_id}/googleAds:search"
records_jsonpath = "$.results[*]"
name = "customer_hierarchy"
primary_keys_jsonpaths = ["customerClient.id"]
primary_keys = ["_sdc_primary_key"]
replication_key = None
parent_stream_type = AccessibleCustomers
schema_filepath = SCHEMAS_DIR / "customer_hierarchy.json"

@property
def path(self):
# Paramas
path = "/customers/{client_id}"
path = path + "/googleAds:search"
path = path + "?pageSize=10000"
path = path + f"&query={self.gaql}"
return path
def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
params = super().get_url_params(context, next_page_token)
params["pageSize"] = "10000"
params["query"] = self.gaql
return params

@property
def gaql(self):
return """
SELECT customer_client.client_customer
SELECT customer_client.client_customer
, customer_client.level
, customer_client.manager
, customer_client.descriptive_name
Expand All @@ -66,29 +72,6 @@ def gaql(self):
WHERE customer_client.level <= 1
"""

records_jsonpath = "$.results[*]"
name = "customer_hierarchy"
primary_keys_jsonpaths = ["customerClient.id"]
primary_keys = ["_sdc_primary_key"]
replication_key = None
parent_stream_type = AccessibleCustomers
schema = th.PropertiesList(
th.Property(
"customerClient",
th.ObjectType(
th.Property("resourceName", th.StringType),
th.Property("clientCustomer", th.StringType),
th.Property("level", th.StringType),
th.Property("timeZone", th.StringType),
th.Property("manager", th.BooleanType),
th.Property("descriptiveName", th.StringType),
th.Property("currencyCode", th.StringType),
th.Property("id", th.StringType),
),
),
th.Property("_sdc_primary_key", th.StringType),
).to_dict()

# Goal of this stream is to send to children stream a dict of
# login-customer-id:customer-id to query for all queries downstream
def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
Expand All @@ -115,7 +98,7 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
for row in self.request_records(context):
row = self.post_process(row, context)
# Don't search Manager accounts as we can't query them for everything
if row["customerClient"]["manager"] == True:
if row["customerClient"]["manager"] is True:
continue
yield row

Expand All @@ -129,14 +112,22 @@ class GeotargetsStream(GoogleAdsStream):

rest_method = "POST"

@property
def path(self):
# Paramas
path = "/customers/{login_customer_id}"
path = path + "/googleAds:search"
path = path + "?pageSize=10000"
path = path + f"&query={self.gaql}"
return path
records_jsonpath = "$.results[*]"
name = "geo_target_constant"
primary_keys_jsonpaths = ["geoTargetConstant.resourceName"]
primary_keys = ["_sdc_primary_key"]
replication_key = None
schema_filepath = SCHEMAS_DIR / "geo_target_constant.json"
parent_stream_type = None # Override ReportsStream default as this is a constant
path = "/customers/{login_customer_id}/googleAds:search"

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
params = super().get_url_params(context, next_page_token)
params["pageSize"] = "10000"
params["query"] = self.gaql
return params

gaql = """
SELECT geo_target_constant.canonical_name
Expand All @@ -147,18 +138,20 @@ def path(self):
, geo_target_constant.target_type
FROM geo_target_constant
"""
records_jsonpath = "$.results[*]"
name = "geo_target_constant"
primary_keys_jsonpaths = ["geoTargetConstant.resourceName"]
primary_keys = ["_sdc_primary_key"]
replication_key = None
schema_filepath = SCHEMAS_DIR / "geo_target_constant.json"
parent_stream_type = None # Override ReportsStream default as this is a constant


class ReportsStream(GoogleAdsStream):
rest_method = "POST"
parent_stream_type = CustomerHierarchyStream
path = "/customers/{client_id}/googleAds:search"

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
params = super().get_url_params(context, next_page_token)
params["pageSize"] = "10000"
params["query"] = self.gaql
return params

@property
def gaql(self):
Expand All @@ -184,15 +177,6 @@ def end_date(self):
def between_filter(self):
return f"BETWEEN '{self.start_date}' AND '{self.end_date}'"

@property
def path(self):
# Paramas
path = "/customers/{client_id}"
path = path + "/googleAds:search"
path = path + "?pageSize=10000"
path = path + f"&query={self.gaql}"
return path


class CampaignsStream(ReportsStream):
"""Define custom stream."""
Expand Down Expand Up @@ -234,7 +218,6 @@ def gaql(self):
, ad_group.labels
, ad_group.id
, ad_group.final_url_suffix
, ad_group.explorer_auto_optimizer_setting.opt_in
, ad_group.excluded_parent_asset_field_types
, ad_group.effective_target_roas_source
, ad_group.effective_target_roas
Expand Down
Loading

0 comments on commit 49b95ad

Please sign in to comment.