diff --git a/balancer/healthcheck.go b/balancer/healthcheck.go index d8a5a2ec..c636bd31 100644 --- a/balancer/healthcheck.go +++ b/balancer/healthcheck.go @@ -76,11 +76,9 @@ func (h *HealthCheck) Start() error { interval := time.Duration(h.options.Interval) * time.Duration(h.options.SamplingCount) ticker := time.NewTicker(interval) h.ticker = ticker - // one time instant check - h.Check() go func() { for { - h.doCheck(interval, h.options.SamplingCount) + h.CheckNodes() _, ok := <-ticker.C if !ok { break @@ -101,9 +99,14 @@ func (h *HealthCheck) Close() error { return nil } -// Check does a one time health check +// Check does a one time instant health check func (h *HealthCheck) Check() { - go h.doCheck(0, 1) + h.mutex.Lock() + nodes := h.refreshNodes() + h.mutex.Unlock() + for _, n := range nodes { + go h.checkNode(n) + } } type rtt struct { @@ -111,71 +114,47 @@ type rtt struct { value time.Duration } -// doCheck performs the 'rounds' amount checks in given 'duration'. You should make -// sure all tags are valid for current balancer -func (h *HealthCheck) doCheck(duration time.Duration, rounds int) { +// CheckNodes performs checks for all nodes with random delays +func (h *HealthCheck) CheckNodes() { + h.mutex.Lock() nodes := h.refreshNodes() - count := len(nodes) * rounds - if count == 0 { - return - } - ch := make(chan *rtt, count) - // rtts := make(map[string][]time.Duration) + h.mutex.Unlock() for _, n := range nodes { - tag, detour := n.Tag(), n - client := newPingClient( - detour, - h.options.Destination, - time.Duration(h.options.Timeout), - ) - for i := 0; i < rounds; i++ { - delay := time.Duration(0) - if duration > 0 { - delay = time.Duration(rand.Intn(int(duration))) - } - time.AfterFunc(delay, func() { - // h.logger.Debug("checking ", tag) - delay, err := client.MeasureDelay() - if err == nil { - ch <- &rtt{ - tag: tag, - value: delay, - } - return - } - if !h.checkConnectivity() { - h.logger.Debug("network is down") - ch <- &rtt{ - tag: tag, - value: 0, - } - return - } - h.logger.Debug( - E.Cause( - err, - fmt.Sprintf("ping %s via %s", h.options.Destination, tag), - ), - ) - ch <- &rtt{ - tag: tag, - value: rttFailed, - } - }) - } - } - for i := 0; i < count; i++ { - rtt := <-ch - if rtt.value > 0 { - // h.logger.Debug("ping ", rtt.tag, ":", rtt.value) - // should not put results when network is down - h.putResult(rtt.tag, rtt.value) - } + delay := time.Duration(rand.Intn(int(h.options.Interval))) + time.AfterFunc(delay, func() { + h.checkNode(n) + }) } } -// putResult put a ping rtt to results -func (h *HealthCheck) putResult(tag string, rtt time.Duration) { +func (h *HealthCheck) checkNode(detour adapter.Outbound) { + tag := detour.Tag() + client := newPingClient( + detour, + h.options.Destination, + time.Duration(h.options.Timeout), + ) + // h.logger.Debug("checking ", tag) + delay, err := client.MeasureDelay() + if err == nil { + h.PutResult(tag, delay) + return + } + if !h.checkConnectivity() { + h.logger.Debug("network is down") + return + } + h.logger.Debug( + E.Cause( + err, + fmt.Sprintf("ping %s via %s", h.options.Destination, tag), + ), + ) + h.PutResult(tag, rttFailed) +} + +// PutResult put a ping rtt to results +func (h *HealthCheck) PutResult(tag string, rtt time.Duration) { h.mutex.Lock() defer h.mutex.Unlock() r, ok := h.results[tag] diff --git a/balancer/healthcheck_nodes.go b/balancer/healthcheck_nodes.go index e412796b..e5a2e6d4 100644 --- a/balancer/healthcheck_nodes.go +++ b/balancer/healthcheck_nodes.go @@ -26,6 +26,10 @@ type Node struct { func (h *HealthCheck) Nodes(network string) *Nodes { h.mutex.Lock() defer h.mutex.Unlock() + + // fetech nodes from router, may have newly added untested nodes + h.refreshNodes() + if h == nil || len(h.results) == 0 { return &Nodes{} } @@ -76,9 +80,6 @@ func CoveredOutbounds(router adapter.Router, tags []string) []adapter.Outbound { // refreshNodes matches nodes from router by tag prefix, and refreshes the health check results func (h *HealthCheck) refreshNodes() []adapter.Outbound { - h.mutex.Lock() - defer h.mutex.Unlock() - nodes := CoveredOutbounds(h.router, h.tags) tags := make(map[string]struct{}) for _, n := range nodes { @@ -97,6 +98,8 @@ func (h *HealthCheck) refreshNodes() []adapter.Outbound { networks: n.Network(), rttStorage: newRTTStorage(h.options.SamplingCount, validity), } + + go h.checkNode(n) } } // remove unused rttStorage