Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send small blobs inline. #8318

Merged
merged 10 commits into from
Feb 5, 2025
103 changes: 85 additions & 18 deletions src/remote/client/interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,8 @@ void Blob::getInfo(CheckStatusWrapper* status,
if (blob->rbl_info.getLocalInfo(itemsLength, items, bufferLength, buffer))
return;

fb_assert(!blob->isCached());

rem_port* port = rdb->rdb_port;
RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION);

Expand Down Expand Up @@ -1353,7 +1355,8 @@ void Blob::freeClientData(CheckStatusWrapper* status, bool force)

try
{
release_object(status, rdb, op_cancel_blob, blob->rbl_id);
if (!blob->isCached())
release_object(status, rdb, op_cancel_blob, blob->rbl_id);
}
catch (const Exception&)
{
Expand Down Expand Up @@ -1426,10 +1429,13 @@ void Blob::internalClose(CheckStatusWrapper* status)

if ((blob->rbl_flags & Rbl::CREATE) && blob->rbl_ptr != blob->rbl_buffer)
{
fb_assert(!blob->isCached());

send_blob(status, blob, 0, NULL);
}

release_object(status, rdb, op_close_blob, blob->rbl_id);
if (!blob->isCached())
release_object(status, rdb, op_close_blob, blob->rbl_id);
release_blob(blob);
blob = NULL;
}
Expand Down Expand Up @@ -1847,14 +1853,13 @@ IBlob* Attachment::createBlob(CheckStatusWrapper* status, ITransaction* apiTra,
p_blob->p_blob_bpb.cstr_address = NULL;

Rbl* blob = FB_NEW Rbl();
*blob_id = packet->p_resp.p_resp_blob_id;
blob->rbl_blob_id = *blob_id = packet->p_resp.p_resp_blob_id;
blob->rbl_rdb = rdb;
blob->rbl_rtr = transaction;
blob->rbl_id = packet->p_resp.p_resp_object;
blob->rbl_flags |= Rbl::CREATE;
SET_OBJECT(rdb, blob, blob->rbl_id);
blob->rbl_next = transaction->rtr_blobs;
transaction->rtr_blobs = blob;
transaction->rtr_blobs.add(blob);

IBlob* b = FB_NEW Blob(blob);
b->addRef();
Expand Down Expand Up @@ -3578,8 +3583,21 @@ ITransaction* Statement::execute(CheckStatusWrapper* status, ITransaction* apiTr
if (out_msg_length)
port->port_statement->rsr_message->msg_address = out_msg;

// Prepare to receive inline blobs
P_INLINE_BLOB* p_blob = &packet->p_inline_blob;
UCHAR blobInfo[64];

UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo);

receive_packet(port, packet);

while (packet->p_operation == op_inline_blob)
{
fb_assert(transaction);
transaction->setupInlineBlob(p_blob);
receive_packet(port, packet);
}

if (packet->p_operation != op_sql_response)
REMOTE_check_response(status, rdb, packet);
else
Expand Down Expand Up @@ -3940,8 +3958,21 @@ ITransaction* Attachment::execute(CheckStatusWrapper* status, ITransaction* apiT
if (in_msg_length || out_msg_length)
port->port_statement->rsr_message->msg_address = out_msg;

// Prepare to receive inline blobs
P_INLINE_BLOB* p_blob = &packet->p_inline_blob;
UCHAR blobInfo[64];

UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo);

receive_packet(rdb->rdb_port, packet);

while (packet->p_operation == op_inline_blob)
{
fb_assert(transaction);
transaction->setupInlineBlob(p_blob);
receive_packet(port, packet);
}

if (packet->p_operation != op_sql_response)
REMOTE_check_response(status, rdb, packet);
else
Expand Down Expand Up @@ -5507,6 +5538,8 @@ int Blob::getSegment(CheckStatusWrapper* status, unsigned int bufferLength, void
break;
}

fb_assert(!blob->isCached());

// Preparatory to asking for more data, use input buffer length
// to cue more efficient blob buffering.

Expand Down Expand Up @@ -5668,6 +5701,20 @@ IBlob* Attachment::openBlob(CheckStatusWrapper* status, ITransaction* apiTra, IS
Rtr* transaction = remoteTransaction(apiTra);
CHECK_HANDLE(transaction, isc_bad_trans_handle);

if (transaction->rtr_blobs.locate(*id))
{
Rbl* blob = transaction->rtr_blobs.current();

if (!bpb_length)
{
Blob* iBlob = FB_NEW Blob(blob);
iBlob->addRef();
return iBlob;
}

release_blob(blob);
}

// Validate data length

CHECK_LENGTH(port, bpb_length);
Expand Down Expand Up @@ -5769,10 +5816,10 @@ IBlob* Attachment::openBlob(CheckStatusWrapper* status, ITransaction* apiTra, IS
Rbl* blob = FB_NEW Rbl;
blob->rbl_rdb = rdb;
blob->rbl_rtr = transaction;
blob->rbl_blob_id = *id;
blob->rbl_id = packet->p_resp.p_resp_object;
SET_OBJECT(rdb, blob, blob->rbl_id);
blob->rbl_next = transaction->rtr_blobs;
transaction->rtr_blobs = blob;
transaction->rtr_blobs.add(blob);

Blob* iBlob = FB_NEW Blob(blob);
iBlob->addRef();
Expand Down Expand Up @@ -6619,6 +6666,11 @@ int Blob::seek(CheckStatusWrapper* status, int mode, int offset)

CHECK_HANDLE(blob, isc_bad_segstr_handle);

if (blob->isCached())
{
Arg::Gds(isc_wish_list).raise();
}

Rdb* rdb = blob->rbl_rdb;
CHECK_HANDLE(rdb, isc_bad_db_handle);
rem_port* port = rdb->rdb_port;
Expand Down Expand Up @@ -7843,6 +7895,12 @@ static void batch_dsql_fetch(rem_port* port,
// Avoid damaging preallocated buffer for response data
UseStandardBuffer guard(packet->p_resp.p_resp_data);

// Prepare to receive inline blobs
P_INLINE_BLOB* p_blob = &packet->p_inline_blob;
UCHAR blobInfo[64];

UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo);

statement->rsr_flags.set(Rsr::FETCHED);
while (true)
{
Expand Down Expand Up @@ -7878,6 +7936,17 @@ static void batch_dsql_fetch(rem_port* port,
throw;
}

if (packet->p_operation == op_inline_blob)
{
fb_assert(!statement->rsr_rtr || statement->rsr_rtr->rtr_id == p_blob->p_tran_id);

Rtr* transaction = statement->rsr_rtr ?
statement->rsr_rtr : port->port_objects[p_blob->p_tran_id];

transaction->setupInlineBlob(p_blob);
continue;
}

if (packet->p_operation != op_fetch_response)
{
statement->rsr_flags.set(Rsr::STREAM_ERR);
Expand Down Expand Up @@ -9217,16 +9286,12 @@ static void release_blob( Rbl* blob)
**************************************/
Rtr* transaction = blob->rbl_rtr;
Rdb* rdb = blob->rbl_rdb;
rdb->rdb_port->releaseObject(blob->rbl_id);

for (Rbl** p = &transaction->rtr_blobs; *p; p = &(*p)->rbl_next)
{
if (*p == blob)
{
*p = blob->rbl_next;
break;
}
}
if (!blob->isCached())
rdb->rdb_port->releaseObject(blob->rbl_id);

if (transaction->rtr_blobs.locate(blob->rbl_blob_id))
transaction->rtr_blobs.fastRemove();

delete blob;
}
Expand Down Expand Up @@ -9384,8 +9449,8 @@ static void release_transaction( Rtr* transaction)
Rdb* rdb = transaction->rtr_rdb;
rdb->rdb_port->releaseObject(transaction->rtr_id);

while (transaction->rtr_blobs)
release_blob(transaction->rtr_blobs);
while (transaction->rtr_blobs.getFirst())
release_blob(transaction->rtr_blobs.current());

for (Rtr** p = &rdb->rdb_transactions; *p; p = &(*p)->rtr_next)
{
Expand Down Expand Up @@ -9433,6 +9498,8 @@ static void send_blob(CheckStatusWrapper* status,
* Actually send blob data (which might be buffered)
*
**************************************/
fb_assert(!blob->isCached());

Rdb* rdb = blob->rbl_rdb;
PACKET* packet = &rdb->rdb_packet;
packet->p_operation = op_put_segment;
Expand Down
7 changes: 6 additions & 1 deletion src/remote/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length)

ULONG net_length = 0;
ULONG offset = 0;
dsc* const begin = format->fmt_desc.begin();

for (dsc* desc = format->fmt_desc.begin(); count; --count, ++desc)
for (dsc* desc = begin; count; --count, ++desc)
{
if (blr_length-- == 0)
return NULL;
Expand Down Expand Up @@ -262,6 +263,8 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length)
desc->dsc_dtype = dtype_quad;
desc->dsc_length = sizeof(SLONG) * 2;
desc->dsc_scale = *blr++;

format->fmt_blob_idx.add(desc - begin);
break;

case blr_float:
Expand Down Expand Up @@ -312,6 +315,8 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length)
USHORT textType = *blr++;
textType += (*blr++) << 8;
desc->setTextType(textType);

format->fmt_blob_idx.add(desc - begin);
}
break;

Expand Down
105 changes: 105 additions & 0 deletions src/remote/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ static bool_t xdr_trrq_blr(RemoteXdr*, CSTRING*);
static bool_t xdr_trrq_message(RemoteXdr*, USHORT);
static bool_t xdr_bytes(RemoteXdr*, void*, ULONG);
static bool_t xdr_blob_stream(RemoteXdr*, SSHORT, CSTRING*);
static bool_t xdr_blobBuffer(RemoteXdr* xdrs, RemBlobBuffer* buff);
static Rsr* getStatement(RemoteXdr*, USHORT);
static Rtr* getTransaction(RemoteXdr*, USHORT);


inline void fixupLength(const RemoteXdr* xdrs, ULONG& length)
Expand Down Expand Up @@ -1138,6 +1140,42 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p)
return P_TRUE(xdrs, p);
}

case op_inline_blob:
{
P_INLINE_BLOB* p_blob = &p->p_inline_blob;
MAP(xdr_short, reinterpret_cast<SSHORT&>(p_blob->p_tran_id));
MAP(xdr_quad, p_blob->p_blob_id);

if (xdrs->x_op == XDR_ENCODE)
{
MAP(xdr_response, p_blob->p_blob_info);
if (!xdr_blobBuffer(xdrs, p_blob->p_blob_data))
return P_FALSE(xdrs, p);
}
else if (xdrs->x_op == XDR_DECODE)
{
Rtr* tran = getTransaction(xdrs, p_blob->p_tran_id);

if (!tran)
return P_FALSE(xdrs, p);

MAP(xdr_response, p_blob->p_blob_info);

Rbl* blb = tran->createInlineBlob();
p_blob->p_blob_data = &blb->rbl_data;

if (!xdr_blobBuffer(xdrs, p_blob->p_blob_data))
{
tran->rtr_inline_blob = nullptr;
delete blb;

return P_FALSE(xdrs, p);
}
}

return P_TRUE(xdrs, p);
}

///case op_insert:
default:
#ifdef DEV_BUILD
Expand Down Expand Up @@ -2274,6 +2312,23 @@ static Rsr* getStatement(RemoteXdr* xdrs, USHORT statement_id)
return port->port_statement;
}

static Rtr* getTransaction(RemoteXdr* xdrs, USHORT tran_id)
{
rem_port* port = xdrs->x_public;

if (tran_id >= port->port_objects.getCount())
return nullptr;

try
{
return port->port_objects[tran_id];
}
catch (const status_exception&)
{
return nullptr;
}
}

static bool_t xdr_blob_stream(RemoteXdr* xdrs, SSHORT statement_id, CSTRING* strmPortion)
{
if (xdrs->x_op == XDR_FREE)
Expand Down Expand Up @@ -2492,3 +2547,53 @@ static bool_t xdr_blob_stream(RemoteXdr* xdrs, SSHORT statement_id, CSTRING* str

return TRUE;
}

static bool_t xdr_blobBuffer(RemoteXdr* xdrs, RemBlobBuffer* buff)
{
SLONG len;
UCHAR* data;
static const SCHAR filler[4] = { 0, 0, 0, 0 };

switch (xdrs->x_op)
{
case XDR_ENCODE:
len = buff->getCount();
if (!xdr_long(xdrs, &len))
return FALSE;

data = buff->begin();
if (len && !xdrs->x_putbytes(reinterpret_cast<const SCHAR*>(data), len))
return FALSE;

len = (4 - len) & 3;
if (len)
return xdrs->x_putbytes(filler, len);

return TRUE;

case XDR_DECODE:
if (!xdr_long(xdrs, &len))
return FALSE;

if (len)
{
data = buff->getBuffer(len);
if (!xdrs->x_getbytes(reinterpret_cast<SCHAR*>(data), len))
return FALSE;
}

len = (4 - len) & 3;
if (len)
{
SCHAR trash[4];
return xdrs->x_getbytes(trash, len);
}

return TRUE;

case XDR_FREE:
return TRUE;
}

return FALSE;
}
Loading
Loading