Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExtractTablesMapping's DESCRIBE query times out #184

Open
hekike opened this issue Sep 15, 2023 · 4 comments
Open

ExtractTablesMapping's DESCRIBE query times out #184

hekike opened this issue Sep 15, 2023 · 4 comments
Assignees
Labels
bug Something isn't working

Comments

@hekike
Copy link
Contributor

hekike commented Sep 15, 2023

Describe the bug

We see every 5-10 minutes, DESCRIBE queries timeout in Kafka Connect while they have 1ms execution_duration in system.query_log. We run our connector task with tableRefreshInterval=15. Based on system.query_log multiple extractTablesMapping overlap. I'm also seeing that SHOW TABLES queries shouldn't run less frequently than 15s runs. I'm wondering if we should add a check that doesn't start ExtractTablesMapping if it's already running.

Steps to reproduce

  1. Run a connector with tableRefreshInterval=15
  2. Generate test data into Kafka queue
  3. Observe Kafka Connect logs

Expected behavior

DESCRIBE query not to timeout.

Error log

8:23:28 AM	ERROR
Exception when running describeTable DESCRIBE TABLE `my_database`.`my_table`
connect timed out, server ClickHouseNode [uri=https://{redacted}.us-east-2.aws.clickhouse.cloud:8443/mydb, options={sslmode=STRICT}]@2074393920
com.clickhouse.client.ClickHouseException: connect timed out, server ClickHouseNode [uri=https://{redacted}.us-east-2.aws.clickhouse.cloud:8443/openmeter, options={sslmode=STRICT}]@-166211039
	at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:164)
	at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:275)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:305)
	at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
	at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:509)
	at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:604)
	at java.base/sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:266)
	at java.base/sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:373)
	at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:207)
	at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1187)
	at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
	at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:193)
	at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1367)
	at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1342)
	at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:246)
	at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:225)
	at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:124)
	at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161)
	at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273)

Configuration

{
  "connector.client.config.override.policy": "ALL",
  "consumer.auto.offset.reset": "latest",
  "consumer.override.auto.offset.reset": "latest",
  "consumer.override.max.poll.records": "5000",
  "database": "mydb",
  "errors.deadletterqueue.context.headers.enable": "true",
  "errors.deadletterqueue.topic.name": "om_deadletterqueue",
  "errors.deadletterqueue.topic.replication.factor": "3",
  "errors.retry.timeout": "30",
  "errors.tolerance": "all",
  "exactlyOnce": "true",
  "hostname": "{redacted}.us-east-2.aws.clickhouse.cloud",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "password": "****************",
  "port": "8443",
  "schemas.enable": "false",
  "ssl": "true",
  "tableRefreshInterval": "15",
  "topics.regex": "^om_[A-Za-z0-9]+(?:_[A-Za-z0-9]+)*_events$",
  "username": "default",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false"
}

Environment

  • Kafka-Connect version: Confluent Cloud
  • Kafka Connect configuration: see above
  • Kafka version: Confluent Cloud
  • Kafka environment: Confluent Cloud
  • OS: Linux

ClickHouse server

  • ClickHouse Server version: ClickHouse Cloud (v23.8)
  • ClickHouse Server non-default settings, if any: ClickHouse Cloud
  • CREATE TABLE statements for tables involved: -
  • Sample data for all these tables, use clickhouse-obfuscator if necessary

cc @mzitnik

@hekike hekike added the bug Something isn't working label Sep 15, 2023
@mzitnik
Copy link
Collaborator

mzitnik commented Sep 19, 2023

@hekike How many workers are you running in parallel & can you show the output from the query log

@Paultagoras
Copy link
Contributor

Hi @hekike - I'm closing this because we haven't heard anything in a while, but if it's still an issue please comment with the details @mzitnik was asking for and we can take another look into it. Thanks!

@hekike
Copy link
Contributor Author

hekike commented Oct 11, 2023

We chatted with @mzitnik offline but couldn't figure out root cause.

@Keremgunduz7
Copy link

Keremgunduz7 commented Jul 10, 2024

I think the issue here sending lots of describe queries simultaneously with an HTTP connection.

We are receiving a similar issue, instead of timeout. Clickhouse starts to refuse new HTTP connections somehow although there is no limit.

But if we open the connectors one by one, (in other words not sending lots of describe tables simultaneously)
Everything works fine.

Here is a patched file to resolve HTTP connection issues. Maybe that can be the also upper issue's cause.

package com.clickhouse.kafka.connect.sink.db.helper;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseNodeSelector;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseProxyType;
import com.clickhouse.config.ClickHouseOption;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.data.ClickHouseValue;
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.util.Utils;
import com.fasterxml.jackson.core.JsonProcessingException;

import lombok.Getter;

public class ClickHouseHelperClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseHelperClient.class);

    private final String hostname;
    private final int port;
    private final String username;
    @Getter
    private final String database;
    private final String password;
    private final boolean sslEnabled;
    private final String jdbcConnectionProperties;
    private final int timeout;
    @Getter
    private ClickHouseNode server = null;
    private final int retry;
    private ClickHouseProxyType proxyType = null;
    private String proxyHost = null;
    private int proxyPort = -1;

    public ClickHouseHelperClient(ClickHouseClientBuilder builder) {
        this.hostname = builder.hostname;
        this.port = builder.port;
        this.username = builder.username;
        this.password = builder.password;
        this.database = builder.database;
        this.sslEnabled = builder.sslEnabled;
        this.jdbcConnectionProperties = builder.jdbcConnectionProperties;
        this.timeout = builder.timeout;
        this.retry = builder.retry;
        this.proxyType = builder.proxyType;
        this.proxyHost = builder.proxyHost;
        this.proxyPort = builder.proxyPort;
        this.server = create();
    }

    public Map<ClickHouseOption, Serializable> getDefaultClientOptions() {
        Map<ClickHouseOption, Serializable> options = new HashMap<>();
        options.put(ClickHouseClientOption.PRODUCT_NAME, "clickhouse-kafka-connect/"+ClickHouseClientOption.class.getPackage().getImplementationVersion());
        if (proxyType != null && !proxyType.equals(ClickHouseProxyType.IGNORE)) {
            options.put(ClickHouseClientOption.PROXY_TYPE, proxyType);
            options.put(ClickHouseClientOption.PROXY_HOST, proxyHost);
            options.put(ClickHouseClientOption.PROXY_PORT, proxyPort);
        }
        return options;
    }

    private ClickHouseNode create() {
        String protocol = "http";
        if (this.sslEnabled)
            protocol += "s";

        String tmpJdbcConnectionProperties = jdbcConnectionProperties;
        if (tmpJdbcConnectionProperties != null && !tmpJdbcConnectionProperties.startsWith("?")) {
            tmpJdbcConnectionProperties = "?" + tmpJdbcConnectionProperties;
        }

        String url = String.format("%s://%s:%d/%s%s", 
                protocol, 
                hostname, 
                port, 
                database,
                tmpJdbcConnectionProperties
        );

        LOGGER.info("ClickHouse URL: {}", url);

        if (username != null && password != null) {
            LOGGER.debug(String.format("Adding username [%s]", username));
            Map<String, String> options = new HashMap<>();
            options.put("user", username);
            options.put("password", password);
            server = ClickHouseNode.of(url, options);
        } else {
            server = ClickHouseNode.of(url);
        }
        return server;
    }

    public boolean ping() {
        ClickHouseClient clientPing = ClickHouseClient.builder()
                .options(getDefaultClientOptions())
                .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                .build();
        LOGGER.debug(String.format("Server [%s] , Timeout [%d]", server, timeout));
        int retryCount = 0;

        while (retryCount < retry) {
            if (clientPing.ping(server, timeout)) {
                LOGGER.info("Ping was successful.");
                clientPing.close();
                return true;
            }
            retryCount++;
            LOGGER.warn(String.format("Ping retry %d out of %d", retryCount, retry));
        }
        LOGGER.error("Unable to ping ClickHouse instance.");
        clientPing.close();
        return false;
    }

    public String version() {
        try (ClickHouseClient client = ClickHouseClient.builder()
                .options(getDefaultClientOptions())
                .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                .build();
             ClickHouseResponse response = client.read(server)
                     .query("SELECT VERSION()")
                     .executeAndWait()) {
            return response.firstRecord().getValue(0).asString();
        } catch (ClickHouseException e) {
            LOGGER.error("Exception when trying to retrieve VERSION()", e);
            return null;
        }
    }

    public ClickHouseResponse query(String query) {
        return query(query, null);
    }

    public ClickHouseResponse query(String query, ClickHouseFormat clickHouseFormat) {
        int retryCount = 0;
        ClickHouseException ce = null;
        while (retryCount < retry) {
            try (ClickHouseClient client = ClickHouseClient.builder()
                    .options(getDefaultClientOptions())
                    .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                    .build();
                 ClickHouseResponse response = client.read(server)
                         .format(clickHouseFormat)
                         .query(query)
                         .executeAndWait()) {
                return response;
            } catch (ClickHouseException e) {
                retryCount++;
                LOGGER.warn(String.format("Query retry %d out of %d", retryCount, retry), e);
                ce = e;
            }
        }
        throw new RuntimeException(ce);
    }

    public List<String> showTables(String database) {
        List<String> tablesNames = new ArrayList<>();
        try (ClickHouseClient client = ClickHouseClient.builder()
                .options(getDefaultClientOptions())
                .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                .build();
             ClickHouseResponse response = client.read(server)
                     .query(String.format("SHOW TABLES FROM `%s`", database))
                     .executeAndWait()) {
            for (ClickHouseRecord r : response.records()) {
                ClickHouseValue v = r.getValue(0);
                String tableName = v.asString();
                tablesNames.add(tableName);
            }
        } catch (ClickHouseException e) {
            LOGGER.error("Failed in show tables", e);
        }
        return tablesNames;
    }

    public Table describeTable(String database, String tableName, ClickHouseClient client) {
        if (tableName.startsWith(".inner"))
            return null;
        String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", database, tableName);
        LOGGER.debug(describeQuery);

        if (client == null) {
            try {
                client = ClickHouseClient.builder()
                    .options(getDefaultClientOptions())
                    .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                    .build(); 
                }
            catch (Exception e) {
                LOGGER.error("Failed to create client", e);
                return null;
            }
        }

        try (ClickHouseResponse response = client.read(server)
                     .set("describe_include_subcolumns", true)
                     .format(ClickHouseFormat.JSONEachRow)
                     .query(describeQuery)
                     .executeAndWait()) {

            Table table = new Table(database, tableName);
            for (ClickHouseRecord r : response.records()) {
                ClickHouseValue v = r.getValue(0);

                ClickHouseFieldDescriptor fieldDescriptor = ClickHouseFieldDescriptor.fromJsonRow(v.asString());
                if (fieldDescriptor.isAlias() || fieldDescriptor.isMaterialized()) {
                    LOGGER.debug("Skipping column {} as it is an alias or materialized view", fieldDescriptor.getName());
                    continue;
                }

                if (fieldDescriptor.hasDefault()) {
                    table.hasDefaults(true);
                }

                Column column = Column.extractColumn(fieldDescriptor);
                //If we run into a rare column we can't handle, just ignore the table and warn the user
                if (column == null) {
                    LOGGER.warn("Unable to handle column: {}", fieldDescriptor.getName());
                    return null;
                }
                table.addColumn(column);
            }
            return table;
        } catch (ClickHouseException | JsonProcessingException e) {
            LOGGER.error(String.format("Exception when running describeTable %s", describeQuery), e);
            return null;
        }
    }

    public Table describeTable(String database, String tableName) {
        return describeTable(database, tableName, null);
    }


    public List<Table> extractTablesMapping(String database, Map<String, Table> cache) {
        List<Table> tableList =  new ArrayList<>();
        try (ClickHouseClient client = ClickHouseClient.builder()
                .options(getDefaultClientOptions())
                .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                .build()) {
            for (String tableName : showTables(database) ) {
                // (Full) Table names are escaped in the cache
                String escapedTableName = Utils.escapeTableName(database, tableName);

                // Read from cache if we already described this table before
                // This means we won't pick up edited table configs until the connector is restarted
                if (cache.containsKey(escapedTableName)) {
                    tableList.add(cache.get(escapedTableName));
                    continue;
                }

                Table table = describeTable(this.database, tableName, client);
                if (table != null )
                    tableList.add(table);
            }
            return tableList;
        } catch (Exception e) {
        LOGGER.error("Failed to extract tables mapping", e);
        return null;
    }

    public static class ClickHouseClientBuilder {
        private ClickHouseSinkConfig config = null;
        private String hostname = null;
        private int port = -1;
        private String username = "default";
        private String database = "default";
        private String password = "";
        private boolean sslEnabled = false;
        private String jdbcConnectionProperties = "";
        private int timeout = ClickHouseSinkConfig.timeoutSecondsDefault * ClickHouseSinkConfig.MILLI_IN_A_SEC;
        private int retry = ClickHouseSinkConfig.retryCountDefault;

        private ClickHouseProxyType proxyType = null;
        private String proxyHost = null;
        private int proxyPort = -1;

        public ClickHouseClientBuilder(String hostname, int port, ClickHouseProxyType proxyType, String proxyHost, int proxyPort) {
            this.hostname = hostname;
            this.port = port;
            this.proxyType = proxyType;
            this.proxyHost = proxyHost;
            this.proxyPort = proxyPort;
        }


        public ClickHouseClientBuilder setUsername(String username) {
            this.username = username;
            return this;
        }

        public ClickHouseClientBuilder setPassword(String password) {
            this.password = password;
            return this;
        }

        public ClickHouseClientBuilder setDatabase(String database) {
            this.database = database;
            return this;
        }

        public ClickHouseClientBuilder sslEnable(boolean sslEnabled) {
            this.sslEnabled = sslEnabled;
            return this;
        }

        public ClickHouseClientBuilder setJdbcConnectionProperties(String jdbcConnectionProperties) {
            this.jdbcConnectionProperties = jdbcConnectionProperties;
            return this;
        }

        public ClickHouseClientBuilder setTimeout(int timeout) {
            this.timeout = timeout;
            return this;
        }

        public ClickHouseClientBuilder setRetry(int retry) {
            this.retry = retry;
            return this;
        }

        public ClickHouseHelperClient build(){
            return new ClickHouseHelperClient(this);
        }

    }
}

@mzitnik

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants