-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadapter_option.go
172 lines (145 loc) · 3.87 KB
/
adapter_option.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package art
import (
"reflect"
)
func NewAdapterOption() (opt *AdapterOption) {
pubsub := &Adapter{
recvResult: make(chan error, 2),
lifecycle: new(Lifecycle),
waitStop: make(chan struct{}),
}
return &AdapterOption{
adapter: pubsub,
}
}
type AdapterOption struct {
adapter *Adapter
decorateAdapter func(adp IAdapter) (app IAdapter)
}
func (opt *AdapterOption) Build() (adp IAdapter, err error) {
pubsub := opt.adapter
if pubsub.logger == nil {
pubsub.logger = DefaultLogger()
}
if opt.decorateAdapter != nil {
pubsub.application = opt.decorateAdapter(pubsub)
} else {
pubsub.application = pubsub
}
if pubsub.hub != nil && !reflect.ValueOf(pubsub.hub).IsZero() {
err = pubsub.hub.Join(pubsub.identifier, pubsub.application)
if err != nil {
return nil, err
}
}
err = pubsub.lifecycle.initialize(pubsub.application)
if err != nil {
return nil, err
}
pubsub.pingpong()
return pubsub.application, nil
}
func (opt *AdapterOption) DecorateAdapter(wrap func(adapter IAdapter) (application IAdapter)) *AdapterOption {
opt.decorateAdapter = wrap
return opt
}
func (opt *AdapterOption) Identifier(identifier string) *AdapterOption {
pubsub := opt.adapter
pubsub.identifier = identifier
return opt
}
func (opt *AdapterOption) Logger(logger Logger) *AdapterOption {
pubsub := opt.adapter
pubsub.logger = logger
return opt
}
func (opt *AdapterOption) AdapterHub(hub AdapterHub) *AdapterOption {
pubsub := opt.adapter
pubsub.hub = hub
return opt
}
func (opt *AdapterOption) Lifecycle(setup func(life *Lifecycle)) *AdapterOption {
if setup != nil {
setup(opt.adapter.lifecycle)
}
return opt
}
func (opt *AdapterOption) IngressMux(mux *Mux) *AdapterOption {
sub := opt.adapter
sub.ingressMux = mux
return opt
}
func (opt *AdapterOption) RawRecv(recv func(logger Logger) (message *Message, err error)) *AdapterOption {
sub := opt.adapter
sub.rawRecv = recv
return opt
}
func (opt *AdapterOption) EgressMux(mux *Mux) *AdapterOption {
sub := opt.adapter
sub.egressMux = mux
return opt
}
func (opt *AdapterOption) RawSend(send func(logger Logger, message *Message) error) *AdapterOption {
pub := opt.adapter
pub.rawSend = send
return opt
}
func (opt *AdapterOption) RawStop(stop func(logger Logger) error) *AdapterOption {
pubsub := opt.adapter
pubsub.rawStop = stop
return opt
}
func (opt *AdapterOption) RawFixup(maxRetrySecond int, fixup func(IAdapter) error) *AdapterOption {
if maxRetrySecond < 0 {
return opt
}
if maxRetrySecond == 0 {
const RetryUntilAdapterStop = 0
maxRetrySecond = RetryUntilAdapterStop
}
pubsub := opt.adapter
pubsub.fixupMaxRetrySecond = maxRetrySecond
pubsub.rawFixup = fixup
return opt
}
// SendPing
//
// When SendPingWaitPong sends a ping message and waits for a corresponding pong message.
// SendPeriod = WaitSecond / 2
func (opt *AdapterOption) SendPing(sendPingSeconds int, waitPong WaitPingPong, sendPing func(IAdapter) error) *AdapterOption {
sendSeconds := sendPingSeconds
if sendSeconds < 0 {
return opt
}
if sendSeconds == 0 {
sendSeconds = 30
}
pubsub := opt.adapter
pubsub.pp = func() error {
return SendPingWaitPong(sendSeconds, func() error { return sendPing(pubsub) }, waitPong, pubsub.IsStopped)
}
return opt
}
// WaitPing
//
// When WaitPingSendPong waits for a ping message and response a corresponding pong message.
// SendPeriod = WaitSecond
func (opt *AdapterOption) WaitPing(waitPingSeconds int, waitPing WaitPingPong, sendPong func(IAdapter) error) *AdapterOption {
waitSeconds := waitPingSeconds
if waitSeconds < 0 {
return opt
}
if waitSeconds == 0 {
waitSeconds = 30
}
pubsub := opt.adapter
pubsub.pp = func() error {
return WaitPingSendPong(waitSeconds, waitPing, func() error { return sendPong(pubsub) }, pubsub.IsStopped)
}
return opt
}
func (opt *AdapterOption) RawInfra(infra any) *AdapterOption {
pubsub := opt.adapter
pubsub.rawInfra = infra
return opt
}