healthcheck optimizes for the situation where outbounds can be added at any time

* simplfiy nodes check logic
* don't schedule check tasks for multiple rounds
This commit is contained in:
jebbs 2022-10-17 10:18:42 +08:00
parent 28c21f1433
commit 105a39d6e1
2 changed files with 50 additions and 68 deletions

View File

@ -76,11 +76,9 @@ func (h *HealthCheck) Start() error {
interval := time.Duration(h.options.Interval) * time.Duration(h.options.SamplingCount)
ticker := time.NewTicker(interval)
h.ticker = ticker
// one time instant check
h.Check()
go func() {
for {
h.doCheck(interval, h.options.SamplingCount)
h.CheckNodes()
_, ok := <-ticker.C
if !ok {
break
@ -101,9 +99,14 @@ func (h *HealthCheck) Close() error {
return nil
}
// Check does a one time health check
// Check does a one time instant health check
func (h *HealthCheck) Check() {
go h.doCheck(0, 1)
h.mutex.Lock()
nodes := h.refreshNodes()
h.mutex.Unlock()
for _, n := range nodes {
go h.checkNode(n)
}
}
type rtt struct {
@ -111,71 +114,47 @@ type rtt struct {
value time.Duration
}
// doCheck performs the 'rounds' amount checks in given 'duration'. You should make
// sure all tags are valid for current balancer
func (h *HealthCheck) doCheck(duration time.Duration, rounds int) {
// CheckNodes performs checks for all nodes with random delays
func (h *HealthCheck) CheckNodes() {
h.mutex.Lock()
nodes := h.refreshNodes()
count := len(nodes) * rounds
if count == 0 {
return
}
ch := make(chan *rtt, count)
// rtts := make(map[string][]time.Duration)
h.mutex.Unlock()
for _, n := range nodes {
tag, detour := n.Tag(), n
client := newPingClient(
detour,
h.options.Destination,
time.Duration(h.options.Timeout),
)
for i := 0; i < rounds; i++ {
delay := time.Duration(0)
if duration > 0 {
delay = time.Duration(rand.Intn(int(duration)))
}
time.AfterFunc(delay, func() {
// h.logger.Debug("checking ", tag)
delay, err := client.MeasureDelay()
if err == nil {
ch <- &rtt{
tag: tag,
value: delay,
}
return
}
if !h.checkConnectivity() {
h.logger.Debug("network is down")
ch <- &rtt{
tag: tag,
value: 0,
}
return
}
h.logger.Debug(
E.Cause(
err,
fmt.Sprintf("ping %s via %s", h.options.Destination, tag),
),
)
ch <- &rtt{
tag: tag,
value: rttFailed,
}
})
}
}
for i := 0; i < count; i++ {
rtt := <-ch
if rtt.value > 0 {
// h.logger.Debug("ping ", rtt.tag, ":", rtt.value)
// should not put results when network is down
h.putResult(rtt.tag, rtt.value)
}
delay := time.Duration(rand.Intn(int(h.options.Interval)))
time.AfterFunc(delay, func() {
h.checkNode(n)
})
}
}
// putResult put a ping rtt to results
func (h *HealthCheck) putResult(tag string, rtt time.Duration) {
func (h *HealthCheck) checkNode(detour adapter.Outbound) {
tag := detour.Tag()
client := newPingClient(
detour,
h.options.Destination,
time.Duration(h.options.Timeout),
)
// h.logger.Debug("checking ", tag)
delay, err := client.MeasureDelay()
if err == nil {
h.PutResult(tag, delay)
return
}
if !h.checkConnectivity() {
h.logger.Debug("network is down")
return
}
h.logger.Debug(
E.Cause(
err,
fmt.Sprintf("ping %s via %s", h.options.Destination, tag),
),
)
h.PutResult(tag, rttFailed)
}
// PutResult put a ping rtt to results
func (h *HealthCheck) PutResult(tag string, rtt time.Duration) {
h.mutex.Lock()
defer h.mutex.Unlock()
r, ok := h.results[tag]

View File

@ -26,6 +26,10 @@ type Node struct {
func (h *HealthCheck) Nodes(network string) *Nodes {
h.mutex.Lock()
defer h.mutex.Unlock()
// fetech nodes from router, may have newly added untested nodes
h.refreshNodes()
if h == nil || len(h.results) == 0 {
return &Nodes{}
}
@ -76,9 +80,6 @@ func CoveredOutbounds(router adapter.Router, tags []string) []adapter.Outbound {
// refreshNodes matches nodes from router by tag prefix, and refreshes the health check results
func (h *HealthCheck) refreshNodes() []adapter.Outbound {
h.mutex.Lock()
defer h.mutex.Unlock()
nodes := CoveredOutbounds(h.router, h.tags)
tags := make(map[string]struct{})
for _, n := range nodes {
@ -97,6 +98,8 @@ func (h *HealthCheck) refreshNodes() []adapter.Outbound {
networks: n.Network(),
rttStorage: newRTTStorage(h.options.SamplingCount, validity),
}
go h.checkNode(n)
}
}
// remove unused rttStorage