From d3d1bdca8ad90962d5e8c7245149a64e382f03a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Flc=E3=82=9B?= Date: Thu, 25 Jan 2024 23:59:05 +0800 Subject: [PATCH] feat(coroutine): Added `Concurrent` and `Parallel` (#74) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(coroutine): Added `Concurrent` and `Parallel` Signed-off-by: Flc゛ * feat(coroutine): Added `Concurrent` and `Parallel` Signed-off-by: Flc゛ * feat(coroutine): Added `Concurrent` and `Parallel` Signed-off-by: Flc゛ --------- Signed-off-by: Flc゛ --- coroutine/concurrent.go | 25 +++++++++++++++++++++++ coroutine/concurrent_test.go | 39 ++++++++++++++++++++++++++++++++++++ coroutine/parallel.go | 17 ++++++++++++++++ coroutine/parallel_test.go | 38 +++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 4 ++++ 6 files changed, 124 insertions(+) create mode 100644 coroutine/concurrent.go create mode 100644 coroutine/concurrent_test.go create mode 100644 coroutine/parallel.go create mode 100644 coroutine/parallel_test.go diff --git a/coroutine/concurrent.go b/coroutine/concurrent.go new file mode 100644 index 0000000..92894f3 --- /dev/null +++ b/coroutine/concurrent.go @@ -0,0 +1,25 @@ +package coroutine + +import "sync" + +func Concurrent(limit int, tasks ...func()) { + var ( + wg sync.WaitGroup + ch = make(chan struct{}, limit) + ) + defer close(ch) + wg.Add(len(tasks)) + + for _, task := range tasks { + ch <- struct{}{} + go func(task func()) { + defer func() { + <-ch + wg.Done() + }() + task() + }(task) + } + + wg.Wait() +} diff --git a/coroutine/concurrent_test.go b/coroutine/concurrent_test.go new file mode 100644 index 0000000..e93b841 --- /dev/null +++ b/coroutine/concurrent_test.go @@ -0,0 +1,39 @@ +package coroutine + +import ( + "bytes" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestConcurrent(t *testing.T) { + var ( + start = time.Now() + buffer bytes.Buffer + mu sync.Mutex + ) + + Concurrent(2, func() { + time.Sleep(1 * time.Second) + mu.Lock() + defer mu.Unlock() + buffer.WriteString("1") + }, func() { + time.Sleep(2 * time.Second) + mu.Lock() + defer mu.Unlock() + buffer.WriteString("2") + }, func() { + time.Sleep(3 * time.Second) + mu.Lock() + defer mu.Unlock() + buffer.WriteString("3") + }) + + assert.Equal(t, "123", buffer.String()) + assert.True(t, time.Since(start) > 4*time.Second) + assert.True(t, time.Since(start) < 6*time.Second) +} diff --git a/coroutine/parallel.go b/coroutine/parallel.go new file mode 100644 index 0000000..444415a --- /dev/null +++ b/coroutine/parallel.go @@ -0,0 +1,17 @@ +package coroutine + +import "sync" + +func Parallel(tasks ...func()) { + var wg sync.WaitGroup + wg.Add(len(tasks)) + + for _, task := range tasks { + go func(task func()) { + defer wg.Done() + task() + }(task) + } + + wg.Wait() +} diff --git a/coroutine/parallel_test.go b/coroutine/parallel_test.go new file mode 100644 index 0000000..0304a7d --- /dev/null +++ b/coroutine/parallel_test.go @@ -0,0 +1,38 @@ +package coroutine + +import ( + "bytes" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestParallel(t *testing.T) { + var ( + start = time.Now() + buffer bytes.Buffer + mu sync.Mutex + ) + + Parallel(func() { + time.Sleep(1 * time.Second) + mu.Lock() + defer mu.Unlock() + buffer.WriteString("1") + }, func() { + time.Sleep(2 * time.Second) + mu.Lock() + defer mu.Unlock() + buffer.WriteString("2") + }, func() { + time.Sleep(3 * time.Second) + mu.Lock() + defer mu.Unlock() + buffer.WriteString("3") + }) + + assert.Equal(t, "123", buffer.String()) + assert.True(t, time.Since(start) < 4*time.Second) +} diff --git a/go.mod b/go.mod index ea71c75..c1cbac2 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.17.0 // indirect + golang.org/x/sync v0.4.0 // indirect golang.org/x/sys v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect google.golang.org/grpc v1.60.1 // indirect diff --git a/go.sum b/go.sum index d278d1f..253e4e6 100644 --- a/go.sum +++ b/go.sum @@ -96,6 +96,8 @@ golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= @@ -103,6 +105,8 @@ golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 h1:YJ5pD9rF8o9Qtta0Cmy9rdBwkSjrTCT6XTiUQVOtIos= +google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 h1:W18sezcAYs+3tDZX4F80yctqa12jcP1PUS2gQu1zTPU= google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 h1:6G8oQ016D88m1xAKljMlBOOGWDZkes4kMhgGFlf8WcQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917/go.mod h1:xtjpI3tXFPP051KaWnhvxkiubL/6dJ18vLVf7q2pTOU= google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU=