Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka connection test. (Part of integration test) #23

Merged
merged 11 commits into from
May 29, 2024
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:

intergration-tests:
name: Integration Tests
needs: tests
needs: [tests, formatting]
uses: ./.github/workflows/integration.yml
with:
python-version: '${{needs.formatting.outputs.min_python}}'
4 changes: 4 additions & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ jobs:
python-version: ${{ inputs.python-version }}
- run: python -m pip install --upgrade pip
- run: python -m pip install -r requirements/ci.txt
- run: python -m pip install -e .
- run: docker-compose version
- run: docker-compose -f tests/docker-compose-file-writer.yml up -d
- run: scicat_ingestor -c resources/config.sample.json --verbose
- run: docker-compose -f tests/docker-compose-file-writer.yml down
4 changes: 1 addition & 3 deletions config.20240405.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
"kafka": {
"topics": ["KAFKA_TOPIC_1","KAFKA_TOPIC_2"],
"group_id": "GROUP_ID",
"bootstrap_servers": [
"HOST:9092"
],
"bootstrap_servers": ["localhost:9093"],
"enable_auto_commit": true,
"auto_offset_reset": "earliest"
},
Expand Down
6 changes: 2 additions & 4 deletions resources/config.sample.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
{
"kafka": {
"topics": ["KAFKA_TOPIC_1","KAFKA_TOPIC_2"],
"topics": ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"],
"group_id": "GROUP_ID",
"bootstrap_servers": [
"HOST:9092"
],
"bootstrap_servers": ["localhost:9093"],
"individual_message_commit": false,
"enable_auto_commit": true,
"auto_offset_reset": "earliest"
Expand Down
4 changes: 2 additions & 2 deletions src/scicat_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
from scicat_logging import build_logger


def quit(logger: logging.Logger) -> None:
def quit(logger: logging.Logger, unexpected: bool = True) -> None:
"""Log the message and exit the program."""
import sys

logger.info("Exiting ingestor")
sys.exit()
sys.exit(1 if unexpected else 0)


def main() -> None:
Expand Down
22 changes: 14 additions & 8 deletions src/scicat_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,43 @@ def collect_consumer_options(options: kafkaOptions) -> dict:

# Build logger and formatter
config_dict = {
key.replace('_', '.'): value
key.replace("_", "."): value
for key, value in asdict(options).items()
if key not in ('topics', 'individual_message_commit')
if key not in ("topics", "individual_message_commit")
}
config_dict['enable.auto.commit'] = (
config_dict["enable.auto.commit"] = (
not options.individual_message_commit
) and options.enable_auto_commit
if isinstance(bootstrap_servers := options.bootstrap_servers, list):
# Convert the list to a comma-separated string
config_dict["bootstrap.servers"] = ",".join(bootstrap_servers)
else:
config_dict["bootstrap.servers"] = bootstrap_servers

return config_dict


def collect_kafka_topics(options: kafkaOptions) -> list[str]:
"""Return the Kafka topics as a list."""
if isinstance(options.topics, str):
return options.topics.split(',')
return options.topics.split(",")
elif isinstance(options.topics, list):
return options.topics
else:
raise TypeError('The topics must be a list or a comma-separated string.')
raise TypeError("The topics must be a list or a comma-separated string.")


def build_consumer(kafka_options: kafkaOptions, logger: logging.Logger) -> Consumer:
"""Build a Kafka consumer and configure it according to the ``options``."""
consumer_options = collect_consumer_options(kafka_options)
logger.info('Connecting to Kafka with the following parameters:')
logger.info("Connecting to Kafka with the following parameters:")
logger.info(consumer_options)
consumer = Consumer(consumer_options)
if not validate_consumer(consumer, logger):
return None

kafka_topics = collect_kafka_topics(kafka_options)
logger.info(f'Subscribing to the following Kafka topics: {kafka_topics}')
logger.info(f"Subscribing to the following Kafka topics: {kafka_topics}")
consumer.subscribe(kafka_topics)
return Consumer(consumer_options)

Expand All @@ -58,5 +64,5 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool:
)
return False
else:
logger.info('Kafka consumer successfully instantiated')
logger.info("Kafka consumer successfully instantiated")
return True
71 changes: 71 additions & 0 deletions tests/docker-compose-file-writer.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
version: "3.5"

services:
# Kafka and file-writer services are copied from
# https://gitlab.esss.lu.se/ecdc/ess-dmsc/kafka-to-nexus/-/blob/main/integration-tests/docker-compose.yml
# Currently github-ci fails to run the original docker-compose.yml file in the ecdc repository
# so we copied and modified the file here.
kafka:
container_name: file-writer-kafka
hostname: file-writer-kafka
image: confluentinc/cp-kafka:7.4.3
deploy:
resources:
limits:
memory: 600M
restart: always
depends_on:
- zookeeper
ports:
- "9093:9093"
networks:
- frontend
environment:
KAFKA_ZOOKEEPER_CONNECT: file-writer-zookeeper:2181
KAFKA_BROKER_ID: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_MESSAGE_MAX_BYTES: 300000000
KAFKA_SOCKET_REQUEST_MAX_BYTES: 300000000
KAFKA_REPLICA_FETCH_MAX_BYTES: 300000000
KAFKA_LOG_RETENTION_MS: -1 # keep data forever, required for tests involving fake "historical" data
## listeners
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9093
KAFKA_ADVERTISED_LISTENERS: INSIDE://file-writer-kafka:9092,OUTSIDE://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
healthcheck:
test: ["CMD", "kafka-topics", "--bootstrap-server", "localhost:9092", "--list"]
interval: 5s
timeout: 5s
retries: 5
start_period: 10s

zookeeper:
container_name: file-writer-zookeeper
hostname: file-writer-zookeeper
image: confluentinc/cp-zookeeper:7.4.3
deploy:
resources:
limits:
memory: 200M
restart: always
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- frontend

filewriter:
container_name: file-writer-file-writer
image: registry.esss.lu.se/ecdc/ess-dmsc/docker-centos7-build-node:latest
depends_on:
kafka:
condition: service_healthy
tty: true
networks:
- frontend

networks:
frontend: