-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlistener.go
More file actions
314 lines (272 loc) · 7.89 KB
/
Copy pathlistener.go
File metadata and controls
314 lines (272 loc) · 7.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
package pointsub
import (
"context"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
)
// ListenOption 定义监听器选项
type ListenOption func(*listenOptions)
type listenOptions struct {
bufferSize int // 流缓冲区大小
idleTimeout time.Duration // 监听器空闲超时
enableLargeMessageMode bool // 启用大消息模式
largeMessageSize int // 预期大消息大小,用于自适应优化
enableAdaptiveChunking bool // 启用自适应分块
chunkSize int // 分块大小
noBlockDelay bool // 是否禁用块间延迟
blockDelay time.Duration // 块间延迟
customReadBufferSize int // 自定义读缓冲区大小
customWriteBufferSize int // 自定义写缓冲区大小
maxConcurrentConnections int // 最大并发连接数
}
// WithBufferSize 设置流缓冲区大小
func WithBufferSize(size int) ListenOption {
return func(o *listenOptions) {
o.bufferSize = size
}
}
// WithIdleTimeout 设置监听器空闲超时
func WithIdleTimeout(timeout time.Duration) ListenOption {
return func(o *listenOptions) {
o.idleTimeout = timeout
}
}
// WithLargeMessageSupport 为监听器启用大消息支持
func WithLargeMessageSupport(expectedSize int) ListenOption {
return func(o *listenOptions) {
o.enableLargeMessageMode = true
o.largeMessageSize = expectedSize
}
}
// WithListenerAdaptiveChunking 为监听器启用自适应分块
func WithListenerAdaptiveChunking(enabled bool) ListenOption {
return func(o *listenOptions) {
o.enableAdaptiveChunking = enabled
}
}
// WithListenerChunkSize 设置监听器的分块大小
func WithListenerChunkSize(size int) ListenOption {
return func(o *listenOptions) {
o.chunkSize = size
}
}
// WithListenerNoBlockDelay 为监听器禁用块间延迟
func WithListenerNoBlockDelay() ListenOption {
return func(o *listenOptions) {
o.noBlockDelay = true
}
}
// WithListenerBlockDelay 设置监听器的块间延迟
func WithListenerBlockDelay(delay time.Duration) ListenOption {
return func(o *listenOptions) {
o.blockDelay = delay
}
}
// WithListenerBufferSizes 设置监听器的读写缓冲区大小
func WithListenerBufferSizes(readSize, writeSize int) ListenOption {
return func(o *listenOptions) {
o.customReadBufferSize = readSize
o.customWriteBufferSize = writeSize
}
}
// WithMaxConcurrentConnections 设置最大并发连接数
func WithMaxConcurrentConnections(max int) ListenOption {
return func(o *listenOptions) {
o.maxConcurrentConnections = max
}
}
// listener 是 net.Listener 的实现,用于处理来自 libp2p 连接的标记流。
type listener struct {
host host.Host
ctx context.Context
tag protocol.ID
cancel func()
streamCh chan network.Stream
closedMu sync.RWMutex
closed bool
// 监控
stats *listenerStats
// 监听器选项
opts listenOptions
}
// 监听器统计
type listenerStats struct {
accepted int64
dropped int64
mu sync.Mutex
}
// Accept 返回此监听器的下一个连接。
// 如果没有连接,它会阻塞。在底层,连接是 libp2p 流。
func (l *listener) Accept() (net.Conn, error) {
l.closedMu.RLock()
if l.closed {
l.closedMu.RUnlock()
return nil, net.ErrClosed
}
l.closedMu.RUnlock()
select {
case s := <-l.streamCh:
if l.stats != nil {
l.stats.mu.Lock()
l.stats.accepted++
l.stats.mu.Unlock()
}
// 创建基本连接
conn := newConn(s)
// 如果启用了大消息模式,应用大消息优化
if l.opts.enableLargeMessageMode {
// 准备大消息选项
var largeOpts []LargeMessageOption
// 自适应分块
if l.opts.enableAdaptiveChunking {
largeOpts = append(largeOpts, WithAdaptiveChunking(true))
} else {
largeOpts = append(largeOpts, WithAdaptiveChunking(false))
}
// 块大小
if l.opts.chunkSize > 0 {
largeOpts = append(largeOpts, WithChunkSize(l.opts.chunkSize))
}
// 块间延迟
if l.opts.noBlockDelay {
largeOpts = append(largeOpts, WithNoBlockDelay())
} else if l.opts.blockDelay > 0 {
largeOpts = append(largeOpts, WithBlockDelay(l.opts.blockDelay))
}
// 自定义缓冲区
if l.opts.customReadBufferSize > 0 || l.opts.customWriteBufferSize > 0 {
readSize := l.opts.customReadBufferSize
if readSize <= 0 {
readSize = DefaultReadBufferSize
}
writeSize := l.opts.customWriteBufferSize
if writeSize <= 0 {
writeSize = DefaultWriteBufferSize
}
largeOpts = append(largeOpts, WithBufferSizes(readSize, writeSize))
}
// 创建优化连接
return NewLargeMessageConn(conn, l.opts.largeMessageSize, largeOpts...), nil
}
return conn, nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
}
}
// Close 终止此监听器。它将不再处理任何传入的流
func (l *listener) Close() error {
l.closedMu.Lock()
defer l.closedMu.Unlock()
if l.closed {
return nil
}
l.closed = true
l.cancel()
l.host.RemoveStreamHandler(l.tag)
return nil
}
// Addr 返回监听器的地址
func (l *listener) Addr() net.Addr {
return &addr{l.host.ID()}
}
// Stats 返回监听器的统计信息
func (l *listener) Stats() (accepted, dropped int64) {
if l.stats == nil {
return 0, 0
}
l.stats.mu.Lock()
accepted = l.stats.accepted
dropped = l.stats.dropped
l.stats.mu.Unlock()
return
}
// Listen 创建一个新的监听器,将为给定的协议标签处理传入的流
func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
return ListenWithOptions(h, tag)
}
// ListenWithOptions 创建一个新的监听器,将为给定的协议标签处理传入的流,支持自定义选项
func ListenWithOptions(h host.Host, tag protocol.ID, opts ...ListenOption) (net.Listener, error) {
// 处理选项
options := listenOptions{
bufferSize: 256,
idleTimeout: 60 * time.Second,
enableLargeMessageMode: false,
enableAdaptiveChunking: true,
chunkSize: DefaultMediumChunkSize,
noBlockDelay: false,
blockDelay: time.Millisecond,
}
for _, opt := range opts {
opt(&options)
}
ctx, cancel := context.WithCancel(context.Background())
l := &listener{
host: h,
ctx: ctx,
tag: tag,
cancel: cancel,
streamCh: make(chan network.Stream, options.bufferSize),
stats: &listenerStats{},
opts: options,
}
h.SetStreamHandler(tag, func(s network.Stream) {
select {
case l.streamCh <- s:
// 流被接受
case <-ctx.Done():
// 监听器已关闭
logger.Warnf("流被拒绝,监听器已关闭: %s", s.ID())
s.Reset()
if l.stats != nil {
l.stats.mu.Lock()
l.stats.dropped++
l.stats.mu.Unlock()
}
default:
if l.stats != nil {
l.stats.mu.Lock()
l.stats.dropped++
l.stats.mu.Unlock()
}
// 没有消费者就拒绝流
logger.Warnf("流被拒绝,缓冲区已满: %s", s.ID())
s.Reset()
}
})
// 如果设置了空闲超时,启动超时检查协程
if options.idleTimeout > 0 {
go func() {
defer cancel()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
var lastAccepted, lastDropped int64
lastActivity := time.Now()
for {
select {
case <-ticker.C:
// 检查是否有活动
accepted, dropped := l.Stats()
// 如果有新的接受或丢弃,更新最后活动时间
if accepted > lastAccepted || dropped > lastDropped {
lastAccepted = accepted
lastDropped = dropped
lastActivity = time.Now()
} else if time.Since(lastActivity) > options.idleTimeout {
// 空闲超时,关闭监听器
logger.Infof("监听器空闲超时,关闭: %s", tag)
_ = l.Close()
return
}
case <-ctx.Done():
return
}
}
}()
}
logger.Infof("监听器启动成功, 协议: %s, 缓冲区: %d", tag, options.bufferSize)
return l, nil
}