Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker is unavailable under devcontainer #267

Open
rkhudov opened this issue Jan 7, 2025 · 12 comments
Open

Broker is unavailable under devcontainer #267

rkhudov opened this issue Jan 7, 2025 · 12 comments
Labels
bug Something isn't working

Comments

@rkhudov
Copy link

rkhudov commented Jan 7, 2025

Describe the bug

Hello, I am trying to have a test with Apache Kafka under devcontainers.
I am using use testcontainers_modules::kafka::apache::Kafka;
I am running the same test provided here, but for some reason I get following error:

called Result::unwrap() on an Err value: (KafkaError (Message production error: MessageTimeOut (Local: Message timed out)), OwnedMessage { payload: Some([77, 101, 115, ...]), key: Some([75, 101, 121, 32, 48]), topic: "test-topic", timestamp: NotAvailable, partition: -1, offset: -1001, headers: None })

I wonder if it is related to devcontainer and instead of 127.0.0.1 I have to use another ip address?

To Reproduce

No response

Expected behavior

No response

@rkhudov rkhudov added the bug Something isn't working label Jan 7, 2025
@DDtKey
Copy link
Contributor

DDtKey commented Jan 7, 2025

Hi 👋

Perhaps, you need to use get_host instead of hardcoded localhost. Which is appropriate when container isn't running locally, but remote

Could you try? And if it doesn't help, could you enrich the context with your setup, please

@rkhudov-tails
Copy link

Hi @DDtKey

Yes, I was trying to do it as well, but it doesn't work for me. Here is what I am trying to do:

let kafka_node = Kafka::default().with_jvm_image().start().await.unwrap();

    let bootstrap_servers = format!(
        "{}:{}",
        kafka_node.get_host().await.unwrap(),
        kafka_node.get_host_port_ipv4(KAFKA_PORT).await.unwrap()
    );

    let producer = ClientConfig::new()
        .set("bootstrap.servers", &bootstrap_servers)
        .set("message.timeout.ms", "5000")
        .create::<FutureProducer>()
        .expect("Failed to create Kafka FutureProducer");

    let consumer = ClientConfig::new()
        .set("group.id", "testcontainer-rs")
        .set("bootstrap.servers", &bootstrap_servers)
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "false")
        .set("auto.offset.reset", "earliest")
        .create::<StreamConsumer>()
        .expect("Failed to create Kafka StreamConsumer");

    let topic = "test-topic";

    let number_of_messages_to_produce = 5_usize;
    let expected: Vec<String> = (0..number_of_messages_to_produce)
        .map(|i| format!("Message {i}"))
        .collect();

    for (i, message) in expected.iter().enumerate() {
        producer
            .send(
                FutureRecord::to(topic)
                    .payload(message)
                    .key(&format!("Key {i}")),
                Duration::from_secs(0),
            )
            .await
            .unwrap();
    }

    consumer
        .subscribe(&[topic])
        .expect("Failed to subscribe to a topic");

    let mut message_stream = consumer.stream();
    for produced in expected {
        let borrowed_message =
            tokio::time::timeout(Duration::from_secs(10), message_stream.next())
                .await
                .unwrap()
                .unwrap();

        assert_eq!(
            produced,
            borrowed_message
                .unwrap()
                .payload_view::<str>()
                .unwrap()
                .unwrap()
        );
    }

This is the error that I am getting:

called `Result::unwrap()` on an `Err` value: (KafkaError (Message production error: MessageTimedOut (Local: Message timed out)), OwnedMessage { payload: Some([77, 101, 115, 115, 97, 103, 101, 32, 48]), key: Some([75, 101, 121, 32, 48]), topic: "test-topic", timestamp: NotAvailable, partition: 0, offset: -1001, headers: None })

and this is what I can see from the logs:

ERROR librdkafka > librdkafka: FAIL [thrd:127.0.0.1:55617/1]: 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT)
 ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT)
 ERROR librdkafka      > librdkafka: FAIL [thrd:127.0.0.1:55617/1]: 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
 ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): 127.0.0.1:55617/1: Connect to ipv4#127.0.0.1:55617 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)

@DDtKey
Copy link
Contributor

DDtKey commented Jan 7, 2025

Did you customize DOCKER_HOST or anything else? Could you elaborate on your docker setup?

Also you always can use export TESTCONTAINERS_COMMAND=keep and double-check the container with docker inspect or check logs of the container after test-run

@rkhudov-tails
Copy link

I am using .devcontainer

@DDtKey
Copy link
Contributor

DDtKey commented Jan 7, 2025

It basically means only one thing - DinD, but doesn't provide info about possible customization/configuration.
I can only assume it's some reasonable defaults and docker.sock is mapped properly without any overridden behavior, because otherwise container wouldn't even start successfully.

Here is the fail happen on connection step. So it's most likely related to proper connection url (IP) 🤔

@rkhudov
Copy link
Author

rkhudov commented Jan 7, 2025

Interesting thing that if I do command in container directly by using kafka_node.exec and changing bootstrap_servers to

let bootstrap_servers = format!(
        "{}:{}",
        "host.docker.internal",
        kafka_node.get_host_port_ipv4(KAFKA_PORT).await.unwrap()
    );

it works just fine.

@rkhudov-tails
Copy link

@DDtKey you said that doesn't provide info about possible customization/configuration. What do you mean?

@DDtKey
Copy link
Contributor

DDtKey commented Jan 8, 2025

I meant that it's unclear which container runtime is used, how it's configured and if devcontainer has any customization over docker socker/host.
But generally, since container is started and working - it's less important. Better to focus on network settings

@rkhudov-tails
Copy link

In .devcontainer I do have following:

"mounts": [
        "source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind"
    ]

I think the problem is definitely with network settings. Just need to figure out how to solve it...

@DDtKey
Copy link
Contributor

DDtKey commented Jan 8, 2025

Generally, I guess the issue is that container is available on your host machine. 127.0.0.1 inside devcontainer refers to itself, but socket is used from host system.

Did you try to use host.docker.internal? Because in that case you will be accessing host machine

@rkhudov-tails
Copy link

Do you mean in env variables? or you mean you it as host for bootstrap_servers?

When I am using host.docker.internal as host for bootstrap_servers, I start to get following error:

ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): host.docker.internal:55809/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
 ERROR rdkafka::client > librdkafka: Global error: AllBrokersDown (Local: All broker connections are down): 1/1 brokers are down
 ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): host.docker.internal:55809/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
 ERROR librdkafka      > librdkafka: FAIL [thrd:host.docker.internal:55809/bootstrap]: host.docker.internal:55809/bootstrap: Connect to ipv4#192.168.65.254:55809 failed: Connection refused (after 6ms in state CONNECT)
 ERROR rdkafka::client > librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): host.docker.internal:55809/bootstrap: Connect to ipv4#192.168.65.254:55809 failed: Connection refused (after 6ms in state CONNECT)

I also tried different cases with
kafka_node.get_host_port_ipv4(testcontainers::core::ContainerPort::Tcp(9092)).await.unwrap() and kafka_node.get_host_port_ipv4(testcontainers::core::ContainerPort::Tcp(9094)).await.unwrap()

@DDtKey
Copy link
Contributor

DDtKey commented Jan 8, 2025

Just as an option, you can try to specify some bridge network for your devcontainer and then pass the same network to testcontainer. After which you can use get_bridge_ip_address 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants