extract balancer.rttBasedBalancer

* leastping shares the same balancer pick settings
This commit is contained in:
jebbs 2022-10-11 14:08:08 +08:00
parent b7cc51f1af
commit fe4fa2ab0c
8 changed files with 179 additions and 212 deletions

View File

@ -1,6 +1,151 @@
package balancer
import (
"math"
"math/rand"
"sort"
"time"
"github.com/sagernet/sing-box/log"
"github.com/sagernet/sing-box/option"
)
var _ Balancer = (*rttBasedBalancer)(nil)
// Balancer is interface for load balancers
type Balancer interface {
Select() *Node
}
type rttBasedBalancer struct {
nodes []*Node
rttFunc rttFunc
options *option.BalancerOutboundOptions
*HealthCheck
costs *WeightManager
}
type rttFunc func(node *Node) time.Duration
// newRTTBasedLoad creates a new rtt based load balancer
func newRTTBasedBalancer(
nodes []*Node, logger log.ContextLogger,
options option.BalancerOutboundOptions,
rttFunc rttFunc,
) (Balancer, error) {
return &rttBasedBalancer{
nodes: nodes,
rttFunc: rttFunc,
options: &options,
HealthCheck: NewHealthCheck(nodes, logger, &options.Check),
costs: NewWeightManager(
logger, options.Pick.Costs, 1,
func(value, cost float64) float64 {
return value * math.Pow(cost, 0.5)
},
),
}, nil
}
// Select selects qualified nodes
func (s *rttBasedBalancer) Select() *Node {
nodes := s.HealthCheck.NodesByCategory()
var candidates []*Node
if len(nodes.Qualified) > 0 {
candidates = nodes.Qualified
for _, node := range candidates {
node.Weighted = time.Duration(s.costs.Apply(node.Outbound.Tag(), float64(s.rttFunc(node))))
}
sortNodes(candidates)
} else {
candidates = nodes.Untested
shuffleNodes(candidates)
}
selects := pickNodes(
candidates, s.logger,
int(s.options.Pick.Expected), s.options.Pick.Baselines,
)
count := len(selects)
if count == 0 {
return nil
}
return selects[rand.Intn(count)]
}
// pickNodes selects nodes according to Baselines and Expected Count.
//
// The strategy always improves network response speed, not matter which mode below is configurated.
// But they can still have different priorities.
//
// 1. Bandwidth priority: no Baseline + Expected Count > 0.: selects `Expected Count` of nodes.
// (one if Expected Count <= 0)
//
// 2. Bandwidth priority advanced: Baselines + Expected Count > 0.
// Select `Expected Count` amount of nodes, and also those near them according to baselines.
// In other words, it selects according to different Baselines, until one of them matches
// the Expected Count, if no Baseline matches, Expected Count applied.
//
// 3. Speed priority: Baselines + `Expected Count <= 0`.
// go through all baselines until find selects, if not, select none. Used in combination
// with 'balancer.fallbackTag', it means: selects qualified nodes or use the fallback.
func pickNodes(nodes []*Node, logger log.Logger, expected int, baselines []option.Duration) []*Node {
if len(nodes) == 0 {
// s.logger.Debug("no qualified nodes")
return nil
}
expected2 := int(expected)
availableCount := len(nodes)
if expected2 > availableCount {
return nodes
}
if expected2 <= 0 {
expected2 = 1
}
if len(baselines) == 0 {
return nodes[:expected2]
}
count := 0
// go through all base line until find expected selects
for _, b := range baselines {
baseline := time.Duration(b)
for i := 0; i < availableCount; i++ {
if nodes[i].Weighted > baseline {
break
}
count = i + 1
}
// don't continue if find expected selects
if count >= expected2 {
logger.Debug("applied baseline: ", baseline)
break
}
}
if expected > 0 && count < expected2 {
count = expected2
}
return nodes[:count]
}
func sortNodes(nodes []*Node) {
sort.Slice(nodes, func(i, j int) bool {
left := nodes[i]
right := nodes[j]
if left.Weighted != right.Weighted {
return left.Weighted < right.Weighted
}
if left.Fail != right.Fail {
return left.Fail < right.Fail
}
return left.All > right.All
})
}
func shuffleNodes(nodes []*Node) {
rand.Seed(time.Now().Unix())
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
}

View File

@ -1,142 +1,21 @@
package balancer
import (
"math"
"math/rand"
"sort"
"time"
"github.com/sagernet/sing-box/log"
"github.com/sagernet/sing-box/option"
)
var _ Balancer = (*LeastLoad)(nil)
// LeastLoad is leastload balancer
type LeastLoad struct {
nodes []*Node
options *option.LeastLoadOutboundOptions
*HealthCheck
costs *WeightManager
}
// NewLeastLoad creates a new LeastLoad outbound
func NewLeastLoad(
nodes []*Node, logger log.ContextLogger,
options option.LeastLoadOutboundOptions,
options option.BalancerOutboundOptions,
) (Balancer, error) {
return &LeastLoad{
nodes: nodes,
options: &options,
HealthCheck: NewHealthCheck(nodes, logger, &options.HealthCheck),
costs: NewWeightManager(
logger, options.Costs, 1,
func(value, cost float64) float64 {
return value * math.Pow(cost, 0.5)
},
),
}, nil
}
// Select selects qualified nodes
func (s *LeastLoad) Select() *Node {
nodes := s.HealthCheck.NodesByCategory()
var candidates []*Node
if len(nodes.Qualified) > 0 {
candidates := nodes.Qualified
appliyCost(candidates, s.costs)
leastPingSort(candidates)
} else {
candidates = nodes.Untested
shuffle(candidates)
}
selects := s.selectLeastLoad(candidates)
count := len(selects)
if count == 0 {
return nil
}
return selects[rand.Intn(count)]
}
// selectLeastLoad selects nodes according to Baselines and Expected Count.
//
// The strategy always improves network response speed, not matter which mode below is configurated.
// But they can still have different priorities.
//
// 1. Bandwidth priority: no Baseline + Expected Count > 0.: selects `Expected Count` of nodes.
// (one if Expected Count <= 0)
//
// 2. Bandwidth priority advanced: Baselines + Expected Count > 0.
// Select `Expected Count` amount of nodes, and also those near them according to baselines.
// In other words, it selects according to different Baselines, until one of them matches
// the Expected Count, if no Baseline matches, Expected Count applied.
//
// 3. Speed priority: Baselines + `Expected Count <= 0`.
// go through all baselines until find selects, if not, select none. Used in combination
// with 'balancer.fallbackTag', it means: selects qualified nodes or use the fallback.
func (s *LeastLoad) selectLeastLoad(nodes []*Node) []*Node {
if len(nodes) == 0 {
// s.logger.Debug("LeastLoad: no qualified nodes")
return nil
}
expected := int(s.options.Expected)
availableCount := len(nodes)
if expected > availableCount {
return nodes
}
if expected <= 0 {
expected = 1
}
if len(s.options.Baselines) == 0 {
return nodes[:expected]
}
count := 0
// go through all base line until find expected selects
for _, b := range s.options.Baselines {
baseline := time.Duration(b)
for i := 0; i < availableCount; i++ {
if nodes[i].Weighted > baseline {
break
}
count = i + 1
}
// don't continue if find expected selects
if count >= expected {
s.logger.Debug("applied baseline: ", baseline)
break
}
}
if s.options.Expected > 0 && count < expected {
count = expected
}
return nodes[:count]
}
func appliyCost(nodes []*Node, costs *WeightManager) {
for _, node := range nodes {
node.Weighted = time.Duration(costs.Apply(node.Outbound.Tag(), float64(node.Deviation)))
}
}
func leastloadSort(nodes []*Node) {
sort.Slice(nodes, func(i, j int) bool {
left := nodes[i]
right := nodes[j]
if left.Weighted != right.Weighted {
return left.Weighted < right.Weighted
}
if left.Deviation != right.Deviation {
return left.Deviation < right.Deviation
}
if left.Average != right.Average {
return left.Average < right.Average
}
if left.Fail != right.Fail {
return left.Fail < right.Fail
}
return left.All > right.All
})
return newRTTBasedBalancer(
nodes, logger, options,
func(node *Node) time.Duration {
return node.Deviation
},
)
}

View File

@ -1,70 +1,21 @@
package balancer
import (
"math/rand"
"sort"
"time"
"github.com/sagernet/sing-box/log"
"github.com/sagernet/sing-box/option"
)
var _ Balancer = (*LeastPing)(nil)
// LeastPing is least ping balancer
type LeastPing struct {
nodes []*Node
options *option.LeastPingOutboundOptions
*HealthCheck
}
// NewLeastPing creates a new LeastPing outbound
func NewLeastPing(
nodes []*Node, logger log.ContextLogger,
options option.LeastPingOutboundOptions,
options option.BalancerOutboundOptions,
) (Balancer, error) {
return &LeastPing{
nodes: nodes,
options: &options,
HealthCheck: NewHealthCheck(nodes, logger, &options.HealthCheck),
}, nil
}
// Select selects least ping node
func (s *LeastPing) Select() *Node {
nodes := s.HealthCheck.NodesByCategory()
var candidates []*Node
if len(nodes.Qualified) > 0 {
candidates := nodes.Qualified
leastPingSort(candidates)
} else {
candidates = nodes.Untested
shuffle(candidates)
}
if len(candidates) == 0 {
return nil
}
return candidates[0]
}
func leastPingSort(nodes []*Node) {
sort.Slice(nodes, func(i, j int) bool {
left := nodes[i]
right := nodes[j]
if left.Average != right.Average {
return left.Average < right.Average
}
if left.Fail != right.Fail {
return left.Fail < right.Fail
}
return left.All > right.All
})
}
func shuffle(nodes []*Node) {
rand.Seed(time.Now().Unix())
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
return newRTTBasedBalancer(
nodes, logger, options,
func(node *Node) time.Duration {
return node.Average
},
)
}

View File

@ -1,17 +1,16 @@
package option
// LeastPingOutboundOptions is the options for leastping outbound
type LeastPingOutboundOptions struct {
BalancerOutboundOptions
// health check settings
HealthCheck HealthCheckSettings `json:"health_check,omitempty"`
// BalancerOutboundOptions is the options for balancer outbound
type BalancerOutboundOptions struct {
Outbounds []string `json:"outbounds"`
Fallback string `json:"fallback,omitempty"`
Check HealthCheckSettings `json:"check,omitempty"`
Pick BalancerPickOptions `json:"pick,omitempty"`
}
// LeastLoadOutboundOptions is the options for leastload outbound
type LeastLoadOutboundOptions struct {
BalancerOutboundOptions
// health check settings
HealthCheck HealthCheckSettings `json:"health_check,omitempty"`
// BalancerPickOptions is the options for balancer outbound picking
type BalancerPickOptions struct {
// expected nodes count to select
Expected int32 `json:"expected,omitempty"`
// ping rtt baselines (ms)
@ -20,12 +19,6 @@ type LeastLoadOutboundOptions struct {
Costs []*StrategyWeight `json:"costs,omitempty"`
}
// BalancerOutboundOptions is the options for balancer outbound
type BalancerOutboundOptions struct {
Outbounds []string `json:"outbounds"`
Fallback string `json:"fallback,omitempty"`
}
// HealthCheckSettings is the settings for health check
type HealthCheckSettings struct {
Destination string `json:"destination"`

View File

@ -25,8 +25,7 @@ type _Outbound struct {
VLESSOptions VLESSOutboundOptions `json:"-"`
SelectorOptions SelectorOutboundOptions `json:"-"`
URLTestOptions URLTestOutboundOptions `json:"-"`
LeastLoadOptions LeastLoadOutboundOptions `json:"-"`
LeastPingOptions LeastPingOutboundOptions `json:"-"`
BalancerOptions BalancerOutboundOptions `json:"-"`
}
type Outbound _Outbound
@ -67,9 +66,9 @@ func (h Outbound) MarshalJSON() ([]byte, error) {
case C.TypeURLTest:
v = h.URLTestOptions
case C.TypeLeastLoad:
v = h.LeastLoadOptions
v = h.BalancerOptions
case C.TypeLeastPing:
v = h.LeastPingOptions
v = h.BalancerOptions
default:
return nil, E.New("unknown outbound type: ", h.Type)
}
@ -116,9 +115,9 @@ func (h *Outbound) UnmarshalJSON(bytes []byte) error {
case C.TypeURLTest:
v = &h.URLTestOptions
case C.TypeLeastLoad:
v = &h.LeastLoadOptions
v = &h.BalancerOptions
case C.TypeLeastPing:
v = &h.LeastPingOptions
v = &h.BalancerOptions
default:
return E.New("unknown outbound type: ", h.Type)
}

View File

@ -50,9 +50,9 @@ func New(ctx context.Context, router adapter.Router, logger log.ContextLogger, o
case C.TypeURLTest:
return NewURLTest(router, logger, options.Tag, options.URLTestOptions)
case C.TypeLeastLoad:
return NewLeastLoad(router, logger, options.Tag, options.LeastLoadOptions)
return NewLeastLoad(router, logger, options.Tag, options.BalancerOptions)
case C.TypeLeastPing:
return NewLeastPing(router, logger, options.Tag, options.LeastPingOptions)
return NewLeastPing(router, logger, options.Tag, options.BalancerOptions)
default:
return nil, E.New("unknown outbound type: ", options.Type)
}

View File

@ -18,11 +18,11 @@ var (
type LeastLoad struct {
*Balancer
options option.LeastLoadOutboundOptions
options option.BalancerOutboundOptions
}
// NewLeastLoad creates a new LeastLoad outbound
func NewLeastLoad(router adapter.Router, logger log.ContextLogger, tag string, options option.LeastLoadOutboundOptions) (*LeastLoad, error) {
func NewLeastLoad(router adapter.Router, logger log.ContextLogger, tag string, options option.BalancerOutboundOptions) (*LeastLoad, error) {
if len(options.Outbounds) == 0 {
return nil, E.New("missing tags")
}

View File

@ -18,11 +18,11 @@ var (
type LeastPing struct {
*Balancer
options option.LeastPingOutboundOptions
options option.BalancerOutboundOptions
}
// NewLeastPing creates a new LeastPing outbound
func NewLeastPing(router adapter.Router, logger log.ContextLogger, tag string, options option.LeastPingOutboundOptions) (*LeastPing, error) {
func NewLeastPing(router adapter.Router, logger log.ContextLogger, tag string, options option.BalancerOutboundOptions) (*LeastPing, error) {
if len(options.Outbounds) == 0 {
return nil, E.New("missing tags")
}