Skip to content

Commit

Permalink
Merge pull request #290 from chdb-io/patchset-2.2.0b2
Browse files Browse the repository at this point in the history
Refactor query execution to use connection object
  • Loading branch information
auxten authored Jan 7, 2025
2 parents 01afb91 + 9bbfd3a commit e5fbb15
Show file tree
Hide file tree
Showing 28 changed files with 806 additions and 415 deletions.
44 changes: 17 additions & 27 deletions .github/workflows/pr_ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,23 @@ jobs:
env:
PYTHON_VERSIONS: "3.11"

runs-on: self-hosted
runs-on: gh-64c
steps:
- name: Check for chdb directory
run: |
if [ ! -d "/home/ubuntu/pr_runner/chdb" ]; then
echo "chdb directory does not exist. Checkout the repository."
mkdir -p /home/ubuntu/pr_runner/
git clone https://github.com/chdb-io/chdb.git /home/ubuntu/pr_runner/chdb
fi
- name: Clone chDB repository
uses: actions/checkout@v2
with:
repository: "chdb-io/chdb"
ref: "refs/pull/${{ github.event.pull_request.number }}/merge"
token: ${{ secrets.GITHUB_TOKEN }}

- name: Cleanup and update chdb directory
run: |
cd /home/ubuntu/pr_runner/chdb
git fetch origin || true
git fetch origin +refs/heads/*:refs/remotes/origin/* +refs/pull/${{ github.event.pull_request.number }}/merge:refs/remotes/pull/${{ github.event.pull_request.number }}/merge || true
git reset --hard origin/${{ github.head_ref }} || true
git clean -fdx || true
git checkout --progress --force refs/remotes/pull/${{ github.event.pull_request.number }}/merge || true
git status -v || true
continue-on-error: true
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: 3.11

- name: Code style check
run: |
export PYENV_ROOT="$HOME/.pyenv"
[[ -d $PYENV_ROOT/bin ]] && export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
pyenv local 3.11
python3 -m pip install flake8
cd chdb && python3 -m flake8
working-directory: /home/ubuntu/pr_runner/chdb
- name: Install flake8
run: python -m pip install flake8

- name: Run flake8 on chdb directory
run: cd chdb && flake8 .

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*.logrt

/python_pkg/
minitest/
/tmps
/bak
*state_tmp_*
Expand Down
37 changes: 33 additions & 4 deletions chdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import os
import threading


class ChdbError(Exception):
Expand Down Expand Up @@ -29,7 +30,9 @@ class ChdbError(Exception):
from . import _chdb # noqa

os.chdir(cwd)
engine_version = str(_chdb.query("SELECT version();", "CSV").bytes())[3:-4]
conn = _chdb.connect()
engine_version = str(conn.query("SELECT version();", "CSV").bytes())[3:-4]
conn.close()
else:
raise NotImplementedError("Python 3.6 or lower version is not supported")

Expand Down Expand Up @@ -64,18 +67,44 @@ def to_df(r):
return t.to_pandas(use_threads=True)


# global connection lock, for multi-threading use of legacy chdb.query()
g_conn_lock = threading.Lock()


# wrap _chdb functions
def query(sql, output_format="CSV", path="", udf_path=""):
global g_udf_path
if udf_path != "":
g_udf_path = udf_path
conn_str = ""
if path == "":
conn_str = ":memory:"
else:
conn_str = f"{path}"
if g_udf_path != "":
if "?" in conn_str:
conn_str = f"{conn_str}&udf_path={g_udf_path}"
else:
conn_str = f"{conn_str}?udf_path={g_udf_path}"
if output_format == "Debug":
output_format = "CSV"
if "?" in conn_str:
conn_str = f"{conn_str}&verbose&log-level=test"
else:
conn_str = f"{conn_str}?verbose&log-level=test"

lower_output_format = output_format.lower()
result_func = _process_result_format_funs.get(lower_output_format, lambda x: x)
if lower_output_format in _arrow_format:
output_format = "Arrow"
res = _chdb.query(sql, output_format, path=path, udf_path=g_udf_path)
if res.has_error():
raise ChdbError(res.error_message())

with g_conn_lock:
conn = _chdb.connect(conn_str)
res = conn.query(sql, output_format)
if res.has_error():
conn.close()
raise ChdbError(res.error_message())
conn.close()
return result_func(res)


Expand Down
2 changes: 1 addition & 1 deletion chdb/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ CMAKE_ARGS="-DCMAKE_BUILD_TYPE=${build_type} -DENABLE_THINLTO=0 -DENABLE_TESTS=0
-DENABLE_PROTOBUF=1 -DENABLE_THRIFT=1 -DENABLE_MSGPACK=1 \
-DENABLE_BROTLI=1 -DENABLE_H3=1 -DENABLE_CURL=1 \
-DENABLE_CLICKHOUSE_ALL=0 -DUSE_STATIC_LIBRARIES=1 -DSPLIT_SHARED_LIBRARIES=0 \
-DENABLE_SIMDJSON=1 \
-DENABLE_SIMDJSON=1 -DENABLE_RAPIDJSON=1 \
${CPU_FEATURES} \
${CMAKE_TOOLCHAIN_FILE} \
-DENABLE_AVX512=0 -DENABLE_AVX512_VBMI=0 \
Expand Down
88 changes: 77 additions & 11 deletions chdb/session/state.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,100 @@
import tempfile
import shutil
import warnings

from chdb import query
import chdb
from ..state import sqlitelike as chdb_stateful


g_session = None
g_session_path = None


class Session:
"""
Session will keep the state of query. All DDL and DML state will be kept in a dir.
Dir path could be passed in as an argument. If not, a temporary dir will be created.
Session will keep the state of query.
If path is None, it will create a temporary directory and use it as the database path
and the temporary directory will be removed when the session is closed.
You can also pass in a path to create a database at that path where will keep your data.
You can also use a connection string to pass in the path and other parameters.
Examples:
- ":memory:" (for in-memory database)
- "test.db" (for relative path)
- "file:test.db" (same as above)
- "/path/to/test.db" (for absolute path)
- "file:/path/to/test.db" (same as above)
- "file:test.db?param1=value1&param2=value2" (for relative path with query params)
- "file::memory:?verbose&log-level=test" (for in-memory database with query params)
- "///path/to/test.db?param1=value1&param2=value2" (for absolute path)
If path is not specified, the temporary dir will be deleted when the Session object is deleted.
Otherwise path will be kept.
Connection string args handling:
Connection string can contain query params like "file:test.db?param1=value1&param2=value2"
"param1=value1" will be passed to ClickHouse engine as start up args.
Note: The default database is "_local" and the default engine is "Memory" which means all data
will be stored in memory. If you want to store data in disk, you should create another database.
For more details, see `clickhouse local --help --verbose`
Some special args handling:
- "mode=ro" would be "--readonly=1" for clickhouse (read-only mode)
Important:
- There can be only one session at a time. If you want to create a new session, you need to close the existing one.
- Creating a new session will close the existing one.
"""

def __init__(self, path=None):
global g_session, g_session_path
if g_session is not None:
warnings.warn(
"There is already an active session. Creating a new session will close the existing one. "
"It is recommended to close the existing session before creating a new one. "
f"Closing the existing session {g_session_path}"
)
g_session.close()
g_session_path = None
if path is None:
self._cleanup = True
self._path = tempfile.mkdtemp()
else:
self._cleanup = False
self._path = path
if chdb.g_udf_path != "":
self._udf_path = chdb.g_udf_path
# add udf_path to conn_str here.
# - the `user_scripts_path` will be the value of `udf_path`
# - the `user_defined_executable_functions_config` will be `user_scripts_path/*.xml`
# Both of them will be added to the conn_str in the Connection class
if "?" in self._path:
self._conn_str = f"{self._path}&udf_path={self._udf_path}"
else:
self._conn_str = f"{self._path}?udf_path={self._udf_path}"
else:
self._udf_path = ""
self._conn_str = f"{self._path}"
self._conn = chdb_stateful.Connection(self._conn_str)
g_session = self
g_session_path = self._path

def __del__(self):
if self._cleanup:
self.cleanup()
self.close()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
self.close()

def close(self):
if self._cleanup:
self.cleanup()
if self._conn is not None:
self._conn.close()
self._conn = None

def cleanup(self):
try:
if self._conn is not None:
self._conn.close()
self._conn = None
shutil.rmtree(self._path)
except: # noqa
pass
Expand All @@ -44,7 +103,14 @@ def query(self, sql, fmt="CSV", udf_path=""):
"""
Execute a query.
"""
return query(sql, fmt, path=self._path, udf_path=udf_path)
if fmt == "Debug":
warnings.warn(
"""Debug format is not supported in Session.query
Please try use parameters in connection string instead:
Eg: conn = connect(f"db_path?verbose&log-level=test")"""
)
fmt = "CSV"
return self._conn.query(sql, fmt)

# alias sql = query
sql = query
3 changes: 2 additions & 1 deletion chdb/state/sqlitelike.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ def connect(connection_string: str = ":memory:") -> Connection:
Args:
connection_string (str, optional): Connection string. Defaults to ":memory:".
Aslo support file path like:
Also support file path like:
- ":memory:" (for in-memory database)
- "test.db" (for relative path)
- "file:test.db" (same as above)
- "/path/to/test.db" (for absolute path)
- "file:/path/to/test.db" (same as above)
- "file:test.db?param1=value1&param2=value2" (for relative path with query params)
- "file::memory:?verbose&log-level=test" (for in-memory database with query params)
- "///path/to/test.db?param1=value1&param2=value2" (for absolute path)
Connection string args handling:
Expand Down
2 changes: 1 addition & 1 deletion chdb/test_smoke.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ python3 -c \
"import chdb; res = chdb._chdb.query('select version()', 'CSV'); print(res)"

python3 -c \
"import chdb; res = chdb.query('select version()', 'CSV'); print(res.bytes())"
"import chdb; res = chdb.query('select version()', 'Debug'); print(res.bytes())"

# test json function
python3 -c \
Expand Down
Loading

0 comments on commit e5fbb15

Please sign in to comment.