From ee2c578dd03abdfd1579a7ec4c6b80e252a6911b Mon Sep 17 00:00:00 2001 From: Curtis McCully Date: Wed, 9 Aug 2023 10:57:14 -0400 Subject: [PATCH 01/10] Fix a bug that wouldn't allow only grouping by instrument --- CHANGES.md | 1 + banzai/calibrations.py | 4 ++++ banzai/stages.py | 12 +++++++++--- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index be70cc06..d02c19bb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,6 @@ 1.11.0 (2023-08-10) ------------------- +- Added the process_by_group keyword to stages to fix a bug that wouldn't allow grouping only by instrument - Updated the logging scheme 1.10.1 (2023-05-31) diff --git a/banzai/calibrations.py b/banzai/calibrations.py index 97105e0f..23c1b332 100644 --- a/banzai/calibrations.py +++ b/banzai/calibrations.py @@ -48,6 +48,10 @@ class CalibrationStacker(CalibrationMaker): def __init__(self, runtime_context): super(CalibrationStacker, self).__init__(runtime_context) + @property + def process_by_group(self): + return True + def make_master_calibration_frame(self, images): make_calibration_name = file_utils.make_calibration_filename_function(self.calibration_type, self.runtime_context) diff --git a/banzai/stages.py b/banzai/stages.py index fe8aff1d..5019cb54 100755 --- a/banzai/stages.py +++ b/banzai/stages.py @@ -21,6 +21,10 @@ def stage_name(self): def group_by_attributes(self): return [] + @property + def process_by_group(self): + return False + def get_grouping(self, image): grouping_criteria = [image.instrument.site, image.instrument.id] if self.group_by_attributes: @@ -30,11 +34,13 @@ def get_grouping(self, image): def run(self, images): if not images: return images - if not self.group_by_attributes: - image_sets = images - else: + if self.group_by_attributes or self.process_by_group: images.sort(key=self.get_grouping) image_sets = [list(image_set) for _, image_set in itertools.groupby(images, self.get_grouping)] + else: + # Treat each image individually + image_sets = images + processed_images = [] for image_set in image_sets: try: From 81fd030695ebfec596f13152ba9a95281700017e Mon Sep 17 00:00:00 2001 From: Matt Daily Date: Thu, 10 Aug 2023 11:07:53 -0700 Subject: [PATCH 02/10] Increase e2e cpu requests we're getting throttled --- banzai/tests/e2e-k8s.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/banzai/tests/e2e-k8s.yaml b/banzai/tests/e2e-k8s.yaml index 32426253..45c65cea 100644 --- a/banzai/tests/e2e-k8s.yaml +++ b/banzai/tests/e2e-k8s.yaml @@ -139,10 +139,10 @@ spec: timeoutSeconds: 10 resources: requests: - cpu: 1 + cpu: 2 memory: 8Gi limits: - cpu: 2 + cpu: 4 memory: 8Gi - name: banzai-celery-beat image: @BANZAI_IMAGE@ From 38129995fb576216f618cb2c100635dd791dcde0 Mon Sep 17 00:00:00 2001 From: Matt Daily Date: Mon, 10 Jul 2023 15:00:03 -0700 Subject: [PATCH 03/10] Update listener backend to in-cluster url rabbitmq.lco.gtn was not resolving, presumably due to coreDNS upgrade --- helm-chart/banzai/values-prod.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/helm-chart/banzai/values-prod.yaml b/helm-chart/banzai/values-prod.yaml index bd76ee34..13f43898 100644 --- a/helm-chart/banzai/values-prod.yaml +++ b/helm-chart/banzai/values-prod.yaml @@ -12,7 +12,7 @@ horizontalPodAutoscaler: image: repository: docker.lco.global/banzai - tag: "1.10.0" + tag: "1.11.0" pullPolicy: IfNotPresent # Values for the OCS Ingester library, used by BANZAI. @@ -35,7 +35,7 @@ banzai: calibrateProposalId: calibrate banzaiWorkerLogLevel: info rawDataApiRoot: http://archiveapi-internal.prod/ - fitsBroker: rabbitmq.lco.gtn + fitsBroker: rabbitmq-ha.prod.svc.cluster.local. fitsExchange: archived_fits queueName: banzai_pipeline celeryTaskQueueName: banzai_imaging From 02d5aff524a19f0f7313d02c18c4bcc6774457b8 Mon Sep 17 00:00:00 2001 From: Matt Daily Date: Thu, 10 Aug 2023 11:14:29 -0700 Subject: [PATCH 04/10] Small typo fix --- banzai/tests/e2e-k8s.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/banzai/tests/e2e-k8s.yaml b/banzai/tests/e2e-k8s.yaml index 45c65cea..b4c2af41 100644 --- a/banzai/tests/e2e-k8s.yaml +++ b/banzai/tests/e2e-k8s.yaml @@ -110,7 +110,6 @@ spec: value: "e2e_task_queue" - name: REFERENCE_CATALOG_URL value: "http://phot-catalog.lco.gtn/" - command: - celery - -A From 59da85388cb8de53f62d18f215033673f690f35e Mon Sep 17 00:00:00 2001 From: Matt Daily Date: Thu, 10 Aug 2023 11:28:47 -0700 Subject: [PATCH 05/10] Fix up to delete pod in build namespace, not dev --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index a35ee305..1793f9c8 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -72,7 +72,7 @@ pipeline { script { withKubeConfig([credentialsId: 'build-kube-config']) { // delete previous run if the previous failed somehow - sh('kubectl -n dev delete pod banzai-e2e-test || true') + sh('kubectl -n build delete pod banzai-e2e-test || true') // we will be testing the image that we just built sh('sed -i -e "s^@BANZAI_IMAGE@^${DOCKER_IMG}^g" banzai/tests/e2e-k8s.yaml') // deploy the test pod to the cluster From 5a0a8bd6a22c0b2369383b2ebbfe8c1f8f709eeb Mon Sep 17 00:00:00 2001 From: Matt Daily Date: Thu, 10 Aug 2023 11:58:28 -0700 Subject: [PATCH 06/10] Further adjust resources --- banzai/tests/e2e-k8s.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/banzai/tests/e2e-k8s.yaml b/banzai/tests/e2e-k8s.yaml index b4c2af41..e54635e7 100644 --- a/banzai/tests/e2e-k8s.yaml +++ b/banzai/tests/e2e-k8s.yaml @@ -138,10 +138,10 @@ spec: timeoutSeconds: 10 resources: requests: - cpu: 2 - memory: 8Gi - limits: cpu: 4 + memory: 6Gi + limits: + cpu: 6 memory: 8Gi - name: banzai-celery-beat image: @BANZAI_IMAGE@ From 0cb6f7bcd39cf6805e4f0e3a3f0e1040b32063f6 Mon Sep 17 00:00:00 2001 From: Matt Daily Date: Thu, 10 Aug 2023 14:16:21 -0700 Subject: [PATCH 07/10] Make worker concurrency 4 see how this affects cpu usage in e2e tests --- banzai/tests/e2e-k8s.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/banzai/tests/e2e-k8s.yaml b/banzai/tests/e2e-k8s.yaml index e54635e7..44f3ef02 100644 --- a/banzai/tests/e2e-k8s.yaml +++ b/banzai/tests/e2e-k8s.yaml @@ -118,7 +118,7 @@ spec: - --hostname - "banzai-celery-worker" - --concurrency - - "2" + - "4" - -l - "debug" - "-Q" From c1c5333f1ae0e3205328f6b1ce06f09191be9f61 Mon Sep 17 00:00:00 2001 From: Matt Daily Date: Thu, 10 Aug 2023 14:33:56 -0700 Subject: [PATCH 08/10] Up CPU usage for rabbitmq - it was very low and was getting throttled --- banzai/tests/e2e-k8s.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/banzai/tests/e2e-k8s.yaml b/banzai/tests/e2e-k8s.yaml index 44f3ef02..70050bd8 100644 --- a/banzai/tests/e2e-k8s.yaml +++ b/banzai/tests/e2e-k8s.yaml @@ -66,10 +66,10 @@ spec: imagePullPolicy: IfNotPresent resources: requests: - cpu: 0.1 + cpu: 1 memory: 512M limits: - cpu: 1 + cpu: 2 memory: 1Gi readinessProbe: exec: From 08028a574574542ca87a761ed0edf3cf41f2490c Mon Sep 17 00:00:00 2001 From: Matt Daily Date: Thu, 10 Aug 2023 16:06:12 -0700 Subject: [PATCH 09/10] One more adjustment to the celery workers --- banzai/tests/e2e-k8s.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/banzai/tests/e2e-k8s.yaml b/banzai/tests/e2e-k8s.yaml index 70050bd8..9fcfa266 100644 --- a/banzai/tests/e2e-k8s.yaml +++ b/banzai/tests/e2e-k8s.yaml @@ -138,10 +138,10 @@ spec: timeoutSeconds: 10 resources: requests: - cpu: 4 + cpu: 6 memory: 6Gi limits: - cpu: 6 + cpu: 8 memory: 8Gi - name: banzai-celery-beat image: @BANZAI_IMAGE@ From c90e399165489559e06420d03ac76c11445ec97f Mon Sep 17 00:00:00 2001 From: Curtis McCully Date: Thu, 10 Aug 2023 22:55:40 -0400 Subject: [PATCH 10/10] Raised the level of logging for celery. --- banzai/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/banzai/main.py b/banzai/main.py index 4f2dfa5a..2065ed3c 100755 --- a/banzai/main.py +++ b/banzai/main.py @@ -186,7 +186,7 @@ def run_realtime_pipeline(): def start_listener(runtime_context): # Need to keep the amqp logger level at least as high as INFO, # or else it send heartbeat check messages every second - logging.getLogger('amqp').setLevel(max(logging.getLogger().level, getattr(logging, 'INFO'))) + logging.getLogger('amqp').setLevel(logging.WARNING) logger.info('Starting pipeline listener') fits_exchange = Exchange(runtime_context.FITS_EXCHANGE, type='fanout')