Skip to content

Commit

Permalink
Common clickhouse client (#106)
Browse files Browse the repository at this point in the history
* Common client instead of monrun client

* Delete monrun client

* Lint

* Test client replaced

* Geobase error type

* Disable temp import

* Small fixes

* Small fixes 2

* Small fixes 3

* Pylint fix

* Mypy fix

* Import fix

* Print error message on unknown_exception

* query_json_data and other fixes
  • Loading branch information
kirillgarbar authored Feb 28, 2024
1 parent 3e4e914 commit 986522e
Show file tree
Hide file tree
Showing 17 changed files with 419 additions and 390 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ test-unit: install-deps
.PHONY: test-integration
test-integration: install-deps build-python-packages
cd $(TESTS_DIR)
export PYTHONPATH=$(CURDIR):$$PATH
$(POETRY) run behave --show-timings --stop --junit $(BEHAVE_ARGS)


Expand Down
278 changes: 229 additions & 49 deletions ch_tools/common/clickhouse/client/clickhouse_client.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,78 @@
import json
import logging
import subprocess
import xml.etree.ElementTree as xml
from datetime import timedelta
from enum import Enum
from typing import Any, Dict, Optional

import requests
from jinja2 import Environment
from typing_extensions import Self

from ch_tools.common.clickhouse.config.path import (
CLICKHOUSE_CERT_CONFIG_PATH,
CLICKHOUSE_CERT_PATH_DEFAULT,
CLICKHOUSE_SERVER_PREPROCESSED_CONFIG_PATH,
)
from ch_tools.common.utils import version_ge

from .error import ClickhouseError
from .retry import retry
from .utils import _format_str_imatch, _format_str_match


class ClickhousePort(Enum):
HTTPS = 4
HTTP = 3
TCP_SECURE = 2
TCP = 1
AUTO = 0 # Select any available port


class ClickhousePortHelper:
_map = {
"https_port": ClickhousePort.HTTPS,
"http_port": ClickhousePort.HTTP,
"tcp_port_secure": ClickhousePort.TCP_SECURE,
"tcp_port": ClickhousePort.TCP,
}

@classmethod
def get(
cls, port: str, default: ClickhousePort = ClickhousePort.AUTO
) -> ClickhousePort:
return cls._map.get(port, default)

@classmethod
def list(cls):
return cls._map.keys()


class ClickhouseClient:
"""
ClickHouse client wrapper.
"""

def __init__(
self,
self: Self,
*,
host,
protocol,
insecure,
port,
user,
password,
timeout,
settings,
host: str,
insecure: bool = False,
user: Optional[str] = None,
password: Optional[str] = None,
ports: Dict[ClickhousePort, str],
cert_path: Optional[str] = None,
timeout: int,
settings: Optional[Dict[str, Any]] = None,
):
self._session = self._create_session(
user=user, password=password, insecure=insecure
)
self._url = f"{protocol}://{host}:{port}"
self._settings = settings
self.host = host
self.insecure = insecure
self.user = user
self.ports = ports
self.cert_path = cert_path
self.password = password
self._settings = settings or {}
self._timeout = timeout
self._ch_version = None

Expand All @@ -54,6 +92,96 @@ def get_uptime(self):
seconds = int(self.query("SELECT uptime()"))
return timedelta(seconds=seconds)

def _execute_http(
self,
query,
format_,
post_data,
timeout,
stream,
per_query_settings,
port,
):
schema = "https" if port == ClickhousePort.HTTPS else "http"
url = f"{schema}://{self.host}:{self.ports[port]}"
headers = {}
if self.user:
headers["X-ClickHouse-User"] = self.user
if self.password:
headers["X-ClickHouse-Key"] = self.password
verify = self.cert_path if port == ClickhousePort.HTTPS else None
try:
if query:
response = requests.post(
url,
params={
**self._settings,
"query": query,
**per_query_settings, # overwrites previous settings
},
headers=headers,
json=post_data,
timeout=timeout,
stream=stream,
verify=verify,
)
else:
# Used for ping
response = requests.get(
url,
headers=headers,
timeout=timeout,
verify=verify,
)

response.raise_for_status()

# Return response for iterating over
if stream:
return response

if format_ in ("JSON", "JSONCompact"):
return response.json()

return response.text.strip()
except requests.exceptions.HTTPError as e:
raise ClickhouseError(query, e.response) from None

def _execute_tcp(self, query, format_, port):
# Private method, we are sure that port is tcps or tcp and presents in config
cmd = [
"clickhouse-client",
"--host",
self.host,
"--port",
self.ports[port],
]
if self.user is not None:
cmd.extend(("--user", self.user))
if self.password is not None:
cmd.extend(("--password", self.password))
if port == ClickhousePort.TCP_SECURE:
cmd.append("--secure")

if not query:
raise RuntimeError(1, "Can't send empty query in tcp(s) port")

# pylint: disable=consider-using-with
proc = subprocess.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE # type: ignore[arg-type]
)
stdout, stderr = proc.communicate(input=query.encode())

if proc.returncode:
raise RuntimeError('"{0}" failed with: {1}'.format(cmd, stderr.decode()))

response = stdout.decode().strip()

if format_ in ("JSON", "JSONCompact"):
return json.loads(response)

return response.strip()

@retry(requests.exceptions.ConnectionError)
def query(
self: Self,
Expand All @@ -66,6 +194,7 @@ def query(
dry_run: bool = False,
stream: bool = False,
settings: Optional[dict] = None,
port: ClickhousePort = ClickhousePort.AUTO,
) -> Any:
"""
Execute query.
Expand All @@ -87,32 +216,60 @@ def query(

per_query_settings = settings or {}

found_port = port
if found_port == ClickhousePort.AUTO:
for port_ in ClickhousePort:
if self.check_port(port_):
found_port = port_
break
if found_port == ClickhousePort.AUTO:
raise UserWarning(2, "Can't find any port in clickhouse-server config")

logging.debug("Executing query: %s", query)
try:
response = self._session.post(
self._url,
params={
**self._settings,
"query": query,
**per_query_settings, # overwrites previous settings
},
json=post_data,
timeout=timeout,
stream=stream,
if found_port in [ClickhousePort.HTTPS, ClickhousePort.HTTP]:
return self._execute_http(
query,
format_,
post_data,
timeout,
stream,
per_query_settings,
found_port,
)
return self._execute_tcp(query, format_, found_port)

response.raise_for_status()

# Return response for iterating over
if stream:
return response

if format_ in ("JSON", "JSONCompact"):
return response.json()

return response.text.strip()
except requests.exceptions.HTTPError as e:
raise ClickhouseError(query, e.response) from None
def query_json_data(
self: Self,
query: str,
query_args: Optional[Dict[str, Any]] = None,
compact: bool = True,
post_data: Any = None,
timeout: Optional[int] = None,
echo: bool = False,
dry_run: bool = False,
stream: bool = False,
settings: Optional[dict] = None,
port: ClickhousePort = ClickhousePort.AUTO,
) -> Any:
"""
Execute ClickHouse query formatted as JSON and return data.
"""
format_ = "JSON"
if compact:
format_ = "JSONCompact"

return self.query(
query=query,
query_args=query_args,
post_data=post_data,
timeout=timeout,
format_=format_,
echo=echo,
dry_run=dry_run,
stream=stream,
settings=settings,
port=port,
)["data"]

def render_query(self, query, **kwargs):
env = Environment()
Expand All @@ -126,18 +283,16 @@ def render_query(self, query, **kwargs):
template = env.from_string(query)
return template.render(kwargs)

@staticmethod
def _create_session(user, password, insecure):
session = requests.Session()

session.verify = False if insecure else "/etc/clickhouse-server/ssl/allCAs.pem"
def check_port(self, port=ClickhousePort.AUTO):
if port == ClickhousePort.AUTO:
return bool(self.ports) # Has any port
return port in self.ports

if user:
session.headers["X-ClickHouse-User"] = user
if password:
session.headers["X-ClickHouse-Key"] = password
def get_port(self, port):
return self.ports.get(port, 0)

return session
def ping(self, port=ClickhousePort.AUTO):
return self.query(query=None, port=port)


def clickhouse_client(ctx):
Expand All @@ -146,15 +301,16 @@ def clickhouse_client(ctx):
Init ClickHouse client and store to the context if it doesn't exist.
"""
if not ctx.obj.get("chcli"):
ports, cert_path = get_ports()
config = ctx.obj["config"]["clickhouse"]
user, password = clickhouse_credentials(ctx)
ctx.obj["chcli"] = ClickhouseClient(
host=config["host"],
protocol=config["protocol"],
insecure=config["insecure"],
port=config["port"],
user=user,
password=password,
ports=ports,
cert_path=cert_path,
timeout=config["timeout"],
settings=config["settings"],
)
Expand All @@ -175,3 +331,27 @@ def clickhouse_credentials(ctx):
password = config["monitoring_password"]

return user, password


def get_ports():
ports: Dict[ClickhousePort, str] = {}
try:
root = xml.parse(CLICKHOUSE_SERVER_PREPROCESSED_CONFIG_PATH)
for setting in ClickhousePortHelper.list():
node = root.find(setting)
if node is not None:
ports[ClickhousePortHelper.get(setting)] = str(node.text)
if not ports:
raise UserWarning(2, "Can't find any port in clickhouse-server config")
node = root.find(CLICKHOUSE_CERT_CONFIG_PATH)
cert_path = CLICKHOUSE_CERT_PATH_DEFAULT
if node is not None:
cert_path = str(node.text)

except FileNotFoundError as e:
raise UserWarning(2, f"clickhouse-server config not found: {e.filename}")

except Exception as e:
raise UserWarning(2, f"Failed to parse clickhouse-server config: {e}")

return ports, cert_path
2 changes: 2 additions & 0 deletions ch_tools/common/clickhouse/config/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@
)
CLICKHOUSE_KEEPER_CONFIG_PATH = "/etc/clickhouse-keeper/config.xml"
CLICKHOUSE_USERS_CONFIG_PATH = "/etc/clickhouse-server/users.xml"
CLICKHOUSE_CERT_PATH_DEFAULT = "/etc/clickhouse-server/ssl/allCAs.pem"
CLICKHOUSE_CERT_CONFIG_PATH = "./openSSL/server/caConfig"
Loading

0 comments on commit 986522e

Please sign in to comment.