impl wsc connection

This commit is contained in:
Mobin 2025-08-30 20:46:51 +03:30
parent ab8f540e0c
commit d1ded94681
2 changed files with 136 additions and 114 deletions

View File

@ -6,8 +6,9 @@ type WSCInboundOptions struct {
type WSCOutboundOptions struct {
DialerOptions
ServerOptions
OutboundTLSOptionsContainer
Network NetworkList `json:"network,omitempty"`
Auth string `json:"auth"`
Host string `json:"host"`
Path string `json:"path"`
}

View File

@ -2,14 +2,13 @@ package outbound
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/url"
"time"
"sync"
"unsafe"
"github.com/itsabgr/ge"
"github.com/sagernet/sing-box/adapter"
"github.com/sagernet/sing-box/common/dialer"
C "github.com/sagernet/sing-box/constant"
@ -25,12 +24,20 @@ import (
var _ adapter.Outbound = &WSC{}
var _ net.Conn = &wscConn{}
type WSC struct {
myOutboundAdapter
dialer N.Dialer
auth string
host string
path string
dialer N.Dialer
serverAddr metadata.Socksaddr
auth string
path string
}
type wscConn struct {
net.Conn
reader *wsutil.Reader
mu sync.Mutex
}
func NewWSC(ctx context.Context, router adapter.Router, logger log.ContextLogger, tag string, options option.WSCOutboundOptions) (*WSC, error) {
@ -48,16 +55,16 @@ func NewWSC(ctx context.Context, router adapter.Router, logger log.ContextLogger
tag: tag,
dependencies: withDialerDependency(options.DialerOptions),
},
dialer: outboundDialer,
auth: options.Auth,
host: options.Host,
path: options.Path,
dialer: outboundDialer,
serverAddr: options.ServerOptions.Build(),
auth: options.Auth,
path: options.Path,
}
if len(outbound.auth) == 0 {
if outbound.auth == "" {
return nil, exceptions.New("Invalid Auth to use in authentications")
}
if len(outbound.host) == 0 {
return nil, exceptions.New("Invalid Host to connect websocket")
if !outbound.serverAddr.IsValid() {
return nil, exceptions.New("Invalid server address")
}
if len(outbound.path) == 0 {
outbound.path = "/"
@ -67,117 +74,59 @@ func NewWSC(ctx context.Context, router adapter.Router, logger log.ContextLogger
}
func (wsc *WSC) DialContext(ctx context.Context, network string, destination metadata.Socksaddr) (net.Conn, error) {
ctx, meta := adapter.ExtendContext(ctx)
meta.Outbound = wsc.tag
meta.Destination = destination
if N.NetworkName(network) != N.NetworkTCP {
return nil, exceptions.Extend(N.ErrUnknownNetwork, network)
}
wsc.logger.InfoContext(ctx, "WSC outbound connection to ", destination)
return wsc.dialer.DialContext(ctx, N.NetworkName(network), destination)
conn, err := wsc.newWscConn(ctx, wsc.auth, wsc.serverAddr, wsc.path, destination)
if err != nil {
return nil, err
}
return conn, nil
}
func (wsc *WSC) ListenPacket(ctx context.Context, destination metadata.Socksaddr) (net.PacketConn, error) {
ctx, meta := adapter.ExtendContext(ctx)
meta.Outbound = wsc.tag
meta.Destination = destination
wsc.logger.InfoContext(ctx, "WSC outbound packet to ", destination)
return wsc.dialer.ListenPacket(ctx, destination)
}
func (wsc *WSC) NewConnection(ctx context.Context, conn net.Conn, metadata adapter.InboundContext) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer conn.Close()
pURL := url.URL{
Scheme: "ws",
Host: wsc.host,
Path: wsc.path,
RawQuery: "",
}
pQuery := pURL.Query()
pQuery.Set("auth", wsc.auth)
pQuery.Set("ep", metadata.Destination.String())
pURL.RawQuery = pQuery.Encode()
wsConn, _, _, err := ws.Dial(ctx, pURL.String())
fmt.Println("new conn : ", metadata.Destination, " | ", metadata.Network, unsafe.Pointer(&conn))
ctx = adapter.WithContext(ctx, &metadata)
wsConn, err := wsc.DialContext(ctx, N.NetworkTCP, metadata.Destination)
if err != nil {
return N.ReportHandshakeFailure(conn, err)
}
if err = N.ReportHandshakeSuccess(conn); err != nil {
wsConn.Close()
return err
}
defer wsConn.Close()
go func() {
pack := make([]byte, 2048)
for {
if ctx.Err() != nil {
return
}
if err := conn.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
return
}
n, err := conn.Read(pack)
if err != nil {
if errors.Is(err, io.EOF) {
return
}
if isTimeoutErr(err) {
continue
}
return
}
if wErr := wsutil.WriteClientBinary(wsConn, pack[:n]); wErr != nil {
return
}
}
}()
wsReader := wsutil.NewReader(wsConn, ws.StateClientSide)
pack := make([]byte, 2048)
for {
if ctx.Err() != nil {
return nil
}
if err := wsConn.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
return nil
}
header, err := wsReader.NextFrame()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
if isTimeoutErr(err) {
continue
}
}
switch header.OpCode {
case ws.OpPing:
wsutil.WriteClientMessage(wsConn, ws.OpPong, nil)
continue
case ws.OpPong:
continue
case ws.OpClose:
wsutil.WriteClientMessage(wsConn, ws.OpClose, nil)
cancel()
return exceptions.New("wsc websocket connection closed")
}
for {
n, err := wsReader.Read(pack)
if n > 0 {
if _, wErr := conn.Write(pack[:n]); wErr != nil {
return wErr
}
}
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
}
}
return CopyEarlyConn(ctx, conn, wsConn)
}
func (wsc *WSC) NewPacketConnection(ctx context.Context, conn network.PacketConn, metadata adapter.InboundContext) error {
fmt.Println("wsc packet conn: ", metadata, " | ", conn)
fmt.Println("new packet conn: ", metadata)
// fmt.Println("wsc packet conn: ", metadata, " | ", conn)
// buffer := buf.NewPacket()
// defer buffer.Release()
// dest, err := conn.ReadPacket(buffer)
// if err != nil {
// fmt.Println("error wsc packet conn: ", err)
// return err
// }
// fmt.Println("wsc packet conn data is : ", dest, " | ", dest.Network(), " | ", buffer.Len())
// time.Sleep(time.Second * 10)
return NewPacketConnection(ctx, wsc.dialer, conn, metadata)
}
@ -185,9 +134,81 @@ func (wsc *WSC) Close() error {
return nil
}
func isTimeoutErr(err error) bool {
if nErr, ok := ge.As[net.Error](err); ok && nErr.Timeout() {
return true
func (wsc *WSC) newWscConn(ctx context.Context, auth string, serverAddr metadata.Socksaddr, path string, endpoint metadata.Socksaddr) (*wscConn, error) {
pURL := url.URL{
Scheme: "ws",
Host: serverAddr.String(),
Path: path,
RawQuery: "",
}
return false
pQuery := pURL.Query()
pQuery.Set("auth", auth)
pQuery.Set("ep", endpoint.String())
pURL.RawQuery = pQuery.Encode()
dialer := ws.Dialer{
NetDial: func(ctx context.Context, network, addr string) (net.Conn, error) {
return wsc.dialer.DialContext(ctx, N.NetworkTCP, metadata.ParseSocksaddr(addr))
},
}
wsConn, _, _, err := dialer.Dial(ctx, pURL.String())
if err != nil {
return nil, err
}
// wsConn, _, _, err := ws.Dial(ctx, pURL.String())
// if err != nil {
// return nil, err
// }
reader := wsutil.NewReader(wsConn, ws.StateClientSide)
return &wscConn{
Conn: wsConn,
reader: reader,
}, nil
}
func (cli *wscConn) Close() error {
cli.mu.Lock()
defer cli.mu.Unlock()
_ = wsutil.WriteClientMessage(cli.Conn, ws.OpClose, nil)
return cli.Conn.Close()
}
func (cli *wscConn) Read(b []byte) (n int, err error) {
for {
header, err := cli.reader.NextFrame()
if err != nil {
return 0, err
}
switch header.OpCode {
case ws.OpBinary, ws.OpText, ws.OpContinuation:
n, err := cli.reader.Read(b)
if n > 0 {
return n, nil
}
if err == io.EOF {
continue
}
return n, err
case ws.OpPing:
wsutil.WriteClientMessage(cli.Conn, ws.OpPong, nil)
case ws.OpPong:
continue
case ws.OpClose:
return 0, io.EOF
default:
continue
}
}
}
func (cli *wscConn) Write(b []byte) (n int, err error) {
cli.mu.Lock()
defer cli.mu.Unlock()
if err := wsutil.WriteClientBinary(cli.Conn, b); err != nil {
return 0, err
}
return len(b), nil
}