Skip to content

Commit

Permalink
Added FORMAT(binary) copy capability and example
Browse files Browse the repository at this point in the history
The function Connection.putCopyData was overloaded to take
const(ubyte[]) values since these are common for raw data.
In addition the example.d file was updated to include an
explicit binary copy in for BYTEA types and to work with dub.
  • Loading branch information
cpiker committed Jan 5, 2025
1 parent ac6057c commit 8cb1476
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ dub.selections.json
*.lst
docs
docs.json
example/dpq2_example*
89 changes: 86 additions & 3 deletions example/example.d
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,17 +1,40 @@
#!/usr/bin/env rdmd
#!/usr/bin/env dub
/+ dub.sdl:
name "dpq2_example"
dependency "dpq2" version="*" path="../"
+/

import dpq2;
import std.system;
import std.bitmanip;
import std.getopt;
import std.range;
import std.stdio: writeln;
import std.typecons: Nullable;
import vibe.data.bson;

void main(string[] args)
alias BE = Endian.bigEndian;

int main(string[] args)
{
string connInfo;
getopt(args, "conninfo", &connInfo);

Connection conn = new Connection(connInfo);
Connection conn;
try{
conn = new Connection(connInfo);
}
catch(ConnectionException ex){
writeln(ex.msg);
writeln("Try adding the arguments:
'--conninfo postgresql://postgres@/template1' or
'--conninfo postgresql://postgres@localhost/template1'
to set the DB connection info. The first version has no host part
and so it tries a local unix-domain socket.");
return 3;
}

// Only text query result can be obtained by this call:
auto answer = conn.exec(
Expand Down Expand Up @@ -88,4 +111,64 @@ void main(string[] args)
// Signal that the COPY is finished. Let Postgresql finalize the command
// and return any errors with the data.
conn.putCopyEnd();
writeln("CSV copy-in worked.");


// It is also possible to send raw binary data. It's even faster, and can
// handle any PostgreSQL type, including BYTEA, but it's more complex than
// sending parsable text streams
conn.exec("CREATE TEMP TABLE test_dpq2_blob (item BIGINT, data BYTEA)");

// Init the COPY command, this time for direct binary input
conn.exec("COPY test_dpq2_blob (item, data) FROM STDIN WITH (FORMAT binary)");

// For FORMAT binary, send over the 19 byte PostgreSQL header manually
// P G C O P Y \n 255 \r \n
conn.putCopyData(cast(ubyte[])[
0x50,0x47,0x43,0x4F,0x50,0x59,0x0A,0xFF,0x0D,0x0A,0,0,0,0,0,0,0,0,0
]);

// Write 10 rows of variable length binary data. PostgreSQL internal
// storage is big endian and the number of values and the length of each
// must be provided. Since binary copy-in is likely to be used in
// "tight-loop" code, we'll use a stack memory. Stack buffer size is:
//
// 2 bytes for the number of values
// plus 4 length bytes for each value
// plus total size of all values in the largest row
//
enum LOOPS = 10;
ubyte[2 + 2*4 + 8 + (2*LOOPS + 7)] buf;
foreach(i;0..LOOPS){
size_t offset = 0;
buf[].write!(short, BE)(2, &offset); // Sending two fields

buf[].write!(int, BE)(long.sizeof, &offset); // BIGINT == long
buf[].write!(long, BE)(i, &offset); // add the item value

// Generate some data. Here's we're making the blob larger for each
// iteration just to emphasize that BYTEA is not fixed length value type.
ubyte[] blob = iota(cast(ubyte)1, cast(ubyte)(2*i + 7), 1).array;

buf[].write!(int, BE)(cast(int)blob.length, &offset);

buf[offset..offset+blob.length] = blob;
offset += blob.length;

// Send the variable length buffer
conn.putCopyData(buf[0..offset]);
}

// Signal that the copy is finished. PostgreSQL will check constraints at
// this point.
conn.putCopyEnd();
writeln("Direct binary copy-in worked.");

// Read binary blobs back as hex-string data.
// For more precise type handling use QueryParams objects with execParams and
// convert the output using the as!(PGbytea) template from to_d_types.d
foreach(row; conn.exec("SELECT item, data from test_dpq2_blob").rangify())
writeln(row[0], ", ", row[1]);

return 0;
}
32 changes: 24 additions & 8 deletions src/dpq2/query.d
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,30 @@ mixin template Queries()
return r != 0;
}

/// Sends a buffer of binary data to the COPY command
///
/// A details example setup and iteration for binary data copy-in
/// can be found in example/example.d
///
/// Before using this function execute a command similar to the
/// the following:
/// ---
/// conn.exec("CREATE TEMP TABLE mytable (mycol1 int, mycol2 real)");
/// ---
///
/// where the column names an types depend on the destination table.
/// Returns:
/// true if the data was queued, false if it was not queued because of
/// full buffers (this will only happen in nonblocking mode)
bool putCopyData(const(ubyte)[] data )
{
const int r = PQputCopyData(conn, cast(const char*)data.ptr, data.length.to!int);

if(r == -1) throw new ConnectionException(this);

return r != 0;
}

/// Signals that COPY data send is finished. Finalize and flush the COPY command.
immutable(Answer) putCopyEnd()
{
Expand Down Expand Up @@ -501,8 +525,6 @@ void _integration_test( string connParam ) @trusted
conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection

{
import dpq2.result: ResponseException;

bool exceptionFlag = false;
string errorMsg;

Expand All @@ -513,12 +535,6 @@ void _integration_test( string connParam ) @trusted
errorMsg = e.msg;
assert(e.msg.length > 15); // error message check
}
catch(ResponseException e)
{
exceptionFlag = true;
errorMsg = e.msg;
assert(e.msg.length > 15); // error message check
}
finally {
assert(exceptionFlag, errorMsg);
}
Expand Down

0 comments on commit 8cb1476

Please sign in to comment.