From a4fdc7b688c80744d167b493ef32a9345c0c16e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=96=E7=95=8C?= Date: Sat, 27 Aug 2022 16:10:22 +0800 Subject: [PATCH] Remove redundant pipe --- test/v2ray_transport_test.go | 3 +++ transport/v2raygrpclite/client.go | 29 ++++++++--------------------- transport/v2raygrpclite/conn.go | 29 +++++++++++++++++++++++++---- transport/v2raygrpclite/server.go | 2 +- transport/v2rayhttp/client.go | 1 + 5 files changed, 38 insertions(+), 26 deletions(-) diff --git a/test/v2ray_transport_test.go b/test/v2ray_transport_test.go index 0632cec8..ec8237dc 100644 --- a/test/v2ray_transport_test.go +++ b/test/v2ray_transport_test.go @@ -94,6 +94,9 @@ func TestV2RayWebscoketSelf(t *testing.T) { func TestV2RayHTTPSelf(t *testing.T) { testV2RayTransportSelf(t, &option.V2RayTransportOptions{ Type: C.V2RayTransportTypeHTTP, + HTTPOptions: option.V2RayHTTPOptions{ + Method: "POST", + }, }) } diff --git a/transport/v2raygrpclite/client.go b/transport/v2raygrpclite/client.go index 3610f94c..a3319b0b 100644 --- a/transport/v2raygrpclite/client.go +++ b/transport/v2raygrpclite/client.go @@ -11,8 +11,6 @@ import ( "github.com/sagernet/sing-box/adapter" "github.com/sagernet/sing-box/option" - "github.com/sagernet/sing/common" - "github.com/sagernet/sing/common/bufio" M "github.com/sagernet/sing/common/metadata" N "github.com/sagernet/sing/common/network" ) @@ -58,35 +56,24 @@ func NewClient(ctx context.Context, dialer N.Dialer, serverAddr M.Socksaddr, opt } func (c *Client) DialContext(ctx context.Context) (net.Conn, error) { - requestPipeReader, requestPipeWriter := io.Pipe() + pipeInReader, pipeInWriter := io.Pipe() request := &http.Request{ Method: http.MethodPost, - Body: requestPipeReader, + Body: pipeInReader, URL: c.url, Proto: "HTTP/2", ProtoMajor: 2, - ProtoMinor: 0, Header: defaultClientHeader, } request = request.WithContext(ctx) - responsePipeReader, responsePipeWriter := io.Pipe() + conn := newLateGunConn(pipeInWriter) go func() { - defer responsePipeWriter.Close() response, err := c.client.Do(request) - if err != nil { - return + if err == nil { + conn.setup(response.Body, nil) + } else { + conn.setup(nil, err) } - bufio.Copy(responsePipeWriter, response.Body) }() - return newGunConn(responsePipeReader, requestPipeWriter, ChainedClosable{requestPipeReader, requestPipeWriter, responsePipeReader}), nil -} - -type ChainedClosable []io.Closer - -// Close implements io.Closer.Close(). -func (cc ChainedClosable) Close() error { - for _, c := range cc { - _ = common.Close(c) - } - return nil + return conn, nil } diff --git a/transport/v2raygrpclite/conn.go b/transport/v2raygrpclite/conn.go index 8f1d2354..db89bef3 100644 --- a/transport/v2raygrpclite/conn.go +++ b/transport/v2raygrpclite/conn.go @@ -12,6 +12,7 @@ import ( "os" "time" + "github.com/sagernet/sing/common" "github.com/sagernet/sing/common/buf" "github.com/sagernet/sing/common/bufio" E "github.com/sagernet/sing/common/exceptions" @@ -24,20 +25,40 @@ var _ net.Conn = (*GunConn)(nil) type GunConn struct { reader io.Reader writer io.Writer - closer io.Closer + create chan struct{} + err error cached []byte cachedIndex int } -func newGunConn(reader io.Reader, writer io.Writer, closer io.Closer) *GunConn { +func newGunConn(reader io.Reader, writer io.Writer) *GunConn { return &GunConn{ reader: reader, writer: writer, - closer: closer, } } +func newLateGunConn(writer io.Writer) *GunConn { + return &GunConn{ + create: make(chan struct{}), + writer: writer, + } +} + +func (c *GunConn) setup(reader io.Reader, err error) { + c.reader = reader + c.err = err + close(c.create) +} + func (c *GunConn) Read(b []byte) (n int, err error) { + if c.reader == nil { + <-c.create + if c.err != nil { + return 0, c.err + } + } + if c.cached != nil { n = copy(b, c.cached[c.cachedIndex:]) c.cachedIndex += n @@ -97,7 +118,7 @@ func (c *GunConn) WriteBuffer(buffer *buf.Buffer) error { }*/ func (c *GunConn) Close() error { - return c.closer.Close() + return common.Close(c.reader, c.writer) } func (c *GunConn) LocalAddr() net.Addr { diff --git a/transport/v2raygrpclite/server.go b/transport/v2raygrpclite/server.go index f9ad2b73..a792ae19 100644 --- a/transport/v2raygrpclite/server.go +++ b/transport/v2raygrpclite/server.go @@ -69,7 +69,7 @@ func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) { writer.WriteHeader(http.StatusOK) var metadata M.Metadata metadata.Source = sHttp.SourceAddress(request) - conn := newGunConn(request.Body, writer, request.Body) + conn := newGunConn(request.Body, writer) s.handler.NewConnection(request.Context(), conn, metadata) } diff --git a/transport/v2rayhttp/client.go b/transport/v2rayhttp/client.go index 56616bd7..7e06e18e 100644 --- a/transport/v2rayhttp/client.go +++ b/transport/v2rayhttp/client.go @@ -141,6 +141,7 @@ func (c *Client) dialHTTP2(ctx context.Context) (net.Conn, error) { return nil, err } if response.StatusCode != 200 { + pipeInWriter.Close() return nil, E.New("unexpected status: ", response.StatusCode, " ", response.Status) } return &HTTPConn{