From eff08014893403f008bfa45f6aae246b3e2df4a5 Mon Sep 17 00:00:00 2001 From: Simon K <6615834+simon-20@users.noreply.github.com> Date: Fri, 16 Aug 2024 16:42:56 +0100 Subject: [PATCH] feat: add prometheus metrics --- .github/workflows/develop.yml | 12 +++- deployment/deployment.yml | 38 ++++++++++- nginx-reverse-proxy/Dockerfile | 3 + nginx-reverse-proxy/nginx.conf | 120 +++++++++++++++++++++++++++++++++ requirements.in | 1 + requirements.txt | 6 +- requirements_dev.txt | 18 ++--- src/constants/config.py | 35 +++++++++- src/handler.py | 22 ++++++ src/library/clean.py | 5 ++ src/library/flatten.py | 3 + src/library/lakify.py | 3 + src/library/prometheus.py | 18 +++++ src/library/refresher.py | 11 +++ src/library/solrize.py | 3 + src/library/validate.py | 5 ++ 16 files changed, 281 insertions(+), 22 deletions(-) create mode 100644 nginx-reverse-proxy/Dockerfile create mode 100644 nginx-reverse-proxy/nginx.conf create mode 100644 src/library/prometheus.py diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index b1c13f8..500bc40 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -68,13 +68,21 @@ jobs: username: ${{ env.REGISTRY_USERNAME }} password: ${{ env.REGISTRY_PASSWORD }} - - name: 'Build and push image' + - name: 'Build and push main image' run: | IMAGE_NAME=$REGISTRY_LOGIN_SERVER/$NAME-$ENVIRONMENT:$TAG echo "IMAGE_NAME=$IMAGE_NAME" >> $GITHUB_ENV docker build . -f Dockerfile -t $IMAGE_NAME docker push $IMAGE_NAME + - name: 'Build and push nginx reverse proxy image' + run: | + htpasswd -c -b ./nginx-reverse-proxy/htpasswd prom "${{ secrets.PROM_NGINX_REVERSE_PROXY_PASSWORD }}" + NGINX_IMAGE_NAME=$REGISTRY_LOGIN_SERVER/prom-nginx-reverse-proxy-$ENVIRONMENT:$TAG + echo "NGINX_IMAGE_NAME=$NGINX_IMAGE_NAME" >> $GITHUB_ENV + docker build nginx-reverse-proxy -f nginx-reverse-proxy/Dockerfile -t $NGINX_IMAGE_NAME + docker push $NGINX_IMAGE_NAME + - name: 'Delete existing container group' uses: 'azure/CLI@v1' with: @@ -91,6 +99,8 @@ jobs: sed -i ''s^#IMAGE_NAME#^$IMAGE_NAME^g'' ./deployment/deployment.yml + sed -i ''s^#NGINX_IMAGE_NAME#^$NGINX_IMAGE_NAME^g'' ./deployment/deployment.yml + sed -i ''s^#REGISTRY_LOGIN_SERVER#^$REGISTRY_LOGIN_SERVER^g'' ./deployment/deployment.yml sed -i ''s^#REGISTRY_USERNAME#^$REGISTRY_USERNAME^g'' ./deployment/deployment.yml sed -i ''s^#REGISTRY_PASSWORD#^$REGISTRY_PASSWORD^g'' ./deployment/deployment.yml diff --git a/deployment/deployment.yml b/deployment/deployment.yml index 8a70a84..3b7c56c 100644 --- a/deployment/deployment.yml +++ b/deployment/deployment.yml @@ -18,7 +18,7 @@ properties: # Properties of container group properties: # Properties of an instance resources: # Resource requirements of the instance requests: - memoryInGB: 4 + memoryInGB: 3.7 cpu: 0.5 image: '#IMAGE_NAME#' # Container image used to create the instance command: @@ -61,7 +61,7 @@ properties: # Properties of container group properties: resources: requests: - memoryInGB: 4 + memoryInGB: 3.7 cpu: 0.1 image: '#IMAGE_NAME#' command: @@ -292,3 +292,37 @@ properties: # Properties of container group secureValue: '#COMMSHUB_KEY#' - name: LOG_LEVEL secureValue: '#LOG_LEVEL#' + - name: 'nginx-proxy-for-prometheus' + properties: + image: "#NGINX_IMAGE_NAME#" + ports: + - port: 9158 + protocol: TCP + - port: 9159 + protocol: TCP + - port: 9160 + protocol: TCP + - port: 9161 + protocol: TCP + - port: 9162 + protocol: TCP + - port: 9163 + protocol: TCP + resources: + requests: + cpu: 0.1 + memoryInGB: 0.6 + ipAddress: + type: "public" + dnsNameLabel: "#NAME#-#ENVIRONMENT#-1" + ports: + - port: 9158 + - port: 9159 + - port: 9160 + ipAddress: # split across two IPs because there is hard limit of 5-public ports per IP + type: "public" + dnsNameLabel: "#NAME#-#ENVIRONMENT#-2" + ports: + - port: 9161 + - port: 9162 + - port: 9163 diff --git a/nginx-reverse-proxy/Dockerfile b/nginx-reverse-proxy/Dockerfile new file mode 100644 index 0000000..b16441a --- /dev/null +++ b/nginx-reverse-proxy/Dockerfile @@ -0,0 +1,3 @@ +FROM nginx +COPY nginx.conf /etc/nginx/nginx.conf +COPY htpasswd /etc/nginx/htpasswd diff --git a/nginx-reverse-proxy/nginx.conf b/nginx-reverse-proxy/nginx.conf new file mode 100644 index 0000000..01201cb --- /dev/null +++ b/nginx-reverse-proxy/nginx.conf @@ -0,0 +1,120 @@ +user nginx; +worker_processes auto; + +error_log /var/log/nginx/error.log notice; +pid /var/run/nginx.pid; + + +events { + worker_connections 1024; +} + + +http { + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + + keepalive_timeout 65; + + gzip on; + + server { + listen 9158; + listen [::]:9158; + + root /var/www/html; + + server_name _; + + location / { + proxy_pass http://localhost:9091; + } + + auth_basic "Unified Pipelne Metrics Exporter - Refresh"; + auth_basic_user_file htpasswd; + } + + server { + listen 9159; + listen [::]:9159; + + root /var/www/html; + + server_name _; + + location / { + proxy_pass http://localhost:9092; + } + + auth_basic "Unified Pipelne Metrics Exporter - Validate"; + auth_basic_user_file htpasswd; + } + + server { + listen 9160; + listen [::]:9160; + + root /var/www/html; + + server_name _; + + location / { + proxy_pass http://localhost:9093; + } + + auth_basic "Unified Pipelne Metrics Exporter - Clean"; + auth_basic_user_file htpasswd; + } + + server { + listen 9161; + listen [::]:9161; + + root /var/www/html; + + server_name _; + + location / { + proxy_pass http://localhost:9094; + } + + auth_basic "Unified Pipelne Metrics Exporter - Flatten"; + auth_basic_user_file htpasswd; + } + + server { + listen 9162; + listen [::]:9162; + + root /var/www/html; + + server_name _; + + location / { + proxy_pass http://localhost:9095; + } + + auth_basic "Unified Pipelne Metrics Exporter - Lakify"; + auth_basic_user_file htpasswd; + } + + server { + listen 9163; + listen [::]:9163; + + root /var/www/html; + + server_name _; + + location / { + proxy_pass http://localhost:9096; + } + + auth_basic "Unified Pipelne Metrics Exporter - Solrize"; + auth_basic_user_file htpasswd; + } +} + diff --git a/requirements.in b/requirements.in index 55998e5..c8dab49 100644 --- a/requirements.in +++ b/requirements.in @@ -9,3 +9,4 @@ requests pysolr chardet python-dateutil +prometheus-client diff --git a/requirements.txt b/requirements.txt index 4394169..855d465 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ # -# This file is autogenerated by pip-compile with Python 3.11 +# This file is autogenerated by pip-compile with Python 3.12 # by the following command: # -# pip-compile requirements.in +# pip-compile # azure-core==1.15.0 # via @@ -39,6 +39,8 @@ msrest==0.6.21 # azure-storage-queue oauthlib==3.2.2 # via requests-oauthlib +prometheus-client==0.20.0 + # via -r requirements.in psycopg2==2.9.6 # via -r requirements.in pycparser==2.21 diff --git a/requirements_dev.txt b/requirements_dev.txt index ee2f0ab..df4b711 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.10 +# This file is autogenerated by pip-compile with Python 3.12 # by the following command: # # pip-compile --output-file=requirements_dev.txt requirements_dev.in @@ -35,8 +35,6 @@ cryptography==42.0.8 # via # azure-storage-blob # azure-storage-queue -exceptiongroup==1.2.1 - # via pytest flake8==7.0.0 # via # -r requirements_dev.in @@ -82,6 +80,8 @@ platformdirs==4.2.2 # via black pluggy==1.5.0 # via pytest +prometheus-client==0.20.0 + # via -r requirements.in psycopg2==2.9.9 # via -r requirements.in pycodestyle==2.11.1 @@ -114,14 +114,6 @@ six==1.16.0 # azure-core # isodate # python-dateutil -tomli==2.0.1 - # via - # black - # build - # flake8-pyproject - # mypy - # pip-tools - # pytest types-psycopg2==2.9.21.20240417 # via -r requirements_dev.in types-python-dateutil==2.9.0.20240316 @@ -129,9 +121,7 @@ types-python-dateutil==2.9.0.20240316 types-requests==2.32.0.20240712 # via -r requirements_dev.in typing-extensions==4.12.2 - # via - # black - # mypy + # via mypy urllib3==2.2.2 # via # requests diff --git a/src/constants/config.py b/src/constants/config.py index b7d483d..a00f3bf 100644 --- a/src/constants/config.py +++ b/src/constants/config.py @@ -45,6 +45,13 @@ DOCUMENT_SAFETY_PERCENTAGE=50, # Maximum number of blobs to delete in a single request when cleaning up blob containers MAX_BLOB_DELETE=250, + PROM_PORT=9091, + PROM_METRIC_DEFS=[ + ("registered_publishers", "The number of publishers on the CKAN Registry"), + ("registered_datasets", "The number of datasets on the CKAN Registry"), + ("datasets_changed", "The number of changed datasets that have been changed"), + ("datasets_to_download", "The number of datasets that need re-downloading"), + ], ), VALIDATION=dict( # Number of parallel processes to run the validation loop with @@ -64,18 +71,36 @@ SAFETY_CHECK_THRESHOLD=100, # Hours SAFETY_CHECK_PERIOD=2, + PROM_PORT=9092, + PROM_METRIC_DEFS=[ + ("new_flagged_publishers", "The number of publishers that have been newly flagged"), + ("datasets_to_validate", "The number of datasets that need validating"), + ], ), CLEAN=dict( # Number of parallel processes to run the clean loop with - PARALLEL_PROCESSES=1 + PARALLEL_PROCESSES=1, + PROM_PORT=9093, + PROM_METRIC_DEFS=[ + ("valid_datasets_to_progress", "The number of valid datasets to progress to flatten stage"), + ("invalid_datasets_to_clean", "The number of invalid datasets that need cleaning"), + ], ), FLATTEN=dict( # Number of parallel processes to run the flatten loop with - PARALLEL_PROCESSES=1 + PARALLEL_PROCESSES=1, + PROM_PORT=9094, + PROM_METRIC_DEFS=[ + ("datasets_to_flatten", "The number of datasets that need flattening"), + ], ), LAKIFY=dict( # Number of parallel processes to run the lakify loop with - PARALLEL_PROCESSES=10 + PARALLEL_PROCESSES=10, + PROM_PORT=9095, + PROM_METRIC_DEFS=[ + ("datasets_to_lakify", "The number of datasets that need lakifying"), + ], ), SOLRIZE=dict( # Number of parallel processes to run the solrize loop with @@ -92,5 +117,9 @@ PYSOLR_TIMEOUT=600, # Time in seconds to sleep after receiving a 5XX error from Solr SOLR_500_SLEEP=os.getenv("SOLR_500_SLEEP"), + PROM_PORT=9096, + PROM_METRIC_DEFS=[ + ("datasets_to_solrize", "The number of datasets that need solrizing"), + ], ), ) diff --git a/src/handler.py b/src/handler.py index dc13261..498a788 100644 --- a/src/handler.py +++ b/src/handler.py @@ -7,13 +7,17 @@ import library.refresher as refresher import library.solrize as solrize import library.validate as validate +from constants.config import config from library.logger import getLogger +from library.prometheus import initialise_prom_metrics_and_start_server logger = getLogger("handler") def main(args): try: + initialise_prom_metrics(args.type) + if args.type == "refresh": db.migrateIfRequired() refresher.refresh() @@ -58,6 +62,24 @@ def main(args): logger.error("{} Failed. {}".format(args.type, str(e).strip())) +def initialise_prom_metrics(operation: str): + if not operation.endswith("loop"): + return + + logger.info("Starting prometheus metrics exporter...") + + if operation == "validateloop": + container_conf_name = "VALIDATION" + elif operation == "refreshloop": + container_conf_name = "REFRESHER" + else: + container_conf_name = operation[:-4].upper() + + initialise_prom_metrics_and_start_server( + config[container_conf_name]["PROM_METRIC_DEFS"], config[container_conf_name]["PROM_PORT"] + ) + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Refresh from IATI Registry") parser.add_argument("-t", "--type", dest="type", default="refresh", help="Trigger 'refresh' or 'validate'") diff --git a/src/library/clean.py b/src/library/clean.py index 0a037c2..690a361 100644 --- a/src/library/clean.py +++ b/src/library/clean.py @@ -11,6 +11,7 @@ import library.utils as utils from constants.config import config from library.logger import getLogger +from library.prometheus import set_prom_metric logger = getLogger("clean") @@ -80,6 +81,8 @@ def copy_valid(): documents = db.getValidActivitiesDocsToCopy(conn) + set_prom_metric("valid_datasets_to_progress", len(documents)) + if config["CLEAN"]["PARALLEL_PROCESSES"] == 1: logger.info(f"Copying {len(documents)} valid IATI files in a single process.") copy_valid_documents(documents) @@ -216,6 +219,8 @@ def clean_invalid(): # get invalid docs that have valid activities documents = db.getInvalidActivitiesDocsToClean(conn) + set_prom_metric("invalid_datasets_to_clean", len(documents)) + if config["CLEAN"]["PARALLEL_PROCESSES"] == 1: logger.info(f"Cleaning and storing {len(documents)} IATI files in a single process.") clean_invalid_documents(documents) diff --git a/src/library/flatten.py b/src/library/flatten.py index 748ef2d..7d882e5 100644 --- a/src/library/flatten.py +++ b/src/library/flatten.py @@ -14,6 +14,7 @@ import library.utils as utils from constants.config import config from library.logger import getLogger +from library.prometheus import set_prom_metric logger = getLogger("flatten") config_explode_elements = json.loads(config["SOLRIZE"]["EXPLODE_ELEMENTS"]) @@ -249,6 +250,8 @@ def main(): file_hashes = db.getUnflattenedDatasets(conn) + set_prom_metric("datasets_to_flatten", len(file_hashes)) + if config["FLATTEN"]["PARALLEL_PROCESSES"] == 1: logger.info("Flattening and storing " + str(len(file_hashes)) + " IATI files in a single process.") process_hash_list(file_hashes) diff --git a/src/library/lakify.py b/src/library/lakify.py index d79a708..27cd006 100644 --- a/src/library/lakify.py +++ b/src/library/lakify.py @@ -11,6 +11,7 @@ import library.utils as utils from constants.config import config from library.logger import getLogger +from library.prometheus import set_prom_metric logger = getLogger("lakify") @@ -182,6 +183,8 @@ def main(): file_hashes = db.getUnlakifiedDatasets(conn) + set_prom_metric("datasets_to_lakify", len(file_hashes)) + logger.info("Got unlakified datasets") if config["LAKIFY"]["PARALLEL_PROCESSES"] == 1: diff --git a/src/library/prometheus.py b/src/library/prometheus.py new file mode 100644 index 0000000..916ea30 --- /dev/null +++ b/src/library/prometheus.py @@ -0,0 +1,18 @@ +from typing import Any + +from prometheus_client import Gauge, start_http_server + +prom_metrics: dict[str, Any] = {} + + +def set_prom_metric(metric_name: str, value: Any): + if metric_name in prom_metrics: + prom_metrics[metric_name].set(value) + + +def initialise_prom_metrics_and_start_server(metric_defs: list[tuple], port: int): + + for metric_def in metric_defs: + prom_metrics[metric_def[0]] = Gauge(metric_def[0], metric_def[1]) + + start_http_server(port) diff --git a/src/library/refresher.py b/src/library/refresher.py index dcefbd8..84a6561 100644 --- a/src/library/refresher.py +++ b/src/library/refresher.py @@ -16,6 +16,7 @@ from constants.config import config from constants.version import __version__ from library.logger import getLogger +from library.prometheus import set_prom_metric from library.solrize import addCore multiprocessing.set_start_method("spawn", True) @@ -308,6 +309,9 @@ def sync_publishers(): publisher_list = get_paginated_response("https://iatiregistry.org/api/3/action/organization_list", 0, 1000) known_publishers_num = db.getNumPublishers(conn) + + set_prom_metric("registered_publishers", len(publisher_list)) + if len(publisher_list) < (config["REFRESHER"]["PUBLISHER_SAFETY_PERCENTAGE"] / 100) * known_publishers_num: logger.error( "Number of publishers reported by registry: " @@ -380,6 +384,8 @@ def sync_documents(): conn.close() raise + set_prom_metric("registered_datasets", len(all_datasets)) + known_documents_num = db.getNumDocuments(conn) if len(all_datasets) < (config["REFRESHER"]["DOCUMENT_SAFETY_PERCENTAGE"] / 100) * known_documents_num: logger.error( @@ -426,6 +432,8 @@ def sync_documents(): + " : Unidentified Error" ) + set_prom_metric("datasets_changed", len(changed_datasets)) + stale_datasets = db.getFilesNotSeenAfter(conn, start_dt) if len(changed_datasets) > 0 or len(stale_datasets) > 0: @@ -462,6 +470,9 @@ def reload(retry_errors): conn = db.getDirectConnection() datasets = db.getRefreshDataset(conn, retry_errors) + + set_prom_metric("datasets_to_download", len(datasets)) + chunked_datasets = list(split(datasets, config["REFRESHER"]["PARALLEL_PROCESSES"])) processes = [] diff --git a/src/library/solrize.py b/src/library/solrize.py index eca7dd4..7bcc084 100644 --- a/src/library/solrize.py +++ b/src/library/solrize.py @@ -11,6 +11,7 @@ import library.utils as utils from constants.config import config from library.logger import getLogger +from library.prometheus import set_prom_metric logger = getLogger("solrize") solr_cores = {} @@ -428,6 +429,8 @@ def main(): file_hashes = db.getUnsolrizedDatasets(conn) + set_prom_metric("datasets_to_solrize", len(file_hashes)) + logger.info("Got unsolrized datasets") if config["SOLRIZE"]["PARALLEL_PROCESSES"] == 1: diff --git a/src/library/validate.py b/src/library/validate.py index 71f046d..6f81b03 100644 --- a/src/library/validate.py +++ b/src/library/validate.py @@ -13,6 +13,7 @@ import library.utils as utils from constants.config import config from library.logger import getLogger +from library.prometheus import set_prom_metric logger = getLogger("validate") @@ -218,6 +219,8 @@ def validate(): file_hashes = db.getUnvalidatedDatasets(conn) + set_prom_metric("datasets_to_validate", len(file_hashes)) + if config["VALIDATION"]["PARALLEL_PROCESSES"] == 1: logger.info(f"Processing {len(file_hashes)} IATI files in a single process for validation") process_hash_list(file_hashes) @@ -274,6 +277,8 @@ def safety_check(): black_flags = db.getUnnotifiedBlackFlags(conn) + set_prom_metric("new_flagged_publishers", len(black_flags)) + for black_flag in black_flags: org_id = black_flag[0]