Skip to content

Commit

Permalink
Feature/processor output individual records for gratia, add CPU usage…
Browse files Browse the repository at this point in the history
… and memory metrics (#54)

* first pass at adding optional gratia output container to helm chart

* fix variable name

* fix syntax for configmap in values.yaml

* Add dedicated docker image for processor

* add optional mode to output individual job records rather than summaries

* fix paramter order in individual_message

* add optional authentication secret to helm chart

* clean up some scratch work

* Add a README for helm chart configuration and installation

* minor documentation consistency/tweaks

* fix phrasing and missing link in README

* Update helm chart make non-privileged user uid/gid configurable

* make serviceUser.uid nullable

* Update processor to output gratia-compatible individual job records,
add Dockerfile

* Add explicit commands to cronjobs, fix default image tag

* clean up comments

* revert accidental change to readme

* Deduplicate memory and cpu usage records, revert change to cpu usage accounting for summary records

* use sum by pod instead of max by pod for more accurate v2 cgroups reporting

* use last_over_time, update comments

* report on pod total requested memory rather than max usage per container

* change result_lengths

* update comment

* small tidying

---------

Co-authored-by: Ryan Taylor <[email protected]>
  • Loading branch information
mwestphall and rptaylor authored Jun 21, 2024
1 parent c46a37c commit 8938f18
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 35 deletions.
11 changes: 11 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM python:3.12
ARG UID=10000
ARG GID=10000

WORKDIR /src
COPY python/requirements.txt .
RUN pip install -r requirements.txt
COPY python/*.py ./

USER $UID:$GID
CMD python3 KAPEL.py
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
services:
kapel-processor:
image: hub.opensciencegrid.org/osgpreview/kapel-processor:latest
build:
context: .
network: host
136 changes: 101 additions & 35 deletions python/KAPEL.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def __init__(self, queryRange, namespace):
self.endtime = f'max_over_time(kube_pod_completion_time{{namespace="{namespace}"}}[{queryRange}])'
self.starttime = f'max_over_time(kube_pod_start_time{{namespace="{namespace}"}}[{queryRange}])'
self.cores = f'max_over_time(kube_pod_container_resource_requests{{resource="cpu", node != "", namespace="{namespace}"}}[{queryRange}])'
self.memory = f'sum by (pod) (max_over_time(kube_pod_container_resource_requests{{resource="memory", node!="", namespace="{namespace}"}}[{queryRange}])) / 1000'

# This is container-level CPU usage reported by kubelets, for gratia output.
# Take the largest (i.e. final) value of the cumulative CPU usage of each container, and sum the results for all containers in a pod.
self.cpuusage = f'sum by (pod) (last_over_time(container_cpu_usage_seconds_total{{namespace="{namespace}"}}[{queryRange}]))'

def summary_message(config, year, month, wall_time, cpu_time, n_jobs, first_end, last_end):
output = (
Expand All @@ -81,6 +86,29 @@ def summary_message(config, year, month, wall_time, cpu_time, n_jobs, first_end,
)
return output

def individual_message(config, pod_name, memory, cores, wall_time, cpu_time, start_time, end_time):
""" Write an APEL individual job message based on prometheus metrics from a single pod """
output = (
f'APEL-individual-job-message: v0.3\n'
f'Site: {config.site_name}\n'
f'VO: {config.vo_name}\n'
f'SubmitHost: {config.submit_host}\n'
f'MachineName: {pod_name}\n'
f'LocalJobId: {pod_name}\n'
f'InfrastructureType: {config.infrastructure_type}\n'
f'ServiceLevelType: si2k\n'
f'ServiceLevel: {config.benchmark_value * 250}\n'
f'WallDuration: {wall_time}\n'
f'CpuDuration: {cpu_time}\n'
f'MemoryVirtual: {memory}\n'
f'Processors: {cores}\n'
f'NodeCount: {config.nodecount}\n'
f'StartTime: {start_time}\n'
f'EndTime: {end_time}\n'
f'%%\n'
)
return output

def sync_message(config, year, month, n_jobs):
output = (
f'APEL-sync-message: v0.1\n'
Expand Down Expand Up @@ -159,43 +187,14 @@ def rearrange(x):
# this produces each of the (key, value) tuples in the list
yield item['metric']['pod'], float(item['value'][1])

# process a time period (do prom query, process data, write output)
# takes a KAPELConfig object and one element of output from get_time_periods
# Remember Prometheus queries go backwards: the time instant is the end, go backwards from there.
def process_period(config, period):
period_start = period['instant'] + dateutil.relativedelta.relativedelta(seconds=-period['range_sec'])
print(
f"Processing year {period['year']}, month {period['month']}, "
f"querying from {period['instant'].isoformat()} and going back {period['range_sec']} s to {period_start.isoformat()}."
)
queries = QueryLogic(queryRange=(str(period['range_sec']) + 's'), namespace=config.namespace)

# SSL generally not used for Prometheus access within a cluster
# Docs on instant query API: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
prom = PrometheusConnect(url=config.prometheus_server, disable_ssl=True)
prom_connect_params = {'time': period['instant'].isoformat(), 'timeout': config.query_timeout}

raw_results, results, result_lengths = {}, {}, []
# iterate over each query (cputime, starttime, endtime, cores) producing raw_results['cputime'] etc.
for query_name, query_string in vars(queries).items():
# Each of these raw_results is a list of dicts. Each dict in the list represents an individual data point, and contains:
# 'metric': a dict of one or more key-value pairs of labels, one of which is the pod name.
# 'value': a list in which the 0th element is the timestamp of the value, and 1th element is the actual value we're interested in.
print(f'Executing {query_name} query: {query_string}')
t1 = timer()
raw_results[query_name] = prom.custom_query(query=query_string, params=prom_connect_params)
t2 = timer()
results[query_name] = dict(rearrange(raw_results[query_name]))
result_lengths.append(len(results[query_name]))
t3 = timer()
print(f'Query finished in {t2 - t1} s, processed in {t3 - t2} s. Got {len(results[query_name])} items from {len(raw_results[query_name])} results. Peak RAM usage: {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss}K.')
del raw_results[query_name]

def record_summarized_period(config, period_start, year, month, results):
""" Record the sum of usage across all pods in the given time period. """
cputime = results['cputime']
endtime = results['endtime']
starttime = results['starttime']
cores = results['cores']

result_lengths = list((len(l) for l in results.values()))
# Confirm the assumption that cputime should have the fewest entries, while starttime and cores may have additional ones
# corresponding to jobs that have started but not finished yet, and endtime may have additional ones if there are pods without CPU resource requests.
# We only want the jobs for which all values are available: start time, end time, CPU request.
Expand Down Expand Up @@ -241,16 +240,16 @@ def process_period(config, period):

summary_output = summary_message(
config,
year=period['year'],
month=period['month'],
year=year,
month=month,
wall_time=sum_walltime,
cpu_time=sum_cputime,
n_jobs=len(endtime),
# this appears faster than getting min/max during the dict iteration above
first_end=round(min(endtime.values())),
last_end=round(max(endtime.values()))
)
sync_output = sync_message(config, year=period['year'], month=period['month'], n_jobs=len(endtime))
sync_output = sync_message(config, year=year, month=month, n_jobs=len(endtime))

# Write output to the message queue on local filesystem
# https://dirq.readthedocs.io/en/latest/queuesimple.html#directory-structure
Expand All @@ -262,6 +261,73 @@ def process_period(config, period):
print(f'Writing sync record to {config.output_path}/{sync_file}:')
print('--------------------------------\n' + sync_output + '--------------------------------')

def record_individual_period(config, results):
""" Record each pod in the namespace over the summarized period.
Assumes each pod ran once and terminated upon completion.
"""
# Pivot records from {'data_type':{'pod_name':value}} to {'pod_name':{'data_type':value}}
per_pod_records = {}
for data_type, records in results.items():
for pod, val in records.items():
if not pod in per_pod_records:
per_pod_records[pod] = {}
per_pod_records[pod][data_type] = val

dirq = QueueSimple(str(config.output_path))
for pod_name, records in per_pod_records.items():
# Only report on pods that have completed. Running pods won't have an endtime
if not ('starttime' in records and 'endtime' in records):
continue
individual_output = individual_message(
config,
pod_name,
records.get('memory', 0),
records.get('cores', 0),
records['endtime'] - records['starttime'],
records.get('cpuusage', 0),
records['starttime'],
records['endtime'])
record_file = dirq.add(individual_output)
print(f'Writing individual record to {config.output_path}/{record_file}:')
print('--------------------------------\n' + individual_output + '--------------------------------')


# process a time period (do prom query, process data, write output)
# takes a KAPELConfig object and one element of output from get_time_periods
# Remember Prometheus queries go backwards: the time instant is the end, go backwards from there.
def process_period(config, period):
period_start = period['instant'] + dateutil.relativedelta.relativedelta(seconds=-period['range_sec'])
print(
f"Processing year {period['year']}, month {period['month']}, "
f"querying from {period['instant'].isoformat()} and going back {period['range_sec']} s to {period_start.isoformat()}."
)
queries = QueryLogic(queryRange=(str(period['range_sec']) + 's'), namespace=config.namespace)

# SSL generally not used for Prometheus access within a cluster
# Docs on instant query API: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
headers = {"Authorization": config.auth_header } if config.auth_header else None
prom = PrometheusConnect(url=config.prometheus_server, disable_ssl=True, headers=headers)
prom_connect_params = {'time': period['instant'].isoformat(), 'timeout': config.query_timeout}

results = {}
# iterate over each query (cputime, starttime, endtime, cores) producing raw_results['cputime'] etc.
for query_name, query_string in vars(queries).items():
# Each of these raw_results is a list of dicts. Each dict in the list represents an individual data point, and contains:
# 'metric': a dict of one or more key-value pairs of labels, one of which is the pod name.
# 'value': a list in which the 0th element is the timestamp of the value, and 1th element is the actual value we're interested in.
print(f'Executing {query_name} query: {query_string}')
t1 = timer()
raw_result = prom.custom_query(query=query_string, params=prom_connect_params)
t2 = timer()
results[query_name] = dict(rearrange(raw_result))
t3 = timer()
print(f'Query finished in {t2 - t1} s, processed in {t3 - t2} s. Got {len(results[query_name])} items from {len(raw_result)} results. Peak RAM usage: {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss}K.')

if config.summarize_records:
record_summarized_period(config, period_start, period['year'], period['month'], results)
else:
record_individual_period(config, results)

def main(envFile):
print(f'Starting KAPEL processor: {__file__} with envFile {envFile} at {datetime.datetime.now(tz=datetime.timezone.utc).isoformat()}')
cfg = KAPELConfig(envFile)
Expand Down
5 changes: 5 additions & 0 deletions python/KAPELConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ def __init__(self, envFile=None):
# Format: validity is determined by python urllib.parse.
self.prometheus_server = env.url("PROMETHEUS_SERVER", "http://kube-prometheus-prometheus.kube-prometheus:9090").geturl()

# Optionally add authentication headers - these are passed in under "Authorization"
self.auth_header = env.str("PROMETHEUS_AUTH_HEADER", None)

# The default behaviour ("auto" mode) is to publish records for the previous month, and up to the current day of the current month.
self.publishing_mode = env.str("PUBLISHING_MODE", "auto")

# The Kubernetes namespace to query. Only pods in this namespace will be accounted.
self.namespace = env.str("NAMESPACE")

self.summarize_records = env.bool("SUMMARIZE_RECORDS", True)

# If PUBLISHING_MODE is "gap" instead, then a fixed time period will be queried instead and we need the start and end to be specified.
# Format: ISO 8601, like "2020-12-20T07:20:50.52Z", to avoid complications with time zones and leap seconds.
# Timezone should be specified, and it should be UTC for consistency with the auto mode publishing.
Expand Down

0 comments on commit 8938f18

Please sign in to comment.