Skip to content

Commit

Permalink
Add Socket class to handle all socket interactions
Browse files Browse the repository at this point in the history
- Once connected, all data is read to prevent buffer overflow
- If acquisition is running, this data is processed and plotted; otherwise, it is thrown away

Fixes #12
  • Loading branch information
bparks13 committed Oct 24, 2024
1 parent d5a1f6d commit eb8d41e
Show file tree
Hide file tree
Showing 5 changed files with 493 additions and 275 deletions.
279 changes: 48 additions & 231 deletions Source/EphysSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,24 @@ DataThread* EphysSocket::createDataThread(SourceNode* sn)
return new EphysSocket(sn);
}

EphysSocket::EphysSocket(SourceNode* sn) : DataThread(sn)
EphysSocket::EphysSocket(SourceNode* sn) : DataThread(sn), socket("socket_thread")
{
port = DEFAULT_PORT;
sample_rate = DEFAULT_SAMPLE_RATE;

total_samples = DEFAULT_TOTAL_SAMPLES;
eventState = DEFAULT_EVENT_STATE;

num_channels = DEFAULT_NUM_CHANNELS;
num_samp = DEFAULT_NUM_SAMPLES;
depth = DEFAULT_DEPTH;
element_size = DEFAULT_ELEMENT_SIZE;
num_bytes = DEFAULT_NUM_BYTES;
total_samples = 0;
eventState = 0;

data_scale = DEFAULT_DATA_SCALE;
data_offset = DEFAULT_DATA_OFFSET;

error_flag = false;

sourceBuffers.add(new DataBuffer(num_channels, sample_rate * bufferSizeInSeconds)); // start with 2 channels and automatically resize
sourceBuffers.add(new DataBuffer(socket.num_channels, sample_rate * bufferSizeInSeconds)); // start with 2 channels and automatically resize
}

std::unique_ptr<GenericEditor> EphysSocket::createEditor(SourceNode* sn)
{
std::unique_ptr<EphysSocketEditor> editor = std::make_unique<EphysSocketEditor>(sn, this);
socket.setEditor(editor.get());

return editor;
}
Expand All @@ -47,120 +40,27 @@ EphysSocket::~EphysSocket()

void EphysSocket::disconnectSocket()
{
if (socket != nullptr)
{
LOGD("Disconnecting socket.");

socket->close();
socket.reset();

connected = false;

CoreServices::sendStatusMessage("Ephys Socket: Socket disconnected.");
}
socket.disconnectSocket();
}

bool EphysSocket::connectSocket(bool printOutput)
{
if (socket != nullptr && socket->isConnected())
{
LOGE("Attempting to connect to an already active socket.");
return false;
}

socket = std::make_unique<StreamingSocket>();
connected = socket->connect("localhost", port, 500);

if (connected)
{
std::vector<std::byte> header_bytes(HEADER_SIZE);

LOGD("Reading header...");
int rc;

for (int i = 0; i < 5; i++) {
rc = socket->read(header_bytes.data(), HEADER_SIZE, false);

if (rc == HEADER_SIZE) break;
else sleep(50);
}

if (rc != HEADER_SIZE)
{
disconnectSocket();

if (printOutput)
{
LOGC("EphysSocket failed to connect; could not read header from stream.");
CoreServices::sendStatusMessage("Ephys Socket: Could not read header from stream.");
}

return false;
}

EphysSocketHeader tmp_header = EphysSocketHeader(header_bytes);
LOGD("Header read and parsed correctly.");

num_bytes = tmp_header.num_bytes;
depth = tmp_header.depth;
element_size = tmp_header.element_size;
num_samp = tmp_header.num_samp;
num_channels = tmp_header.num_channels;

const int matrix_size = num_channels * num_samp * element_size;
header_bytes.reserve(matrix_size);
socket->read(header_bytes.data(), matrix_size, true); // NB: Realign stream to the beginning of a packet

if (printOutput)
{
LOGC("EphysSocket connected.");
CoreServices::sendStatusMessage("Ephys Socket: Socket connected and ready to receive data.");
}

return true;
}
else
{
if (printOutput)
{
LOGC("EphysSocket failed to connect");
CoreServices::sendStatusMessage("Ephys Socket: Socket could not connect.");
}

return false;
}
}

bool EphysSocket::reconnectSocket()
{
if (!isThreadRunning()) return false;

if (socket != nullptr && socket->isConnected())
{
socket->close();
socket.reset();

LOGC("EphysSocket has been disconnected. Attempting to reconnect now.");
CoreServices::sendStatusMessage("Ephys Socket: Socket disconnected, attempting to reconnect...");
}

return connectSocket(false);
return socket.connectSocket(port, printOutput);
}

bool EphysSocket::errorFlag()
{
return error_flag;
return socket.isError();
}

void EphysSocket::resizeBuffers()
{
sourceBuffers[0]->resize(num_channels, sample_rate * bufferSizeInSeconds);
read_buffer.resize(num_channels * num_samp * element_size + HEADER_SIZE);
convbuf.resize(num_channels * num_samp);
sampleNumbers.resize(num_samp);
sourceBuffers[0]->resize(socket.num_channels, sample_rate * bufferSizeInSeconds);
convbuf.resize(socket.num_channels * socket.num_samp);
sampleNumbers.resize(socket.num_samp);
timestamps.clear();
timestamps.insertMultiple(0, 0.0, num_samp);
ttlEventWords.resize(num_samp);
timestamps.insertMultiple(0, 0.0, socket.num_samp);
ttlEventWords.resize(socket.num_samp);
}

void EphysSocket::updateSettings(OwnedArray<ContinuousChannel>* continuousChannels,
Expand All @@ -170,7 +70,6 @@ void EphysSocket::updateSettings(OwnedArray<ContinuousChannel>* continuousChanne
OwnedArray<DeviceInfo>* devices,
OwnedArray<ConfigurationObject>* configurationObjects)
{

continuousChannels->clear();
eventChannels->clear();
devices->clear();
Expand All @@ -189,11 +88,10 @@ void EphysSocket::updateSettings(OwnedArray<ContinuousChannel>* continuousChanne
};

sourceStreams->add(new DataStream(settings));
sourceBuffers[0]->resize(num_channels, sample_rate * bufferSizeInSeconds);
sourceBuffers[0]->resize(socket.num_channels, sample_rate * bufferSizeInSeconds);

for (int ch = 0; ch < num_channels; ch++)
for (int ch = 0; ch < socket.num_channels; ch++)
{

ContinuousChannel::Settings settings{
ContinuousChannel::Type::ELECTRODE,
"CH" + String(ch + 1),
Expand Down Expand Up @@ -222,7 +120,12 @@ void EphysSocket::updateSettings(OwnedArray<ContinuousChannel>* continuousChanne

bool EphysSocket::foundInputSource()
{
return connected;
return socket.isConnected();
}

bool EphysSocket::isReady()
{
return socket.isConnected();
}

bool EphysSocket::startAcquisition()
Expand All @@ -232,10 +135,9 @@ bool EphysSocket::startAcquisition()
total_samples = 0;
eventState = 0;

error_flag = false;

lastPacketReceived = time(nullptr);
socket.startAcquisition();

socket.startThread();
startThread();

return true;
Expand All @@ -248,141 +150,58 @@ bool EphysSocket::stopAcquisition()
signalThreadShouldExit();
}

waitForThreadToExit(500);
socket.stopAcquisition();

sourceBuffers[0]->clear();
return true;
}

bool EphysSocket::compareHeaders(EphysSocketHeader header) const
{
if (header.depth != depth ||
header.element_size != element_size ||
header.num_channels != num_channels ||
header.num_samp != num_samp)
{
return false;
}

return true;
}

template <typename T>
void EphysSocket::convertData()
void EphysSocket::convertData(std::vector<std::byte> buffer)
{
T* buf = (T*)(read_buffer.data() + HEADER_SIZE);
T* buf = (T*)(buffer.data() + HEADER_SIZE);

for (int i = 0; i < num_samp * num_channels; i++) {
for (int i = 0; i < socket.num_samp * socket.num_channels; i++) {
convbuf[i] = data_scale * ((float)(buf[i]) - data_offset);
}
}

bool EphysSocket::updateBuffer()
{
const int bytes_expected = num_channels * num_samp * element_size + HEADER_SIZE;

int rc;
EphysSocketHeader header;

if (socket != nullptr && socket->isConnected()) {
rc = socket->read(read_buffer.data(), bytes_expected, false);
}
else {
rc = 0; // NB: This will attempt to reconnect the socket below
}

if (rc == -1)
if (socket.isError())
{
if (socket->getRawSocketHandle() == -1)
{
CoreServices::sendStatusMessage("Ephys Socket: Socket handle invalid.");
LOGE("Ephys Socket: Socket handle is invalid, returns -1");
error_flag = true;
return false;
}

CoreServices::sendStatusMessage("Ephys Socket: Data shape mismatch");
LOGE("Ephys Socket: Socket read did not complete, returns -1");
error_flag = true;
return false;
}
else if (rc == 0)
{
if (difftime(time(nullptr), lastPacketReceived) >= 2)
{
error_flag = true;
auto previousHeader = EphysSocketHeader(num_bytes, depth, element_size, num_samp, num_channels);

if (reconnectSocket())
{
if (!compareHeaders(previousHeader))
{
disconnectSocket();
LOGE("Mismatched header, disconnecting socket.");
CoreServices::sendStatusMessage("Ephys Socket: Mismatched header, disconnecting socket.");
return false;
}

LOGD("Successfully reconnected the socket.");
CoreServices::sendStatusMessage("Ephys Socket: Socket reconnected.");
lastPacketReceived = time(nullptr);
error_flag = false;
}
}

if (socket.data.isEmpty()) {
return true;
}
else if (rc != bytes_expected)
{
int bytes_received = rc;

while (bytes_received < bytes_expected)
{
rc = socket->read(read_buffer.data() + bytes_received, bytes_expected - bytes_received, false);

if (rc != 0) {
bytes_received += rc;
}

if (threadShouldExit()) {
return false;
}
}
std::vector<std::byte> data = socket.data.removeAndReturn(0);

if (socket.depth == U8) {
convertData<uint8_t>(data);
}

header = EphysSocketHeader(read_buffer);

if (!compareHeaders(header))
{
CoreServices::sendStatusMessage("Ephys Socket: Header mismatch");
LOGE("Ephys Socket: Header values have changed since first connecting");
error_flag = true;
return false;
}

if (depth == U8) {
convertData<uint8_t>();
else if (socket.depth == S8) {
convertData<int8_t>(data);
}
else if (depth == S8) {
convertData<int8_t>();
else if (socket.depth == U16) {
convertData<uint16_t>(data);
}
else if (depth == U16) {
convertData<uint16_t>();
else if (socket.depth == S16) {
convertData<int16_t>(data);
}
else if (depth == S16) {
convertData<int16_t>();
else if (socket.depth == S32) {
convertData<int32_t>(data);
}
else if (depth == S32) {
convertData<int32_t>();
else if (socket.depth == F32) {
convertData<float_t>(data);
}
else if (depth == F32) {
convertData<float_t>();
}
else if (depth == F64) {
convertData<double_t>();
else if (socket.depth == F64) {
convertData<double_t>(data);
}

for (int i = 0; i < num_samp; i++)
for (int i = 0; i < socket.num_samp; i++)
{
sampleNumbers.set(i, total_samples++);
ttlEventWords.set(i, eventState);
Expand All @@ -392,9 +211,7 @@ bool EphysSocket::updateBuffer()
sampleNumbers.getRawDataPointer(),
timestamps.getRawDataPointer(),
ttlEventWords.getRawDataPointer(),
num_samp);

lastPacketReceived = time(nullptr);
socket.num_samp);

return true;
}
Expand All @@ -412,7 +229,7 @@ String EphysSocket::handleConfigMessage(const String& msg)
return "Ephys Socket plugin cannot update settings while acquisition is active.";
}

if (connected) {
if (socket.isConnected()) {
return "Ephys Socket plugin cannot update settings while connected to an active socket.";
}

Expand Down
Loading

0 comments on commit eb8d41e

Please sign in to comment.