From 9a016e24b18867973ec867b1de6e11e8b8e3ff68 Mon Sep 17 00:00:00 2001 From: zsxxsz Date: Fri, 21 Feb 2014 23:01:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BA=86=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E4=B8=ADBUG=EF=BC=9B=E4=BF=AE=E5=A4=8D=E4=BA=86iocp?= =?UTF-8?q?=E5=8F=8Awin32=E6=B6=88=E6=81=AF=E4=BA=8B=E4=BB=B6=E5=BC=95?= =?UTF-8?q?=E6=93=8E=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 2 +- changes.txt | 1 + lib_acl/changes.txt | 8 ++ lib_acl/lib_acl.rc | 8 +- lib_acl/src/event/events_iocp.c | 76 +++++++++++++++---- lib_acl/src/event/events_wmsg.c | 17 +++-- lib_acl/src/init/acl_init.c | 2 +- lib_acl/src/thread/acl_pthread_pool.c | 37 ++++----- lib_acl_cpp/samples/aio/aio_server/main.cpp | 8 +- .../samples/winaio/winaio_vc2012.vcxproj | 12 ++- samples/thread/thread_pool4/Makefile | 2 + samples/thread/thread_pool4/main.c | 28 +++++++ samples/thread/thread_pool4/valgrind.sh | 3 + 13 files changed, 146 insertions(+), 58 deletions(-) create mode 100644 samples/thread/thread_pool4/Makefile create mode 100644 samples/thread/thread_pool4/main.c create mode 100644 samples/thread/thread_pool4/valgrind.sh diff --git a/Makefile b/Makefile index 628a34438..e4983c62e 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ endif ############################################################################## .PHONY = check help all clean install uninstall uninstall_all build_bin build_src -VERSION = 3.0.11 +VERSION = 3.0.17 help: @(echo "usage: make help|all|clean|install|uninstall|uninstall_all|build_bin|build_src") diff --git a/changes.txt b/changes.txt index fe236aabf..ebc3b1687 100644 --- a/changes.txt +++ b/changes.txt @@ -1,5 +1,6 @@ 修改历史列表: ------------------------------------------------------------------------ +67) 2014.2.21 --- acl 3.0.17 版本发布!(因线程池库存在严重 BUG,所以发布此紧急版本) 66) 2014.2.17 --- acl 3.0.16 版本发布! 65) 2014.1.25 --- acl 3.0.15 版本发布! 64) 2014.1.11 diff --git a/lib_acl/changes.txt b/lib_acl/changes.txt index 7a7f9540c..9ac28d234 100644 --- a/lib_acl/changes.txt +++ b/lib_acl/changes.txt @@ -1,5 +1,13 @@ 修改历史列表: +------------------------------------------------------------------------ +427) 2014.2.21 +427.1) bugfix: events_wmsg.c 中在回调函数中没有将 stream 流针对传递,这会影响 +基于 win32 消息事件的异步 IO 过程 +427.2) bugfix: events_iocp.c 当主动关闭完成端口时,对于未决状态,不应提前释放 +重叠结构对象(http://support.microsoft.com/kb/192800/zh-cn) +427.3) bugfix: acl_pthread_pool.c 线程条件变量链表的操作方法存在指针越界问题 + ------------------------------------------------------------------------ 426) 2014.2.17 diff --git a/lib_acl/lib_acl.rc b/lib_acl/lib_acl.rc index b7e7b02e4..b0ec001d5 100644 --- a/lib_acl/lib_acl.rc +++ b/lib_acl/lib_acl.rc @@ -53,8 +53,8 @@ END // VS_VERSION_INFO VERSIONINFO - FILEVERSION 2,1,2,9 - PRODUCTVERSION 2,1,2,9 + FILEVERSION 3,0,1,7 + PRODUCTVERSION 3,0,1,7 FILEFLAGSMASK 0x17L #ifdef _DEBUG FILEFLAGS 0x1L @@ -71,12 +71,12 @@ BEGIN BEGIN VALUE "Comments", "本库为跨平台的C库,包括了网络通讯,服务器框架等功能" VALUE "FileDescription", "acl 库" - VALUE "FileVersion", "2, 1, 2, 9" + VALUE "FileVersion", "3, 0, 1, 7" VALUE "InternalName", "lib_acl" VALUE "LegalCopyright", "zsx (C) 2011" VALUE "OriginalFilename", "lib_acl.lib" VALUE "ProductName", " acl 库" - VALUE "ProductVersion", "2, 1, 2, 9" + VALUE "ProductVersion", "3, 0, l, 7" END END BLOCK "VarFileInfo" diff --git a/lib_acl/src/event/events_iocp.c b/lib_acl/src/event/events_iocp.c index d075fad72..e21cf85fb 100644 --- a/lib_acl/src/event/events_iocp.c +++ b/lib_acl/src/event/events_iocp.c @@ -50,6 +50,8 @@ struct IOCP_EVENT { int type; #define IOCP_EVENT_READ (1 << 0) #define IOCP_EVENT_WRITE (1 << 2) +#define IOCP_EVENT_DEAD (1 << 3) + ACL_EVENT_FDTABLE *fdp; #define ACCEPT_ADDRESS_LENGTH ((sizeof(struct sockaddr_in) + 16)) @@ -73,20 +75,44 @@ static void stream_on_close(ACL_VSTREAM *stream, void *arg) } /* 必须在释放 fdp->event_read/fdp->event_write 前关闭套接口句柄 */ - - if (ACL_VSTREAM_SOCK(stream) != ACL_SOCKET_INVALID && stream->close_fn) + shutdown(ACL_VSTREAM_SOCK(stream), 0); + shutdown(ACL_VSTREAM_SOCK(stream), 1); + if (ACL_VSTREAM_SOCK(stream) != ACL_SOCKET_INVALID + && stream->close_fn) + { (void) stream->close_fn(ACL_VSTREAM_SOCK(stream)); - else if (ACL_VSTREAM_FILE(stream) != ACL_FILE_INVALID && stream->fclose_fn) + } else if (ACL_VSTREAM_FILE(stream) != ACL_FILE_INVALID + && stream->fclose_fn) + { (void) stream->fclose_fn(ACL_VSTREAM_FILE(stream)); + } + ACL_VSTREAM_SOCK(stream) = ACL_SOCKET_INVALID; ACL_VSTREAM_FILE(stream) = ACL_FILE_INVALID; if (fdp->event_read) { - acl_myfree(fdp->event_read); + /* 如果完成端口处于未决状态,则不能释放重叠结构,需在主循环的 + * GetQueuedCompletionStatus 调用后来释放 + */ + if (HasOverlappedIoCompleted(&fdp->event_read->overlapped)) + acl_myfree(fdp->event_read); + else { + fdp->event_read->type = IOCP_EVENT_DEAD; + fdp->event_read->fdp = NULL; + } fdp->event_read = NULL; } if (fdp->event_write) { - acl_myfree(fdp->event_write); + /* 如果完成端口处于未决状态,则不能释放重叠结构,需在主循环的 + * GetQueuedCompletionStatus 调用后来释放 + */ + if (HasOverlappedIoCompleted(&fdp->event_write->overlapped)) + acl_myfree(fdp->event_write); + else { + fdp->event_write->type = IOCP_EVENT_DEAD; + fdp->event_write->fdp = NULL; + } + fdp->event_write = NULL; } @@ -741,42 +767,64 @@ static void event_loop(ACL_EVENT *eventp) DWORD lastError = 0; IOCP_EVENT *iocp_event = NULL; - isSuccess = GetQueuedCompletionStatus(ev->h_iocp, &bytesTransferred, - (DWORD*) &fdp, (OVERLAPPED**) &iocp_event, delay); + isSuccess = GetQueuedCompletionStatus(ev->h_iocp, + &bytesTransferred, (DWORD*) &fdp, + (OVERLAPPED**) &iocp_event, delay); + if (!isSuccess) { if (iocp_event == NULL) break; - if (!(fdp->event_type & (ACL_EVENT_XCPT | ACL_EVENT_RW_TIMEOUT))) { + if (iocp_event->type == IOCP_EVENT_DEAD) + acl_myfree(iocp_event); + else if (iocp_event->fdp == NULL) { + acl_msg_warn("%s(%d): fdp null", + myname, __LINE__); + acl_myfree(iocp_event); + } else if (iocp_event->fdp != fdp) + acl_msg_fatal("%s(%d): invalid fdp", + myname, __LINE__); + else if (!(fdp->event_type & (ACL_EVENT_XCPT + | ACL_EVENT_RW_TIMEOUT))) + { fdp->event_type |= ACL_EVENT_XCPT; fdp->fdidx_ready = eventp->fdcnt_ready; - eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp; + eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp; + eventp->fdcnt_ready++; } continue; } acl_assert(fdp == iocp_event->fdp); - if ((fdp->event_type & (ACL_EVENT_XCPT | ACL_EVENT_RW_TIMEOUT))) + if ((fdp->event_type & (ACL_EVENT_XCPT + | ACL_EVENT_RW_TIMEOUT))) + { continue; + } + if (iocp_event->type == IOCP_EVENT_READ) { acl_assert(fdp->event_read == iocp_event); iocp_event->type &= ~IOCP_EVENT_READ; fdp->stream->sys_read_ready = 1; - if ((fdp->event_type & (ACL_EVENT_READ | ACL_EVENT_WRITE)) == 0) + if ((fdp->event_type & (ACL_EVENT_READ + | ACL_EVENT_WRITE)) == 0) { fdp->event_type |= ACL_EVENT_READ; fdp->fdidx_ready = eventp->fdcnt_ready; - eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp; + eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp; + eventp->fdcnt_ready++; } } if (iocp_event->type == IOCP_EVENT_WRITE) { acl_assert(fdp->event_write == iocp_event); iocp_event->type &= ~IOCP_EVENT_WRITE; - if ((fdp->event_type & (ACL_EVENT_READ | ACL_EVENT_WRITE)) == 0) + if ((fdp->event_type & (ACL_EVENT_READ + | ACL_EVENT_WRITE)) == 0) { fdp->event_type |= ACL_EVENT_WRITE; fdp->fdidx_ready = eventp->fdcnt_ready; - eventp->fdtabs_ready[eventp->fdcnt_ready++] = fdp; + eventp->fdtabs_ready[eventp->fdcnt_ready] = fdp; + eventp->fdcnt_ready++; } } delay = 0; diff --git a/lib_acl/src/event/events_wmsg.c b/lib_acl/src/event/events_wmsg.c index 2b5f2c202..9e06b542f 100644 --- a/lib_acl/src/event/events_wmsg.c +++ b/lib_acl/src/event/events_wmsg.c @@ -481,10 +481,10 @@ static void handleClose(EVENT_WMSG *ev, ACL_SOCKET sockfd) return; else if (fdp->r_callback) fdp->r_callback(ACL_EVENT_XCPT, &ev->event, - NULL, fdp->r_context); + fdp->stream, fdp->r_context); else if (fdp->w_callback) fdp->w_callback(ACL_EVENT_XCPT, &ev->event, - NULL, fdp->w_context); + fdp->stream, fdp->w_context); /* else acl_msg_error("%s(%d): w_callback and r_callback null" @@ -506,7 +506,7 @@ static void handleConnect(EVENT_WMSG *ev, ACL_SOCKET sockfd) else { fdp->stream->flag &= ~ACL_VSTREAM_FLAG_CONNECTING; fdp->w_callback(ACL_EVENT_WRITE, &ev->event, - NULL, fdp->w_context); + fdp->stream, fdp->w_context); } } @@ -520,7 +520,8 @@ static void handleAccept(EVENT_WMSG *ev, ACL_SOCKET sockfd) else if (fdp->r_callback == NULL) acl_msg_fatal("%s(%d): fdp callback null", myname, __LINE__); - fdp->r_callback(ACL_EVENT_READ, &ev->event, NULL, fdp->r_context); + fdp->r_callback(ACL_EVENT_READ, &ev->event, + fdp->stream, fdp->r_context); } static void handleRead(EVENT_WMSG *ev, ACL_SOCKET sockfd) @@ -532,15 +533,15 @@ static void handleRead(EVENT_WMSG *ev, ACL_SOCKET sockfd) acl_msg_error("%s(%d): fdp null for sockfd(%d)", myname, __LINE__, (int) sockfd); else if ((fdp->stream->type & ACL_VSTREAM_TYPE_LISTEN)) - fdp->r_callback(ACL_EVENT_READ, &ev->event, NULL, - fdp->r_context); + fdp->r_callback(ACL_EVENT_READ, &ev->event, + fdp->stream, fdp->r_context); else if (fdp->r_callback != NULL) { /* 该描述字可读则设置 ACL_VSTREAM 的系统可读标志从而触发 * ACL_VSTREAM 流在读时调用系统的 read 函数 */ fdp->stream->sys_read_ready = 1; fdp->r_callback(ACL_EVENT_READ, &ev->event, - NULL, fdp->r_context); + fdp->stream, fdp->r_context); } /* else acl_msg_error("%s(%d): fdp->r_callback null for sockfd(%d)", @@ -560,7 +561,7 @@ static void handleWrite(EVENT_WMSG *ev, ACL_SOCKET sockfd) handleConnect(ev, sockfd); else if (fdp->w_callback != NULL) fdp->w_callback(ACL_EVENT_WRITE, &ev->event, - NULL, fdp->w_context); + fdp->stream, fdp->w_context); /* else acl_msg_error("%s(%d): fdp->w_callback null for sockfd(%d)", diff --git a/lib_acl/src/init/acl_init.c b/lib_acl/src/init/acl_init.c index cc9c95648..070f48cdb 100644 --- a/lib_acl/src/init/acl_init.c +++ b/lib_acl/src/init/acl_init.c @@ -24,7 +24,7 @@ #include "init.h" -static char *version = "lib_acl_3.0.16"; +static char *version = "lib_acl_3.0.17"; const char *acl_version(void) { diff --git a/lib_acl/src/thread/acl_pthread_pool.c b/lib_acl/src/thread/acl_pthread_pool.c index e02f02254..148377e17 100644 --- a/lib_acl/src/thread/acl_pthread_pool.c +++ b/lib_acl/src/thread/acl_pthread_pool.c @@ -83,7 +83,6 @@ struct acl_pthread_pool_t { thread_worker *thr_first; /* first idle thread */ thread_worker *thr_iter; /* for bat operation */ thread_cond *cond_first; - thread_cond *cond_last; int poller_running; /* is poller thread running ? */ int qlen; /* the work queue's length */ int job_nslot; @@ -193,7 +192,7 @@ static thread_cond *thread_cond_create(void) thread_cond *cond = (thread_cond*) acl_mycalloc(1, sizeof(thread_cond)); - acl_pthread_cond_init(&cond->cond, NULL); + acl_assert(acl_pthread_cond_init(&cond->cond, NULL) == 0); return cond; } @@ -207,7 +206,6 @@ static thread_worker *worker_create(acl_pthread_pool_t *thr_pool) { thread_worker *thr = (thread_worker*) acl_mycalloc(1, sizeof(thread_worker)); - thread_cond *cond = thr_pool->cond_first; thr->id = (unsigned long) acl_pthread_self(); thr->idle = thr_pool->idle_timeout; @@ -222,27 +220,20 @@ static thread_worker *worker_create(acl_pthread_pool_t *thr_pool) } else thr->idle = 0; - if (cond == NULL) { - cond = thread_cond_create(); - acl_assert(acl_pthread_cond_init(&cond->cond, NULL) == 0); - } else { - thr_pool->cond_first = cond->next; - if (thr_pool->cond_last == cond) - thr_pool->cond_last = NULL; - } + if (thr_pool->cond_first != NULL) { + thr->cond = thr_pool->cond_first; + thr_pool->cond_first = thr_pool->cond_first->next; + } else + thr->cond = thread_cond_create(); - thr->cond = cond; thr->mutex = &thr_pool->worker_mutex; return thr; } static void worker_free(acl_pthread_pool_t *thr_pool, thread_worker *thr) { - if (thr_pool->cond_first == NULL) - thr_pool->cond_first = thr->cond; - else - thr_pool->cond_last->next = thr->cond; - thr_pool->cond_last = thr->cond; + thr->cond->next = thr_pool->cond_first; + thr_pool->cond_first = thr->cond; acl_myfree(thr); } @@ -431,19 +422,18 @@ static void *worker_thread(void* arg) } } - thr = worker_create(thr_pool); - acl_assert(thr->mutex == &thr_pool->worker_mutex); - mutex = thr->mutex; - /* lock the thread pool's global mutex at first */ - status = acl_pthread_mutex_lock(mutex); + status = acl_pthread_mutex_lock(&thr_pool->worker_mutex); if (status != 0) { SET_ERRNO(status); acl_msg_fatal("%s(%d), %s: lock failed: %s", __FILE__, __LINE__, myname, acl_last_serror()); } + thr = worker_create(thr_pool); + mutex = thr->mutex; + for (;;) { /* handle thread self's job first */ @@ -498,7 +488,7 @@ static void *worker_thread(void* arg) status = acl_pthread_mutex_unlock(mutex); if (status != 0) { SET_ERRNO(status); - acl_msg_error("%s, %s(%d): unlock error(%s)", + acl_msg_fatal("%s, %s(%d): unlock error(%s)", __FILE__, myname, __LINE__, acl_last_serror()); } @@ -943,7 +933,6 @@ static void thread_pool_init(acl_pthread_pool_t *thr_pool) thr_pool->schedule_warn = 100; thr_pool->schedule_wait = 100; thr_pool->cond_first = NULL; - thr_pool->cond_last = NULL; } /* create work queue */ diff --git a/lib_acl_cpp/samples/aio/aio_server/main.cpp b/lib_acl_cpp/samples/aio/aio_server/main.cpp index 437f0972b..6d23b4163 100644 --- a/lib_acl_cpp/samples/aio/aio_server/main.cpp +++ b/lib_acl_cpp/samples/aio/aio_server/main.cpp @@ -195,8 +195,8 @@ class io_callback : public aio_callback */ bool timeout_callback() { - std::cout << "Timeout ..." << std::endl; - return (true); + std::cout << "Timeout, delete it ..." << std::endl; + return (false); } private: @@ -239,7 +239,7 @@ class io_accept_callback : public aio_accept_callback client->add_timeout_callback(callback); // 从异步流读一行数据 - client->gets(10, false); + client->gets(3, false); return (true); } }; @@ -251,7 +251,7 @@ static void usage(const char* procname) int main(int argc, char* argv[]) { - bool use_kernel = true; + bool use_kernel = false; int ch; while ((ch = getopt(argc, argv, "hk")) > 0) diff --git a/lib_acl_cpp/samples/winaio/winaio_vc2012.vcxproj b/lib_acl_cpp/samples/winaio/winaio_vc2012.vcxproj index 89f26452a..438a3bb85 100644 --- a/lib_acl_cpp/samples/winaio/winaio_vc2012.vcxproj +++ b/lib_acl_cpp/samples/winaio/winaio_vc2012.vcxproj @@ -194,6 +194,12 @@ 0x0804 $(IntDir);%(AdditionalIncludeDirectories) + + copy ..\..\..\dist\lib\win32\lib_acl_d.dll $(OutDir) /Y +copy ..\..\..\dist\lib\win32\lib_acl_cpp_d.dll $(OutDir) /Y +copy ..\..\..\dist\lib\win32\lib_protocol_d.dll $(OutDir) /Y + + @@ -226,8 +232,10 @@ $(IntDir);%(AdditionalIncludeDirectories) - - + copy ..\..\..\dist\lib\win32\lib_acl.dll $(OutDir) /Y +copy ..\..\..\dist\lib\win32\lib_acl_cpp.dll $(OutDir) /Y +copy ..\..\..\dist\lib\win32\lib_protocol.dll $(OutDir) /Y + diff --git a/samples/thread/thread_pool4/Makefile b/samples/thread/thread_pool4/Makefile new file mode 100644 index 000000000..36a1e0b2e --- /dev/null +++ b/samples/thread/thread_pool4/Makefile @@ -0,0 +1,2 @@ +include ../Makefile.in +PROG = thread_pool_client diff --git a/samples/thread/thread_pool4/main.c b/samples/thread/thread_pool4/main.c new file mode 100644 index 000000000..2a749b52a --- /dev/null +++ b/samples/thread/thread_pool4/main.c @@ -0,0 +1,28 @@ +#include +#include "lib_acl.h" + +static void thread_run(void *arg) +{ + (void) arg; +} + +int main(void) +{ + int nthreads = 200, count = 100, i; + acl_pthread_pool_t *thrpool; + + thrpool = acl_thread_pool_create(nthreads, 1); + + for (i = 0; i < nthreads; i++) + acl_pthread_pool_add(thrpool, thread_run, &count); + + sleep(2); + + for (i = 0; i < nthreads; i++) + acl_pthread_pool_add(thrpool, thread_run, &count); + + printf("enter any key to exit\r\n"); + getchar(); + acl_pthread_pool_destroy(thrpool); + return 0; +} diff --git a/samples/thread/thread_pool4/valgrind.sh b/samples/thread/thread_pool4/valgrind.sh new file mode 100644 index 000000000..aad8796fe --- /dev/null +++ b/samples/thread/thread_pool4/valgrind.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +valgrind --tool=memcheck --leak-check=yes -v ./thread_pool_client