Skip to content

Commit

Permalink
Change to TCP instead of UDP
Browse files Browse the repository at this point in the history
Change the socket protocol from UDP to TCP.
Comment out UDP-specific lines, but leave them in case they are useful in the future.
Add useful error and debug outputs.
  • Loading branch information
bparks13 committed Mar 13, 2024
1 parent 5a16037 commit fce1446
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 19 deletions.
48 changes: 30 additions & 18 deletions Source/EphysSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ void EphysSocket::tryToConnect()
{
if (socket != nullptr)
{
socket->shutdown();
//socket->shutdown(); // UDP
socket->close();
socket.reset();
}


/* UDP
socket = std::make_unique<DatagramSocket>();
bool bound = socket->bindToPort(port);
if (bound)
Expand All @@ -69,23 +71,32 @@ void EphysSocket::tryToConnect()
}
else {
LOGC("EphysSocket could not bind socket to port ", port);
}
}*/

socket = std::make_unique<StreamingSocket>();
connected = socket->connect("localhost", port, 500);

if (connected)
{
LOGC("EphysSocket connected.");

std::vector<std::byte> header_bytes(HEADER_SIZE);

LOGD("Reading header...");
int rc = socket->read(header_bytes.data(), HEADER_SIZE, true);

Header tmp_header = parseHeader(header_bytes);
LOGD("Header read and parsed correctly.");

num_bytes = tmp_header.num_bytes;
depth = tmp_header.depth;
element_size = tmp_header.element_size;
num_samp = tmp_header.num_samp;
num_channels = tmp_header.num_channels;

const int matrix_size = num_channels * num_samp * element_size;
header_bytes.reserve(matrix_size);
socket->read(header_bytes.data(), matrix_size, true); // NB: Realign stream to the beginning of a packet
}
else {
LOGC("EphysSocket failed to connect");
Expand Down Expand Up @@ -242,57 +253,58 @@ void EphysSocket::convertData()
void EphysSocket::runBufferThread()
{
const int matrix_size = num_channels * num_samp * element_size;
const int num_expected_packets = (matrix_size / MAX_PACKET_SIZE) + 1;
const int max_data_size = num_expected_packets == 1 ? matrix_size : MAX_PACKET_SIZE - HEADER_SIZE;
const int num_expected_packets = 1;
//const int num_expected_packets = (matrix_size / MAX_PACKET_SIZE) + 1; // UDP
//const int max_data_size = num_expected_packets == 1 ? matrix_size : MAX_PACKET_SIZE - HEADER_SIZE; // UDP

std::vector<std::byte> read_buffer;
read_buffer.resize(matrix_size + HEADER_SIZE * num_expected_packets);
read_buffer.resize(matrix_size + HEADER_SIZE);

int rc;
Header header;

// NB: If mutliple packets are needed for the matrix, they can be misaligned by
// reading the header in tryToConnect(); two consecutive reads realigns the
// packets and allows correct reconstruction of the whole matrix
rc = socket->read(read_buffer.data(), HEADER_SIZE, true);
rc = socket->read(read_buffer.data(), HEADER_SIZE, true);

while (!stop_flag)
{
rc = socket->read(read_buffer.data(), matrix_size + HEADER_SIZE * num_expected_packets, true);
rc = socket->read(read_buffer.data(), matrix_size + HEADER_SIZE, true);

if (rc == -1)
{
if (socket->getRawSocketHandle() == -1)
{
CoreServices::sendStatusMessage("Ephys Socket: Socket handle is no longer valid.");
CoreServices::sendStatusMessage("Ephys Socket: Socket handle invalid.");
LOGE("Ephys Socket: Socket handle is invalid, returns -1");
error_flag = true;
return;
}

CoreServices::sendStatusMessage("Ephys Socket: Data shape mismatch");
LOGE("Ephys Socket: Socket read did not complete, returns -1");
error_flag = true;
return;
}

for (int i = 0; i < num_expected_packets; i++)
{
header = Header(read_buffer, i * (max_data_size + HEADER_SIZE));
//header = Header(read_buffer, i * (max_data_size + HEADER_SIZE)); // UDP
header = Header(read_buffer);

if (!compareHeaders(header))
{
CoreServices::sendStatusMessage("Ephys Socket: Header mismatch");
LOGE("Ephys Socket: Header values have changed since first connecting");
error_flag = true;
return;
}

if (i == 0 && header.offset != 0)
{
CoreServices::sendStatusMessage("Ephys Socket: Packets were dropped");
LOGE("Ephys Socket: [UDP] First packet does not have offset of 0; data is misaligned");
error_flag = true;
return;
}

int current_offset = HEADER_SIZE * (i + 1) + header.offset;
int current_offset = HEADER_SIZE * (i + 1) + header.offset;

if (!buffer_flag) {
std::copy(read_buffer.begin() + current_offset, read_buffer.begin() + current_offset + header.num_bytes, recvbuf0.begin() + header.offset);
Expand Down
5 changes: 4 additions & 1 deletion Source/EphysSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ namespace EphysSocketNode
bool connected = false;

/** UPD socket object */
std::unique_ptr<DatagramSocket> socket;
//std::unique_ptr<DatagramSocket> socket;

/** TCP Socket object */
std::unique_ptr<StreamingSocket> socket;

/** Internal buffers */
std::vector<std::byte> recvbuf0;
Expand Down

0 comments on commit fce1446

Please sign in to comment.