Skip to content

Commit

Permalink
修复了线程池中BUG;修复了iocp及win32消息事件引擎的BUG
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengshuxin committed Feb 21, 2014
1 parent b20e0b6 commit 9a016e2
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ endif
##############################################################################

.PHONY = check help all clean install uninstall uninstall_all build_bin build_src
VERSION = 3.0.11
VERSION = 3.0.17

help:
@(echo "usage: make help|all|clean|install|uninstall|uninstall_all|build_bin|build_src")
Expand Down
1 change: 1 addition & 0 deletions changes.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
�޸���ʷ�б���
------------------------------------------------------------------------
67) 2014.2.21 --- acl 3.0.17 �汾������(���̳߳ؿ�������� BUG�����Է����˽����汾)
66) 2014.2.17 --- acl 3.0.16 �汾������
65) 2014.1.25 --- acl 3.0.15 �汾������
64) 2014.1.11
Expand Down
8 changes: 8 additions & 0 deletions lib_acl/changes.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
�޸���ʷ�б���

------------------------------------------------------------------------
427) 2014.2.21
427.1) bugfix: events_wmsg.c ���ڻص�������û�н� stream ����Դ��ݣ����Ӱ��
���� win32 ��Ϣ�¼����첽 IO ����
427.2) bugfix: events_iocp.c �������ر���ɶ˿�ʱ������δ��״̬����Ӧ��ǰ�ͷ�
�ص��ṹ����(http://support.microsoft.com/kb/192800/zh-cn)
427.3) bugfix: acl_pthread_pool.c �߳��������������IJ�����������ָ��Խ������

------------------------------------------------------------------------

426) 2014.2.17
Expand Down
8 changes: 4 additions & 4 deletions lib_acl/lib_acl.rc
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ END
//

VS_VERSION_INFO VERSIONINFO
FILEVERSION 2,1,2,9
PRODUCTVERSION 2,1,2,9
FILEVERSION 3,0,1,7
PRODUCTVERSION 3,0,1,7
FILEFLAGSMASK 0x17L
#ifdef _DEBUG
FILEFLAGS 0x1L
Expand All @@ -71,12 +71,12 @@ BEGIN
BEGIN
VALUE "Comments", "����Ϊ��ƽ̨��C�⣬����������ͨѶ����������ܵȹ���"
VALUE "FileDescription", "acl ��"
VALUE "FileVersion", "2, 1, 2, 9"
VALUE "FileVersion", "3, 0, 1, 7"
VALUE "InternalName", "lib_acl"
VALUE "LegalCopyright", "zsx (C) 2011"
VALUE "OriginalFilename", "lib_acl.lib"
VALUE "ProductName", " acl ��"
VALUE "ProductVersion", "2, 1, 2, 9"
VALUE "ProductVersion", "3, 0, l, 7"
END
END
BLOCK "VarFileInfo"
Expand Down
76 changes: 62 additions & 14 deletions lib_acl/src/event/events_iocp.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ struct IOCP_EVENT {
int type;
#define IOCP_EVENT_READ (1 << 0)
#define IOCP_EVENT_WRITE (1 << 2)
#define IOCP_EVENT_DEAD (1 << 3)

ACL_EVENT_FDTABLE *fdp;

#define ACCEPT_ADDRESS_LENGTH ((sizeof(struct sockaddr_in) + 16))
Expand All @@ -73,20 +75,44 @@ static void stream_on_close(ACL_VSTREAM *stream, void *arg)
}

/* 必须在释放 fdp->event_read/fdp->event_write 前关闭套接口句柄 */

if (ACL_VSTREAM_SOCK(stream) != ACL_SOCKET_INVALID && stream->close_fn)
shutdown(ACL_VSTREAM_SOCK(stream), 0);
shutdown(ACL_VSTREAM_SOCK(stream), 1);
if (ACL_VSTREAM_SOCK(stream) != ACL_SOCKET_INVALID
&& stream->close_fn)
{
(void) stream->close_fn(ACL_VSTREAM_SOCK(stream));
else if (ACL_VSTREAM_FILE(stream) != ACL_FILE_INVALID && stream->fclose_fn)
} else if (ACL_VSTREAM_FILE(stream) != ACL_FILE_INVALID
&& stream->fclose_fn)
{
(void) stream->fclose_fn(ACL_VSTREAM_FILE(stream));
}

ACL_VSTREAM_SOCK(stream) = ACL_SOCKET_INVALID;
ACL_VSTREAM_FILE(stream) = ACL_FILE_INVALID;

if (fdp->event_read) {
acl_myfree(fdp->event_read);
/* 如果完成端口处于未决状态,则不能释放重叠结构,需在主循环的
* GetQueuedCompletionStatus 调用后来释放
*/
if (HasOverlappedIoCompleted(&fdp->event_read->overlapped))
acl_myfree(fdp->event_read);
else {
fdp->event_read->type = IOCP_EVENT_DEAD;
fdp->event_read->fdp = NULL;
}
fdp->event_read = NULL;
}
if (fdp->event_write) {
acl_myfree(fdp->event_write);
/* 如果完成端口处于未决状态,则不能释放重叠结构,需在主循环的
* GetQueuedCompletionStatus 调用后来释放
*/
if (HasOverlappedIoCompleted(&fdp->event_write->overlapped))
acl_myfree(fdp->event_write);
else {
fdp->event_write->type = IOCP_EVENT_DEAD;
fdp->event_write->fdp = NULL;
}

fdp->event_write = NULL;
}

Expand Down Expand Up @@ -741,42 +767,64 @@ static void event_loop(ACL_EVENT *eventp)
DWORD lastError = 0;
IOCP_EVENT *iocp_event = NULL;

isSuccess = GetQueuedCompletionStatus(ev->h_iocp, &bytesTransferred,
(DWORD*) &fdp, (OVERLAPPED**) &iocp_event, delay);
isSuccess = GetQueuedCompletionStatus(ev->h_iocp,
&bytesTransferred, (DWORD*) &fdp,
(OVERLAPPED**) &iocp_event, delay);

if (!isSuccess) {
if (iocp_event == NULL)
break;
if (!(fdp->event_type & (ACL_EVENT_XCPT | ACL_EVENT_RW_TIMEOUT))) {
if (iocp_event->type == IOCP_EVENT_DEAD)
acl_myfree(iocp_event);
else if (iocp_event->fdp == NULL) {
acl_msg_warn("%s(%d): fdp null",
myname, __LINE__);
acl_myfree(iocp_event);
} else if (iocp_event->fdp != fdp)
acl_msg_fatal("%s(%d): invalid fdp",
myname, __LINE__);
else if (!(fdp->event_type & (ACL_EVENT_XCPT
| ACL_EVENT_RW_TIMEOUT)))
{
fdp->event_type |= ACL_EVENT_XCPT;
fdp->fdidx_ready = eventp->fdcnt_ready;
eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp;
eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp;
eventp->fdcnt_ready++;
}
continue;
}

acl_assert(fdp == iocp_event->fdp);

if ((fdp->event_type & (ACL_EVENT_XCPT | ACL_EVENT_RW_TIMEOUT)))
if ((fdp->event_type & (ACL_EVENT_XCPT
| ACL_EVENT_RW_TIMEOUT)))
{
continue;
}

if (iocp_event->type == IOCP_EVENT_READ) {
acl_assert(fdp->event_read == iocp_event);
iocp_event->type &= ~IOCP_EVENT_READ;
fdp->stream->sys_read_ready = 1;
if ((fdp->event_type & (ACL_EVENT_READ | ACL_EVENT_WRITE)) == 0)
if ((fdp->event_type & (ACL_EVENT_READ
| ACL_EVENT_WRITE)) == 0)
{
fdp->event_type |= ACL_EVENT_READ;
fdp->fdidx_ready = eventp->fdcnt_ready;
eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp;
eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp;
eventp->fdcnt_ready++;
}
}
if (iocp_event->type == IOCP_EVENT_WRITE) {
acl_assert(fdp->event_write == iocp_event);
iocp_event->type &= ~IOCP_EVENT_WRITE;
if ((fdp->event_type & (ACL_EVENT_READ | ACL_EVENT_WRITE)) == 0)
if ((fdp->event_type & (ACL_EVENT_READ
| ACL_EVENT_WRITE)) == 0)
{
fdp->event_type |= ACL_EVENT_WRITE;
fdp->fdidx_ready = eventp->fdcnt_ready;
eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp;
eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp;
eventp->fdcnt_ready++;
}
}
delay = 0;
Expand Down
17 changes: 9 additions & 8 deletions lib_acl/src/event/events_wmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,10 @@ static void handleClose(EVENT_WMSG *ev, ACL_SOCKET sockfd)
return;
else if (fdp->r_callback)
fdp->r_callback(ACL_EVENT_XCPT, &ev->event,
NULL, fdp->r_context);
fdp->stream, fdp->r_context);
else if (fdp->w_callback)
fdp->w_callback(ACL_EVENT_XCPT, &ev->event,
NULL, fdp->w_context);
fdp->stream, fdp->w_context);
/*
else
acl_msg_error("%s(%d): w_callback and r_callback null"
Expand All @@ -506,7 +506,7 @@ static void handleConnect(EVENT_WMSG *ev, ACL_SOCKET sockfd)
else {
fdp->stream->flag &= ~ACL_VSTREAM_FLAG_CONNECTING;
fdp->w_callback(ACL_EVENT_WRITE, &ev->event,
NULL, fdp->w_context);
fdp->stream, fdp->w_context);
}
}

Expand All @@ -520,7 +520,8 @@ static void handleAccept(EVENT_WMSG *ev, ACL_SOCKET sockfd)
else if (fdp->r_callback == NULL)
acl_msg_fatal("%s(%d): fdp callback null", myname, __LINE__);

fdp->r_callback(ACL_EVENT_READ, &ev->event, NULL, fdp->r_context);
fdp->r_callback(ACL_EVENT_READ, &ev->event,
fdp->stream, fdp->r_context);
}

static void handleRead(EVENT_WMSG *ev, ACL_SOCKET sockfd)
Expand All @@ -532,15 +533,15 @@ static void handleRead(EVENT_WMSG *ev, ACL_SOCKET sockfd)
acl_msg_error("%s(%d): fdp null for sockfd(%d)",
myname, __LINE__, (int) sockfd);
else if ((fdp->stream->type & ACL_VSTREAM_TYPE_LISTEN))
fdp->r_callback(ACL_EVENT_READ, &ev->event, NULL,
fdp->r_context);
fdp->r_callback(ACL_EVENT_READ, &ev->event,
fdp->stream, fdp->r_context);
else if (fdp->r_callback != NULL) {
/* 该描述字可读则设置 ACL_VSTREAM 的系统可读标志从而触发
* ACL_VSTREAM 流在读时调用系统的 read 函数
*/
fdp->stream->sys_read_ready = 1;
fdp->r_callback(ACL_EVENT_READ, &ev->event,
NULL, fdp->r_context);
fdp->stream, fdp->r_context);
}
/* else
acl_msg_error("%s(%d): fdp->r_callback null for sockfd(%d)",
Expand All @@ -560,7 +561,7 @@ static void handleWrite(EVENT_WMSG *ev, ACL_SOCKET sockfd)
handleConnect(ev, sockfd);
else if (fdp->w_callback != NULL)
fdp->w_callback(ACL_EVENT_WRITE, &ev->event,
NULL, fdp->w_context);
fdp->stream, fdp->w_context);
/*
else
acl_msg_error("%s(%d): fdp->w_callback null for sockfd(%d)",
Expand Down
2 changes: 1 addition & 1 deletion lib_acl/src/init/acl_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

#include "init.h"

static char *version = "lib_acl_3.0.16";
static char *version = "lib_acl_3.0.17";

const char *acl_version(void)
{
Expand Down
37 changes: 13 additions & 24 deletions lib_acl/src/thread/acl_pthread_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ struct acl_pthread_pool_t {
thread_worker *thr_first; /* first idle thread */
thread_worker *thr_iter; /* for bat operation */
thread_cond *cond_first;
thread_cond *cond_last;
int poller_running; /* is poller thread running ? */
int qlen; /* the work queue's length */
int job_nslot;
Expand Down Expand Up @@ -193,7 +192,7 @@ static thread_cond *thread_cond_create(void)
thread_cond *cond = (thread_cond*)
acl_mycalloc(1, sizeof(thread_cond));

acl_pthread_cond_init(&cond->cond, NULL);
acl_assert(acl_pthread_cond_init(&cond->cond, NULL) == 0);
return cond;
}

Expand All @@ -207,7 +206,6 @@ static thread_worker *worker_create(acl_pthread_pool_t *thr_pool)
{
thread_worker *thr = (thread_worker*) acl_mycalloc(1,
sizeof(thread_worker));
thread_cond *cond = thr_pool->cond_first;

thr->id = (unsigned long) acl_pthread_self();
thr->idle = thr_pool->idle_timeout;
Expand All @@ -222,27 +220,20 @@ static thread_worker *worker_create(acl_pthread_pool_t *thr_pool)
} else
thr->idle = 0;

if (cond == NULL) {
cond = thread_cond_create();
acl_assert(acl_pthread_cond_init(&cond->cond, NULL) == 0);
} else {
thr_pool->cond_first = cond->next;
if (thr_pool->cond_last == cond)
thr_pool->cond_last = NULL;
}
if (thr_pool->cond_first != NULL) {
thr->cond = thr_pool->cond_first;
thr_pool->cond_first = thr_pool->cond_first->next;
} else
thr->cond = thread_cond_create();

thr->cond = cond;
thr->mutex = &thr_pool->worker_mutex;
return thr;
}

static void worker_free(acl_pthread_pool_t *thr_pool, thread_worker *thr)
{
if (thr_pool->cond_first == NULL)
thr_pool->cond_first = thr->cond;
else
thr_pool->cond_last->next = thr->cond;
thr_pool->cond_last = thr->cond;
thr->cond->next = thr_pool->cond_first;
thr_pool->cond_first = thr->cond;
acl_myfree(thr);
}

Expand Down Expand Up @@ -431,19 +422,18 @@ static void *worker_thread(void* arg)
}
}

thr = worker_create(thr_pool);
acl_assert(thr->mutex == &thr_pool->worker_mutex);
mutex = thr->mutex;

/* lock the thread pool's global mutex at first */

status = acl_pthread_mutex_lock(mutex);
status = acl_pthread_mutex_lock(&thr_pool->worker_mutex);
if (status != 0) {
SET_ERRNO(status);
acl_msg_fatal("%s(%d), %s: lock failed: %s", __FILE__,
__LINE__, myname, acl_last_serror());
}

thr = worker_create(thr_pool);
mutex = thr->mutex;

for (;;) {

/* handle thread self's job first */
Expand Down Expand Up @@ -498,7 +488,7 @@ static void *worker_thread(void* arg)
status = acl_pthread_mutex_unlock(mutex);
if (status != 0) {
SET_ERRNO(status);
acl_msg_error("%s, %s(%d): unlock error(%s)",
acl_msg_fatal("%s, %s(%d): unlock error(%s)",
__FILE__, myname, __LINE__, acl_last_serror());
}

Expand Down Expand Up @@ -943,7 +933,6 @@ static void thread_pool_init(acl_pthread_pool_t *thr_pool)
thr_pool->schedule_warn = 100;
thr_pool->schedule_wait = 100;
thr_pool->cond_first = NULL;
thr_pool->cond_last = NULL;
}

/* create work queue */
Expand Down
8 changes: 4 additions & 4 deletions lib_acl_cpp/samples/aio/aio_server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ class io_callback : public aio_callback
*/
bool timeout_callback()
{
std::cout << "Timeout ..." << std::endl;
return (true);
std::cout << "Timeout, delete it ..." << std::endl;
return (false);
}

private:
Expand Down Expand Up @@ -239,7 +239,7 @@ class io_accept_callback : public aio_accept_callback
client->add_timeout_callback(callback);

// 从异步流读一行数据
client->gets(10, false);
client->gets(3, false);
return (true);
}
};
Expand All @@ -251,7 +251,7 @@ static void usage(const char* procname)

int main(int argc, char* argv[])
{
bool use_kernel = true;
bool use_kernel = false;
int ch;

while ((ch = getopt(argc, argv, "hk")) > 0)
Expand Down
Loading

0 comments on commit 9a016e2

Please sign in to comment.