remove deprecated

This commit is contained in:
Mobin 2025-09-07 20:23:10 +03:30
parent 7caca8a5c9
commit 5f1a087a6b
6 changed files with 0 additions and 1034 deletions

View File

@ -1,168 +0,0 @@
package wsc
import (
"context"
"net"
"net/http"
"net/url"
"github.com/sagernet/sing-box/adapter"
"github.com/sagernet/sing-box/common/tls"
"github.com/sagernet/sing-box/option"
"github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
"github.com/sagernet/ws"
)
var _ adapter.WSCClientTransport = &Client{}
type Client struct {
auth string
host string
path string
tls tls.Config
dialer N.Dialer
endpointReplace map[string]string
ruleApplicator *WSCRuleApplicator
}
type ClientConfig struct {
Auth string
Host string
Path string
TLS tls.Config
Dialer N.Dialer
EndpointReplace map[string]string
Rules []option.WSCRule
}
func NewClient(params ClientConfig) (*Client, error) {
ruleApplicator, err := NewRuleApplicator(params.Rules)
if err != nil {
return nil, err
}
cli := &Client{
auth: params.Auth,
host: params.Host,
path: params.Path,
tls: params.TLS,
dialer: params.Dialer,
endpointReplace: params.EndpointReplace,
ruleApplicator: ruleApplicator,
}
return cli, nil
}
func (cli *Client) DialContext(ctx context.Context, network string, endpoint string) (net.Conn, error) {
return cli.newConn(ctx, network, endpoint)
}
func (cli *Client) ListenPacket(ctx context.Context, network string, endpoint string) (net.PacketConn, error) {
return cli.newPacketConn(ctx, cli.ruleApplicator, network, endpoint)
}
func (cli *Client) Close(ctx context.Context) error {
return cli.cleanup(ctx)
}
func (cli *Client) newWSConn(ctx context.Context, network string, endpoint string) (net.Conn, error) {
pURL, _, err := cli.newURL("ws", "", endpoint, network)
if err != nil {
return nil, err
}
dialer := ws.Dialer{
NetDial: func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := cli.dialer.DialContext(ctx, N.NetworkTCP, metadata.ParseSocksaddr(addr))
if err != nil {
return nil, err
}
if cli.tls != nil {
conn, err = tls.ClientHandshake(ctx, conn, cli.tls)
if err != nil {
return nil, err
}
}
return conn, nil
},
}
conn, _, _, err := dialer.Dial(ctx, pURL.String())
if err != nil {
return nil, err
}
return conn, nil
}
func (cli *Client) cleanup(ctx context.Context) error {
pURL, tlsConfig, err := cli.newURL("http", "/cleanup", "", "")
if err != nil {
return err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return cli.dialer.DialContext(ctx, network, metadata.ParseSocksaddr(addr))
},
},
}
request, err := http.NewRequestWithContext(ctx, http.MethodPost, pURL.String(), nil)
if err != nil {
return err
}
response, err := client.Do(request)
if err != nil {
return err
}
defer response.Body.Close()
return nil
}
func (cli *Client) newURL(scheme string, path string, endpoint string, network string) (url.URL, *tls.STDConfig, error) {
var tlsConfig *tls.STDConfig = nil
if cli.tls != nil {
scheme += "s"
var err error
tlsConfig, err = cli.tls.Config()
if err != nil {
return url.URL{}, nil, err
}
}
if path == "" {
path = cli.path
}
if with, exists := cli.endpointReplace[endpoint]; exists {
endpoint = with
}
endpoint, network = cli.ruleApplicator.ApplyEndpointReplace(endpoint, network)
pURL := url.URL{
Scheme: scheme,
Host: cli.host,
Path: path,
RawQuery: "",
}
pQuery := pURL.Query()
pQuery.Set("auth", cli.auth)
if endpoint != "" {
pQuery.Set("ep", endpoint)
}
if network != "" {
pQuery.Set("net", network)
}
pURL.RawQuery = pQuery.Encode()
return pURL, tlsConfig, nil
}

View File

@ -1,109 +0,0 @@
package wsc
import (
"context"
"errors"
"io"
"net"
"sync"
"github.com/sagernet/sing/common/buf"
"github.com/sagernet/sing/common/exceptions"
"github.com/sagernet/sing/common/network"
"github.com/sagernet/ws"
"github.com/sagernet/ws/wsutil"
)
var _ network.ExtendedConn = &clientConn{}
type clientConn struct {
net.Conn
reader *wsutil.Reader
buf [2048]byte
mu sync.Mutex
}
func (cli *Client) newConn(ctx context.Context, network string, endpoint string) (*clientConn, error) {
conn, err := cli.newWSConn(ctx, network, endpoint)
if err != nil {
return nil, err
}
reader := wsutil.NewReader(conn, ws.StateClientSide)
return &clientConn{
Conn: conn,
reader: reader,
}, nil
}
func (conn *clientConn) Close() error {
conn.mu.Lock()
defer conn.mu.Unlock()
_ = wsutil.WriteClientMessage(conn.Conn, ws.OpClose, nil)
return conn.Conn.Close()
}
func (conn *clientConn) ReadBuffer(buffer *buf.Buffer) error {
if buffer == nil {
return errors.New("buffer is nil")
}
n, err := conn.Read(conn.buf[:])
if _, wErr := buffer.Write(conn.buf[:n]); wErr != nil {
return wErr
}
if errors.Is(err, io.EOF) {
return nil
}
return err
}
func (conn *clientConn) WriteBuffer(buffer *buf.Buffer) error {
if buffer == nil {
return errors.New("buffer is nil")
}
conn.mu.Lock()
defer conn.mu.Unlock()
return wsutil.WriteClientBinary(conn.Conn, buffer.Bytes())
}
func (conn *clientConn) Read(b []byte) (n int, err error) {
err = nil
var header ws.Header
for {
n, err = conn.reader.Read(b)
if n > 0 {
return
}
if !exceptions.IsMulti(err, io.EOF, wsutil.ErrNoFrameAdvance) {
return
}
header, err = conn.reader.NextFrame()
if err != nil {
return
}
switch header.OpCode {
case ws.OpBinary, ws.OpText, ws.OpContinuation:
continue
case ws.OpPing:
wsutil.WriteClientMessage(conn.Conn, ws.OpPong, nil)
case ws.OpPong:
continue
case ws.OpClose:
err = io.EOF
return
default:
continue
}
}
}
func (conn *clientConn) Write(b []byte) (n int, err error) {
conn.mu.Lock()
defer conn.mu.Unlock()
if err := wsutil.WriteClientBinary(conn.Conn, b); err != nil {
return 0, err
}
return len(b), nil
}

View File

@ -1,175 +0,0 @@
package wsc
import (
"bytes"
"context"
"errors"
"io"
"net"
"sync"
"github.com/sagernet/sing/common/buf"
"github.com/sagernet/sing/common/metadata"
"github.com/sagernet/sing/common/network"
"github.com/sagernet/ws"
"github.com/sagernet/ws/wsutil"
)
var _ network.NetPacketReader = &clientPacketConn{}
var _ network.NetPacketWriter = &clientPacketConn{}
type readerCache struct {
reader *bytes.Reader
addr metadata.Socksaddr
}
type clientPacketConn struct {
net.Conn
reader *wsutil.Reader
cache *readerCache
mu sync.Mutex
ruleApplicator *WSCRuleApplicator
}
func (cli *Client) newPacketConn(ctx context.Context, ruleApplicator *WSCRuleApplicator, network string, endpoint string) (*clientPacketConn, error) {
conn, err := cli.newWSConn(ctx, network, endpoint)
if err != nil {
return nil, err
}
reader := wsutil.NewReader(conn, ws.StateClientSide)
return &clientPacketConn{
Conn: conn,
reader: reader,
cache: nil,
ruleApplicator: ruleApplicator,
}, nil
}
func (packetConn *clientPacketConn) ReadPacket(buffer *buf.Buffer) (destination metadata.Socksaddr, err error) {
if buffer == nil {
return metadata.Socksaddr{}, errors.New("buffer is nil")
}
buf, err := wsutil.ReadServerBinary(packetConn.Conn)
if err != nil {
var cerr wsutil.ClosedError
if errors.Is(err, &cerr) {
return metadata.Socksaddr{}, err
}
return metadata.Socksaddr{}, err
}
payload := packetConnPayload{}
if err := payload.UnmarshalBinaryUnsafe(buf); err != nil {
return metadata.Socksaddr{}, err
}
destination = metadata.SocksaddrFromNetIP(payload.addrPort)
ep, _ := packetConn.ruleApplicator.ApplyEndpointReplace(destination.String(), network.NetworkUDP)
if _, err := buffer.Write(payload.payload); err != nil {
return metadata.Socksaddr{}, err
}
return metadata.ParseSocksaddr(ep), nil
}
func (packetConn *clientPacketConn) WritePacket(buffer *buf.Buffer, destination metadata.Socksaddr) error {
if buffer == nil {
return errors.New("buffer is nil")
}
ep, _ := packetConn.ruleApplicator.ApplyEndpointReplace(destination.String(), network.NetworkUDP)
payload := packetConnPayload{
addrPort: metadata.ParseSocksaddr(ep).AddrPort(),
payload: buffer.Bytes(),
}
payloadBytes, err := payload.MarshalBinary()
if err != nil {
return err
}
packetConn.mu.Lock()
defer packetConn.mu.Unlock()
if err := wsutil.WriteClientBinary(packetConn.Conn, payloadBytes); err != nil {
return err
}
return nil
}
func (packetConn *clientPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
err = nil
if packetConn.cache != nil {
n, err = packetConn.cache.reader.Read(p)
addr = packetConn.cache.addr
if err == io.EOF {
err = nil
packetConn.cache = nil
} else {
return
}
}
buf, err := wsutil.ReadServerBinary(packetConn.Conn)
if err != nil {
var cerr wsutil.ClosedError
if errors.Is(err, &cerr) {
return 0, nil, io.EOF
}
return 0, nil, err
}
payload := packetConnPayload{}
if err := payload.UnmarshalBinaryUnsafe(buf); err != nil {
return 0, nil, err
}
ep, _ := packetConn.ruleApplicator.ApplyEndpointReplace(payload.addrPort.String(), network.NetworkUDP)
packetConn.cache = &readerCache{
reader: bytes.NewReader(payload.payload),
// addr: metadata.SocksaddrFromNetIP(payload.addrPort),
addr: metadata.ParseSocksaddr(ep),
}
n, err = packetConn.cache.reader.Read(p)
addr = packetConn.cache.addr
if err == io.EOF {
packetConn.cache = nil
}
return
}
func (packetConn *clientPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
ep, _ := packetConn.ruleApplicator.ApplyEndpointReplace(addr.String(), network.NetworkUDP)
payload := packetConnPayload{
// addrPort: metadata.SocksaddrFromNet(addr).AddrPort(),
addrPort: metadata.ParseSocksaddr(ep).AddrPort(),
payload: p,
}
payloadBytes, err := payload.MarshalBinary()
if err != nil {
return 0, err
}
packetConn.mu.Lock()
defer packetConn.mu.Unlock()
if err := wsutil.WriteClientBinary(packetConn.Conn, payloadBytes); err != nil {
return 0, err
}
return len(payloadBytes), nil
}
func (packetConn *clientPacketConn) Close() error {
packetConn.mu.Lock()
defer packetConn.mu.Unlock()
_ = wsutil.WriteClientMessage(packetConn.Conn, ws.OpClose, nil)
return packetConn.Conn.Close()
}

View File

@ -1,226 +0,0 @@
package wsc
import (
"context"
"errors"
"net"
"net/http"
"os"
"strconv"
"time"
"github.com/sagernet/sing-box/adapter"
"github.com/sagernet/sing-box/constant"
"github.com/sagernet/sing/common"
"github.com/sagernet/sing/common/exceptions"
"github.com/sagernet/sing/common/logger"
"github.com/sagernet/sing/common/metadata"
"github.com/sagernet/sing/common/network"
"github.com/sagernet/ws"
)
var _ adapter.WSCServerTransport = &Server{}
var _ http.Handler = &Server{}
type Server struct {
ctx context.Context
handler adapter.WSCServerTransportHandler
httpServer *http.Server
logger logger.ContextLogger
authenticator Authenticator
userManager *wscUserManager
router adapter.Router
dialer network.Dialer
}
type ServerConfig struct {
Ctx context.Context
Logger logger.ContextLogger
Handler adapter.WSCServerTransportHandler
Authenticator Authenticator
Router adapter.Router
Dialer network.Dialer
MaxConnectionPerUser int
UsageReportTrafficInterval int64
UsageReportTimeInterval time.Duration
}
/*TODO: Support TLS servers*/
func NewServer(config ServerConfig) (*Server, error) {
if config.Authenticator == nil {
return nil, errors.New("authenticator required")
}
server := &Server{
ctx: config.Ctx,
handler: config.Handler,
logger: config.Logger,
authenticator: config.Authenticator,
router: config.Router,
dialer: config.Dialer,
userManager: &wscUserManager{
users: map[int64]*wscUser{},
authenticator: config.Authenticator,
maxConnPerUser: config.MaxConnectionPerUser,
usageReportTrafficInterval: config.UsageReportTrafficInterval,
usageReportTimeInterval: config.UsageReportTimeInterval,
},
}
server.httpServer = &http.Server{
Handler: server,
ReadHeaderTimeout: constant.TCPTimeout,
MaxHeaderBytes: http.DefaultMaxHeaderBytes,
BaseContext: func(l net.Listener) context.Context {
return config.Ctx
},
}
return server, nil
}
func (server *Server) ServeHTTP(res http.ResponseWriter, req *http.Request) {
ctx := req.Context()
auth := req.URL.Query().Get("auth")
if auth == "" {
server.failRequest(res, req, "Authentication required", http.StatusBadRequest, 0, "", &metadata.Socksaddr{})
return
}
account, err := server.authenticator.Authenticate(ctx, AuthenticateParams{
Auth: auth,
MaxConn: server.userManager.maxConnPerUser,
})
if err != nil {
if account.ID != 0 {
if err := server.userManager.cleanupUser(ctx, account.ID, false); err != nil {
server.logger.Debug("Request failed. Couldn't cleanup user: ", err.Error(), " (Client: ", req.RemoteAddr, ", User-ID: ", account.ID, ")")
}
}
server.failRequest(res, req, "Authentication failed: "+err.Error(), http.StatusBadRequest, account.ID, "", &metadata.Socksaddr{})
return
}
if req.Method == http.MethodPost && req.URL.Path == "/cleanup" {
if err := server.userManager.cleanupUser(ctx, account.ID, true); err != nil {
server.failRequest(res, req, "Failed to cleanup user: "+err.Error(), http.StatusInternalServerError, account.ID, "", &metadata.Socksaddr{})
return
}
res.WriteHeader(http.StatusOK)
return
}
user := server.userManager.findOrCreateUser(ctx, account.ID, account.Rate, account.MaxConn)
netW := req.URL.Query().Get("net")
if netW == "" {
netW = network.NetworkTCP
}
endpoint := req.URL.Query().Get("ep")
addr, err := server.resolveDestination(ctx, metadata.ParseSocksaddr(endpoint))
if err != nil {
server.failRequest(res, req, "Failed to parse and resolve endpoint: "+err.Error(), http.StatusBadRequest, account.ID, netW, &addr)
return
}
server.logger.Debug("New request (Client: ", req.RemoteAddr, ", Auth: ", auth, ", User-ID: ", account.ID, ", ", netW+"-Addr: ", addr.String(), ")")
conn, _, _, err := ws.UpgradeHTTP(req, res)
if err != nil {
server.failRequest(res, req, "Websocket upgrade failed: "+err.Error(), http.StatusBadRequest, account.ID, netW, &addr)
return
}
defer func() {
if err := server.userManager.cleanupUserConn(ctx, user, conn); err != nil {
server.logger.Error("Failed to cleanup user connection: "+err.Error(), "(Client: ", req.RemoteAddr, ", User-ID: ", account.ID, ")")
}
if err := conn.Close(); err != nil {
server.logger.Debug("Failed to close connection: "+err.Error(), "(Client: ", req.RemoteAddr, ", User-ID: ", account.ID, ")")
}
}()
if err := server.pipeConn(ctx, user, conn, netW, &addr); err != nil {
server.failRequest(res, req, "Failed to pipe connection: "+err.Error(), http.StatusInternalServerError, account.ID, netW, &addr)
}
}
func (server *Server) pipeConn(ctx context.Context, user *wscUser, conn net.Conn, netW string, addr *metadata.Socksaddr) error {
if poppedConn, err := user.addConn(conn); err != nil {
return err
} else {
if poppedConn != nil {
poppedConn.Close()
}
}
switch netW {
case network.NetworkTCP:
{
piper := serverTCPPiper{
conn: conn,
user: user,
addr: addr,
dialer: server.dialer,
}
return piper.pipe(ctx)
}
case network.NetworkUDP:
{
piper := serverUDPPiper{
conn: conn,
user: user,
addr: addr,
dialer: server.dialer,
}
return piper.pipe(ctx)
}
default:
return errors.New("network " + netW + " not supported")
}
}
func (server *Server) Close() error {
return common.Close(common.PtrOrNil(server.httpServer))
}
func (server *Server) Network() []string {
return []string{network.NetworkTCP}
}
func (server *Server) Serve(listener net.Listener) error {
return server.httpServer.Serve(listener)
}
func (server *Server) ServePacket(listener net.PacketConn) error {
return os.ErrInvalid
}
func (server *Server) resolveDestination(ctx context.Context, dest metadata.Socksaddr) (metadata.Socksaddr, error) {
if dest.IsFqdn() {
addrs, err := server.router.LookupDefault(ctx, dest.Fqdn)
if err != nil {
return metadata.Socksaddr{}, err
}
if len(addrs) == 0 {
return metadata.Socksaddr{}, exceptions.New("no addresses found for endpoint domina: ", dest.Fqdn)
}
return metadata.Socksaddr{
Addr: addrs[0],
Port: dest.Port,
}, nil
}
return dest, nil
}
func (server *Server) failRequest(res http.ResponseWriter, request *http.Request, msg string, code int, uid int64, network string, addr *metadata.Socksaddr) {
http.Error(res, msg, code)
info := "(Client: " + request.RemoteAddr
info += ", User-ID: " + strconv.Itoa(int(uid))
info += ", Network: " + network
info += ", " + network + "-Address: " + addr.String()
info += ")"
server.logger.Debug(msg, " ", info)
}

View File

@ -1,170 +0,0 @@
package wsc
import (
"context"
"errors"
"io"
"net"
"sync"
"time"
"github.com/sagernet/sing/common/metadata"
"github.com/sagernet/sing/common/network"
"github.com/sagernet/ws"
"github.com/sagernet/ws/wsutil"
)
type serverTCPPiper struct {
conn net.Conn
user *wscUser
addr *metadata.Socksaddr
dialer network.Dialer
}
func (piper *serverTCPPiper) pipe(ctx context.Context) error {
remote, err := piper.prepare(ctx)
if err != nil {
return err
}
defer remote.Close()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var mu sync.Mutex
var wg sync.WaitGroup
var gErr error = nil
collectErr := func(err error) {
mu.Lock()
defer mu.Unlock()
gErr = errors.Join(gErr, err)
}
wg.Add(1)
go func() {
defer cancel()
defer wg.Done()
if err := piper.pipeInbound(ctx, remote); err != nil {
collectErr(err)
}
}()
if err := piper.pipeOutbount(ctx, remote); err != nil {
collectErr(err)
}
cancel()
wg.Wait()
return gErr
}
func (piper *serverTCPPiper) pipeInbound(ctx context.Context, remote net.Conn) error {
clientInReader, err := piper.user.connReader(piper.conn)
if err != nil {
return err
}
clientOut, err := piper.user.connWriter(piper.conn)
if err != nil {
return err
}
clientIn := wsutil.NewReader(clientInReader, ws.StateServerSide)
buf := piper.user.inBuffer(piper.conn)
for {
if ctx.Err() != nil {
return nil
}
if err := piper.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 300)); err != nil {
return err
}
header, err := clientIn.NextFrame()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
if isTimeoutErr(err) {
continue
}
return err
}
pass := false
switch header.OpCode {
case ws.OpPing:
wsutil.WriteServerMessage(clientOut, ws.OpPong, nil)
pass = true
case ws.OpPong:
pass = true
case ws.OpClose:
wsutil.WriteServerMessage(clientOut, ws.OpClose, nil)
return nil
}
if pass {
continue
}
for {
n, err := clientIn.Read(buf)
if n > 0 {
if _, wErr := remote.Write(buf[:n]); wErr != nil {
return wErr
} else {
piper.user.usedTrafficBytes.Add(int64(n))
}
}
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
}
}
}
func (piper *serverTCPPiper) pipeOutbount(ctx context.Context, remote net.Conn) error {
clientOut, err := piper.user.connWriter(piper.conn)
if err != nil {
return err
}
buf := piper.user.outBuffer(piper.conn)
for {
if ctx.Err() != nil {
return nil
}
if err := remote.SetReadDeadline(time.Now().Add(time.Millisecond * 300)); err != nil {
return err
}
n, err := remote.Read(buf)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
if isTimeoutErr(err) {
continue
}
return err
}
piper.user.usedTrafficBytes.Add(int64(n))
if err := wsutil.WriteServerBinary(clientOut, buf[:n]); err != nil {
return err
}
}
}
func (piper *serverTCPPiper) prepare(ctx context.Context) (net.Conn, error) {
remote, err := piper.dialer.DialContext(ctx, network.NetworkTCP, *piper.addr)
if err != nil {
return nil, err
}
return remote, nil
}

View File

@ -1,186 +0,0 @@
package wsc
import (
"context"
"errors"
"io"
"net"
"sync"
"time"
"github.com/sagernet/sing/common/metadata"
"github.com/sagernet/sing/common/network"
"github.com/sagernet/ws"
"github.com/sagernet/ws/wsutil"
)
type serverUDPPiper struct {
conn net.Conn
user *wscUser
addr *metadata.Socksaddr
dialer network.Dialer
}
func (piper *serverUDPPiper) pipe(ctx context.Context) error {
remote, err := piper.prepare(ctx)
if err != nil {
return err
}
defer remote.Close()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var mu sync.Mutex
var wg sync.WaitGroup
var gErr error = nil
collectErr := func(err error) {
mu.Lock()
defer mu.Unlock()
gErr = errors.Join(gErr, err)
}
wg.Add(1)
go func() {
defer cancel()
defer wg.Done()
if err := piper.pipeInbound(ctx, remote); err != nil {
collectErr(err)
}
}()
if err := piper.pipeOutbount(ctx, remote); err != nil {
collectErr(err)
}
cancel()
wg.Wait()
return gErr
}
func (piper *serverUDPPiper) pipeInbound(ctx context.Context, remote net.PacketConn) error {
clientInReader, err := piper.user.connReader(piper.conn)
if err != nil {
return err
}
clientOut, err := piper.user.connWriter(piper.conn)
if err != nil {
return err
}
clientIn := wsutil.NewReader(clientInReader, ws.StateServerSide)
buf := piper.user.inBuffer(piper.conn)
payload := packetConnPayload{}
for {
if ctx.Err() != nil {
return nil
}
if err := piper.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 300)); err != nil {
return err
}
header, err := clientIn.NextFrame()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
if isTimeoutErr(err) {
continue
}
return err
}
pass := false
switch header.OpCode {
case ws.OpPing:
wsutil.WriteServerMessage(clientOut, ws.OpPong, nil)
pass = true
case ws.OpPong:
pass = true
case ws.OpClose:
wsutil.WriteServerMessage(clientOut, ws.OpClose, nil)
return nil
}
if pass {
continue
}
for {
n, err := clientIn.Read(buf)
if n > 0 {
if err := payload.UnmarshalBinaryUnsafe(buf[:n]); err != nil {
return err
}
if _, wErr := remote.WriteTo(payload.payload, net.UDPAddrFromAddrPort(payload.addrPort)); wErr != nil {
return wErr
} else {
piper.user.usedTrafficBytes.Add(int64(n))
}
}
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
}
}
}
func (piper *serverUDPPiper) pipeOutbount(ctx context.Context, remote net.PacketConn) error {
clientOut, err := piper.user.connWriter(piper.conn)
if err != nil {
return err
}
buf := piper.user.outBuffer(piper.conn)
payload := packetConnPayload{}
for {
if ctx.Err() != nil {
return nil
}
if err := remote.SetReadDeadline(time.Now().Add(time.Millisecond * 300)); err != nil {
return err
}
n, netAddr, err := remote.ReadFrom(buf)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
if isTimeoutErr(err) {
continue
}
return err
}
if ua, ok := netAddr.(*net.UDPAddr); ok {
payload.addrPort = ua.AddrPort()
} else {
return errors.New("unexpected addr type")
}
payload.payload = buf[:n]
payloadBytes, err := payload.MarshalBinary()
if err != nil {
return err
}
piper.user.usedTrafficBytes.Add(int64(n))
if err := wsutil.WriteServerBinary(clientOut, payloadBytes); err != nil {
return err
}
}
}
func (piper *serverUDPPiper) prepare(ctx context.Context) (net.PacketConn, error) {
remote, err := piper.dialer.ListenPacket(ctx, *piper.addr)
if err != nil {
return nil, err
}
return remote, nil
}