Skip to content

Commit

Permalink
Remove WFMultiThreadTask. (#1405)
Browse files Browse the repository at this point in the history
  • Loading branch information
Barenboim authored Oct 25, 2023
1 parent 915711c commit 10a6b26
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 137 deletions.
92 changes: 0 additions & 92 deletions src/factory/WFTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,98 +113,6 @@ class WFThreadTask : public ExecRequest
virtual ~WFThreadTask() { }
};

template<class INPUT, class OUTPUT>
class WFMultiThreadTask : public ParallelTask
{
public:
void start()
{
assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}

void dismiss()
{
assert(!series_of(this));
delete this;
}

public:
INPUT *get_input(size_t index)
{
return static_cast<Thread *>(this->subtasks[index])->get_input();
}

OUTPUT *get_output(size_t index)
{
return static_cast<Thread *>(this->subtasks[index])->get_output();
}

public:
void *user_data;

public:
int get_state(size_t index) const
{
return static_cast<const Thread *>(this->subtasks[index])->get_state();
}

int get_error(size_t index) const
{
return static_cast<const Thread *>(this->subtasks[index])->get_error();
}

public:
void set_callback(
std::function<void (WFMultiThreadTask<INPUT, OUTPUT> *)> cb)
{
this->callback = std::move(cb);
}

protected:
virtual SubTask *done()
{
SeriesWork *series = series_of(this);

if (this->callback)
this->callback(this);

delete this;
return series->pop();
}

protected:
std::function<void (WFMultiThreadTask<INPUT, OUTPUT> *)> callback;

protected:
using Thread = WFThreadTask<INPUT, OUTPUT>;

public:
WFMultiThreadTask(Thread *const tasks[], size_t n,
std::function<void (WFMultiThreadTask<INPUT, OUTPUT> *)>&& cb) :
ParallelTask(new SubTask *[n], n),
callback(std::move(cb))
{
size_t i;

for (i = 0; i < n; i++)
this->subtasks[i] = tasks[i];

this->user_data = NULL;
}

protected:
virtual ~WFMultiThreadTask()
{
size_t n = this->subtasks_nr;

while (n > 0)
delete this->subtasks[--n];

delete []this->subtasks;
}
};

template<class REQ, class RESP>
class WFNetworkTask : public CommRequest
{
Expand Down
10 changes: 0 additions & 10 deletions src/factory/WFTaskFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -446,16 +446,6 @@ class WFThreadTaskFactory
ExecQueue *queue, Executor *executor,
std::function<void (INPUT *, OUTPUT *)> routine,
std::function<void (T *)> callback);

private:
using MT = WFMultiThreadTask<INPUT, OUTPUT>;

public:
static MT *create_multi_thread_task(const std::string& queue_name,
std::function<void (INPUT *, OUTPUT *)> routine,
size_t nthreads,
std::function<void (MT *)> callback);

};

#include "WFTaskFactory.inl"
Expand Down
35 changes: 0 additions & 35 deletions src/factory/WFTaskFactory.inl
Original file line number Diff line number Diff line change
Expand Up @@ -888,38 +888,3 @@ WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(time_t seconds, long nano
std::move(callback));
}

template<class INPUT, class OUTPUT>
class __WFThreadTask__ : public __WFThreadTask<INPUT, OUTPUT>
{
private:
virtual SubTask *done() { return NULL; }

public:
using __WFThreadTask<INPUT, OUTPUT>::__WFThreadTask;
};

template<class INPUT, class OUTPUT>
WFMultiThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_multi_thread_task(const std::string& queue_name,
std::function<void (INPUT *, OUTPUT *)> routine, size_t nthreads,
std::function<void (WFMultiThreadTask<INPUT, OUTPUT> *)> callback)
{
WFThreadTask<INPUT, OUTPUT> **tasks = new WFThreadTask<INPUT, OUTPUT> *[nthreads];
char buf[32];
size_t i;

for (i = 0; i < nthreads; i++)
{
sprintf(buf, "-%zu@MTT", i);
tasks[i] = new __WFThreadTask__<INPUT, OUTPUT>
(WFGlobal::get_exec_queue(queue_name + buf),
WFGlobal::get_compute_executor(),
std::function<void (INPUT *, OUTPUT *)>(routine),
nullptr);
}

auto *mt = new WFMultiThreadTask<INPUT, OUTPUT>(tasks, nthreads, std::move(callback));
delete []tasks;
return mt;
}

0 comments on commit 10a6b26

Please sign in to comment.