Skip to content

Commit

Permalink
Merge pull request #1393 from Barenboim/master
Browse files Browse the repository at this point in the history
Move callback out of the lock. Fix server bug when error on receiving request.
  • Loading branch information
Barenboim authored Oct 15, 2023
2 parents ad429a7 + f870f75 commit f94bbb6
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 110 deletions.
3 changes: 2 additions & 1 deletion src/factory/WFTask.inl
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ public:
protected:
virtual ~WFServerTask()
{
((Series *)series_of(this))->task = NULL;
if (this->target)
((Series *)series_of(this))->task = NULL;
}
};

Expand Down
217 changes: 108 additions & 109 deletions src/kernel/poller.c
Original file line number Diff line number Diff line change
Expand Up @@ -968,10 +968,9 @@ static void __poller_handle_timeout(const struct __poller_node *time_node,
}

pthread_mutex_unlock(&poller->mutex);
while (!list_empty(&timeo_list))
list_for_each_safe(pos, tmp, &timeo_list)
{
node = list_entry(timeo_list.next, struct __poller_node, list);
list_del(&node->list);
node = list_entry(pos, struct __poller_node, list);
if (node->data.fd >= 0)
{
node->error = ETIMEDOUT;
Expand Down Expand Up @@ -1313,7 +1312,8 @@ static int __poller_data_get_event(int *event, const struct poller_data *data)
}
}

int poller_add(const struct poller_data *data, int timeout, poller_t *poller)
static struct __poller_node *__poller_new_node(const struct poller_data *data,
int timeout, poller_t *poller)
{
struct __poller_node *res = NULL;
struct __poller_node *node;
Expand All @@ -1323,18 +1323,18 @@ int poller_add(const struct poller_data *data, int timeout, poller_t *poller)
if ((size_t)data->fd >= poller->max_open_files)
{
errno = data->fd < 0 ? EBADF : EMFILE;
return -1;
return NULL;
}

need_res = __poller_data_get_event(&event, data);
if (need_res < 0)
return -1;
return NULL;

if (need_res)
{
res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
if (!res)
return -1;
return NULL;
}

node = (struct __poller_node *)malloc(sizeof (struct __poller_node));
Expand All @@ -1347,36 +1347,49 @@ int poller_add(const struct poller_data *data, int timeout, poller_t *poller)
node->res = res;
if (timeout >= 0)
__poller_node_set_timeout(timeout, node);
}

pthread_mutex_lock(&poller->mutex);
if (!poller->nodes[data->fd])
{
if (__poller_add_fd(data->fd, event, node, poller) >= 0)
{
if (timeout >= 0)
__poller_insert_node(node, poller);
else
list_add_tail(&node->list, &poller->no_timeo_list);
return node;
}

poller->nodes[data->fd] = node;
node = NULL;
}
}
int poller_add(const struct poller_data *data, int timeout, poller_t *poller)
{
struct __poller_node *node;

pthread_mutex_unlock(&poller->mutex);
if (node == NULL)
return 0;
node = __poller_new_node(data, timeout, poller);
if (!node)
return -1;

free(node);
pthread_mutex_lock(&poller->mutex);
if (!poller->nodes[data->fd])
{
if (__poller_add_fd(data->fd, node->event, node, poller) >= 0)
{
if (timeout >= 0)
__poller_insert_node(node, poller);
else
list_add_tail(&node->list, &poller->no_timeo_list);

poller->nodes[data->fd] = node;
node = NULL;
}
}
else
errno = EEXIST;

free(res);
pthread_mutex_unlock(&poller->mutex);
if (node == NULL)
return 0;

free(node->res);
free(node);
return -1;
}

int poller_del(int fd, poller_t *poller)
{
struct __poller_node *node;
int stopped = 0;

if ((size_t)fd >= poller->max_open_files)
{
Expand All @@ -1399,12 +1412,8 @@ int poller_del(int fd, poller_t *poller)

node->error = 0;
node->state = PR_ST_DELETED;
if (poller->stopped)
{
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
}
else
stopped = poller->stopped;
if (!stopped)
{
node->removed = 1;
write(poller->pipe_wr, &node, sizeof (void *));
Expand All @@ -1414,89 +1423,69 @@ int poller_del(int fd, poller_t *poller)
errno = ENOENT;

pthread_mutex_unlock(&poller->mutex);
if (stopped)
{
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
}

return -!node;
}

int poller_mod(const struct poller_data *data, int timeout, poller_t *poller)
{
struct __poller_node *res = NULL;
struct __poller_node *node;
struct __poller_node *old;
int need_res;
int event;
struct __poller_node *orig;
int stopped = 0;

if ((size_t)data->fd >= poller->max_open_files)
{
errno = data->fd < 0 ? EBADF : EMFILE;
return -1;
}

need_res = __poller_data_get_event(&event, data);
if (need_res < 0)
node = __poller_new_node(data, timeout, poller);
if (!node)
return -1;

if (need_res)
{
res = (struct __poller_node *)malloc(sizeof (struct __poller_node));
if (!res)
return -1;
}

node = (struct __poller_node *)malloc(sizeof (struct __poller_node));
if (node)
pthread_mutex_lock(&poller->mutex);
orig = poller->nodes[data->fd];
if (orig)
{
node->data = *data;
node->event = event;
node->in_rbtree = 0;
node->removed = 0;
node->res = res;
if (timeout >= 0)
__poller_node_set_timeout(timeout, node);

pthread_mutex_lock(&poller->mutex);
old = poller->nodes[data->fd];
if (old)
if (__poller_mod_fd(data->fd, orig->event, node->event, node, poller) >= 0)
{
if (__poller_mod_fd(data->fd, old->event, event, node, poller) >= 0)
{
if (old->in_rbtree)
__poller_tree_erase(old, poller);
else
list_del(&old->list);
if (orig->in_rbtree)
__poller_tree_erase(orig, poller);
else
list_del(&orig->list);

old->error = 0;
old->state = PR_ST_MODIFIED;
if (poller->stopped)
{
free(old->res);
poller->cb((struct poller_result *)old, poller->ctx);
}
else
{
old->removed = 1;
write(poller->pipe_wr, &old, sizeof (void *));
}
orig->error = 0;
orig->state = PR_ST_MODIFIED;
stopped = poller->stopped;
if (!stopped)
{
orig->removed = 1;
write(poller->pipe_wr, &orig, sizeof (void *));
}

if (timeout >= 0)
__poller_insert_node(node, poller);
else
list_add_tail(&node->list, &poller->no_timeo_list);
if (timeout >= 0)
__poller_insert_node(node, poller);
else
list_add_tail(&node->list, &poller->no_timeo_list);

poller->nodes[data->fd] = node;
node = NULL;
}
poller->nodes[data->fd] = node;
node = NULL;
}
else
errno = ENOENT;

pthread_mutex_unlock(&poller->mutex);
if (node == NULL)
return 0;
}
else
errno = ENOENT;

free(node);
pthread_mutex_unlock(&poller->mutex);
if (stopped)
{
free(orig->res);
poller->cb((struct poller_result *)orig, poller->ctx);
}

free(res);
if (node == NULL)
return 0;

free(node->res);
free(node);
return -1;
}

Expand Down Expand Up @@ -1574,11 +1563,10 @@ void *poller_add_timer(const struct timespec *value, void *context,
int poller_del_timer(void *timer, poller_t *poller)
{
struct __poller_node *node = (struct __poller_node *)timer;
int removed;
int stopped = 0;

pthread_mutex_lock(&poller->mutex);
removed = node->removed;
if (!removed)
if (!node->removed)
{
node->removed = 1;

Expand All @@ -1589,22 +1577,28 @@ int poller_del_timer(void *timer, poller_t *poller)

node->error = 0;
node->state = PR_ST_DELETED;
if (poller->stopped)
poller->cb((struct poller_result *)node, poller->ctx);
else
stopped = poller->stopped;
if (!poller->stopped)
write(poller->pipe_wr, &node, sizeof (void *));
}
else
{
errno = ENOENT;
node = NULL;
}

pthread_mutex_unlock(&poller->mutex);
return -removed;
if (stopped)
poller->cb((struct poller_result *)node, poller->ctx);

return -!node;
}

void poller_stop(poller_t *poller)
{
struct __poller_node *node;
struct list_head *pos, *tmp;
LIST_HEAD(node_list);
void *p = NULL;

write(poller->pipe_wr, &p, sizeof (void *));
Expand All @@ -1624,26 +1618,31 @@ void poller_stop(poller_t *poller)
{
node = rb_entry(poller->timeo_tree.rb_node, struct __poller_node, rb);
rb_erase(&node->rb, &poller->timeo_tree);
list_add(&node->list, &poller->timeo_list);
list_add(&node->list, &node_list);
}

list_splice_init(&poller->no_timeo_list, &poller->timeo_list);
list_for_each_safe(pos, tmp, &poller->timeo_list)
list_splice_init(&poller->timeo_list, &node_list);
list_splice_init(&poller->no_timeo_list, &node_list);
list_for_each(pos, &node_list)
{
node = list_entry(pos, struct __poller_node, list);
list_del(&node->list);
if (node->data.fd >= 0)
{
poller->nodes[node->data.fd] = NULL;
__poller_del_fd(node->data.fd, node->event, poller);
}
else
node->removed = 1;
}

pthread_mutex_unlock(&poller->mutex);
list_for_each_safe(pos, tmp, &node_list)
{
node = list_entry(pos, struct __poller_node, list);
node->error = 0;
node->state = PR_ST_STOPPED;
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
}

pthread_mutex_unlock(&poller->mutex);
}

3 changes: 3 additions & 0 deletions src/manager/WFGlobal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,9 @@ const char *WFGlobal::get_error_string(int state, int error)
case WFT_STATE_TASK_ERROR:
return __get_task_error_string(error);

case WFT_STATE_ABORTED:
return "Aborted";

case WFT_STATE_UNDEFINED:
return "Undefined";

Expand Down

0 comments on commit f94bbb6

Please sign in to comment.