From 3d44156b672648fa3758a6e605fb7912a4d0c6e0 Mon Sep 17 00:00:00 2001 From: jebbs Date: Tue, 11 Oct 2022 14:08:08 +0800 Subject: [PATCH] extract balancer.rttBasedBalancer * leastping shares the same balancer pick settings --- balancer/balancer.go | 145 ++++++++++++++++++++++++++++++++++++++++++ balancer/leastload.go | 135 ++------------------------------------- balancer/leastping.go | 63 ++---------------- option/balancer.go | 25 +++----- option/outbound.go | 15 ++--- outbound/builder.go | 4 +- outbound/leastload.go | 4 +- outbound/leastping.go | 4 +- 8 files changed, 179 insertions(+), 216 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index ac21dd4e..dff1e3c7 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -1,6 +1,151 @@ package balancer +import ( + "math" + "math/rand" + "sort" + "time" + + "github.com/sagernet/sing-box/log" + "github.com/sagernet/sing-box/option" +) + +var _ Balancer = (*rttBasedBalancer)(nil) + // Balancer is interface for load balancers type Balancer interface { Select() *Node } + +type rttBasedBalancer struct { + nodes []*Node + rttFunc rttFunc + options *option.BalancerOutboundOptions + + *HealthCheck + costs *WeightManager +} + +type rttFunc func(node *Node) time.Duration + +// newRTTBasedLoad creates a new rtt based load balancer +func newRTTBasedBalancer( + nodes []*Node, logger log.ContextLogger, + options option.BalancerOutboundOptions, + rttFunc rttFunc, +) (Balancer, error) { + return &rttBasedBalancer{ + nodes: nodes, + rttFunc: rttFunc, + options: &options, + HealthCheck: NewHealthCheck(nodes, logger, &options.Check), + costs: NewWeightManager( + logger, options.Pick.Costs, 1, + func(value, cost float64) float64 { + return value * math.Pow(cost, 0.5) + }, + ), + }, nil +} + +// Select selects qualified nodes +func (s *rttBasedBalancer) Select() *Node { + nodes := s.HealthCheck.NodesByCategory() + var candidates []*Node + if len(nodes.Qualified) > 0 { + candidates = nodes.Qualified + for _, node := range candidates { + node.Weighted = time.Duration(s.costs.Apply(node.Outbound.Tag(), float64(s.rttFunc(node)))) + } + sortNodes(candidates) + } else { + candidates = nodes.Untested + shuffleNodes(candidates) + } + selects := pickNodes( + candidates, s.logger, + int(s.options.Pick.Expected), s.options.Pick.Baselines, + ) + count := len(selects) + if count == 0 { + return nil + } + return selects[rand.Intn(count)] +} + +// pickNodes selects nodes according to Baselines and Expected Count. +// +// The strategy always improves network response speed, not matter which mode below is configurated. +// But they can still have different priorities. +// +// 1. Bandwidth priority: no Baseline + Expected Count > 0.: selects `Expected Count` of nodes. +// (one if Expected Count <= 0) +// +// 2. Bandwidth priority advanced: Baselines + Expected Count > 0. +// Select `Expected Count` amount of nodes, and also those near them according to baselines. +// In other words, it selects according to different Baselines, until one of them matches +// the Expected Count, if no Baseline matches, Expected Count applied. +// +// 3. Speed priority: Baselines + `Expected Count <= 0`. +// go through all baselines until find selects, if not, select none. Used in combination +// with 'balancer.fallbackTag', it means: selects qualified nodes or use the fallback. +func pickNodes(nodes []*Node, logger log.Logger, expected int, baselines []option.Duration) []*Node { + if len(nodes) == 0 { + // s.logger.Debug("no qualified nodes") + return nil + } + expected2 := int(expected) + availableCount := len(nodes) + if expected2 > availableCount { + return nodes + } + + if expected2 <= 0 { + expected2 = 1 + } + if len(baselines) == 0 { + return nodes[:expected2] + } + + count := 0 + // go through all base line until find expected selects + for _, b := range baselines { + baseline := time.Duration(b) + for i := 0; i < availableCount; i++ { + if nodes[i].Weighted > baseline { + break + } + count = i + 1 + } + // don't continue if find expected selects + if count >= expected2 { + logger.Debug("applied baseline: ", baseline) + break + } + } + if expected > 0 && count < expected2 { + count = expected2 + } + return nodes[:count] +} + +func sortNodes(nodes []*Node) { + sort.Slice(nodes, func(i, j int) bool { + left := nodes[i] + right := nodes[j] + if left.Weighted != right.Weighted { + return left.Weighted < right.Weighted + } + if left.Fail != right.Fail { + return left.Fail < right.Fail + } + return left.All > right.All + }) +} + +func shuffleNodes(nodes []*Node) { + rand.Seed(time.Now().Unix()) + rand.Shuffle(len(nodes), func(i, j int) { + nodes[i], nodes[j] = nodes[j], nodes[i] + }) +} diff --git a/balancer/leastload.go b/balancer/leastload.go index 7514eacf..0a7937e0 100644 --- a/balancer/leastload.go +++ b/balancer/leastload.go @@ -1,142 +1,21 @@ package balancer import ( - "math" - "math/rand" - "sort" "time" "github.com/sagernet/sing-box/log" "github.com/sagernet/sing-box/option" ) -var _ Balancer = (*LeastLoad)(nil) - -// LeastLoad is leastload balancer -type LeastLoad struct { - nodes []*Node - options *option.LeastLoadOutboundOptions - - *HealthCheck - costs *WeightManager -} - // NewLeastLoad creates a new LeastLoad outbound func NewLeastLoad( nodes []*Node, logger log.ContextLogger, - options option.LeastLoadOutboundOptions, + options option.BalancerOutboundOptions, ) (Balancer, error) { - return &LeastLoad{ - nodes: nodes, - options: &options, - HealthCheck: NewHealthCheck(nodes, logger, &options.HealthCheck), - costs: NewWeightManager( - logger, options.Costs, 1, - func(value, cost float64) float64 { - return value * math.Pow(cost, 0.5) - }, - ), - }, nil -} - -// Select selects qualified nodes -func (s *LeastLoad) Select() *Node { - nodes := s.HealthCheck.NodesByCategory() - var candidates []*Node - if len(nodes.Qualified) > 0 { - candidates := nodes.Qualified - appliyCost(candidates, s.costs) - leastPingSort(candidates) - } else { - candidates = nodes.Untested - shuffle(candidates) - } - selects := s.selectLeastLoad(candidates) - count := len(selects) - if count == 0 { - return nil - } - return selects[rand.Intn(count)] -} - -// selectLeastLoad selects nodes according to Baselines and Expected Count. -// -// The strategy always improves network response speed, not matter which mode below is configurated. -// But they can still have different priorities. -// -// 1. Bandwidth priority: no Baseline + Expected Count > 0.: selects `Expected Count` of nodes. -// (one if Expected Count <= 0) -// -// 2. Bandwidth priority advanced: Baselines + Expected Count > 0. -// Select `Expected Count` amount of nodes, and also those near them according to baselines. -// In other words, it selects according to different Baselines, until one of them matches -// the Expected Count, if no Baseline matches, Expected Count applied. -// -// 3. Speed priority: Baselines + `Expected Count <= 0`. -// go through all baselines until find selects, if not, select none. Used in combination -// with 'balancer.fallbackTag', it means: selects qualified nodes or use the fallback. -func (s *LeastLoad) selectLeastLoad(nodes []*Node) []*Node { - if len(nodes) == 0 { - // s.logger.Debug("LeastLoad: no qualified nodes") - return nil - } - expected := int(s.options.Expected) - availableCount := len(nodes) - if expected > availableCount { - return nodes - } - - if expected <= 0 { - expected = 1 - } - if len(s.options.Baselines) == 0 { - return nodes[:expected] - } - - count := 0 - // go through all base line until find expected selects - for _, b := range s.options.Baselines { - baseline := time.Duration(b) - for i := 0; i < availableCount; i++ { - if nodes[i].Weighted > baseline { - break - } - count = i + 1 - } - // don't continue if find expected selects - if count >= expected { - s.logger.Debug("applied baseline: ", baseline) - break - } - } - if s.options.Expected > 0 && count < expected { - count = expected - } - return nodes[:count] -} - -func appliyCost(nodes []*Node, costs *WeightManager) { - for _, node := range nodes { - node.Weighted = time.Duration(costs.Apply(node.Outbound.Tag(), float64(node.Deviation))) - } -} - -func leastloadSort(nodes []*Node) { - sort.Slice(nodes, func(i, j int) bool { - left := nodes[i] - right := nodes[j] - if left.Weighted != right.Weighted { - return left.Weighted < right.Weighted - } - if left.Deviation != right.Deviation { - return left.Deviation < right.Deviation - } - if left.Average != right.Average { - return left.Average < right.Average - } - if left.Fail != right.Fail { - return left.Fail < right.Fail - } - return left.All > right.All - }) + return newRTTBasedBalancer( + nodes, logger, options, + func(node *Node) time.Duration { + return node.Deviation + }, + ) } diff --git a/balancer/leastping.go b/balancer/leastping.go index 708eabef..86596544 100644 --- a/balancer/leastping.go +++ b/balancer/leastping.go @@ -1,70 +1,21 @@ package balancer import ( - "math/rand" - "sort" "time" "github.com/sagernet/sing-box/log" "github.com/sagernet/sing-box/option" ) -var _ Balancer = (*LeastPing)(nil) - -// LeastPing is least ping balancer -type LeastPing struct { - nodes []*Node - options *option.LeastPingOutboundOptions - - *HealthCheck -} - // NewLeastPing creates a new LeastPing outbound func NewLeastPing( nodes []*Node, logger log.ContextLogger, - options option.LeastPingOutboundOptions, + options option.BalancerOutboundOptions, ) (Balancer, error) { - return &LeastPing{ - nodes: nodes, - options: &options, - HealthCheck: NewHealthCheck(nodes, logger, &options.HealthCheck), - }, nil -} - -// Select selects least ping node -func (s *LeastPing) Select() *Node { - nodes := s.HealthCheck.NodesByCategory() - var candidates []*Node - if len(nodes.Qualified) > 0 { - candidates := nodes.Qualified - leastPingSort(candidates) - } else { - candidates = nodes.Untested - shuffle(candidates) - } - if len(candidates) == 0 { - return nil - } - return candidates[0] -} - -func leastPingSort(nodes []*Node) { - sort.Slice(nodes, func(i, j int) bool { - left := nodes[i] - right := nodes[j] - if left.Average != right.Average { - return left.Average < right.Average - } - if left.Fail != right.Fail { - return left.Fail < right.Fail - } - return left.All > right.All - }) -} - -func shuffle(nodes []*Node) { - rand.Seed(time.Now().Unix()) - rand.Shuffle(len(nodes), func(i, j int) { - nodes[i], nodes[j] = nodes[j], nodes[i] - }) + return newRTTBasedBalancer( + nodes, logger, options, + func(node *Node) time.Duration { + return node.Average + }, + ) } diff --git a/option/balancer.go b/option/balancer.go index bc59839d..18e50de6 100644 --- a/option/balancer.go +++ b/option/balancer.go @@ -1,17 +1,16 @@ package option -// LeastPingOutboundOptions is the options for leastping outbound -type LeastPingOutboundOptions struct { - BalancerOutboundOptions - // health check settings - HealthCheck HealthCheckSettings `json:"health_check,omitempty"` +// BalancerOutboundOptions is the options for balancer outbound +type BalancerOutboundOptions struct { + Outbounds []string `json:"outbounds"` + Fallback string `json:"fallback,omitempty"` + + Check HealthCheckSettings `json:"check,omitempty"` + Pick BalancerPickOptions `json:"pick,omitempty"` } -// LeastLoadOutboundOptions is the options for leastload outbound -type LeastLoadOutboundOptions struct { - BalancerOutboundOptions - // health check settings - HealthCheck HealthCheckSettings `json:"health_check,omitempty"` +// BalancerPickOptions is the options for balancer outbound picking +type BalancerPickOptions struct { // expected nodes count to select Expected int32 `json:"expected,omitempty"` // ping rtt baselines (ms) @@ -20,12 +19,6 @@ type LeastLoadOutboundOptions struct { Costs []*StrategyWeight `json:"costs,omitempty"` } -// BalancerOutboundOptions is the options for balancer outbound -type BalancerOutboundOptions struct { - Outbounds []string `json:"outbounds"` - Fallback string `json:"fallback,omitempty"` -} - // HealthCheckSettings is the settings for health check type HealthCheckSettings struct { Destination string `json:"destination"` diff --git a/option/outbound.go b/option/outbound.go index 4b5003e6..e37b24fc 100644 --- a/option/outbound.go +++ b/option/outbound.go @@ -22,8 +22,7 @@ type _Outbound struct { SSHOptions SSHOutboundOptions `json:"-"` ShadowTLSOptions ShadowTLSOutboundOptions `json:"-"` SelectorOptions SelectorOutboundOptions `json:"-"` - LeastLoadOptions LeastLoadOutboundOptions `json:"-"` - LeastPingOptions LeastPingOutboundOptions `json:"-"` + BalancerOptions BalancerOutboundOptions `json:"-"` } type Outbound _Outbound @@ -57,10 +56,8 @@ func (h Outbound) MarshalJSON() ([]byte, error) { v = h.ShadowTLSOptions case C.TypeSelector: v = h.SelectorOptions - case C.TypeLeastLoad: - v = h.LeastLoadOptions - case C.TypeLeastPing: - v = h.LeastPingOptions + case C.TypeLeastLoad, C.TypeLeastPing: + v = h.BalancerOptions default: return nil, E.New("unknown outbound type: ", h.Type) } @@ -100,10 +97,8 @@ func (h *Outbound) UnmarshalJSON(bytes []byte) error { v = &h.ShadowTLSOptions case C.TypeSelector: v = &h.SelectorOptions - case C.TypeLeastLoad: - v = &h.LeastLoadOptions - case C.TypeLeastPing: - v = &h.LeastPingOptions + case C.TypeLeastLoad, C.TypeLeastPing: + v = &h.BalancerOptions default: return E.New("unknown outbound type: ", h.Type) } diff --git a/outbound/builder.go b/outbound/builder.go index ab748ed2..ca908cec 100644 --- a/outbound/builder.go +++ b/outbound/builder.go @@ -44,9 +44,9 @@ func New(ctx context.Context, router adapter.Router, logger log.ContextLogger, o case C.TypeSelector: return NewSelector(router, logger, options.Tag, options.SelectorOptions) case C.TypeLeastLoad: - return NewLeastLoad(router, logger, options.Tag, options.LeastLoadOptions) + return NewLeastLoad(router, logger, options.Tag, options.BalancerOptions) case C.TypeLeastPing: - return NewLeastPing(router, logger, options.Tag, options.LeastPingOptions) + return NewLeastPing(router, logger, options.Tag, options.BalancerOptions) default: return nil, E.New("unknown outbound type: ", options.Type) } diff --git a/outbound/leastload.go b/outbound/leastload.go index ce1e0478..ed6153bd 100644 --- a/outbound/leastload.go +++ b/outbound/leastload.go @@ -18,11 +18,11 @@ var ( type LeastLoad struct { *Balancer - options option.LeastLoadOutboundOptions + options option.BalancerOutboundOptions } // NewLeastLoad creates a new LeastLoad outbound -func NewLeastLoad(router adapter.Router, logger log.ContextLogger, tag string, options option.LeastLoadOutboundOptions) (*LeastLoad, error) { +func NewLeastLoad(router adapter.Router, logger log.ContextLogger, tag string, options option.BalancerOutboundOptions) (*LeastLoad, error) { if len(options.Outbounds) == 0 { return nil, E.New("missing tags") } diff --git a/outbound/leastping.go b/outbound/leastping.go index f3172a68..1e53bc0b 100644 --- a/outbound/leastping.go +++ b/outbound/leastping.go @@ -18,11 +18,11 @@ var ( type LeastPing struct { *Balancer - options option.LeastPingOutboundOptions + options option.BalancerOutboundOptions } // NewLeastPing creates a new LeastPing outbound -func NewLeastPing(router adapter.Router, logger log.ContextLogger, tag string, options option.LeastPingOutboundOptions) (*LeastPing, error) { +func NewLeastPing(router adapter.Router, logger log.ContextLogger, tag string, options option.BalancerOutboundOptions) (*LeastPing, error) { if len(options.Outbounds) == 0 { return nil, E.New("missing tags") }