diff --git a/src/include/fb_types.h b/src/include/fb_types.h index 1daeff94a2b..493565c4862 100644 --- a/src/include/fb_types.h +++ b/src/include/fb_types.h @@ -81,6 +81,22 @@ typedef FB_UINT64 ISC_UINT64; typedef ISC_QUAD SQUAD; +const SQUAD NULL_BLOB = { 0, 0 }; + +inline bool operator==(const SQUAD& s1, const SQUAD& s2) +{ + return s1.gds_quad_high == s2.gds_quad_high && + s2.gds_quad_low == s1.gds_quad_low; +} + +inline bool operator>(const SQUAD& s1, const SQUAD& s2) +{ + return (s1.gds_quad_high > s2.gds_quad_high) || + (s1.gds_quad_high == s2.gds_quad_high && + s1.gds_quad_low > s2.gds_quad_low); +} + + /* * TMN: some misc data types from all over the place */ diff --git a/src/include/firebird/FirebirdInterface.idl b/src/include/firebird/FirebirdInterface.idl index 8a0a1ad2c18..ad2923f7b00 100644 --- a/src/include/firebird/FirebirdInterface.idl +++ b/src/include/firebird/FirebirdInterface.idl @@ -521,6 +521,11 @@ version: // 3.0 => 4.0 version: // 3.0.7 => 3.0.8, 4.0.0 => 4.0.1 [notImplementedAction if ::FB_UsedInYValve then defaultAction else call deprecatedFree(status) endif] void free(Status status); + +version: // 6.0 + // Inline blob transfer + uint getMaxInlineBlobSize(Status status); + void setMaxInlineBlobSize(Status status, uint size); } interface Batch : ReferenceCounted @@ -713,6 +718,15 @@ version: // 3.0.7 => 3.0.8, 4.0.0 => 4.0.1 void detach(Status status); [notImplementedAction if ::FB_UsedInYValve then defaultAction else call deprecatedDropDatabase(status) endif] void dropDatabase(Status status); + +version: // 6.0 + // Blob caching by client + uint getMaxBlobCacheSize(Status status); + void setMaxBlobCacheSize(Status status, uint size); + + // Inline blob transfer + uint getMaxInlineBlobSize(Status status); + void setMaxInlineBlobSize(Status status, uint size); } interface Service : ReferenceCounted diff --git a/src/include/firebird/IdlFbInterfaces.h b/src/include/firebird/IdlFbInterfaces.h index 11e294944e3..1fd527db1a2 100644 --- a/src/include/firebird/IdlFbInterfaces.h +++ b/src/include/firebird/IdlFbInterfaces.h @@ -1869,7 +1869,7 @@ namespace Firebird } }; -#define FIREBIRD_ISTATEMENT_VERSION 5u +#define FIREBIRD_ISTATEMENT_VERSION 6u class IStatement : public IReferenceCounted { @@ -1891,6 +1891,8 @@ namespace Firebird void (CLOOP_CARG *setTimeout)(IStatement* self, IStatus* status, unsigned timeOut) CLOOP_NOEXCEPT; IBatch* (CLOOP_CARG *createBatch)(IStatement* self, IStatus* status, IMessageMetadata* inMetadata, unsigned parLength, const unsigned char* par) CLOOP_NOEXCEPT; void (CLOOP_CARG *free)(IStatement* self, IStatus* status) CLOOP_NOEXCEPT; + unsigned (CLOOP_CARG *getMaxInlineBlobSize)(IStatement* self, IStatus* status) CLOOP_NOEXCEPT; + void (CLOOP_CARG *setMaxInlineBlobSize)(IStatement* self, IStatus* status, unsigned size) CLOOP_NOEXCEPT; }; protected: @@ -2064,6 +2066,33 @@ namespace Firebird static_cast(this->cloopVTable)->free(this, status); StatusType::checkException(status); } + + template unsigned getMaxInlineBlobSize(StatusType* status) + { + if (cloopVTable->version < 6) + { + StatusType::setVersionError(status, "IStatement", cloopVTable->version, 6); + StatusType::checkException(status); + return 0; + } + StatusType::clearException(status); + unsigned ret = static_cast(this->cloopVTable)->getMaxInlineBlobSize(this, status); + StatusType::checkException(status); + return ret; + } + + template void setMaxInlineBlobSize(StatusType* status, unsigned size) + { + if (cloopVTable->version < 6) + { + StatusType::setVersionError(status, "IStatement", cloopVTable->version, 6); + StatusType::checkException(status); + return; + } + StatusType::clearException(status); + static_cast(this->cloopVTable)->setMaxInlineBlobSize(this, status, size); + StatusType::checkException(status); + } }; #define FIREBIRD_IBATCH_VERSION 4u @@ -2499,7 +2528,7 @@ namespace Firebird } }; -#define FIREBIRD_IATTACHMENT_VERSION 5u +#define FIREBIRD_IATTACHMENT_VERSION 6u class IAttachment : public IReferenceCounted { @@ -2532,6 +2561,10 @@ namespace Firebird IReplicator* (CLOOP_CARG *createReplicator)(IAttachment* self, IStatus* status) CLOOP_NOEXCEPT; void (CLOOP_CARG *detach)(IAttachment* self, IStatus* status) CLOOP_NOEXCEPT; void (CLOOP_CARG *dropDatabase)(IAttachment* self, IStatus* status) CLOOP_NOEXCEPT; + unsigned (CLOOP_CARG *getMaxBlobCacheSize)(IAttachment* self, IStatus* status) CLOOP_NOEXCEPT; + void (CLOOP_CARG *setMaxBlobCacheSize)(IAttachment* self, IStatus* status, unsigned size) CLOOP_NOEXCEPT; + unsigned (CLOOP_CARG *getMaxInlineBlobSize)(IAttachment* self, IStatus* status) CLOOP_NOEXCEPT; + void (CLOOP_CARG *setMaxInlineBlobSize)(IAttachment* self, IStatus* status, unsigned size) CLOOP_NOEXCEPT; }; protected: @@ -2800,6 +2833,60 @@ namespace Firebird static_cast(this->cloopVTable)->dropDatabase(this, status); StatusType::checkException(status); } + + template unsigned getMaxBlobCacheSize(StatusType* status) + { + if (cloopVTable->version < 6) + { + StatusType::setVersionError(status, "IAttachment", cloopVTable->version, 6); + StatusType::checkException(status); + return 0; + } + StatusType::clearException(status); + unsigned ret = static_cast(this->cloopVTable)->getMaxBlobCacheSize(this, status); + StatusType::checkException(status); + return ret; + } + + template void setMaxBlobCacheSize(StatusType* status, unsigned size) + { + if (cloopVTable->version < 6) + { + StatusType::setVersionError(status, "IAttachment", cloopVTable->version, 6); + StatusType::checkException(status); + return; + } + StatusType::clearException(status); + static_cast(this->cloopVTable)->setMaxBlobCacheSize(this, status, size); + StatusType::checkException(status); + } + + template unsigned getMaxInlineBlobSize(StatusType* status) + { + if (cloopVTable->version < 6) + { + StatusType::setVersionError(status, "IAttachment", cloopVTable->version, 6); + StatusType::checkException(status); + return 0; + } + StatusType::clearException(status); + unsigned ret = static_cast(this->cloopVTable)->getMaxInlineBlobSize(this, status); + StatusType::checkException(status); + return ret; + } + + template void setMaxInlineBlobSize(StatusType* status, unsigned size) + { + if (cloopVTable->version < 6) + { + StatusType::setVersionError(status, "IAttachment", cloopVTable->version, 6); + StatusType::checkException(status); + return; + } + StatusType::clearException(status); + static_cast(this->cloopVTable)->setMaxInlineBlobSize(this, status, size); + StatusType::checkException(status); + } }; #define FIREBIRD_ISERVICE_VERSION 5u @@ -10525,6 +10612,8 @@ namespace Firebird this->setTimeout = &Name::cloopsetTimeoutDispatcher; this->createBatch = &Name::cloopcreateBatchDispatcher; this->free = &Name::cloopfreeDispatcher; + this->getMaxInlineBlobSize = &Name::cloopgetMaxInlineBlobSizeDispatcher; + this->setMaxInlineBlobSize = &Name::cloopsetMaxInlineBlobSizeDispatcher; } } vTable; @@ -10751,6 +10840,35 @@ namespace Firebird } } + static unsigned CLOOP_CARG cloopgetMaxInlineBlobSizeDispatcher(IStatement* self, IStatus* status) CLOOP_NOEXCEPT + { + StatusType status2(status); + + try + { + return static_cast(self)->Name::getMaxInlineBlobSize(&status2); + } + catch (...) + { + StatusType::catchException(&status2); + return static_cast(0); + } + } + + static void CLOOP_CARG cloopsetMaxInlineBlobSizeDispatcher(IStatement* self, IStatus* status, unsigned size) CLOOP_NOEXCEPT + { + StatusType status2(status); + + try + { + static_cast(self)->Name::setMaxInlineBlobSize(&status2, size); + } + catch (...) + { + StatusType::catchException(&status2); + } + } + static void CLOOP_CARG cloopaddRefDispatcher(IReferenceCounted* self) CLOOP_NOEXCEPT { try @@ -10805,6 +10923,8 @@ namespace Firebird virtual void setTimeout(StatusType* status, unsigned timeOut) = 0; virtual IBatch* createBatch(StatusType* status, IMessageMetadata* inMetadata, unsigned parLength, const unsigned char* par) = 0; virtual void free(StatusType* status) = 0; + virtual unsigned getMaxInlineBlobSize(StatusType* status) = 0; + virtual void setMaxInlineBlobSize(StatusType* status, unsigned size) = 0; }; template @@ -11630,6 +11750,10 @@ namespace Firebird this->createReplicator = &Name::cloopcreateReplicatorDispatcher; this->detach = &Name::cloopdetachDispatcher; this->dropDatabase = &Name::cloopdropDatabaseDispatcher; + this->getMaxBlobCacheSize = &Name::cloopgetBlobCacheSizeDispatcher; + this->setMaxBlobCacheSize = &Name::cloopsetBlobCacheSizeDispatcher; + this->getMaxInlineBlobSize = &Name::cloopgetMaxInlineBlobSizeDispatcher; + this->setMaxInlineBlobSize = &Name::cloopsetMaxInlineBlobSizeDispatcher; } } vTable; @@ -12014,6 +12138,64 @@ namespace Firebird } } + static unsigned CLOOP_CARG cloopgetBlobCacheSizeDispatcher(IAttachment* self, IStatus* status) CLOOP_NOEXCEPT + { + StatusType status2(status); + + try + { + return static_cast(self)->Name::getMaxBlobCacheSize(&status2); + } + catch (...) + { + StatusType::catchException(&status2); + return static_cast(0); + } + } + + static void CLOOP_CARG cloopsetBlobCacheSizeDispatcher(IAttachment* self, IStatus* status, unsigned size) CLOOP_NOEXCEPT + { + StatusType status2(status); + + try + { + static_cast(self)->Name::setMaxBlobCacheSize(&status2, size); + } + catch (...) + { + StatusType::catchException(&status2); + } + } + + static unsigned CLOOP_CARG cloopgetMaxInlineBlobSizeDispatcher(IAttachment* self, IStatus* status) CLOOP_NOEXCEPT + { + StatusType status2(status); + + try + { + return static_cast(self)->Name::getMaxInlineBlobSize(&status2); + } + catch (...) + { + StatusType::catchException(&status2); + return static_cast(0); + } + } + + static void CLOOP_CARG cloopsetMaxInlineBlobSizeDispatcher(IAttachment* self, IStatus* status, unsigned size) CLOOP_NOEXCEPT + { + StatusType status2(status); + + try + { + static_cast(self)->Name::setMaxInlineBlobSize(&status2, size); + } + catch (...) + { + StatusType::catchException(&status2); + } + } + static void CLOOP_CARG cloopaddRefDispatcher(IReferenceCounted* self) CLOOP_NOEXCEPT { try @@ -12079,6 +12261,10 @@ namespace Firebird virtual IReplicator* createReplicator(StatusType* status) = 0; virtual void detach(StatusType* status) = 0; virtual void dropDatabase(StatusType* status) = 0; + virtual unsigned getMaxBlobCacheSize(StatusType* status) = 0; + virtual void setMaxBlobCacheSize(StatusType* status, unsigned size) = 0; + virtual unsigned getMaxInlineBlobSize(StatusType* status) = 0; + virtual void setMaxInlineBlobSize(StatusType* status, unsigned size) = 0; }; template diff --git a/src/include/firebird/impl/consts_pub.h b/src/include/firebird/impl/consts_pub.h index 645db9d4864..d83cb138ff3 100644 --- a/src/include/firebird/impl/consts_pub.h +++ b/src/include/firebird/impl/consts_pub.h @@ -133,6 +133,8 @@ #define isc_dpb_parallel_workers 100 #define isc_dpb_worker_attach 101 #define isc_dpb_owner 102 +#define isc_dpb_max_blob_cache_size 103 +#define isc_dpb_max_inline_blob_size 104 /**************************************************/ diff --git a/src/include/firebird/impl/inf_pub.h b/src/include/firebird/impl/inf_pub.h index 9f77591fdcd..3815e2663cd 100644 --- a/src/include/firebird/impl/inf_pub.h +++ b/src/include/firebird/impl/inf_pub.h @@ -189,6 +189,9 @@ enum db_info_types fb_info_wire_rcv_bytes = 157, fb_info_wire_roundtrips = 158, + fb_info_max_blob_cache_size = 159, + fb_info_max_inline_blob_size = 160, + isc_info_db_last_value /* Leave this LAST! */ }; diff --git a/src/include/gen/Firebird.pas b/src/include/gen/Firebird.pas index 9c1be4d79af..53c50b80abf 100644 --- a/src/include/gen/Firebird.pas +++ b/src/include/gen/Firebird.pas @@ -358,6 +358,8 @@ ISC_TIMESTAMP_TZ_EX = record IStatement_setTimeoutPtr = procedure(this: IStatement; status: IStatus; timeOut: Cardinal); cdecl; IStatement_createBatchPtr = function(this: IStatement; status: IStatus; inMetadata: IMessageMetadata; parLength: Cardinal; par: BytePtr): IBatch; cdecl; IStatement_freePtr = procedure(this: IStatement; status: IStatus); cdecl; + IStatement_getMaxInlineBlobSizePtr = function(this: IStatement; status: IStatus): Cardinal; cdecl; + IStatement_setMaxInlineBlobSizePtr = procedure(this: IStatement; status: IStatus; size: Cardinal); cdecl; IBatch_addPtr = procedure(this: IBatch; status: IStatus; count: Cardinal; inBuffer: Pointer); cdecl; IBatch_addBlobPtr = procedure(this: IBatch; status: IStatus; length: Cardinal; inBuffer: Pointer; blobId: ISC_QUADPtr; parLength: Cardinal; par: BytePtr); cdecl; IBatch_appendBlobDataPtr = procedure(this: IBatch; status: IStatus; length: Cardinal; inBuffer: Pointer); cdecl; @@ -414,6 +416,10 @@ ISC_TIMESTAMP_TZ_EX = record IAttachment_createReplicatorPtr = function(this: IAttachment; status: IStatus): IReplicator; cdecl; IAttachment_detachPtr = procedure(this: IAttachment; status: IStatus); cdecl; IAttachment_dropDatabasePtr = procedure(this: IAttachment; status: IStatus); cdecl; + IAttachment_getMaxBlobCacheSizePtr = function(this: IAttachment; status: IStatus): Cardinal; cdecl; + IAttachment_setMaxBlobCacheSizePtr = procedure(this: IAttachment; status: IStatus; size: Cardinal); cdecl; + IAttachment_getMaxInlineBlobSizePtr = function(this: IAttachment; status: IStatus): Cardinal; cdecl; + IAttachment_setMaxInlineBlobSizePtr = procedure(this: IAttachment; status: IStatus; size: Cardinal); cdecl; IService_deprecatedDetachPtr = procedure(this: IService; status: IStatus); cdecl; IService_queryPtr = procedure(this: IService; status: IStatus; sendLength: Cardinal; sendItems: BytePtr; receiveLength: Cardinal; receiveItems: BytePtr; bufferLength: Cardinal; buffer: BytePtr); cdecl; IService_startPtr = procedure(this: IService; status: IStatus; spbLength: Cardinal; spb: BytePtr); cdecl; @@ -1523,10 +1529,12 @@ StatementVTable = class(ReferenceCountedVTable) setTimeout: IStatement_setTimeoutPtr; createBatch: IStatement_createBatchPtr; free: IStatement_freePtr; + getMaxInlineBlobSize: IStatement_getMaxInlineBlobSizePtr; + setMaxInlineBlobSize: IStatement_setMaxInlineBlobSizePtr; end; IStatement = class(IReferenceCounted) - const VERSION = 5; + const VERSION = 6; const PREPARE_PREFETCH_NONE = Cardinal($0); const PREPARE_PREFETCH_TYPE = Cardinal($1); const PREPARE_PREFETCH_INPUT_PARAMETERS = Cardinal($2); @@ -1557,6 +1565,8 @@ IStatement = class(IReferenceCounted) procedure setTimeout(status: IStatus; timeOut: Cardinal); function createBatch(status: IStatus; inMetadata: IMessageMetadata; parLength: Cardinal; par: BytePtr): IBatch; procedure free(status: IStatus); + function getMaxInlineBlobSize(status: IStatus): Cardinal; + procedure setMaxInlineBlobSize(status: IStatus; size: Cardinal); end; IStatementImpl = class(IStatement) @@ -1579,6 +1589,8 @@ IStatementImpl = class(IStatement) procedure setTimeout(status: IStatus; timeOut: Cardinal); virtual; abstract; function createBatch(status: IStatus; inMetadata: IMessageMetadata; parLength: Cardinal; par: BytePtr): IBatch; virtual; abstract; procedure free(status: IStatus); virtual; abstract; + function getMaxInlineBlobSize(status: IStatus): Cardinal; virtual; abstract; + procedure setMaxInlineBlobSize(status: IStatus; size: Cardinal); virtual; abstract; end; BatchVTable = class(ReferenceCountedVTable) @@ -1792,10 +1804,14 @@ AttachmentVTable = class(ReferenceCountedVTable) createReplicator: IAttachment_createReplicatorPtr; detach: IAttachment_detachPtr; dropDatabase: IAttachment_dropDatabasePtr; + getMaxBlobCacheSize: IAttachment_getMaxBlobCacheSizePtr; + setMaxBlobCacheSize: IAttachment_setMaxBlobCacheSizePtr; + getMaxInlineBlobSize: IAttachment_getMaxInlineBlobSizePtr; + setMaxInlineBlobSize: IAttachment_setMaxInlineBlobSizePtr; end; IAttachment = class(IReferenceCounted) - const VERSION = 5; + const VERSION = 6; procedure getInfo(status: IStatus; itemsLength: Cardinal; items: BytePtr; bufferLength: Cardinal; buffer: BytePtr); function startTransaction(status: IStatus; tpbLength: Cardinal; tpb: BytePtr): ITransaction; @@ -1823,6 +1839,10 @@ IAttachment = class(IReferenceCounted) function createReplicator(status: IStatus): IReplicator; procedure detach(status: IStatus); procedure dropDatabase(status: IStatus); + function getMaxBlobCacheSize(status: IStatus): Cardinal; + procedure setMaxBlobCacheSize(status: IStatus; size: Cardinal); + function getMaxInlineBlobSize(status: IStatus): Cardinal; + procedure setMaxInlineBlobSize(status: IStatus; size: Cardinal); end; IAttachmentImpl = class(IAttachment) @@ -1856,6 +1876,10 @@ IAttachmentImpl = class(IAttachment) function createReplicator(status: IStatus): IReplicator; virtual; abstract; procedure detach(status: IStatus); virtual; abstract; procedure dropDatabase(status: IStatus); virtual; abstract; + function getMaxBlobCacheSize(status: IStatus): Cardinal; virtual; abstract; + procedure setMaxBlobCacheSize(status: IStatus; size: Cardinal); virtual; abstract; + function getMaxInlineBlobSize(status: IStatus): Cardinal; virtual; abstract; + procedure setMaxInlineBlobSize(status: IStatus; size: Cardinal); virtual; abstract; end; ServiceVTable = class(ReferenceCountedVTable) @@ -4057,6 +4081,8 @@ IProfilerStatsImpl = class(IProfilerStats) isc_dpb_parallel_workers = byte(100); isc_dpb_worker_attach = byte(101); isc_dpb_owner = byte(102); + isc_dpb_max_blob_cache_size = byte(103); + isc_dpb_max_inline_blob_size = byte(104); isc_dpb_address = byte(1); isc_dpb_addr_protocol = byte(1); isc_dpb_addr_endpoint = byte(2); @@ -4516,6 +4542,8 @@ IProfilerStatsImpl = class(IProfilerStats) fb_info_wire_snd_bytes = byte(156); fb_info_wire_rcv_bytes = byte(157); fb_info_wire_roundtrips = byte(158); + fb_info_max_blob_cache_size = byte(159); + fb_info_max_inline_blob_size = byte(160); fb_info_crypt_encrypted = $01; fb_info_crypt_process = $02; fb_feature_multi_statements = byte(1); @@ -7220,6 +7248,29 @@ procedure IStatement.free(status: IStatus); FbException.checkException(status); end; +function IStatement.getMaxInlineBlobSize(status: IStatus): Cardinal; +begin + if (vTable.version < 6) then begin + FbException.setVersionError(status, 'IStatement', vTable.version, 6); + Result := 0; + end + else begin + Result := StatementVTable(vTable).getMaxInlineBlobSize(Self, status); + end; + FbException.checkException(status); +end; + +procedure IStatement.setMaxInlineBlobSize(status: IStatus; size: Cardinal); +begin + if (vTable.version < 6) then begin + FbException.setVersionError(status, 'IStatement', vTable.version, 6); + end + else begin + StatementVTable(vTable).setMaxInlineBlobSize(Self, status, size); + end; + FbException.checkException(status); +end; + procedure IBatch.add(status: IStatus; count: Cardinal; inBuffer: Pointer); begin BatchVTable(vTable).add(Self, status, count, inBuffer); @@ -7655,6 +7706,52 @@ procedure IAttachment.dropDatabase(status: IStatus); FbException.checkException(status); end; +function IAttachment.getMaxBlobCacheSize(status: IStatus): Cardinal; +begin + if (vTable.version < 6) then begin + FbException.setVersionError(status, 'IAttachment', vTable.version, 6); + Result := 0; + end + else begin + Result := AttachmentVTable(vTable).getMaxBlobCacheSize(Self, status); + end; + FbException.checkException(status); +end; + +procedure IAttachment.setMaxBlobCacheSize(status: IStatus; size: Cardinal); +begin + if (vTable.version < 6) then begin + FbException.setVersionError(status, 'IAttachment', vTable.version, 6); + end + else begin + AttachmentVTable(vTable).setMaxBlobCacheSize(Self, status, size); + end; + FbException.checkException(status); +end; + +function IAttachment.getMaxInlineBlobSize(status: IStatus): Cardinal; +begin + if (vTable.version < 6) then begin + FbException.setVersionError(status, 'IAttachment', vTable.version, 6); + Result := 0; + end + else begin + Result := AttachmentVTable(vTable).getMaxInlineBlobSize(Self, status); + end; + FbException.checkException(status); +end; + +procedure IAttachment.setMaxInlineBlobSize(status: IStatus; size: Cardinal); +begin + if (vTable.version < 6) then begin + FbException.setVersionError(status, 'IAttachment', vTable.version, 6); + end + else begin + AttachmentVTable(vTable).setMaxInlineBlobSize(Self, status, size); + end; + FbException.checkException(status); +end; + procedure IService.deprecatedDetach(status: IStatus); begin ServiceVTable(vTable).deprecatedDetach(Self, status); @@ -11577,6 +11674,25 @@ procedure IStatementImpl_freeDispatcher(this: IStatement; status: IStatus); cdec end end; +function IStatementImpl_getMaxInlineBlobSizeDispatcher(this: IStatement; status: IStatus): Cardinal; cdecl; +begin + Result := 0; + try + Result := IStatementImpl(this).getMaxInlineBlobSize(status); + except + on e: Exception do FbException.catchException(status, e); + end +end; + +procedure IStatementImpl_setMaxInlineBlobSizeDispatcher(this: IStatement; status: IStatus; size: Cardinal); cdecl; +begin + try + IStatementImpl(this).setMaxInlineBlobSize(status, size); + except + on e: Exception do FbException.catchException(status, e); + end +end; + var IStatementImpl_vTable: StatementVTable; @@ -12253,6 +12369,44 @@ procedure IAttachmentImpl_dropDatabaseDispatcher(this: IAttachment; status: ISta end end; +function IAttachmentImpl_getMaxBlobCacheSizeDispatcher(this: IAttachment; status: IStatus): Cardinal; cdecl; +begin + Result := 0; + try + Result := IAttachmentImpl(this).getMaxBlobCacheSize(status); + except + on e: Exception do FbException.catchException(status, e); + end +end; + +procedure IAttachmentImpl_setMaxBlobCacheSizeDispatcher(this: IAttachment; status: IStatus; size: Cardinal); cdecl; +begin + try + IAttachmentImpl(this).setMaxBlobCacheSize(status, size); + except + on e: Exception do FbException.catchException(status, e); + end +end; + +function IAttachmentImpl_getMaxInlineBlobSizeDispatcher(this: IAttachment; status: IStatus): Cardinal; cdecl; +begin + Result := 0; + try + Result := IAttachmentImpl(this).getMaxInlineBlobSize(status); + except + on e: Exception do FbException.catchException(status, e); + end +end; + +procedure IAttachmentImpl_setMaxInlineBlobSizeDispatcher(this: IAttachment; status: IStatus; size: Cardinal); cdecl; +begin + try + IAttachmentImpl(this).setMaxInlineBlobSize(status, size); + except + on e: Exception do FbException.catchException(status, e); + end +end; + var IAttachmentImpl_vTable: AttachmentVTable; @@ -17319,7 +17473,7 @@ initialization IResultSetImpl_vTable.getInfo := @IResultSetImpl_getInfoDispatcher; IStatementImpl_vTable := StatementVTable.create; - IStatementImpl_vTable.version := 5; + IStatementImpl_vTable.version := 6; IStatementImpl_vTable.addRef := @IStatementImpl_addRefDispatcher; IStatementImpl_vTable.release := @IStatementImpl_releaseDispatcher; IStatementImpl_vTable.getInfo := @IStatementImpl_getInfoDispatcher; @@ -17337,6 +17491,8 @@ initialization IStatementImpl_vTable.setTimeout := @IStatementImpl_setTimeoutDispatcher; IStatementImpl_vTable.createBatch := @IStatementImpl_createBatchDispatcher; IStatementImpl_vTable.free := @IStatementImpl_freeDispatcher; + IStatementImpl_vTable.getMaxInlineBlobSize := @IStatementImpl_getMaxInlineBlobSizeDispatcher; + IStatementImpl_vTable.setMaxInlineBlobSize := @IStatementImpl_setMaxInlineBlobSizeDispatcher; IBatchImpl_vTable := BatchVTable.create; IBatchImpl_vTable.version := 4; @@ -17393,7 +17549,7 @@ initialization IEventsImpl_vTable.cancel := @IEventsImpl_cancelDispatcher; IAttachmentImpl_vTable := AttachmentVTable.create; - IAttachmentImpl_vTable.version := 5; + IAttachmentImpl_vTable.version := 6; IAttachmentImpl_vTable.addRef := @IAttachmentImpl_addRefDispatcher; IAttachmentImpl_vTable.release := @IAttachmentImpl_releaseDispatcher; IAttachmentImpl_vTable.getInfo := @IAttachmentImpl_getInfoDispatcher; @@ -17422,6 +17578,10 @@ initialization IAttachmentImpl_vTable.createReplicator := @IAttachmentImpl_createReplicatorDispatcher; IAttachmentImpl_vTable.detach := @IAttachmentImpl_detachDispatcher; IAttachmentImpl_vTable.dropDatabase := @IAttachmentImpl_dropDatabaseDispatcher; + IAttachmentImpl_vTable.getMaxBlobCacheSize := @IAttachmentImpl_getMaxBlobCacheSizeDispatcher; + IAttachmentImpl_vTable.setMaxBlobCacheSize := @IAttachmentImpl_setMaxBlobCacheSizeDispatcher; + IAttachmentImpl_vTable.getMaxInlineBlobSize := @IAttachmentImpl_getMaxInlineBlobSizeDispatcher; + IAttachmentImpl_vTable.setMaxInlineBlobSize := @IAttachmentImpl_setMaxInlineBlobSizeDispatcher; IServiceImpl_vTable := ServiceVTable.create; IServiceImpl_vTable.version := 5; diff --git a/src/jrd/EngineInterface.h b/src/jrd/EngineInterface.h index 3f06df7e2eb..3a2adf4fcc6 100644 --- a/src/jrd/EngineInterface.h +++ b/src/jrd/EngineInterface.h @@ -307,6 +307,9 @@ class JStatement final : JBatch* createBatch(Firebird::CheckStatusWrapper* status, Firebird::IMessageMetadata* inMetadata, unsigned parLength, const unsigned char* par) override; + unsigned getMaxInlineBlobSize(Firebird::CheckStatusWrapper* status) override; + void setMaxInlineBlobSize(Firebird::CheckStatusWrapper* status, unsigned size) override; + public: JStatement(DsqlRequest* handle, StableAttachmentPart* sa, Firebird::Array& meta); @@ -460,6 +463,10 @@ class JAttachment final : Firebird::IMessageMetadata* inMetadata, unsigned parLength, const unsigned char* par) override; Firebird::IReplicator* createReplicator(Firebird::CheckStatusWrapper* status) override; + unsigned getMaxBlobCacheSize(Firebird::CheckStatusWrapper* status) override; + void setMaxBlobCacheSize(Firebird::CheckStatusWrapper* status, unsigned size) override; + unsigned getMaxInlineBlobSize(Firebird::CheckStatusWrapper* status) override; + void setMaxInlineBlobSize(Firebird::CheckStatusWrapper* status, unsigned size) override; public: explicit JAttachment(StableAttachmentPart* js); diff --git a/src/jrd/jrd.cpp b/src/jrd/jrd.cpp index 08f560ddf73..054c3d71b49 100644 --- a/src/jrd/jrd.cpp +++ b/src/jrd/jrd.cpp @@ -5308,6 +5308,28 @@ IReplicator* JAttachment::createReplicator(CheckStatusWrapper* user_status) return jr; } +unsigned JAttachment::getMaxBlobCacheSize(CheckStatusWrapper* status) +{ + status->setErrors(Arg::Gds(isc_wish_list).value()); + return 0; +} + +void JAttachment::setMaxBlobCacheSize(CheckStatusWrapper* status, unsigned size) +{ + status->setErrors(Arg::Gds(isc_wish_list).value()); +} + +unsigned JAttachment::getMaxInlineBlobSize(CheckStatusWrapper* status) +{ + status->setErrors(Arg::Gds(isc_wish_list).value()); + return 0; +} + +void JAttachment::setMaxInlineBlobSize(CheckStatusWrapper* status, unsigned size) +{ + status->setErrors(Arg::Gds(isc_wish_list).value()); +} + int JResultSet::fetchNext(CheckStatusWrapper* user_status, void* buffer) { @@ -6090,6 +6112,17 @@ JBatch* JStatement::createBatch(Firebird::CheckStatusWrapper* status, Firebird:: return batch; } +unsigned JStatement::getMaxInlineBlobSize(CheckStatusWrapper* status) +{ + status->setErrors(Arg::Gds(isc_wish_list).value()); + return 0; +} + +void JStatement::setMaxInlineBlobSize(CheckStatusWrapper* status, unsigned size) +{ + status->setErrors(Arg::Gds(isc_wish_list).value()); +} + JBatch::JBatch(DsqlBatch* handle, JStatement* aStatement, IMessageMetadata* aMetadata) : batch(handle), diff --git a/src/remote/client/interface.cpp b/src/remote/client/interface.cpp index d175c6a9bad..c907dfb9080 100644 --- a/src/remote/client/interface.cpp +++ b/src/remote/client/interface.cpp @@ -699,6 +699,9 @@ class Statement final : public RefCntIfaceport_context, filename); - a->addRef(); - return a; + Attachment* att = FB_NEW Attachment(port->port_context, filename); + att->addRef(); + att->setParamsFromDPB(newDpb); + + return att; } catch (const Exception& ex) { @@ -1312,6 +1326,8 @@ void Blob::getInfo(CheckStatusWrapper* status, if (blob->rbl_info.getLocalInfo(itemsLength, items, bufferLength, buffer)) return; + fb_assert(!blob->isCached()); + rem_port* port = rdb->rdb_port; RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION); @@ -1353,7 +1369,8 @@ void Blob::freeClientData(CheckStatusWrapper* status, bool force) try { - release_object(status, rdb, op_cancel_blob, blob->rbl_id); + if (!blob->isCached()) + release_object(status, rdb, op_cancel_blob, blob->rbl_id); } catch (const Exception&) { @@ -1426,10 +1443,13 @@ void Blob::internalClose(CheckStatusWrapper* status) if ((blob->rbl_flags & Rbl::CREATE) && blob->rbl_ptr != blob->rbl_buffer) { + fb_assert(!blob->isCached()); + send_blob(status, blob, 0, NULL); } - release_object(status, rdb, op_close_blob, blob->rbl_id); + if (!blob->isCached()) + release_object(status, rdb, op_close_blob, blob->rbl_id); release_blob(blob); blob = NULL; } @@ -1846,15 +1866,14 @@ IBlob* Attachment::createBlob(CheckStatusWrapper* status, ITransaction* apiTra, p_blob->p_blob_bpb.cstr_length = 0; p_blob->p_blob_bpb.cstr_address = NULL; - Rbl* blob = FB_NEW Rbl(); - *blob_id = packet->p_resp.p_resp_blob_id; + Rbl* blob = FB_NEW Rbl(BLOB_LENGTH); + blob->rbl_blob_id = *blob_id = packet->p_resp.p_resp_blob_id; blob->rbl_rdb = rdb; blob->rbl_rtr = transaction; blob->rbl_id = packet->p_resp.p_resp_object; blob->rbl_flags |= Rbl::CREATE; SET_OBJECT(rdb, blob, blob->rbl_id); - blob->rbl_next = transaction->rtr_blobs; - transaction->rtr_blobs = blob; + transaction->rtr_blobs.add(blob); IBlob* b = FB_NEW Blob(blob); b->addRef(); @@ -1971,7 +1990,7 @@ IAttachment* Loopback::createDatabase(CheckStatusWrapper* status, const char* fi } -unsigned char* Attachment::getWireStatsInfo(UCharBuffer& info, unsigned int buffer_length, +unsigned char* Attachment::getLocalInfo(UCharBuffer& info, unsigned int buffer_length, unsigned char* buffer) { const rem_port* const port = rdb->rdb_port; @@ -1991,6 +2010,9 @@ unsigned char* Attachment::getWireStatsInfo(UCharBuffer& info, unsigned int buff break; } + FB_UINT64 value; + bool skip = false; + switch (*item) { case fb_info_wire_snd_packets: @@ -2002,25 +2024,37 @@ unsigned char* Attachment::getWireStatsInfo(UCharBuffer& info, unsigned int buff case fb_info_wire_out_bytes: case fb_info_wire_in_bytes: case fb_info_wire_roundtrips: - { - const FB_UINT64 value = port->getStatItem(*item); + value = port->getStatItem(*item); + break; - if (value <= MAX_SLONG) - ptr = fb_utils::putInfoItemInt(*item, (SLONG) value, ptr, end); - else - ptr = fb_utils::putInfoItemInt(*item, value, ptr, end); + case fb_info_max_blob_cache_size: + value = rdb->rdb_blob_cache_size; + break; - if (!ptr) - return nullptr; + case fb_info_max_inline_blob_size: + value = rdb->rdb_inline_blob_size; + break; - info.remove(item); + default: + skip = true; break; } - default: + if (skip) + { item++; - break; + continue; } + + if (value <= MAX_SLONG) + ptr = fb_utils::putInfoItemInt(*item, (SLONG) value, ptr, end); + else + ptr = fb_utils::putInfoItemInt(*item, value, ptr, end); + + if (!ptr) + return nullptr; + + info.remove(item); } if (info.isEmpty() && ptr < end) @@ -2059,7 +2093,7 @@ void Attachment::getInfo(CheckStatusWrapper* status, RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION); UCharBuffer tempInfo(items, item_length); - UCHAR* ptr = getWireStatsInfo(tempInfo, buffer_length, buffer); + UCHAR* ptr = getLocalInfo(tempInfo, buffer_length, buffer); if (!ptr) return; @@ -2440,6 +2474,168 @@ Batch* Attachment::createBatch(CheckStatusWrapper* status, ITransaction* transac } +void Attachment::setParamsFromDPB(ClumpletReader& dpb) +{ + dpb.rewind(); + for (; !dpb.isEof(); dpb.moveNext()) + { + const UCHAR item = dpb.getClumpTag(); + switch (item) + { + case isc_dpb_max_blob_cache_size: + case isc_dpb_max_inline_blob_size: + if (rdb->rdb_port->port_protocol >= PROTOCOL_INLINE_BLOB) + { + SLONG val = dpb.getInt(); + if (val < 0) + val = 0; + + if (item == isc_dpb_max_blob_cache_size) + rdb->rdb_blob_cache_size = val; + else + rdb->rdb_inline_blob_size = MIN(val, MAX_INLINE_BLOB_SIZE); + } + break; + + default: + break; + } + } +} + + +unsigned Attachment::getMaxBlobCacheSize(CheckStatusWrapper* status) +{ + try + { + reset(status); + CHECK_HANDLE(rdb, isc_bad_db_handle); + + if (rdb->rdb_port->port_protocol < PROTOCOL_INLINE_BLOB) + unsupported(); + + return rdb->rdb_blob_cache_size; + } + catch (const Exception& ex) + { + ex.stuffException(status); + } + return 0; +} + + +void Attachment::setMaxBlobCacheSize(CheckStatusWrapper* status, unsigned size) +{ + try + { + reset(status); + CHECK_HANDLE(rdb, isc_bad_db_handle); + + if (rdb->rdb_port->port_protocol < PROTOCOL_INLINE_BLOB) + unsupported(); + + rdb->rdb_blob_cache_size = size; + } + catch (const Exception& ex) + { + ex.stuffException(status); + } +} + + +unsigned Attachment::getMaxInlineBlobSize(CheckStatusWrapper* status) +{ + try + { + reset(status); + CHECK_HANDLE(rdb, isc_bad_db_handle); + + if (rdb->rdb_port->port_protocol < PROTOCOL_INLINE_BLOB) + unsupported(); + + return rdb->rdb_inline_blob_size; + } + catch (const Exception& ex) + { + ex.stuffException(status); + } + return 0; +} + + +void Attachment::setMaxInlineBlobSize(CheckStatusWrapper* status, unsigned size) +{ + try + { + reset(status); + CHECK_HANDLE(rdb, isc_bad_db_handle); + + if (rdb->rdb_port->port_protocol < PROTOCOL_INLINE_BLOB) + unsupported(); + + if (size > MAX_INLINE_BLOB_SIZE) + size = MAX_INLINE_BLOB_SIZE; + + rdb->rdb_inline_blob_size = size; + } + catch (const Exception& ex) + { + ex.stuffException(status); + } +} + + +unsigned Statement::getMaxInlineBlobSize(CheckStatusWrapper* status) +{ + try + { + reset(status); + + Rsr* statement = getStatement(); + CHECK_HANDLE(statement, isc_bad_req_handle); + Rdb* rdb = statement->rsr_rdb; + CHECK_HANDLE(rdb, isc_bad_db_handle); + + if (rdb->rdb_port->port_protocol < PROTOCOL_INLINE_BLOB) + unsupported(); + + return statement->rsr_inline_blob_size; + } + catch (const Exception& ex) + { + ex.stuffException(status); + } + + return 0; +} + + +void Statement::setMaxInlineBlobSize(CheckStatusWrapper* status, unsigned size) +{ + try + { + reset(status); + + Rsr* statement = getStatement(); + CHECK_HANDLE(statement, isc_bad_req_handle); + Rdb* rdb = statement->rsr_rdb; + CHECK_HANDLE(rdb, isc_bad_db_handle); + + if (rdb->rdb_port->port_protocol < PROTOCOL_INLINE_BLOB) + unsupported(); + + if (size > MAX_INLINE_BLOB_SIZE) + size = MAX_INLINE_BLOB_SIZE; + + statement->rsr_inline_blob_size = size; + } + catch (const Exception& ex) + { + ex.stuffException(status); + } +} + + Batch* Statement::createBatch(CheckStatusWrapper* status, IMessageMetadata* inMetadata, unsigned parLength, const unsigned char* par) { @@ -3568,6 +3764,7 @@ ITransaction* Statement::execute(CheckStatusWrapper* status, ITransaction* apiTr sqldata->p_sqldata_out_message_number = 0; // out_msg_type sqldata->p_sqldata_timeout = statement->rsr_timeout; sqldata->p_sqldata_cursor_flags = 0; + sqldata->p_sqldata_inline_blob_size = statement->rsr_inline_blob_size; send_packet(port, packet); @@ -3578,8 +3775,21 @@ ITransaction* Statement::execute(CheckStatusWrapper* status, ITransaction* apiTr if (out_msg_length) port->port_statement->rsr_message->msg_address = out_msg; + // Prepare to receive inline blobs + P_INLINE_BLOB* p_blob = &packet->p_inline_blob; + UCHAR blobInfo[64]; + + UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo); + receive_packet(port, packet); + while (packet->p_operation == op_inline_blob) + { + fb_assert(transaction); + transaction->setupInlineBlob(p_blob); + receive_packet(port, packet); + } + if (packet->p_operation != op_sql_response) REMOTE_check_response(status, rdb, packet); else @@ -3734,6 +3944,7 @@ ResultSet* Statement::openCursor(CheckStatusWrapper* status, ITransaction* apiTr sqldata->p_sqldata_out_message_number = 0; // out_msg_type sqldata->p_sqldata_timeout = statement->rsr_timeout; sqldata->p_sqldata_cursor_flags = flags; + sqldata->p_sqldata_inline_blob_size = statement->rsr_inline_blob_size; { Cleanup msgClean([&message] { @@ -3927,6 +4138,8 @@ ITransaction* Attachment::execute(CheckStatusWrapper* status, ITransaction* apiT ex_now->p_sqlst_out_blr.cstr_length = out_blr_length; ex_now->p_sqlst_out_blr.cstr_address = const_cast(out_blr); ex_now->p_sqlst_out_message_number = 0; // out_msg_type + ex_now->p_sqlst_inline_blob_size = (packet->p_operation == op_exec_immediate2) ? + rdb->rdb_inline_blob_size : 0; send_packet(port, packet); @@ -3940,8 +4153,21 @@ ITransaction* Attachment::execute(CheckStatusWrapper* status, ITransaction* apiT if (in_msg_length || out_msg_length) port->port_statement->rsr_message->msg_address = out_msg; + // Prepare to receive inline blobs + P_INLINE_BLOB* p_blob = &packet->p_inline_blob; + UCHAR blobInfo[64]; + + UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo); + receive_packet(rdb->rdb_port, packet); + while (packet->p_operation == op_inline_blob) + { + fb_assert(transaction); + transaction->setupInlineBlob(p_blob); + receive_packet(port, packet); + } + if (packet->p_operation != op_sql_response) REMOTE_check_response(status, rdb, packet); else @@ -4125,6 +4351,7 @@ Statement* Attachment::createStatement(CheckStatusWrapper* status, unsigned dial statement->rsr_next = rdb->rdb_sql_requests; rdb->rdb_sql_requests = statement; + statement->rsr_inline_blob_size = rdb->rdb_inline_blob_size; Statement* s = FB_NEW Statement(statement, this, dialect); s->addRef(); @@ -5507,6 +5734,8 @@ int Blob::getSegment(CheckStatusWrapper* status, unsigned int bufferLength, void break; } + fb_assert(!blob->isCached()); + // Preparatory to asking for more data, use input buffer length // to cue more efficient blob buffering. @@ -5668,6 +5897,20 @@ IBlob* Attachment::openBlob(CheckStatusWrapper* status, ITransaction* apiTra, IS Rtr* transaction = remoteTransaction(apiTra); CHECK_HANDLE(transaction, isc_bad_trans_handle); + if (transaction->rtr_blobs.locate(*id)) + { + Rbl* blob = transaction->rtr_blobs.current(); + + if (!bpb_length) + { + Blob* iBlob = FB_NEW Blob(blob); + iBlob->addRef(); + return iBlob; + } + + release_blob(blob); + } + // Validate data length CHECK_LENGTH(port, bpb_length); @@ -5766,13 +6009,13 @@ IBlob* Attachment::openBlob(CheckStatusWrapper* status, ITransaction* apiTra, IS //p_blob->p_blob_bpb.cstr_length = 0; //p_blob->p_blob_bpb.cstr_address = NULL; - Rbl* blob = FB_NEW Rbl; + Rbl* blob = FB_NEW Rbl(BLOB_LENGTH); blob->rbl_rdb = rdb; blob->rbl_rtr = transaction; + blob->rbl_blob_id = *id; blob->rbl_id = packet->p_resp.p_resp_object; SET_OBJECT(rdb, blob, blob->rbl_id); - blob->rbl_next = transaction->rtr_blobs; - transaction->rtr_blobs = blob; + transaction->rtr_blobs.add(blob); Blob* iBlob = FB_NEW Blob(blob); iBlob->addRef(); @@ -6619,6 +6862,11 @@ int Blob::seek(CheckStatusWrapper* status, int mode, int offset) CHECK_HANDLE(blob, isc_bad_segstr_handle); + if (blob->isCached()) + { + Arg::Gds(isc_wish_list).raise(); + } + Rdb* rdb = blob->rbl_rdb; CHECK_HANDLE(rdb, isc_bad_db_handle); rem_port* port = rdb->rdb_port; @@ -7843,6 +8091,12 @@ static void batch_dsql_fetch(rem_port* port, // Avoid damaging preallocated buffer for response data UseStandardBuffer guard(packet->p_resp.p_resp_data); + // Prepare to receive inline blobs + P_INLINE_BLOB* p_blob = &packet->p_inline_blob; + UCHAR blobInfo[64]; + + UsePreallocatedBuffer guardBlobInfo(p_blob->p_blob_info, sizeof(blobInfo), blobInfo); + statement->rsr_flags.set(Rsr::FETCHED); while (true) { @@ -7878,6 +8132,17 @@ static void batch_dsql_fetch(rem_port* port, throw; } + if (packet->p_operation == op_inline_blob) + { + fb_assert(!statement->rsr_rtr || statement->rsr_rtr->rtr_id == p_blob->p_tran_id); + + Rtr* transaction = statement->rsr_rtr ? + statement->rsr_rtr : port->port_objects[p_blob->p_tran_id]; + + transaction->setupInlineBlob(p_blob); + continue; + } + if (packet->p_operation != op_fetch_response) { statement->rsr_flags.set(Rsr::STREAM_ERR); @@ -9217,16 +9482,17 @@ static void release_blob( Rbl* blob) **************************************/ Rtr* transaction = blob->rbl_rtr; Rdb* rdb = blob->rbl_rdb; - rdb->rdb_port->releaseObject(blob->rbl_id); - for (Rbl** p = &transaction->rtr_blobs; *p; p = &(*p)->rbl_next) + if (blob->isCached()) { - if (*p == blob) - { - *p = blob->rbl_next; - break; - } + // Assume buffer was not resized while blob was cached + rdb->decBlobCache(blob->getCachedSize()); } + else + rdb->rdb_port->releaseObject(blob->rbl_id); + + if (transaction->rtr_blobs.locate(blob->rbl_blob_id)) + transaction->rtr_blobs.fastRemove(); delete blob; } @@ -9384,8 +9650,8 @@ static void release_transaction( Rtr* transaction) Rdb* rdb = transaction->rtr_rdb; rdb->rdb_port->releaseObject(transaction->rtr_id); - while (transaction->rtr_blobs) - release_blob(transaction->rtr_blobs); + while (transaction->rtr_blobs.getFirst()) + release_blob(transaction->rtr_blobs.current()); for (Rtr** p = &rdb->rdb_transactions; *p; p = &(*p)->rtr_next) { @@ -9433,6 +9699,8 @@ static void send_blob(CheckStatusWrapper* status, * Actually send blob data (which might be buffered) * **************************************/ + fb_assert(!blob->isCached()); + Rdb* rdb = blob->rbl_rdb; PACKET* packet = &rdb->rdb_packet; packet->p_operation = op_put_segment; diff --git a/src/remote/parser.cpp b/src/remote/parser.cpp index ac1896b1838..eabc92907a7 100644 --- a/src/remote/parser.cpp +++ b/src/remote/parser.cpp @@ -161,8 +161,9 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length) ULONG net_length = 0; ULONG offset = 0; + dsc* const begin = format->fmt_desc.begin(); - for (dsc* desc = format->fmt_desc.begin(); count; --count, ++desc) + for (dsc* desc = begin; count; --count, ++desc) { if (blr_length-- == 0) return NULL; @@ -262,6 +263,8 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length) desc->dsc_dtype = dtype_quad; desc->dsc_length = sizeof(SLONG) * 2; desc->dsc_scale = *blr++; + + format->fmt_blob_idx.add(desc - begin); break; case blr_float: @@ -312,6 +315,8 @@ static rem_fmt* parse_format(const UCHAR*& blr, size_t& blr_length) USHORT textType = *blr++; textType += (*blr++) << 8; desc->setTextType(textType); + + format->fmt_blob_idx.add(desc - begin); } break; diff --git a/src/remote/protocol.cpp b/src/remote/protocol.cpp index 2564d978cee..ea0e0348dc7 100644 --- a/src/remote/protocol.cpp +++ b/src/remote/protocol.cpp @@ -120,7 +120,9 @@ static bool_t xdr_trrq_blr(RemoteXdr*, CSTRING*); static bool_t xdr_trrq_message(RemoteXdr*, USHORT); static bool_t xdr_bytes(RemoteXdr*, void*, ULONG); static bool_t xdr_blob_stream(RemoteXdr*, SSHORT, CSTRING*); +static bool_t xdr_blobBuffer(RemoteXdr* xdrs, RemBlobBuffer* buff); static Rsr* getStatement(RemoteXdr*, USHORT); +static Rtr* getTransaction(RemoteXdr*, USHORT); inline void fixupLength(const RemoteXdr* xdrs, ULONG& length) @@ -668,6 +670,8 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) MAP(xdr_u_long, sqldata->p_sqldata_timeout); if (port->port_protocol >= PROTOCOL_FETCH_SCROLL) MAP(xdr_u_long, sqldata->p_sqldata_cursor_flags); + if (port->port_protocol >= PROTOCOL_INLINE_BLOB) + MAP(xdr_u_long, sqldata->p_sqldata_inline_blob_size); DEBUG_PRINTSIZE(xdrs, p->p_operation); return P_TRUE(xdrs, p); @@ -689,6 +693,10 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) return P_FALSE(xdrs, p); } MAP(xdr_short, reinterpret_cast(prep_stmt->p_sqlst_out_message_number)); + + if (port->port_protocol >= PROTOCOL_INLINE_BLOB) + MAP(xdr_u_long, prep_stmt->p_sqlst_inline_blob_size); + // Fall into ... case op_exec_immediate: @@ -1138,6 +1146,41 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) return P_TRUE(xdrs, p); } + case op_inline_blob: + { + P_INLINE_BLOB* p_blob = &p->p_inline_blob; + MAP(xdr_short, reinterpret_cast(p_blob->p_tran_id)); + MAP(xdr_quad, p_blob->p_blob_id); + + if (xdrs->x_op == XDR_ENCODE) + { + MAP(xdr_response, p_blob->p_blob_info); + if (!xdr_blobBuffer(xdrs, p_blob->p_blob_data)) + return P_FALSE(xdrs, p); + } + else if (xdrs->x_op == XDR_DECODE) + { + Rtr* tran = getTransaction(xdrs, p_blob->p_tran_id); + + if (!tran) + return P_FALSE(xdrs, p); + + MAP(xdr_response, p_blob->p_blob_info); + + AutoPtr blb = tran->createInlineBlob(); + p_blob->p_blob_data = &blb->rbl_data; + + if (!xdr_blobBuffer(xdrs, p_blob->p_blob_data)) + { + tran->rtr_inline_blob = nullptr; + return P_FALSE(xdrs, p); + } + blb.release(); + } + + return P_TRUE(xdrs, p); + } + ///case op_insert: default: #ifdef DEV_BUILD @@ -2274,6 +2317,23 @@ static Rsr* getStatement(RemoteXdr* xdrs, USHORT statement_id) return port->port_statement; } +static Rtr* getTransaction(RemoteXdr* xdrs, USHORT tran_id) +{ + rem_port* port = xdrs->x_public; + + if (tran_id >= port->port_objects.getCount()) + return nullptr; + + try + { + return port->port_objects[tran_id]; + } + catch (const status_exception&) + { + return nullptr; + } +} + static bool_t xdr_blob_stream(RemoteXdr* xdrs, SSHORT statement_id, CSTRING* strmPortion) { if (xdrs->x_op == XDR_FREE) @@ -2492,3 +2552,53 @@ static bool_t xdr_blob_stream(RemoteXdr* xdrs, SSHORT statement_id, CSTRING* str return TRUE; } + +static bool_t xdr_blobBuffer(RemoteXdr* xdrs, RemBlobBuffer* buff) +{ + SLONG len; + UCHAR* data; + static const SCHAR filler[4] = { 0, 0, 0, 0 }; + + switch (xdrs->x_op) + { + case XDR_ENCODE: + len = buff->getCount(); + if (!xdr_long(xdrs, &len)) + return FALSE; + + data = buff->begin(); + if (len && !xdrs->x_putbytes(reinterpret_cast(data), len)) + return FALSE; + + len = (4 - len) & 3; + if (len) + return xdrs->x_putbytes(filler, len); + + return TRUE; + + case XDR_DECODE: + if (!xdr_long(xdrs, &len)) + return FALSE; + + if (len) + { + data = buff->getBuffer(len); + if (!xdrs->x_getbytes(reinterpret_cast(data), len)) + return FALSE; + } + + len = (4 - len) & 3; + if (len) + { + SCHAR trash[4]; + return xdrs->x_getbytes(trash, len); + } + + return TRUE; + + case XDR_FREE: + return TRUE; + } + + return FALSE; +} diff --git a/src/remote/protocol.h b/src/remote/protocol.h index 5a0c9ffdb32..4e4c4d990e5 100644 --- a/src/remote/protocol.h +++ b/src/remote/protocol.h @@ -38,6 +38,9 @@ namespace Firebird { class DynamicStatusVector; } +class RemBlobBuffer; // see remote.h + + // dimitr: ask for asymmetric protocols only. // Comment it out to return back to FB 1.0 behaviour. #define ASYMMETRIC_PROTOCOLS_ONLY @@ -106,8 +109,10 @@ const USHORT PROTOCOL_FETCH_SCROLL = PROTOCOL_VERSION18; // Protocol 19: // - supports passing flags to IStatement::prepare +// - supports op_inline_blob const USHORT PROTOCOL_VERSION19 = (FB_PROTOCOL_FLAG | 19); +const USHORT PROTOCOL_INLINE_BLOB = PROTOCOL_VERSION19; // Architecture types @@ -324,6 +329,8 @@ enum P_OP op_fetch_scroll = 112, op_info_cursor = 113, + op_inline_blob = 114, + op_max }; @@ -623,6 +630,7 @@ typedef struct p_sqlst CSTRING p_sqlst_out_blr; // blr describing output message USHORT p_sqlst_out_message_number; USHORT p_sqlst_flags; // prepare flags + ULONG p_sqlst_inline_blob_size; // maximum size of inlined blob } P_SQLST; typedef struct p_sqldata @@ -640,6 +648,7 @@ typedef struct p_sqldata ULONG p_sqldata_cursor_flags; // cursor flags P_FETCH p_sqldata_fetch_op; // Fetch operation SLONG p_sqldata_fetch_pos; // Fetch position + ULONG p_sqldata_inline_blob_size; // maximum size of inlined blob } P_SQLDATA; typedef struct p_sqlfree @@ -766,6 +775,14 @@ typedef struct p_replicate CSTRING_CONST p_repl_data; // replication data } P_REPLICATE; +typedef struct p_inline_blob +{ + OBJCT p_tran_id; // transaction id + SQUAD p_blob_id; // blob id + CSTRING p_blob_info; // blob info + RemBlobBuffer* p_blob_data; // blob data +} P_INLINE_BLOB; + // Generalize packet (sic!) @@ -819,6 +836,7 @@ typedef struct packet P_BATCH_REGBLOB p_batch_regblob; // Register already existing BLOB in batch P_BATCH_SETBPB p_batch_setbpb; // Set default BPB for batch P_REPLICATE p_replicate; // replicate + P_INLINE_BLOB p_inline_blob; // inline blob public: packet() diff --git a/src/remote/remote.cpp b/src/remote/remote.cpp index 609dd6eeb06..6e3141f4dc2 100644 --- a/src/remote/remote.cpp +++ b/src/remote/remote.cpp @@ -959,6 +959,63 @@ void Rrq::saveStatus(IStatus* v) noexcept } } +Rbl* Rtr::createInlineBlob() +{ + fb_assert(!rtr_inline_blob); + + Rbl* blb = FB_NEW Rbl(0); + + blb->rbl_rdb = rtr_rdb; + blb->rbl_rtr = this; + + blb->rbl_id = INVALID_OBJECT; + //SET_OBJECT(rdb, blb, blb->rbl_id); + + blb->rbl_flags |= Rbl::CACHED; + + rtr_inline_blob = blb; + return blb; +}; + +void Rtr::setupInlineBlob(P_INLINE_BLOB* p_blob) +{ + fb_assert(p_blob->p_tran_id == this->rtr_id); + fb_assert(rtr_inline_blob); + + Rbl* blb = rtr_inline_blob; + rtr_inline_blob = nullptr; + + fb_assert(blb->rbl_data.getCount() <= MAX_INLINE_BLOB_SIZE); + fb_assert(blb->rbl_data.getCount() <= MAX_USHORT); + + blb->rbl_buffer_length = MIN(MAX_USHORT, blb->rbl_data.getCapacity()); + if (!rtr_rdb->incBlobCache(blb->getCachedSize())) + { + delete blb; + return; + } + + blb->rbl_blob_id = p_blob->p_blob_id; + if (!rtr_blobs.add(blb)) + { + // Blob with the same blob id already exists. It could be in use, or it + // could be opened by user explicitly with custom BPB - thus delete new one. + Rbl* old = rtr_blobs.current(); + + fb_assert(blb != old); + delete blb; + + rtr_rdb->decBlobCache(blb->getCachedSize()); + return; + } + + blb->rbl_info.parseInfo(p_blob->p_blob_info.cstr_length, p_blob->p_blob_info.cstr_address); + + blb->rbl_length = blb->rbl_data.getCount(); + blb->rbl_ptr = blb->rbl_buffer = blb->rbl_data.begin(); + blb->rbl_flags |= Rbl::EOF_PENDING; +} + void Rsr::saveException(const Exception& ex, bool overwrite) { if (!rsr_status) { diff --git a/src/remote/remote.h b/src/remote/remote.h index e61cf071f93..f63312be5c0 100644 --- a/src/remote/remote.h +++ b/src/remote/remote.h @@ -35,13 +35,13 @@ #include "../common/ThreadStart.h" #include "../common/Auth.h" #include "../common/classes/objects_array.h" +#include "../common/classes/tree.h" #include "../common/classes/fb_string.h" #include "../common/classes/ClumpletWriter.h" #include "../common/classes/RefMutex.h" #include "../common/StatusHolder.h" #include "../common/classes/RefCounted.h" #include "../common/classes/GetPlugins.h" -#include "../common/classes/RefMutex.h" #include "firebird/Interface.h" @@ -92,7 +92,6 @@ const int BLOB_LENGTH = 16384; #include "../remote/protocol.h" #include "fb_blk.h" -#include "firebird/Interface.h" // Prefetch constants @@ -103,6 +102,11 @@ const ULONG MAX_ROWS_PER_BATCH = 1000; const ULONG MAX_BATCH_CACHE_SIZE = 1024 * 1024; // 1 MB +const ULONG DEFAULT_BLOBS_CACHE_SIZE = 10 * 1024 * 1024; // 10 MB + +const ULONG MAX_INLINE_BLOB_SIZE = MAX_USHORT; +const ULONG DEFAULT_INLINE_BLOB_SIZE = MAX_USHORT; + // fwd. decl. namespace Firebird { class Exception; @@ -180,35 +184,68 @@ struct Rdb : public Firebird::GlobalStorage, public TypedHandle public: std::atomic rdb_async_lock; // Atomic to avoid >1 async calls at once + ULONG rdb_inline_blob_size; // default max size of blob that can be transfered inline + ULONG rdb_blob_cache_size; // limit on cached blobs size + ULONG rdb_cached_blobs_size; // actual size of cached blobs + ULONG rdb_cached_blobs_count; // actual count of cached blobs + public: Rdb() : rdb_iface(NULL), rdb_port(0), rdb_transactions(0), rdb_requests(0), rdb_events(0), rdb_sql_requests(0), - rdb_id(0), rdb_async_thread_id(0), rdb_async_lock(0) + rdb_id(0), rdb_async_thread_id(0), rdb_async_lock(0), + rdb_inline_blob_size(DEFAULT_INLINE_BLOB_SIZE), rdb_blob_cache_size(DEFAULT_BLOBS_CACHE_SIZE), + rdb_cached_blobs_size(0), rdb_cached_blobs_count(0) { } static ISC_STATUS badHandle() { return isc_bad_db_handle; } + + // Increment blob cache usage. + // Return false if blob cache have not enough space for a blob of given size. + bool incBlobCache(ULONG size) + { + if (rdb_cached_blobs_size + size > rdb_blob_cache_size) + return false; + + rdb_cached_blobs_size += size; + rdb_cached_blobs_count++; + return true; + } + + // Decrement blob cache usage. + void decBlobCache(ULONG size) + { + fb_assert(rdb_cached_blobs_size >= size); + fb_assert(rdb_cached_blobs_count > 0); + + rdb_cached_blobs_size -= size; + rdb_cached_blobs_count--; + } }; struct Rtr : public Firebird::GlobalStorage, public TypedHandle { + using BlobsTree = Firebird::BePlusTree; + Rdb* rtr_rdb; Rtr* rtr_next; - struct Rbl* rtr_blobs; + BlobsTree rtr_blobs; ServTransaction rtr_iface; USHORT rtr_id; bool rtr_limbo; Firebird::Array rtr_cursors; Rtr** rtr_self; + Rbl* rtr_inline_blob; public: Rtr() : - rtr_rdb(0), rtr_next(0), rtr_blobs(0), + rtr_rdb(0), rtr_next(0), rtr_blobs(getPool()), rtr_iface(NULL), rtr_id(0), rtr_limbo(0), - rtr_cursors(getPool()), rtr_self(NULL) + rtr_cursors(getPool()), rtr_self(NULL), + rtr_inline_blob(NULL) { } ~Rtr() @@ -218,6 +255,9 @@ struct Rtr : public Firebird::GlobalStorage, public TypedHandle } static ISC_STATUS badHandle() { return isc_bad_trans_handle; } + + Rbl* createInlineBlob(); + void setupInlineBlob(P_INLINE_BLOB* p_blob); }; @@ -242,15 +282,21 @@ struct RBlobInfo unsigned int bufferLength, unsigned char* buffer); }; +// Used in XDR +class RemBlobBuffer : public Firebird::Array +{ + using Firebird::Array::Array; +}; + struct Rbl : public Firebird::GlobalStorage, public TypedHandle { - Firebird::HalfStaticArray rbl_data; + RemBlobBuffer rbl_data; Rdb* rbl_rdb; Rtr* rbl_rtr; - Rbl* rbl_next; UCHAR* rbl_buffer; UCHAR* rbl_ptr; ServBlob rbl_iface; + SQUAD rbl_blob_id; SLONG rbl_offset; // Apparent (to user) offset in blob USHORT rbl_id; USHORT rbl_flags; @@ -265,18 +311,19 @@ struct Rbl : public Firebird::GlobalStorage, public TypedHandle public: // Values for rbl_flags enum { - EOF_SET = 1, - SEGMENT = 2, - EOF_PENDING = 4, - CREATE = 8 + EOF_SET = 0x01, + SEGMENT = 0x02, + EOF_PENDING = 0x04, + CREATE = 0x08, + CACHED = 0x10 }; public: - Rbl() : - rbl_data(getPool()), rbl_rdb(0), rbl_rtr(0), rbl_next(0), - rbl_buffer(rbl_data.getBuffer(BLOB_LENGTH)), rbl_ptr(rbl_buffer), rbl_iface(NULL), - rbl_offset(0), rbl_id(0), rbl_flags(0), - rbl_buffer_length(BLOB_LENGTH), rbl_length(0), rbl_fragment_length(0), + Rbl(unsigned int initialSize) : + rbl_data(getPool()), rbl_rdb(0), rbl_rtr(0), + rbl_buffer(rbl_data.getBuffer(initialSize)), rbl_ptr(rbl_buffer), rbl_iface(NULL), + rbl_blob_id(NULL_BLOB), rbl_offset(0), rbl_id(0), rbl_flags(0), + rbl_buffer_length(initialSize), rbl_length(0), rbl_fragment_length(0), rbl_source_interp(0), rbl_target_interp(0), rbl_self(NULL) { } @@ -290,6 +337,11 @@ struct Rbl : public Firebird::GlobalStorage, public TypedHandle } static ISC_STATUS badHandle() { return isc_bad_segstr_handle; } + + bool isCached() const { return rbl_flags & CACHED; } + unsigned getCachedSize() const { return sizeof(Rbl) + rbl_data.getCapacity(); } + + static const SQUAD& generate(const void*, const Rbl* item) { return item->rbl_blob_id; } }; @@ -330,20 +382,29 @@ struct rem_str : public pool_alloc_rpt #include "../common/dsc.h" +// Note, currently the only routine that created and changed rem_fmt is +// parse_format() in parse.cpp struct rem_fmt : public Firebird::GlobalStorage { ULONG fmt_length; ULONG fmt_net_length; Firebird::Array fmt_desc; + Firebird::HalfStaticArray fmt_blob_idx; // indices of blob's in fmt_desc public: explicit rem_fmt(FB_SIZE_T rpt) : fmt_length(0), fmt_net_length(0), - fmt_desc(getPool(), rpt) + fmt_desc(getPool(), rpt), + fmt_blob_idx(getPool()) { fmt_desc.grow(rpt); } + + bool haveBlobs() const + { + return fmt_blob_idx.hasData(); + } }; // Windows declares a msg structure, so rename the structure @@ -522,6 +583,7 @@ struct Rsr : public Firebird::GlobalStorage, public TypedHandle P_FETCH rsr_fetch_operation; // Last performed fetch operation SLONG rsr_fetch_position; // and position + unsigned int rsr_inline_blob_size; // max size of blob that can be transfered inline struct BatchStream { @@ -575,7 +637,8 @@ struct Rsr : public Firebird::GlobalStorage, public TypedHandle rsr_id(0), rsr_fmt_length(0), rsr_rows_pending(0), rsr_msgs_waiting(0), rsr_reorder_level(0), rsr_batch_count(0), rsr_cursor_name(getPool()), rsr_delayed_format(false), rsr_timeout(0), rsr_self(NULL), - rsr_fetch_operation(fetch_next), rsr_fetch_position(0) + rsr_batch_size(0), rsr_batch_flags(0), rsr_batch_ics(NULL), + rsr_fetch_operation(fetch_next), rsr_fetch_position(0), rsr_inline_blob_size(0) { } ~Rsr() @@ -607,6 +670,12 @@ struct Rsr : public Firebird::GlobalStorage, public TypedHandle void checkCursor(); void checkBatch(); + // return true if select format have blobs + bool haveBlobs() const + { + return rsr_select_format && rsr_select_format->haveBlobs(); + } + SLONG getCursorAdjustment() const { if (rsr_fetch_operation != fetch_next && rsr_fetch_operation != fetch_prior) @@ -1310,7 +1379,7 @@ struct rem_port : public Firebird::GlobalStorage, public Firebird::RefCounted port_send_partial(0), port_connect(0), port_request(0), port_select_multi(0), port_type(t), port_state(PENDING), port_clients(0), port_next(0), port_parent(0), port_async(0), port_async_receive(0), - port_server(0), port_server_flags(0), port_protocol(0), port_buff_size(rpt / 2), + port_server(0), port_server_flags(0), port_protocol(0), port_buff_size((USHORT)(rpt / 2)), port_flags(0), port_partial_data(false), port_z_data(false), port_connect_timeout(0), port_dummy_packet_interval(0), port_dummy_timeout(0), port_handle(INVALID_SOCKET), port_channel(INVALID_SOCKET), port_context(0), @@ -1576,6 +1645,11 @@ struct rem_port : public Firebird::GlobalStorage, public Firebird::RefCounted private: bool tryKeyType(const KnownServerKey& srvKey, InternalCryptKey* cryptKey); + + void sendInlineBlobs(PACKET*, Rtr* rtr, UCHAR* message, const rem_fmt* format, ULONG maxSize); + + // return false if any error retrieving blob happens + bool sendInlineBlob(PACKET*, Rtr* rtr, SQUAD blobId, ULONG maxSize); }; diff --git a/src/remote/server/server.cpp b/src/remote/server/server.cpp index e8c94bf285d..3d96d2f16b0 100644 --- a/src/remote/server/server.cpp +++ b/src/remote/server/server.cpp @@ -54,6 +54,7 @@ #include "../common/isc_proto.h" #include "../jrd/constants.h" #include "firebird/impl/inf_pub.h" +#include "../common/classes/auto.h" #include "../common/classes/init.h" #include "../common/classes/semaphore.h" #include "../common/classes/ClumpletWriter.h" @@ -3511,6 +3512,11 @@ ISC_STATUS rem_port::execute_immediate(P_OP op, P_SQLST * exnow, PACKET* sendL) { this->port_statement->rsr_format = this->port_statement->rsr_select_format; + if (out_msg && exnow->p_sqlst_inline_blob_size) + { + sendInlineBlobs(sendL, transaction, out_msg, port_statement->rsr_select_format, + exnow->p_sqlst_inline_blob_size); + } sendL->p_operation = op_sql_response; sendL->p_sqldata.p_sqldata_messages = ((status_vector.getState() & IStatus::STATE_ERRORS) || !out_msg) ? 0 : 1; @@ -3946,10 +3952,17 @@ ISC_STATUS rem_port::execute_statement(P_OP op, P_SQLDATA* sqldata, PACKET* send iMsgBuffer.metadata, iMsgBuffer.buffer, oMsgBuffer.metadata, oMsgBuffer.buffer); } + statement->rsr_inline_blob_size = sqldata->p_sqldata_inline_blob_size; + if (op == op_execute2) { this->port_statement->rsr_format = this->port_statement->rsr_select_format; + if (out_msg && statement->rsr_inline_blob_size) + { + sendInlineBlobs(sendL, transaction, out_msg, port_statement->rsr_select_format, + statement->rsr_inline_blob_size); + } sendL->p_operation = op_sql_response; sendL->p_sqldata.p_sqldata_messages = ((status_vector.getState() & IStatus::STATE_ERRORS) || !out_msg) ? 0 : 1; @@ -4248,6 +4261,15 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL, bool scroll) statement->rsr_msgs_waiting--; } + // send blob data inline + if (statement->haveBlobs() && statement->rsr_inline_blob_size) + { + AutoSaveRestore op(&sendL->p_operation); + + sendInlineBlobs(sendL, statement->rsr_rtr, message->msg_buffer, + statement->rsr_select_format, statement->rsr_inline_blob_size); + } + // There's a buffer waiting -- send it this->send_partial(sendL); @@ -4803,18 +4825,18 @@ ISC_STATUS rem_port::open_blob(P_OP op, P_BLOB* stuff, PACKET* sendL) USHORT object = 0; if (!(status_vector.getState() & IStatus::STATE_ERRORS)) { - Rbl* blob = FB_NEW Rbl; + Rbl* blob = FB_NEW Rbl(BLOB_LENGTH); #ifdef DEBUG_REMOTE_MEMORY printf("open_blob(server) allocate blob %x\n", blob); #endif + blob->rbl_blob_id = stuff->p_blob_id; blob->rbl_iface = iface; blob->rbl_rdb = rdb; if (blob->rbl_id = this->get_id(blob)) { object = blob->rbl_id; blob->rbl_rtr = transaction; - blob->rbl_next = transaction->rtr_blobs; - transaction->rtr_blobs = blob; + transaction->rtr_blobs.add(blob); } else { @@ -5777,14 +5799,8 @@ static void release_blob(Rbl* blob) rdb->rdb_port->releaseObject(blob->rbl_id); - for (Rbl** p = &transaction->rtr_blobs; *p; p = &(*p)->rbl_next) - { - if (*p == blob) - { - *p = blob->rbl_next; - break; - } - } + if (transaction->rtr_blobs.locate(blob->rbl_blob_id)) + transaction->rtr_blobs.fastRemove(); #ifdef DEBUG_REMOTE_MEMORY printf("release_blob(server) free blob %x\n", blob); @@ -5930,8 +5946,8 @@ static void release_transaction( Rtr* transaction) Rdb* rdb = transaction->rtr_rdb; rdb->rdb_port->releaseObject(transaction->rtr_id); - while (transaction->rtr_blobs) - release_blob(transaction->rtr_blobs); + while (transaction->rtr_blobs.getFirst()) + release_blob(transaction->rtr_blobs.current()); while (transaction->rtr_cursors.hasData()) { @@ -5986,6 +6002,141 @@ ISC_STATUS rem_port::seek_blob(P_SEEK* seek, PACKET* sendL) } +void rem_port::sendInlineBlobs(PACKET* sendL, Rtr* rtr, UCHAR* message, + const rem_fmt* format, ULONG maxSize) +{ + if (port_protocol < PROTOCOL_INLINE_BLOB || port_type == XNET) + return; + + fb_assert(format && message); + + if (!format || !format->haveBlobs() || !message) + return; + + sendL->p_operation = op_inline_blob; + + const auto& descs = format->fmt_desc; + for (unsigned ind : format->fmt_blob_idx) + { + const auto offs = (ULONG) (U_IPTR) descs[ind].dsc_address; + const auto blobId = (ISC_QUAD*) (message + offs); + if (*blobId == NULL_BLOB) + continue; + + if (!sendInlineBlob(sendL, rtr, *blobId, maxSize)) + break; + } +} + + +bool rem_port::sendInlineBlob(PACKET* sendL, Rtr* rtr, SQUAD blobId, ULONG maxSize) +{ + P_INLINE_BLOB* p_blob = &sendL->p_inline_blob; + + p_blob->p_tran_id = rtr->rtr_id; + p_blob->p_blob_id = blobId; + + LocalStatus ls; + CheckStatusWrapper status(&ls); + + ServAttachment att = port_context->rdb_iface; + + ServBlob blob(att->openBlob(&status, rtr->rtr_iface, &blobId, 0, nullptr)); + if (status.getState() & IStatus::STATE_ERRORS) + return false; + + // ask blob info + const UCHAR items[] = { + isc_info_blob_num_segments, + isc_info_blob_max_segment, + isc_info_blob_total_length, + isc_info_blob_type, + isc_info_end + }; + + UCHAR info[64]; + + blob->getInfo(&status, sizeof(items), items, sizeof(info), info); + if (status.getState() & IStatus::STATE_ERRORS) + return false; + + bool segmented; + ULONG num_segments; + ULONG max_segment; + ULONG total_length; + + ClumpletReader p(ClumpletReader::InfoResponse, info, sizeof(info)); + for (; !p.isEof(); p.moveNext()) + { + switch (p.getClumpTag()) + { + case isc_info_blob_num_segments: + num_segments = p.getInt(); + break; + case isc_info_blob_max_segment: + max_segment = p.getInt(); + break; + case isc_info_blob_total_length: + total_length = p.getInt(); + break; + case isc_info_blob_type: + segmented = (p.getInt() == 0); + break; + case isc_info_end: + p_blob->p_blob_info.cstr_length = p.getCurOffset() + 1; + break; + default: + fb_assert(false); + break; + } + } + + RemBlobBuffer buff(getPool()); + + if (total_length) + { + if (!segmented) + num_segments = (total_length + max_segment - 1) / max_segment; + + const ULONG dataLen = total_length + num_segments * 2; + + fb_assert(maxSize <= MAX_INLINE_BLOB_SIZE); + if (maxSize > MAX_INLINE_BLOB_SIZE) + maxSize = MAX_INLINE_BLOB_SIZE; + + if (dataLen > maxSize) + return true; + + UCHAR* ptr = buff.getBuffer(dataLen); + const UCHAR* const end = ptr + dataLen; + + for (; num_segments; num_segments--) + { + const unsigned inLen = MIN(end - ptr, max_segment); + unsigned outLen; + + const int res = blob->getSegment(&status, inLen, ptr + 2, &outLen); + if (res == IStatus::RESULT_ERROR) + return false; + + ptr[0] = (UCHAR) outLen; + ptr[1] = (UCHAR) (outLen >> 8); + + ptr += 2 + outLen; + } + fb_assert(ptr == end); + } + + blob->close(&status); + + p_blob->p_blob_info.cstr_address = info; + p_blob->p_blob_data = &buff; + + this->send_partial(sendL); + return true; +} + + ISC_STATUS rem_port::send_msg(P_DATA * data, PACKET* sendL) { /************************************** diff --git a/src/yvalve/YObjects.h b/src/yvalve/YObjects.h index c2ea4843505..b2a93839ba1 100644 --- a/src/yvalve/YObjects.h +++ b/src/yvalve/YObjects.h @@ -477,6 +477,9 @@ class YStatement final : YBatch* createBatch(Firebird::CheckStatusWrapper* status, Firebird::IMessageMetadata* inMetadata, unsigned parLength, const unsigned char* par); + unsigned getMaxInlineBlobSize(Firebird::CheckStatusWrapper* status) override; + void setMaxInlineBlobSize(Firebird::CheckStatusWrapper* status, unsigned size) override; + public: AtomicAttPtr attachment; Firebird::Mutex statementMutex; @@ -579,6 +582,11 @@ class YAttachment final : Firebird::IMessageMetadata* inMetadata, unsigned parLength, const unsigned char* par); YReplicator* createReplicator(Firebird::CheckStatusWrapper* status); + unsigned getMaxBlobCacheSize(Firebird::CheckStatusWrapper* status) override; + void setMaxBlobCacheSize(Firebird::CheckStatusWrapper* status, unsigned size) override; + unsigned getMaxInlineBlobSize(Firebird::CheckStatusWrapper* status) override; + void setMaxInlineBlobSize(Firebird::CheckStatusWrapper* status, unsigned size) override; + public: Firebird::IProvider* provider; Firebird::PathName dbPath; diff --git a/src/yvalve/why.cpp b/src/yvalve/why.cpp index cc722930c2b..51a54e876fd 100644 --- a/src/yvalve/why.cpp +++ b/src/yvalve/why.cpp @@ -4582,6 +4582,36 @@ YBatch* YStatement::createBatch(CheckStatusWrapper* status, IMessageMetadata* in return NULL; } + +unsigned YStatement::getMaxInlineBlobSize(CheckStatusWrapper* status) +{ + try + { + YEntry entry(status, this); + return entry.next()->getMaxInlineBlobSize(status); + } + catch (const Exception& e) + { + e.stuffException(status); + } + + return 0; +} + + +void YStatement::setMaxInlineBlobSize(CheckStatusWrapper* status, unsigned size) +{ + try + { + YEntry entry(status, this); + entry.next()->setMaxInlineBlobSize(status, size); + } + catch (const Exception& e) + { + e.stuffException(status); + } +} + //------------------------------------- IscStatement::~IscStatement() @@ -6220,6 +6250,66 @@ YReplicator* YAttachment::createReplicator(CheckStatusWrapper* status) } +unsigned YAttachment::getMaxBlobCacheSize(CheckStatusWrapper* status) +{ + try + { + YEntry entry(status, this); + return entry.next()->getMaxBlobCacheSize(status); + } + catch (const Exception& e) + { + e.stuffException(status); + } + + return 0; +} + + +void YAttachment::setMaxBlobCacheSize(CheckStatusWrapper* status, unsigned size) +{ + try + { + YEntry entry(status, this); + entry.next()->setMaxBlobCacheSize(status, size); + } + catch (const Exception& e) + { + e.stuffException(status); + } +} + + +unsigned YAttachment::getMaxInlineBlobSize(CheckStatusWrapper* status) +{ + try + { + YEntry entry(status, this); + return entry.next()->getMaxInlineBlobSize(status); + } + catch (const Exception& e) + { + e.stuffException(status); + } + + return 0; +} + + +void YAttachment::setMaxInlineBlobSize(CheckStatusWrapper* status, unsigned size) +{ + try + { + YEntry entry(status, this); + entry.next()->setMaxInlineBlobSize(status, size); + } + catch (const Exception& e) + { + e.stuffException(status); + } +} + + //-------------------------------------