Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[search/save] Run bystro-stats on saved annotations #321

Merged
merged 6 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/beanstalk.clean.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ beanstalkd:
ancestry:
submission: ancestry
events: ancestry_events
proteomics:
submission: proteomics
events: proteomics_events
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[build-system]
requires = ["maturin>=0.14,<0.15", "setuptools", "wheel", "Cython"]
requires = ["maturin>=1.3.0,<1.4.0", "setuptools", "wheel", "Cython"]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this we get warnings on building the python project.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we just fold that into a code comment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's caused by a mismatch between the version in requirements.txt and pyproject.toml. Do you think that's a connection that needs to be drawn (that the packages in [build-system] are required to be in your environment?)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have full context here but in general whenever we're pinning on a minor release I've never regretted documenting the reason :). If there's a version mismatch between requirements.txt and pyproject.toml, optimally we'd resolve the mismatch, but if we can't it's probably worth jotting down somewhere why they have to differ.

build-backend = "maturin"

[project]
Expand Down
14 changes: 7 additions & 7 deletions python/python/bystro/search/index/bystro_file.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ cdef class ReadAnnotationTarball:
list paths
int id

def __cinit__(self, str index_name, dict delimiters, str tar_path, str annotation_name = 'annotation.tsv.gz', int chunk_size=500):
def __cinit__(self, str index_name, object delimiters, str tar_path, str annotation_name = 'annotation.tsv.gz', int chunk_size=500):
self.index_name = index_name
self.chunk_size = chunk_size
self.field_separator = delimiters['field']
self.position_delimiter = delimiters['position']
self.overlap_delimiter = delimiters['overlap']
self.value_delimiter = delimiters['value']
self.empty_field_char = delimiters['empty_field']
self.field_separator = delimiters.field
self.position_delimiter = delimiters.position
self.overlap_delimiter = delimiters.overlap
self.value_delimiter = delimiters.value
self.empty_field_char = delimiters.empty_field

t = tarfile.open(tar_path)
for member in t.getmembers():
Expand Down Expand Up @@ -117,5 +117,5 @@ cdef class ReadAnnotationTarball:
def get_header_fields(self):
return self.header_fields

cpdef ReadAnnotationTarball read_annotation_tarball(str index_name, dict delimiters, str tar_path, str annotation_name = 'annotation.tsv.gz', int chunk_size=500):
cpdef ReadAnnotationTarball read_annotation_tarball(str index_name, object delimiters, str tar_path, str annotation_name = 'annotation.tsv.gz', int chunk_size=500):
return ReadAnnotationTarball(index_name, delimiters, tar_path, annotation_name, chunk_size)
18 changes: 12 additions & 6 deletions python/python/bystro/search/index/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

from bystro.beanstalkd.worker import ProgressPublisher, get_progress_reporter
from bystro.search.index.bystro_file import ( # type: ignore # pylint: disable=no-name-in-module,import-error # noqa: E501
read_annotation_tarball,
read_annotation_tarball,
)
from bystro.search.utils.annotation import get_delimiters
from bystro.search.utils.annotation import DelimitersConfig
from bystro.search.utils.opensearch import gather_opensearch_args

ray.init(ignore_reinit_error=True, address="auto")
Expand Down Expand Up @@ -47,7 +47,9 @@ async def index(self, data):
self.counter += resp[0]

if self.counter >= self.reporter_batch:
await asyncio.to_thread(self.progress_tracker.increment.remote, self.counter)
await asyncio.to_thread(
self.progress_tracker.increment.remote, self.counter
)
self.counter = 0

return resp
Expand Down Expand Up @@ -96,7 +98,9 @@ async def go(

if not index_body["settings"].get("number_of_shards"):
file_size = os.path.getsize(tar_path)
index_body["settings"]["number_of_shards"] = ceil(float(file_size) / float(1e10))
index_body["settings"]["number_of_shards"] = ceil(
float(file_size) / float(1e10)
)

try:
await client.indices.create(index_name, body=index_body)
Expand All @@ -106,7 +110,7 @@ async def go(
data = read_annotation_tarball(
index_name=index_name,
tar_path=tar_path,
delimiters=get_delimiters(),
delimiters=DelimitersConfig(),
chunk_size=paralleleism_chunk_size,
)

Expand Down Expand Up @@ -154,7 +158,9 @@ async def go(

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process some config files.")
parser.add_argument("--tar", type=str, help="Path to the tarball containing the annotation")
parser.add_argument(
"--tar", type=str, help="Path to the tarball containing the annotation"
)

parser.add_argument(
"--search_conf",
Expand Down
17 changes: 10 additions & 7 deletions python/python/bystro/search/index/tests/test_bystro_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from bystro.search.index.bystro_file import ( # type: ignore # pylint: disable=no-name-in-module,import-error # noqa: E501
read_annotation_tarball,
)
from bystro.search.utils.annotation import get_delimiters
from bystro.search.utils.annotation import DelimitersConfig


def create_mock_tarball(annotation_content):
Expand All @@ -27,11 +27,11 @@ def create_mock_tarball(annotation_content):


def test_read_annotation_tarball():
delims = get_delimiters()
delim_v = delims["value"] # e.g. ;
delim_f = delims["field"] # e.g. \t
delim_o = delims["overlap"] # e.g. chr(31)
delim_p = delims["position"] # e.g. |
delims = DelimitersConfig()
delim_v = delims.value # e.g. ;
delim_f = delims.field # e.g. \t
delim_o = delims.overlap # e.g. chr(31)
delim_p = delims.position # e.g. |

header = f"field1{delim_f}field2{delim_f}field3\n"
field1val = f"value1a{delim_v}value1b{delim_p}value2aa{delim_o}value2ab{delim_v}value2b{delim_f}"
Expand Down Expand Up @@ -70,9 +70,12 @@ def test_read_annotation_tarball():
},
}
]

import numpy as np
import json
result_data = next(reader)
assert result_data == expected_data
np.testing.assert_equal(result_data, expected_data)
print(json.dumps(expected_data, indent=4))

# Test the end of the data
with pytest.raises(StopIteration):
Expand Down
66 changes: 48 additions & 18 deletions python/python/bystro/search/save/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,30 @@
# TODO 2023-05-08: concatenate chunks in a different ray worker

import gzip
import logging
import math
import os
import pathlib
import subprocess
import traceback
from typing import Any

import numpy as np
import ray

from opensearchpy import OpenSearch

from bystro.beanstalkd.worker import ProgressPublisher, get_progress_reporter
from bystro.search.utils.annotation import AnnotationOutputs, get_delimiters
from bystro.search.utils.annotation import (
AnnotationOutputs,
DelimitersConfig,
)
from bystro.search.utils.messages import SaveJobData
from bystro.search.utils.opensearch import gather_opensearch_args
from bystro.utils.compress import GZIP_EXECUTABLE
from bystro.utils.tar import GNU_TAR_EXECUTABLE_NAME

logger = logging.getLogger(__name__)

ray.init(ignore_reinit_error=True, address="auto")

Expand Down Expand Up @@ -57,8 +66,8 @@ def _get_header(field_names):
return parents, children


def _populate_data(field_path, data_for_end_of_path):
if not isinstance(field_path, list) or data_for_end_of_path is None:
def _populate_data(field_path: list[str] | str, data_for_end_of_path: Any):
if not isinstance(data_for_end_of_path, dict):
return data_for_end_of_path

for child_field in field_path:
Expand All @@ -70,8 +79,8 @@ def _populate_data(field_path, data_for_end_of_path):
return data_for_end_of_path


def _make_output_string(rows: list, delims: dict):
empty_field_char = delims["empty_field"]
def _make_output_string(rows: list, delims: DelimitersConfig):
empty_field_char = delims.empty_field
for row_idx, row in enumerate(rows): # pylint:disable=too-many-nested-blocks
# Some fields may just be missing; we won't store even the alt/pos [[]] structure for those
for i, column in enumerate(row):
Expand Down Expand Up @@ -99,18 +108,23 @@ def _make_output_string(rows: list, delims: dict):

if isinstance(sub, list):
inner_values.append(
delims["overlap"].join(
map(lambda x: str(x) if x is not None else empty_field_char, sub)
delims.overlap.join(
map(
lambda x: str(x)
if x is not None
else empty_field_char,
sub,
)
)
)
else:
inner_values.append(str(sub))

column[j] = delims["value"].join(inner_values)
column[j] = delims.value.join(inner_values)

row[i] = delims["position"].join(column)
row[i] = delims.position.join(column)

rows[row_idx] = delims["field"].join(row)
rows[row_idx] = delims.field.join(row)

return bytes("\n".join(rows) + "\n", encoding="utf-8")

Expand Down Expand Up @@ -143,7 +157,9 @@ def _process_query(
for doc in resp["hits"]["hits"]:
row = np.empty(len(field_names), dtype=object)
for y in range(len(field_names)):
row[y] = _populate_data(child_fields[y], doc["_source"].get(parent_fields[y]))
row[y] = _populate_data(
child_fields[y], doc["_source"].get(parent_fields[y])
)

if row[discordant_idx][0][0] is False:
row[discordant_idx][0][0] = 0
Expand Down Expand Up @@ -191,7 +207,7 @@ def go( # pylint:disable=invalid-name
output_dir = os.path.dirname(job_data.outputBasePath)
basename = os.path.basename(job_data.outputBasePath)
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
outputs = AnnotationOutputs.from_path(output_dir, basename, True)
outputs, stats = AnnotationOutputs.from_path(output_dir, basename, compress=True)

written_chunks = [os.path.join(output_dir, f"{job_data.indexName}_header")]

Expand All @@ -203,7 +219,9 @@ def go( # pylint:disable=invalid-name
client = OpenSearch(**search_client_args)

query = _clean_query(job_data.queryBody)
num_slices = _get_num_slices(client, job_data.indexName, max_query_size, max_slices, query)
num_slices = _get_num_slices(
client, job_data.indexName, max_query_size, max_slices, query
)
pit_id = client.create_point_in_time(index=job_data.indexName, params={"keep_alive": keep_alive})["pit_id"] # type: ignore # noqa: E501
try:
reporter = get_progress_reporter(publisher)
Expand All @@ -212,7 +230,9 @@ def go( # pylint:disable=invalid-name

reqs = []
for slice_id in range(num_slices):
written_chunks.append(os.path.join(output_dir, f"{job_data.indexName}_{slice_id}"))
written_chunks.append(
os.path.join(output_dir, f"{job_data.indexName}_{slice_id}")
)
body = query.copy()
if num_slices > 1:
# Slice queries require max > 1
Expand All @@ -223,7 +243,7 @@ def go( # pylint:disable=invalid-name
job_data.fieldNames,
written_chunks[-1],
reporter,
get_delimiters(),
DelimitersConfig(),
)
reqs.append(res)
results_processed = ray.get(reqs)
Expand All @@ -234,14 +254,24 @@ def go( # pylint:disable=invalid-name
all_chunks = " ".join(written_chunks)

annotation_path = os.path.join(output_dir, outputs.annotation)
ret = subprocess.call(f"cat {all_chunks} > {annotation_path}; rm {all_chunks}", shell=True)
ret = subprocess.call(
f"cat {all_chunks} > {annotation_path}; rm {all_chunks}", shell=True
)
if ret != 0:
raise IOError(f"Failed to write {annotation_path}")

tarball_name = os.path.basename(outputs.archived)
ret = subprocess.call(
f"{GZIP_EXECUTABLE} -d -c {annotation_path} | {stats.stdin_cli_stats_command}",
shell=True,
)
if ret != 0:
raise IOError(f"Failed to write statistics for {annotation_path}")

tarball_name = os.path.basename(outputs.archived)
# Webserver requires the output to have top-level statistics data,
# but annotation data will be too large to want to store 2 copies of
ret = subprocess.call(
f'cd {output_dir}; tar --exclude ".*" --exclude={tarball_name} -cf {tarball_name} * --remove-files', # noqa: E501
f'cd {output_dir}; {GNU_TAR_EXECUTABLE_NAME} --exclude ".*" --exclude={tarball_name} -cf {tarball_name} * && rm {annotation_path}', # noqa: E501
shell=True,
)
if ret != 0:
Expand Down
Loading