Skip to content

Commit

Permalink
Split bulkupsert
Browse files Browse the repository at this point in the history
  • Loading branch information
maximyurchuk committed Jul 29, 2024
1 parent bd147a9 commit 6c43ebc
Showing 1 changed file with 18 additions and 31 deletions.
49 changes: 18 additions & 31 deletions ydb/ci/build_bloat/ydb_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ def generate_column_types(row):
assert False
return column_types

def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]

# We split bulk upsert because of
# https://github.com/ydb-platform/ydb-python-sdk/issues/460
BULK_UPSERT_CHUNK_SIZE = 5000

def parse_args():
parser = argparse.ArgumentParser()
Expand Down Expand Up @@ -148,32 +156,11 @@ def main():
row["compilation_time_s"] = time_s
row["id"] = str(uuid.uuid4())
rows.append(copy.copy(row))


"""
Temporary disable because of
2024-07-12T08:09:49.8675916Z Traceback (most recent call last):
2024-07-12T08:09:49.8677299Z File "/home/runner/.local/lib/python3.10/site-packages/ydb/connection.py", line 458, in __call__
2024-07-12T08:09:49.8678065Z response = rpc_state(
2024-07-12T08:09:49.8678879Z File "/home/runner/.local/lib/python3.10/site-packages/ydb/connection.py", line 242, in __call__
2024-07-12T08:09:49.8679773Z response, rendezvous = self.rpc.with_call(*args, **kwargs)
2024-07-12T08:09:49.8680722Z File "/home/runner/.local/lib/python3.10/site-packages/grpc/_channel.py", line 1198, in with_call
2024-07-12T08:09:49.8681604Z return _end_unary_response_blocking(state, call, True, None)
2024-07-12T08:09:49.8682681Z File "/home/runner/.local/lib/python3.10/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
2024-07-12T08:09:49.8683773Z raise _InactiveRpcError(state) # pytype: disable=not-instantiable
2024-07-12T08:09:49.8684656Z grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
2024-07-12T08:09:49.8685362Z status = StatusCode.RESOURCE_EXHAUSTED
2024-07-12T08:09:49.8686015Z details = "CLIENT: Sent message larger than max (64975458 vs. 64000000)"
2024-07-12T08:09:49.8687739Z debug_error_string = "UNKNOWN:Error received from peer ***grpc_message:"CLIENT: Sent message larger than max (64975458 vs. 64000000)", grpc_status:8, created_time:"2024-07-12T08:09:49.841712345+00:00"***"
2024-07-12T08:09:49.8688817Z >
"""
TEMPORARY_DISABLE = True

if rows and not TEMPORARY_DISABLE:
row = rows[0]

for chunk_rows in chunks(rows, BULK_UPSERT_CHUNK_SIZE):
row = chunk_rows[0]
driver.table_client.bulk_upsert(
DATABASE_PATH + "/code-agility/cpp_compile_time", rows, generate_column_types(row)
DATABASE_PATH + "/code-agility/cpp_compile_time", chunk_rows, generate_column_types(row)
)

# upload into total_compile_time
Expand All @@ -198,10 +185,10 @@ def main():
row["inclusion_count"] = inclusion_count
rows.append(copy.copy(row))

if rows and not TEMPORARY_DISABLE:
row = rows[0]
for chunk_rows in chunks(rows, BULK_UPSERT_CHUNK_SIZE):
row = chunk_rows[0]
driver.table_client.bulk_upsert(
DATABASE_PATH + "/code-agility/headers_impact", rows, generate_column_types(row)
DATABASE_PATH + "/code-agility/headers_impact", chunk_rows, generate_column_types(row)
)

# upload into compile_breakdown
Expand All @@ -222,10 +209,10 @@ def main():

rows.append(copy.copy(row))

if rows and not TEMPORARY_DISABLE:
row = rows[0]
for chunk_rows in chunks(rows, BULK_UPSERT_CHUNK_SIZE):
row = chunk_rows[0]
driver.table_client.bulk_upsert(
DATABASE_PATH + "/code-agility/compile_breakdown", rows, generate_column_types(row)
DATABASE_PATH + "/code-agility/compile_breakdown", chunk_rows, generate_column_types(row)
)


Expand Down

0 comments on commit 6c43ebc

Please sign in to comment.