diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f7789d6..039cb30 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,7 +49,7 @@ jobs: intergration-tests: name: Integration Tests - needs: tests + needs: [tests, formatting] uses: ./.github/workflows/integration.yml with: python-version: '${{needs.formatting.outputs.min_python}}' diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index d19989d..dfeb8b2 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -24,4 +24,8 @@ jobs: python-version: ${{ inputs.python-version }} - run: python -m pip install --upgrade pip - run: python -m pip install -r requirements/ci.txt + - run: python -m pip install -e . - run: docker-compose version + - run: docker-compose -f tests/docker-compose-file-writer.yml up -d + - run: scicat_ingestor -c resources/config.sample.json --verbose + - run: docker-compose -f tests/docker-compose-file-writer.yml down diff --git a/config.20240405.json b/config.20240405.json index e4954af..1c09f99 100644 --- a/config.20240405.json +++ b/config.20240405.json @@ -2,9 +2,7 @@ "kafka": { "topics": ["KAFKA_TOPIC_1","KAFKA_TOPIC_2"], "group_id": "GROUP_ID", - "bootstrap_servers": [ - "HOST:9092" - ], + "bootstrap_servers": ["localhost:9093"], "enable_auto_commit": true, "auto_offset_reset": "earliest" }, diff --git a/resources/config.sample.json b/resources/config.sample.json index 12dd806..1ef930f 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -1,10 +1,8 @@ { "kafka": { - "topics": ["KAFKA_TOPIC_1","KAFKA_TOPIC_2"], + "topics": ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"], "group_id": "GROUP_ID", - "bootstrap_servers": [ - "HOST:9092" - ], + "bootstrap_servers": ["localhost:9093"], "individual_message_commit": false, "enable_auto_commit": true, "auto_offset_reset": "earliest" diff --git a/src/scicat_ingestor.py b/src/scicat_ingestor.py index b9bf2c2..879de0c 100644 --- a/src/scicat_ingestor.py +++ b/src/scicat_ingestor.py @@ -7,12 +7,12 @@ from scicat_logging import build_logger -def quit(logger: logging.Logger) -> None: +def quit(logger: logging.Logger, unexpected: bool = True) -> None: """Log the message and exit the program.""" import sys logger.info("Exiting ingestor") - sys.exit() + sys.exit(1 if unexpected else 0) def main() -> None: diff --git a/src/scicat_kafka.py b/src/scicat_kafka.py index 11fcd36..a832867 100644 --- a/src/scicat_kafka.py +++ b/src/scicat_kafka.py @@ -13,37 +13,43 @@ def collect_consumer_options(options: kafkaOptions) -> dict: # Build logger and formatter config_dict = { - key.replace('_', '.'): value + key.replace("_", "."): value for key, value in asdict(options).items() - if key not in ('topics', 'individual_message_commit') + if key not in ("topics", "individual_message_commit") } - config_dict['enable.auto.commit'] = ( + config_dict["enable.auto.commit"] = ( not options.individual_message_commit ) and options.enable_auto_commit + if isinstance(bootstrap_servers := options.bootstrap_servers, list): + # Convert the list to a comma-separated string + config_dict["bootstrap.servers"] = ",".join(bootstrap_servers) + else: + config_dict["bootstrap.servers"] = bootstrap_servers + return config_dict def collect_kafka_topics(options: kafkaOptions) -> list[str]: """Return the Kafka topics as a list.""" if isinstance(options.topics, str): - return options.topics.split(',') + return options.topics.split(",") elif isinstance(options.topics, list): return options.topics else: - raise TypeError('The topics must be a list or a comma-separated string.') + raise TypeError("The topics must be a list or a comma-separated string.") def build_consumer(kafka_options: kafkaOptions, logger: logging.Logger) -> Consumer: """Build a Kafka consumer and configure it according to the ``options``.""" consumer_options = collect_consumer_options(kafka_options) - logger.info('Connecting to Kafka with the following parameters:') + logger.info("Connecting to Kafka with the following parameters:") logger.info(consumer_options) consumer = Consumer(consumer_options) if not validate_consumer(consumer, logger): return None kafka_topics = collect_kafka_topics(kafka_options) - logger.info(f'Subscribing to the following Kafka topics: {kafka_topics}') + logger.info(f"Subscribing to the following Kafka topics: {kafka_topics}") consumer.subscribe(kafka_topics) return Consumer(consumer_options) @@ -58,5 +64,5 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool: ) return False else: - logger.info('Kafka consumer successfully instantiated') + logger.info("Kafka consumer successfully instantiated") return True diff --git a/tests/docker-compose-file-writer.yml b/tests/docker-compose-file-writer.yml new file mode 100644 index 0000000..eb39197 --- /dev/null +++ b/tests/docker-compose-file-writer.yml @@ -0,0 +1,71 @@ +version: "3.5" + +services: +# Kafka and file-writer services are copied from +# https://gitlab.esss.lu.se/ecdc/ess-dmsc/kafka-to-nexus/-/blob/main/integration-tests/docker-compose.yml +# Currently github-ci fails to run the original docker-compose.yml file in the ecdc repository +# so we copied and modified the file here. + kafka: + container_name: file-writer-kafka + hostname: file-writer-kafka + image: confluentinc/cp-kafka:7.4.3 + deploy: + resources: + limits: + memory: 600M + restart: always + depends_on: + - zookeeper + ports: + - "9093:9093" + networks: + - frontend + environment: + KAFKA_ZOOKEEPER_CONNECT: file-writer-zookeeper:2181 + KAFKA_BROKER_ID: 0 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_MESSAGE_MAX_BYTES: 300000000 + KAFKA_SOCKET_REQUEST_MAX_BYTES: 300000000 + KAFKA_REPLICA_FETCH_MAX_BYTES: 300000000 + KAFKA_LOG_RETENTION_MS: -1 # keep data forever, required for tests involving fake "historical" data + ## listeners + KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9093 + KAFKA_ADVERTISED_LISTENERS: INSIDE://file-writer-kafka:9092,OUTSIDE://localhost:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE + healthcheck: + test: ["CMD", "kafka-topics", "--bootstrap-server", "localhost:9092", "--list"] + interval: 5s + timeout: 5s + retries: 5 + start_period: 10s + + zookeeper: + container_name: file-writer-zookeeper + hostname: file-writer-zookeeper + image: confluentinc/cp-zookeeper:7.4.3 + deploy: + resources: + limits: + memory: 200M + restart: always + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + networks: + - frontend + + filewriter: + container_name: file-writer-file-writer + image: registry.esss.lu.se/ecdc/ess-dmsc/docker-centos7-build-node:latest + depends_on: + kafka: + condition: service_healthy + tty: true + networks: + - frontend + +networks: + frontend: