cleanup after fetch

This commit is contained in:
jebbs 2022-10-15 21:12:50 +08:00
parent ac3655693c
commit 22b79b297f

View File

@ -6,6 +6,7 @@ import (
"net" "net"
"net/http" "net/http"
"regexp" "regexp"
"strings"
"time" "time"
"github.com/sagernet/sing-box/adapter" "github.com/sagernet/sing-box/adapter"
@ -47,6 +48,10 @@ type subscriptionProvider struct {
// NewSubscription creates a new subscription service. // NewSubscription creates a new subscription service.
func NewSubscription(ctx context.Context, router adapter.Router, logger log.ContextLogger, logFactory log.Factory, options option.Service) (*Subscription, error) { func NewSubscription(ctx context.Context, router adapter.Router, logger log.ContextLogger, logFactory log.Factory, options option.Service) (*Subscription, error) {
if options.Tag == "" {
// required for outbounds clean up
return nil, E.New("subscription tag is required")
}
nproviders := len(options.SubscriptionOptions.Providers) nproviders := len(options.SubscriptionOptions.Providers)
if nproviders == 0 { if nproviders == 0 {
return nil, E.New("missing subscription providers") return nil, E.New("missing subscription providers")
@ -128,27 +133,64 @@ func (s *Subscription) fetchLoop() {
ticker := time.NewTicker(s.interval) ticker := time.NewTicker(s.interval)
defer ticker.Stop() defer ticker.Stop()
if err := s.fetch(); err != nil { s.refresh()
s.logger.Error("fetch subscription: ", err)
}
L: L:
for { for {
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
break L break L
case <-ticker.C: case <-ticker.C:
if err := s.fetch(); err != nil { s.refresh()
s.logger.Error("fetch subscription: ", err)
}
} }
} }
} }
func (s *Subscription) fetch() error { func (s *Subscription) refresh() {
opts, err := s.fetch()
if err != nil {
s.logger.Error("fetch subscription: ", err)
}
s.updateOutbounds(opts)
}
func (s *Subscription) updateOutbounds(opts []*option.Outbound) {
outbounds := s.router.Outbounds()
knownOutbounds := make(map[string]struct{})
for _, opt := range opts {
tag := opt.Tag
knownOutbounds[tag] = struct{}{}
outbound, err := outbound.New(
s.parentCtx,
s.router,
s.logFactory.NewLogger(F.ToString("outbound/", opt.Type, "[", tag, "]")),
*opt,
)
if err != nil {
s.logger.Warn("create outbound [", tag, "]: ", err)
}
s.router.AddOutbound(outbound)
s.logger.Info("created outbound [", tag, "]")
}
// remove outbounds that are not in the latest list
tagPrefix := s.tag + "."
for _, outbound := range outbounds {
tag := outbound.Tag()
if !strings.HasPrefix(tag, tagPrefix) {
continue
}
if _, ok := knownOutbounds[tag]; ok {
continue
}
s.router.RemoveOutbound(tag)
}
}
func (s *Subscription) fetch() ([]*option.Outbound, error) {
client, err := s.client() client, err := s.client()
if err != nil { if err != nil {
return err return nil, err
} }
opts := make([]*option.Outbound, 0)
for i, provider := range s.providers { for i, provider := range s.providers {
var tag string var tag string
if provider.tag != "" { if provider.tag != "" {
@ -168,20 +210,10 @@ func (s *Subscription) fetch() error {
continue continue
} }
s.applyOptions(opt, provider) s.applyOptions(opt, provider)
outbound, err := outbound.New( opts = append(opts, opt)
s.parentCtx,
s.router,
s.logFactory.NewLogger(F.ToString("outbound/", opt.Type, "[", opt.Tag, "]")),
*opt,
)
if err != nil {
s.logger.Warn("create outbound [", opt.Tag, "]: ", err)
}
s.router.AddOutbound(outbound)
s.logger.Info("created outbound [", opt.Tag, "]")
} }
} }
return nil return opts, nil
} }
func selectedByTag(tag string, provider *subscriptionProvider) bool { func selectedByTag(tag string, provider *subscriptionProvider) bool {