extract HealthCheck.NodesByCategory()

This commit is contained in:
jebbs 2022-10-11 10:50:48 +08:00
parent e7093541c1
commit 40ae02fc54
5 changed files with 112 additions and 125 deletions

View File

@ -0,0 +1,46 @@
package balancer
import "time"
// CategorizedNodes holds the categorized nodes
type CategorizedNodes struct {
Qualified, Unqualified []*Node
Failed, Untested []*Node
}
// NodesByCategory returns the categorized nodes
func (h *HealthCheck) NodesByCategory() *CategorizedNodes {
h.Lock()
defer h.Unlock()
if h == nil || h.Results == nil {
return &CategorizedNodes{
Untested: h.nodes,
}
}
nodes := &CategorizedNodes{
Qualified: make([]*Node, 0, len(h.nodes)),
Unqualified: make([]*Node, 0, len(h.nodes)),
Failed: make([]*Node, 0, len(h.nodes)),
Untested: make([]*Node, 0, len(h.nodes)),
}
for _, node := range h.nodes {
r, ok := h.Results[node.Outbound.Tag()]
if !ok {
node.HealthCheckStats = healthPingStatsUntested
continue
}
node.HealthCheckStats = r.Get()
switch {
case node.HealthCheckStats.All == 0:
nodes.Untested = append(nodes.Untested, node)
case node.HealthCheckStats.All == node.HealthCheckStats.Fail,
float64(node.Fail)/float64(node.All) > float64(h.options.Tolerance):
nodes.Failed = append(nodes.Failed, node)
case h.options.MaxRTT > 0 && node.Average > time.Duration(h.options.MaxRTT):
nodes.Unqualified = append(nodes.Unqualified, node)
default:
nodes.Qualified = append(nodes.Qualified, node)
}
}
return nodes
}

View File

@ -20,7 +20,7 @@ type HealthCheckStats struct {
Max time.Duration Max time.Duration
Min time.Duration Min time.Duration
applied time.Duration Weighted time.Duration
} }
// HealthCheckRTTS holds ping rtts for health Checker // HealthCheckRTTS holds ping rtts for health Checker
@ -45,21 +45,24 @@ func NewHealthPingResult(cap int, validity time.Duration) *HealthCheckRTTS {
} }
// Get gets statistics of the HealthPingRTTS // Get gets statistics of the HealthPingRTTS
func (h *HealthCheckRTTS) Get() *HealthCheckStats { func (h *HealthCheckRTTS) Get() HealthCheckStats {
return h.getStatistics() return h.getStatistics()
} }
// GetWithCache get statistics and write cache for next call // GetWithCache get statistics and write cache for next call
// Make sure use Mutex.Lock() before calling it, RWMutex.RLock() // Make sure use Mutex.Lock() before calling it, RWMutex.RLock()
// is not an option since it writes cache // is not an option since it writes cache
func (h *HealthCheckRTTS) GetWithCache() *HealthCheckStats { func (h *HealthCheckRTTS) GetWithCache() HealthCheckStats {
lastPutAt := h.rtts[h.idx].time lastPutAt := h.rtts[h.idx].time
now := time.Now() now := time.Now()
if h.stats == nil || h.lastUpdateAt.Before(lastPutAt) || h.findOutdated(now) >= 0 { if h.stats == nil || h.lastUpdateAt.Before(lastPutAt) || h.findOutdated(now) >= 0 {
h.stats = h.getStatistics() if h.stats == nil {
h.stats = &HealthCheckStats{}
}
*h.stats = h.getStatistics()
h.lastUpdateAt = now h.lastUpdateAt = now
} }
return h.stats return *h.stats
} }
// Put puts a new rtt to the HealthPingResult // Put puts a new rtt to the HealthPingResult
@ -86,14 +89,14 @@ func (h *HealthCheckRTTS) calcIndex(step int) int {
return idx return idx
} }
func (h *HealthCheckRTTS) getStatistics() *HealthCheckStats { func (h *HealthCheckRTTS) getStatistics() HealthCheckStats {
stats := &HealthCheckStats{} stats := HealthCheckStats{}
stats.Fail = 0 stats.Fail = 0
stats.Max = 0 stats.Max = 0
stats.Min = rttFailed stats.Min = rttFailed
sum := time.Duration(0) sum := time.Duration(0)
cnt := 0 cnt := 0
validRTTs := make([]time.Duration, 0) validRTTs := make([]time.Duration, 0, h.cap)
for _, rtt := range h.rtts { for _, rtt := range h.rtts {
switch { switch {
case rtt.value == 0 || time.Since(rtt.time) > h.validity: case rtt.value == 0 || time.Since(rtt.time) > h.validity:
@ -115,9 +118,22 @@ func (h *HealthCheckRTTS) getStatistics() *HealthCheckStats {
stats.All = cnt + stats.Fail stats.All = cnt + stats.Fail
if cnt == 0 { if cnt == 0 {
stats.Min = 0 stats.Min = 0
return stats return healthPingStatsUntested
} }
stats.Average = time.Duration(int(sum) / cnt) stats.Average = time.Duration(int(sum) / cnt)
switch {
case stats.All == 0:
return healthPingStatsUntested
case stats.Fail == stats.All:
return HealthCheckStats{
All: stats.All,
Fail: stats.Fail,
Deviation: rttFailed,
Average: rttFailed,
Max: rttFailed,
Min: rttFailed,
}
}
var std float64 var std float64
if cnt < 2 { if cnt < 2 {
// no enough data for standard deviation, we assume it's half of the average rtt // no enough data for standard deviation, we assume it's half of the average rtt

View File

@ -41,8 +41,17 @@ func NewLeastLoad(
// Select selects qualified nodes // Select selects qualified nodes
func (s *LeastLoad) Select() *Node { func (s *LeastLoad) Select() *Node {
qualified, _ := s.getNodes() nodes := s.HealthCheck.NodesByCategory()
selects := s.selectLeastLoad(qualified) var candidates []*Node
if len(nodes.Qualified) > 0 {
candidates := nodes.Qualified
appliyCost(candidates, s.costs)
leastPingSort(candidates)
} else {
candidates = nodes.Untested
shuffle(candidates)
}
selects := s.selectLeastLoad(candidates)
count := len(selects) count := len(selects)
if count == 0 { if count == 0 {
return nil return nil
@ -89,7 +98,7 @@ func (s *LeastLoad) selectLeastLoad(nodes []*Node) []*Node {
for _, b := range s.options.Baselines { for _, b := range s.options.Baselines {
baseline := time.Duration(b) baseline := time.Duration(b)
for i := 0; i < availableCount; i++ { for i := 0; i < availableCount; i++ {
if nodes[i].applied > baseline { if nodes[i].Weighted > baseline {
break break
} }
count = i + 1 count = i + 1
@ -106,57 +115,21 @@ func (s *LeastLoad) selectLeastLoad(nodes []*Node) []*Node {
return nodes[:count] return nodes[:count]
} }
func (s *LeastLoad) getNodes() ([]*Node, []*Node) { func appliyCost(nodes []*Node, costs *WeightManager) {
s.HealthCheck.Lock() for _, node := range nodes {
defer s.HealthCheck.Unlock() node.Weighted = time.Duration(costs.Apply(node.Outbound.Tag(), float64(node.Deviation)))
qualified := make([]*Node, 0)
unqualified := make([]*Node, 0)
failed := make([]*Node, 0)
untested := make([]*Node, 0)
others := make([]*Node, 0)
for _, node := range s.nodes {
node.FetchStats(s.HealthCheck)
switch {
case node.All == 0:
node.applied = rttUntested
untested = append(untested, node)
case s.options.HealthCheck.MaxRTT > 0 && node.Average > time.Duration(s.options.HealthCheck.MaxRTT):
node.applied = rttUnqualified
unqualified = append(unqualified, node)
case float64(node.Fail)/float64(node.All) > float64(s.options.HealthCheck.Tolerance):
node.applied = rttFailed
if node.All-node.Fail == 0 {
// no good, put them after has-good nodes
node.applied = rttFailed
node.Deviation = rttFailed
node.Average = rttFailed
}
failed = append(failed, node)
default:
node.applied = time.Duration(s.costs.Apply(node.Outbound.Tag(), float64(node.Deviation)))
qualified = append(qualified, node)
}
} }
if len(qualified) > 0 {
leastloadSort(qualified)
others = append(others, unqualified...)
others = append(others, untested...)
others = append(others, failed...)
} else {
qualified = untested
others = append(others, unqualified...)
others = append(others, failed...)
}
return qualified, others
} }
func leastloadSort(nodes []*Node) { func leastloadSort(nodes []*Node) {
sort.Slice(nodes, func(i, j int) bool { sort.Slice(nodes, func(i, j int) bool {
left := nodes[i] left := nodes[i]
right := nodes[j] right := nodes[j]
if left.applied != right.applied { if left.Weighted != right.Weighted {
return left.applied < right.applied return left.Weighted < right.Weighted
}
if left.Deviation != right.Deviation {
return left.Deviation < right.Deviation
} }
if left.Average != right.Average { if left.Average != right.Average {
return left.Average < right.Average return left.Average < right.Average

View File

@ -33,66 +33,27 @@ func NewLeastPing(
// Select selects least ping node // Select selects least ping node
func (s *LeastPing) Select() *Node { func (s *LeastPing) Select() *Node {
qualified, _ := s.getNodes() nodes := s.HealthCheck.NodesByCategory()
if len(qualified) == 0 { var candidates []*Node
if len(nodes.Qualified) > 0 {
candidates := nodes.Qualified
leastPingSort(candidates)
} else {
candidates = nodes.Untested
shuffle(candidates)
}
if len(candidates) == 0 {
return nil return nil
} }
return qualified[0] return candidates[0]
}
func (s *LeastPing) getNodes() ([]*Node, []*Node) {
s.HealthCheck.Lock()
defer s.HealthCheck.Unlock()
qualified := make([]*Node, 0)
unqualified := make([]*Node, 0)
failed := make([]*Node, 0)
untested := make([]*Node, 0)
others := make([]*Node, 0)
for _, node := range s.nodes {
node.FetchStats(s.HealthCheck)
switch {
case node.All == 0:
node.applied = rttUntested
untested = append(untested, node)
case s.options.HealthCheck.MaxRTT > 0 && node.Average > time.Duration(s.options.HealthCheck.MaxRTT):
node.applied = rttUnqualified
unqualified = append(unqualified, node)
case float64(node.Fail)/float64(node.All) > float64(s.options.HealthCheck.Tolerance):
node.applied = rttFailed
if node.All-node.Fail == 0 {
// no good, put them after has-good nodes
node.applied = rttFailed
node.Deviation = rttFailed
node.Average = rttFailed
}
failed = append(failed, node)
default:
node.applied = node.Average
qualified = append(qualified, node)
}
}
if len(qualified) > 0 {
leastPingSort(qualified)
others = append(others, unqualified...)
others = append(others, untested...)
others = append(others, failed...)
} else {
// random node if not tested
shuffle(untested)
qualified = untested
others = append(others, unqualified...)
others = append(others, failed...)
}
return qualified, others
} }
func leastPingSort(nodes []*Node) { func leastPingSort(nodes []*Node) {
sort.Slice(nodes, func(i, j int) bool { sort.Slice(nodes, func(i, j int) bool {
left := nodes[i] left := nodes[i]
right := nodes[j] right := nodes[j]
if left.applied != right.applied { if left.Average != right.Average {
return left.applied < right.applied return left.Average < right.Average
} }
if left.Fail != right.Fail { if left.Fail != right.Fail {
return left.Fail < right.Fail return left.Fail < right.Fail

View File

@ -4,8 +4,13 @@ import (
"github.com/sagernet/sing-box/adapter" "github.com/sagernet/sing-box/adapter"
) )
var healthPingStatsZero = HealthCheckStats{ var healthPingStatsUntested = HealthCheckStats{
applied: rttUntested, All: 0,
Fail: 0,
Deviation: rttUntested,
Average: rttUntested,
Max: rttUntested,
Min: rttUntested,
} }
// Node is a banalcer node with health check result // Node is a banalcer node with health check result
@ -18,20 +23,6 @@ type Node struct {
func NewNode(outbound adapter.Outbound) *Node { func NewNode(outbound adapter.Outbound) *Node {
return &Node{ return &Node{
Outbound: outbound, Outbound: outbound,
HealthCheckStats: healthPingStatsZero, HealthCheckStats: healthPingStatsUntested,
} }
} }
// FetchStats fetches statistics from *HealthPing p
func (s *Node) FetchStats(p *HealthCheck) {
if p == nil || p.Results == nil {
s.HealthCheckStats = healthPingStatsZero
return
}
r, ok := p.Results[s.Outbound.Tag()]
if !ok {
s.HealthCheckStats = healthPingStatsZero
return
}
s.HealthCheckStats = *r.Get()
}