-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsample.go
132 lines (112 loc) · 2.67 KB
/
sample.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package main
import (
"fmt"
"sync"
)
// Worker represents a worker that executes tasks.
type Worker struct {
ID int
TaskCh chan Task
QuitCh chan struct{}
}
// Task represents a task that can be executed by a worker.
type Task func()
// NewWorker creates a new worker with the given ID.
func NewWorker(id int, wg *sync.WaitGroup) *Worker {
worker := &Worker{
ID: id,
TaskCh: make(chan Task),
QuitCh: make(chan struct{}),
}
go worker.start(wg)
return worker
}
// start starts the worker, listening for tasks and quitting when necessary.
func (w *Worker) start(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case task := <-w.TaskCh:
task() // Execute the task.
case <-w.QuitCh:
return // Quit the worker.
}
}
}
// Stop stops the worker.
func (w *Worker) Stop() {
close(w.QuitCh)
}
// WorkerPool represents a pool of workers.
type WorkerPool struct {
Workers []*Worker
TaskCh chan Task
WG sync.WaitGroup
}
// NewWorkerPool creates a new worker pool with the specified number of workers.
func NewWorkerPool(numWorkers int) *WorkerPool {
pool := &WorkerPool{
Workers: make([]*Worker, numWorkers),
TaskCh: make(chan Task),
}
for i := 0; i < numWorkers; i++ {
pool.Workers[i] = NewWorker(i, &pool.WG)
pool.WG.Add(1)
}
go pool.start()
return pool
}
// start starts the worker pool, dispatching tasks to available workers.
func (p *WorkerPool) start() {
defer p.WG.Wait()
for {
select {
case task := <-p.TaskCh:
// Dispatch task to an available worker.
worker := p.getAvailableWorker()
worker.TaskCh <- task
}
}
}
// getAvailableWorker finds an available worker or waits until one is available.
func (p *WorkerPool) getAvailableWorker() *Worker {
for _, worker := range p.Workers {
select {
case <-worker.QuitCh:
// Restart the worker if it has been stopped.
worker = NewWorker(worker.ID, &p.WG)
p.WG.Add(1)
return worker
default:
if len(worker.TaskCh) == 0 {
return worker
}
}
}
// If no available workers, wait for a worker to be available.
for _, worker := range p.Workers {
<-worker.TaskCh // Wait for a worker to be available.
return worker
}
}
// Stop stops the worker pool and all its workers.
func (p *WorkerPool) Stop() {
for _, worker := range p.Workers {
worker.Stop()
}
close(p.TaskCh)
}
func main() {
// Create a worker pool with 3 workers.
pool := NewWorkerPool(3)
// Submit tasks to the worker pool.
for i := 0; i < 10; i++ {
index := i // Capture the loop variable.
task := func() {
fmt.Printf("Task %d executed by worker %d\n", index, pool.getAvailableWorker().ID)
}
pool.TaskCh <- task
}
// Stop the worker pool when all tasks are submitted.
pool.Stop()
}