Merge f7e7e7d372cd95e0cf3dfaa540d6cba6dc82c0a3 into c3cc01088007ee935036f93d1b01b2081a57175a

This commit is contained in:
Liberty 2025-06-06 15:29:48 +00:00 committed by GitHub
commit bfbdd0fec2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 33 additions and 6 deletions

View File

@ -14,7 +14,8 @@
"interval": "",
"tolerance": 0,
"idle_timeout": "",
"interrupt_exist_connections": false
"interrupt_exist_connections": false,
"test_concurrency": 10
}
```
@ -47,3 +48,7 @@ The idle timeout. `30m` will be used if empty.
Interrupt existing connections when the selected outbound has changed.
Only inbound connections are affected by this setting, internal connections will always be interrupted.
#### test_concurrency
URL Test concurrency. `10` will be used if empty.

View File

@ -14,7 +14,8 @@
"interval": "",
"tolerance": 50,
"idle_timeout": "",
"interrupt_exist_connections": false
"interrupt_exist_connections": false,
"test_concurrency": 10
}
```
@ -47,3 +48,8 @@
当选定的出站发生更改时,中断现有连接。
仅入站连接受此设置影响,内部连接将始终被中断。
#### test_concurrency
测试的并发连接数。 默认使用 `10`

View File

@ -15,4 +15,5 @@ type URLTestOutboundOptions struct {
Tolerance uint16 `json:"tolerance,omitempty"`
IdleTimeout badoption.Duration `json:"idle_timeout,omitempty"`
InterruptExistConnections bool `json:"interrupt_exist_connections,omitempty"`
TestConcurrency uint16 `json:"test_concurrency,omitempty"`
}

View File

@ -44,9 +44,14 @@ type URLTest struct {
idleTimeout time.Duration
group *URLTestGroup
interruptExternalConnections bool
testConcurrency uint16
}
func NewURLTest(ctx context.Context, router adapter.Router, logger log.ContextLogger, tag string, options option.URLTestOutboundOptions) (adapter.Outbound, error) {
testConcurrency := options.TestConcurrency
if testConcurrency == 0 {
testConcurrency = 10 // Default concurrency
}
outbound := &URLTest{
Adapter: outbound.NewAdapter(C.TypeURLTest, tag, []string{N.NetworkTCP, N.NetworkUDP}, options.Outbounds),
ctx: ctx,
@ -60,6 +65,7 @@ func NewURLTest(ctx context.Context, router adapter.Router, logger log.ContextLo
tolerance: options.Tolerance,
idleTimeout: time.Duration(options.IdleTimeout),
interruptExternalConnections: options.InterruptExistConnections,
testConcurrency: testConcurrency,
}
if len(outbound.tags) == 0 {
return nil, E.New("missing tags")
@ -76,7 +82,7 @@ func (s *URLTest) Start() error {
}
outbounds = append(outbounds, detour)
}
group, err := NewURLTestGroup(s.ctx, s.outbound, s.logger, outbounds, s.link, s.interval, s.tolerance, s.idleTimeout, s.interruptExternalConnections)
group, err := NewURLTestGroup(s.ctx, s.outbound, s.logger, outbounds, s.link, s.interval, s.tolerance, s.idleTimeout, s.interruptExternalConnections, s.testConcurrency)
if err != nil {
return err
}
@ -193,9 +199,10 @@ type URLTestGroup struct {
close chan struct{}
started bool
lastActive atomic.TypedValue[time.Time]
testConcurrency uint16
}
func NewURLTestGroup(ctx context.Context, outboundManager adapter.OutboundManager, logger log.Logger, outbounds []adapter.Outbound, link string, interval time.Duration, tolerance uint16, idleTimeout time.Duration, interruptExternalConnections bool) (*URLTestGroup, error) {
func NewURLTestGroup(ctx context.Context, outboundManager adapter.OutboundManager, logger log.Logger, outbounds []adapter.Outbound, link string, interval time.Duration, tolerance uint16, idleTimeout time.Duration, interruptExternalConnections bool, testConcurrency uint16) (*URLTestGroup, error) {
if interval == 0 {
interval = C.DefaultURLTestInterval
}
@ -230,6 +237,7 @@ func NewURLTestGroup(ctx context.Context, outboundManager adapter.OutboundManage
pause: service.FromContext[pause.Manager](ctx),
interruptGroup: interrupt.NewGroup(),
interruptExternalConnections: interruptExternalConnections,
testConcurrency: testConcurrency,
}, nil
}
@ -350,7 +358,11 @@ func (g *URLTestGroup) urlTest(ctx context.Context, force bool) (map[string]uint
return result, nil
}
defer g.checking.Store(false)
b, _ := batch.New(ctx, batch.WithConcurrencyNum[any](10))
concurrency := int(g.testConcurrency)
if concurrency <= 0 {
concurrency = 10 // Fallback to a minimum sensible default if somehow it's zero or negative
}
b, _ := batch.New(ctx, batch.WithConcurrencyNum[any](concurrency))
checked := make(map[string]bool)
var resultAccess sync.Mutex
for _, detour := range g.outbounds {
@ -384,6 +396,7 @@ func (g *URLTestGroup) urlTest(ctx context.Context, force bool) (map[string]uint
resultAccess.Lock()
result[tag] = t
resultAccess.Unlock()
g.performUpdateCheck()
}
return nil, nil
})
@ -394,6 +407,8 @@ func (g *URLTestGroup) urlTest(ctx context.Context, force bool) (map[string]uint
}
func (g *URLTestGroup) performUpdateCheck() {
g.access.Lock()
defer g.access.Unlock()
var updated bool
if outbound, exists := g.Select(N.NetworkTCP); outbound != nil && (g.selectedOutboundTCP == nil || (exists && outbound != g.selectedOutboundTCP)) {
if g.selectedOutboundTCP != nil {