Improve grpc lite conn

This commit is contained in:
世界 2022-08-27 16:40:00 +08:00
parent a4fdc7b688
commit 7301a2e69a
No known key found for this signature in database
GPG Key ID: CD109927C34A63C4
2 changed files with 58 additions and 41 deletions

View File

@ -4,6 +4,7 @@
package v2raygrpclite package v2raygrpclite
import ( import (
std_bufio "bufio"
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"io" "io"
@ -16,6 +17,7 @@ import (
"github.com/sagernet/sing/common/buf" "github.com/sagernet/sing/common/buf"
"github.com/sagernet/sing/common/bufio" "github.com/sagernet/sing/common/bufio"
E "github.com/sagernet/sing/common/exceptions" E "github.com/sagernet/sing/common/exceptions"
"github.com/sagernet/sing/common/rw"
) )
var ErrInvalidLength = E.New("invalid length") var ErrInvalidLength = E.New("invalid length")
@ -23,18 +25,19 @@ var ErrInvalidLength = E.New("invalid length")
var _ net.Conn = (*GunConn)(nil) var _ net.Conn = (*GunConn)(nil)
type GunConn struct { type GunConn struct {
reader io.Reader reader *std_bufio.Reader
writer io.Writer writer io.Writer
flusher http.Flusher
create chan struct{} create chan struct{}
err error err error
cached []byte readRemaining int
cachedIndex int
} }
func newGunConn(reader io.Reader, writer io.Writer) *GunConn { func newGunConn(reader io.Reader, writer io.Writer, flusher http.Flusher) *GunConn {
return &GunConn{ return &GunConn{
reader: reader, reader: std_bufio.NewReader(reader),
writer: writer, writer: writer,
flusher: flusher,
} }
} }
@ -46,7 +49,7 @@ func newLateGunConn(writer io.Writer) *GunConn {
} }
func (c *GunConn) setup(reader io.Reader, err error) { func (c *GunConn) setup(reader io.Reader, err error) {
c.reader = reader c.reader = std_bufio.NewReader(reader)
c.err = err c.err = err
close(c.create) close(c.create)
} }
@ -59,42 +62,34 @@ func (c *GunConn) Read(b []byte) (n int, err error) {
} }
} }
if c.cached != nil { if c.readRemaining > 0 {
n = copy(b, c.cached[c.cachedIndex:]) if len(b) > c.readRemaining {
c.cachedIndex += n b = b[:c.readRemaining]
if c.cachedIndex == len(c.cached) {
buf.Put(c.cached)
c.cached = nil
} }
n, err = c.reader.Read(b)
c.readRemaining -= n
return return
} }
buffer := buf.Get(5)
_, err = io.ReadFull(c.reader, buffer)
if err != nil {
return 0, err
}
grpcPayloadLen := binary.BigEndian.Uint32(buffer[1:])
buf.Put(buffer)
buffer = buf.Get(int(grpcPayloadLen)) _, err = c.reader.Discard(6)
_, err = io.ReadFull(c.reader, buffer)
if err != nil { if err != nil {
return 0, io.ErrUnexpectedEOF return
} }
protobufPayloadLen, protobufLengthLen := binary.Uvarint(buffer[1:])
if protobufLengthLen == 0 { dataLen, err := binary.ReadUvarint(c.reader)
return 0, ErrInvalidLength if err != nil {
return
} }
if grpcPayloadLen != uint32(protobufPayloadLen)+uint32(protobufLengthLen)+1 {
return 0, ErrInvalidLength readLen := int(dataLen)
c.readRemaining = readLen
if len(b) > readLen {
b = b[:readLen]
} }
n = copy(b, buffer[1+protobufLengthLen:])
if n < int(protobufPayloadLen) { n, err = c.reader.Read(b)
c.cached = buffer c.readRemaining -= n
c.cachedIndex = 1 + int(protobufLengthLen) + n return
return n, nil
}
return n, nil
} }
func (c *GunConn) Write(b []byte) (n int, err error) { func (c *GunConn) Write(b []byte) (n int, err error) {
@ -111,11 +106,33 @@ func (c *GunConn) Write(b []byte) (n int, err error) {
return len(b), err return len(b), err
} }
/*func (c *GunConn) ReadBuffer(buffer *buf.Buffer) error { func uLen(x uint64) int {
i := 0
for x >= 0x80 {
x >>= 7
i++
}
return i + 1
} }
func (c *GunConn) WriteBuffer(buffer *buf.Buffer) error { func (c *GunConn) WriteBuffer(buffer *buf.Buffer) error {
}*/ defer buffer.Release()
dataLen := buffer.Len()
varLen := uLen(uint64(dataLen))
header := buffer.ExtendHeader(6 + varLen)
binary.BigEndian.PutUint32(header[1:5], uint32(1+varLen+dataLen))
header[5] = 0x0A
binary.PutUvarint(header[6:], uint64(dataLen))
err := rw.WriteBytes(c.writer, buffer.Bytes())
if c.flusher != nil {
c.flusher.Flush()
}
return err
}
func (c *GunConn) FrontHeadroom() int {
return 6 + binary.MaxVarintLen64
}
func (c *GunConn) Close() error { func (c *GunConn) Close() error {
return common.Close(c.reader, c.writer) return common.Close(c.reader, c.writer)

View File

@ -69,7 +69,7 @@ func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusOK) writer.WriteHeader(http.StatusOK)
var metadata M.Metadata var metadata M.Metadata
metadata.Source = sHttp.SourceAddress(request) metadata.Source = sHttp.SourceAddress(request)
conn := newGunConn(request.Body, writer) conn := newGunConn(request.Body, writer, writer.(http.Flusher))
s.handler.NewConnection(request.Context(), conn, metadata) s.handler.NewConnection(request.Context(), conn, metadata)
} }