Skip to content

Commit 2b5e11a

Browse files
committed
chore: pool and some
1 parent f3eb19f commit 2b5e11a

6 files changed

Lines changed: 37 additions & 37 deletions

File tree

pkg/arch/dispatchers/dispatchers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type Dispatcher struct {
2020
ctx context.Context
2121
ctxCancel context.CancelFunc
2222
forwarders *structure.IndexMap[*arch.ForwarderWithValues]
23-
totalWeights map[uint32]uint32 // priority -> totalWeight
23+
totalWeights map[uint64]uint64 // priority -> totalWeight
2424
currentIndex int
2525
senderPacket *channel.SafeSender[packet.IPacket]
2626
connsMap *concurrent.ConcurrentMap[string, string] // conn -> forwarder
@@ -33,7 +33,7 @@ func NewDispatcher(parentCtx context.Context) *Dispatcher {
3333
ctx: ctx,
3434
ctxCancel: cancel,
3535
forwarders: structure.NewIndexMap[*arch.ForwarderWithValues](),
36-
totalWeights: make(map[uint32]uint32),
36+
totalWeights: make(map[uint64]uint64),
3737
currentIndex: 0,
3838
MetaConcurrentStructure: *concurrent.NewMetaSyncStructure[Dispatcher](),
3939
senderPacket: channel.NewSafeSenderWithParentCtxAndSize[packet.IPacket](ctx, 16),
@@ -97,7 +97,7 @@ func (dispatcher *Dispatcher) Next() (uuid string, forwarder arch.IForwarder, ok
9797
return
9898
}
9999

100-
totalWeight, _ok := hof.NewStreamWithMap(dispatcher.totalWeights).Max(func(bigger container.Entry[uint32, uint32], smaller container.Entry[uint32, uint32]) bool {
100+
totalWeight, _ok := hof.NewStreamWithMap(dispatcher.totalWeights).Max(func(bigger container.Entry[uint64, uint64], smaller container.Entry[uint64, uint64]) bool {
101101
return bigger.GetKey() > smaller.GetKey()
102102
})
103103

pkg/arch/forwarders/forwarders.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,6 @@ func (forwarder *Forwarder) HandlePacket(p packet.IPacket) bool {
4040
return true
4141
}
4242

43-
func (forwarder *Forwarder) receivePacket() packet.IPacket {
44-
r, err := comm.ReceivePacket(forwarder.conn)
45-
if err != nil {
46-
if forwarder.GetCtx().Err() != nil {
47-
return nil
48-
}
49-
r = &packet.PacketUnknown{Err: err}
50-
}
51-
return r
52-
}
53-
5443
func (forwarder *Forwarder) GetCtx() context.Context {
5544
return forwarder.conn.GetCtx()
5645
}
@@ -69,6 +58,17 @@ func (forwarder *Forwarder) GetChanSendPacket() <-chan packet.IPacket {
6958

7059
// ---------------------------------------------------------------------
7160

61+
func (forwarder *Forwarder) receivePacket() packet.IPacket {
62+
r, err := comm.ReceivePacket(forwarder.conn)
63+
if err != nil {
64+
if forwarder.GetCtx().Err() != nil {
65+
return nil
66+
}
67+
r = &packet.PacketUnknown{Err: err}
68+
}
69+
return r
70+
}
71+
7272
func (forwarder *Forwarder) routineRead() {
7373
pattern.NewConfigSelectContextAndChannel[packet.IPacket]().
7474
WithCtx(forwarder.GetCtx()).

pkg/base/network/udp_listener.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616

1717
var logger = logging.GetLogger()
1818

19-
var udpListenerBufSize uint32 = 10 * 1024
19+
var udpListenerBufSize uint64 = 10 * 1024
2020
var udpListenerBufPool = concurrent.NewPool(func() *[]byte {
2121
buf := make([]byte, udpListenerBufSize)
2222
return &buf
@@ -218,7 +218,7 @@ func (ul *UDPListener) dispatch() {
218218
continue
219219
}
220220

221-
data := make([]byte, n)
221+
data := make([]byte, n) //TODO 这里可以用pool
222222
copy(data, buf[:n])
223223
ok := c.receiver.TryPush(data)
224224
if !ok {

pkg/comm/comm.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010

1111
// var logger = logging.GetLogger()
1212

13-
var commDataSize uint32 = 128 * 1024
14-
var commBufSize uint32 = 4 + commDataSize
13+
var commDataSize uint64 = 64 * 1024
14+
var commBufSize uint64 = 8 + 1024 + commDataSize
1515

1616
var commBufPool = concurrent.NewPool(func() *[]byte {
1717
buf := make([]byte, commBufSize)
@@ -28,37 +28,37 @@ func SendPacket(dst io.Writer, p packet.IPacket) (int, error) {
2828
return 0, err
2929
}
3030

31-
bufPtr := commBufPool.Get()
32-
defer commBufPool.Put(bufPtr)
33-
buf := *bufPtr
34-
35-
length := uint32(len(data))
31+
length := uint64(len(data))
3632
if length > commBufSize {
3733
return 0, io.ErrShortBuffer
3834
}
3935

40-
buf = buf[:4+length]
41-
binary.BigEndian.PutUint32(buf[:4], length)
42-
copy(buf[4:], data)
43-
return dst.Write(buf)
44-
}
45-
46-
func ReceivePacket(src io.Reader) (packet.IPacket, error) {
4736
bufPtr := commBufPool.Get()
4837
defer commBufPool.Put(bufPtr)
4938
buf := *bufPtr
5039

51-
buf = buf[:4]
52-
_, err := io.ReadFull(src, buf)
40+
buf = buf[:8+length]
41+
binary.BigEndian.PutUint64(buf[:8], length)
42+
copy(buf[8:], data)
43+
return dst.Write(buf)
44+
}
45+
46+
func ReceivePacket(src io.Reader) (packet.IPacket, error) {
47+
lenBuf := make([]byte, 8)
48+
_, err := io.ReadFull(src, lenBuf)
5349
if err != nil {
5450
return nil, err
5551
}
5652

57-
length := binary.BigEndian.Uint32(buf)
53+
length := binary.BigEndian.Uint64(lenBuf)
5854
if length > commBufSize {
5955
return nil, io.ErrShortBuffer
6056
}
6157

58+
bufPtr := commBufPool.Get()
59+
defer commBufPool.Put(bufPtr)
60+
buf := *bufPtr
61+
6262
buf = buf[:length]
6363
n, err := io.ReadFull(src, buf)
6464
if err != nil {

pkg/config/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ type ConfigItemProxy struct {
1212
Remotes []string `toml:"remotes"`
1313
Backend string `toml:"backend"`
1414
Frontend string `toml:"frontend"`
15-
Priority uint32 `toml:"priority"`
16-
Weight uint32 `toml:"weight"`
15+
Priority uint64 `toml:"priority"`
16+
Weight uint64 `toml:"weight"`
1717
}
1818

1919
type ConfigClient struct {

pkg/packet/packets.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ type PacketProxyNegotiationRequest struct {
1111
Name string
1212
Proto string
1313
FrontendAddr string
14-
Priority uint32
15-
Weight uint32
14+
Priority uint64
15+
Weight uint64
1616
Token string
1717
}
1818

0 commit comments

Comments
 (0)