From 1a2e43a2b672fca0c168898f19d5ff827fa619cf Mon Sep 17 00:00:00 2001 From: jebbs Date: Wed, 12 Oct 2022 11:54:43 +0800 Subject: [PATCH] tidy up balancer package exports --- balancer/balancer.go | 22 +++++++-------- balancer/healthcheck.go | 24 ++++++++--------- balancer/healthcheck_nodes.go | 51 ++++++++++++++++++++--------------- balancer/leastload.go | 4 +-- balancer/leastping.go | 4 +-- balancer/node.go | 17 ------------ balancer/rtt_storage.go | 9 ++++++- 7 files changed, 64 insertions(+), 67 deletions(-) delete mode 100644 balancer/node.go diff --git a/balancer/balancer.go b/balancer/balancer.go index 6430a58b..1df7840b 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -32,7 +32,7 @@ type rttBasedBalancer struct { costs *WeightManager } -type rttFunc func(node *Node) time.Duration +type rttFunc func(n *Node) time.Duration // newRTTBasedLoad creates a new rtt based load balancer func newRTTBasedBalancer( @@ -56,12 +56,12 @@ func newRTTBasedBalancer( // Select selects qualified nodes func (s *rttBasedBalancer) Networks() []string { hasTCP, hasUDP := false, false - nodes := s.HealthCheck.NodesByCategory("") - for _, node := range nodes.Qualified { - if !hasTCP && common.Contains(node.Networks, N.NetworkTCP) { + nodes := s.HealthCheck.Nodes("") + for _, n := range nodes.Qualified { + if !hasTCP && common.Contains(n.Networks, N.NetworkTCP) { hasTCP = true } - if !hasUDP && common.Contains(node.Networks, N.NetworkUDP) { + if !hasUDP && common.Contains(n.Networks, N.NetworkUDP) { hasUDP = true } if hasTCP && hasUDP { @@ -69,11 +69,11 @@ func (s *rttBasedBalancer) Networks() []string { } } if !hasTCP && !hasUDP { - for _, node := range nodes.Untested { - if !hasTCP && common.Contains(node.Networks, N.NetworkTCP) { + for _, n := range nodes.Untested { + if !hasTCP && common.Contains(n.Networks, N.NetworkTCP) { hasTCP = true } - if !hasUDP && common.Contains(node.Networks, N.NetworkUDP) { + if !hasUDP && common.Contains(n.Networks, N.NetworkUDP) { hasUDP = true } if hasTCP && hasUDP { @@ -93,12 +93,12 @@ func (s *rttBasedBalancer) Networks() []string { // Select selects qualified nodes func (s *rttBasedBalancer) Pick(network string) string { - nodes := s.HealthCheck.NodesByCategory(network) + nodes := s.HealthCheck.Nodes(network) var candidates []*Node if len(nodes.Qualified) > 0 { candidates = nodes.Qualified - for _, node := range candidates { - node.Weighted = time.Duration(s.costs.Apply(node.Tag, float64(s.rttFunc(node)))) + for _, n := range candidates { + n.Weighted = time.Duration(s.costs.Apply(n.Tag, float64(s.rttFunc(n)))) } sortNodes(candidates) } else { diff --git a/balancer/healthcheck.go b/balancer/healthcheck.go index ece6f0bc..16099d48 100644 --- a/balancer/healthcheck.go +++ b/balancer/healthcheck.go @@ -14,7 +14,7 @@ import ( // HealthCheck is the health checker for balancers type HealthCheck struct { - sync.Mutex + mutex sync.Mutex ticker *time.Ticker router adapter.Router @@ -64,8 +64,8 @@ func NewHealthCheck(router adapter.Router, tags []string, logger log.Logger, con // Start starts the health check service func (h *HealthCheck) Start() error { - h.Lock() - defer h.Unlock() + h.mutex.Lock() + defer h.mutex.Unlock() if h.ticker != nil { return nil } @@ -88,8 +88,8 @@ func (h *HealthCheck) Start() error { // Stop stops the health check service func (h *HealthCheck) Stop() { - h.Lock() - defer h.Unlock() + h.mutex.Lock() + defer h.mutex.Unlock() if h.ticker != nil { h.ticker.Stop() h.ticker = nil @@ -116,8 +116,8 @@ func (h *HealthCheck) doCheck(duration time.Duration, rounds int) { } ch := make(chan *rtt, count) // rtts := make(map[string][]time.Duration) - for _, node := range nodes { - tag, detour := node.Tag(), node + for _, n := range nodes { + tag, detour := n.Tag(), n client := newPingClient( detour, h.options.Destination, @@ -164,15 +164,15 @@ func (h *HealthCheck) doCheck(duration time.Duration, rounds int) { if rtt.value > 0 { // h.logger.Debug("ping ", rtt.tag, ":", rtt.value) // should not put results when network is down - h.PutResult(rtt.tag, rtt.value) + h.putResult(rtt.tag, rtt.value) } } } -// PutResult put a ping rtt to results -func (h *HealthCheck) PutResult(tag string, rtt time.Duration) { - h.Lock() - defer h.Unlock() +// putResult put a ping rtt to results +func (h *HealthCheck) putResult(tag string, rtt time.Duration) { + h.mutex.Lock() + defer h.mutex.Unlock() r, ok := h.results[tag] if !ok { // the result may come after the node is removed diff --git a/balancer/healthcheck_nodes.go b/balancer/healthcheck_nodes.go index aac4cf90..e412796b 100644 --- a/balancer/healthcheck_nodes.go +++ b/balancer/healthcheck_nodes.go @@ -8,21 +8,28 @@ import ( "github.com/sagernet/sing/common" ) -// CategorizedNodes holds the categorized nodes -type CategorizedNodes struct { +// Nodes holds the categorized nodes +type Nodes struct { Qualified, Unqualified []*Node Failed, Untested []*Node } -// NodesByCategory returns the categorized nodes for specific network. +// Node is a banalcer Node with health check result +type Node struct { + Tag string + Networks []string + RTTStats +} + +// Nodes returns the categorized nodes for specific network. // If network is empty, all nodes are returned. -func (h *HealthCheck) NodesByCategory(network string) *CategorizedNodes { - h.Lock() - defer h.Unlock() +func (h *HealthCheck) Nodes(network string) *Nodes { + h.mutex.Lock() + defer h.mutex.Unlock() if h == nil || len(h.results) == 0 { - return &CategorizedNodes{} + return &Nodes{} } - nodes := &CategorizedNodes{ + nodes := &Nodes{ Qualified: make([]*Node, 0, len(h.results)), Unqualified: make([]*Node, 0, len(h.results)), Failed: make([]*Node, 0, len(h.results)), @@ -32,21 +39,21 @@ func (h *HealthCheck) NodesByCategory(network string) *CategorizedNodes { if network != "" && !common.Contains(result.networks, network) { continue } - node := &Node{ + n := &Node{ Tag: tag, Networks: result.networks, RTTStats: result.rttStorage.Get(), } switch { - case node.RTTStats.All == 0: - nodes.Untested = append(nodes.Untested, node) - case node.RTTStats.All == node.RTTStats.Fail, - float64(node.Fail)/float64(node.All) > float64(h.options.Tolerance): - nodes.Failed = append(nodes.Failed, node) - case h.options.MaxRTT > 0 && node.Average > time.Duration(h.options.MaxRTT): - nodes.Unqualified = append(nodes.Unqualified, node) + case n.RTTStats.All == 0: + nodes.Untested = append(nodes.Untested, n) + case n.RTTStats.All == n.RTTStats.Fail, + float64(n.Fail)/float64(n.All) > float64(h.options.Tolerance): + nodes.Failed = append(nodes.Failed, n) + case h.options.MaxRTT > 0 && n.Average > time.Duration(h.options.MaxRTT): + nodes.Unqualified = append(nodes.Unqualified, n) default: - nodes.Qualified = append(nodes.Qualified, node) + nodes.Qualified = append(nodes.Qualified, n) } } return nodes @@ -69,13 +76,13 @@ func CoveredOutbounds(router adapter.Router, tags []string) []adapter.Outbound { // refreshNodes matches nodes from router by tag prefix, and refreshes the health check results func (h *HealthCheck) refreshNodes() []adapter.Outbound { - h.Lock() - defer h.Unlock() + h.mutex.Lock() + defer h.mutex.Unlock() nodes := CoveredOutbounds(h.router, h.tags) tags := make(map[string]struct{}) - for _, node := range nodes { - tag := node.Tag() + for _, n := range nodes { + tag := n.Tag() tags[tag] = struct{}{} // make it known to the health check results _, ok := h.results[tag] @@ -87,7 +94,7 @@ func (h *HealthCheck) refreshNodes() []adapter.Outbound { validity := time.Duration(h.options.Interval) * time.Duration(h.options.SamplingCount) * 2 h.results[tag] = &result{ // tag: tag, - networks: node.Network(), + networks: n.Network(), rttStorage: newRTTStorage(h.options.SamplingCount, validity), } } diff --git a/balancer/leastload.go b/balancer/leastload.go index b8270bb2..d7b962d6 100644 --- a/balancer/leastload.go +++ b/balancer/leastload.go @@ -15,8 +15,8 @@ func NewLeastLoad( ) (Balancer, error) { return newRTTBasedBalancer( router, logger, options, - func(node *Node) time.Duration { - return node.Deviation + func(n *Node) time.Duration { + return n.Deviation }, ) } diff --git a/balancer/leastping.go b/balancer/leastping.go index 14752a0a..60e850c4 100644 --- a/balancer/leastping.go +++ b/balancer/leastping.go @@ -15,8 +15,8 @@ func NewLeastPing( ) (Balancer, error) { return newRTTBasedBalancer( router, logger, options, - func(node *Node) time.Duration { - return node.Average + func(n *Node) time.Duration { + return n.Average }, ) } diff --git a/balancer/node.go b/balancer/node.go deleted file mode 100644 index e0134c62..00000000 --- a/balancer/node.go +++ /dev/null @@ -1,17 +0,0 @@ -package balancer - -var healthPingStatsUntested = RTTStats{ - All: 0, - Fail: 0, - Deviation: rttUntested, - Average: rttUntested, - Max: rttUntested, - Min: rttUntested, -} - -// Node is a banalcer node with health check result -type Node struct { - Tag string - Networks []string - RTTStats -} diff --git a/balancer/rtt_storage.go b/balancer/rtt_storage.go index 37034282..59253bed 100644 --- a/balancer/rtt_storage.go +++ b/balancer/rtt_storage.go @@ -120,7 +120,14 @@ func (h *rttStorage) getStatistics() RTTStats { } switch { case stats.All == 0: - return healthPingStatsUntested + return RTTStats{ + All: 0, + Fail: 0, + Deviation: rttUntested, + Average: rttUntested, + Max: rttUntested, + Min: rttUntested, + } case stats.Fail == stats.All: return RTTStats{ All: stats.All,