Skip to content

Commit

Permalink
+limit+test code
Browse files Browse the repository at this point in the history
  • Loading branch information
guonaihong committed May 21, 2024
1 parent dc87e87 commit b18a660
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 11 deletions.
182 changes: 181 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* 支持 epoll/kqueue
* 低内存占用
* 高tps
* 对websocket的兼容性较高,完整实现rfc6455, rfc7692

# 暂不支持

Expand All @@ -26,10 +27,33 @@

早期阶段,暂时不建议生产使用

## 内容
* [安装](#Installation)
* [例子](#example)
* [net/http升级到websocket服务端](#net-http升级到websocket服务端)
* [gin升级到websocket服务端](#gin升级到websocket服务端)
* [客户端](#客户端)
* [配置函数](#配置函数)
* [客户端配置参数](#客户端配置)
* [配置header](#配置header)
* [配置握手时的超时时间](#配置握手时的超时时间)
* [配置自动回复ping消息](#配置自动回复ping消息)
* [配置客户端最大读取message](#配置客户端最大读message)
* [服务配置参数](#服务端配置)
* [配置服务自动回复ping消息](#配置服务自动回复ping消息)
* [配置服务端最大读取message](#配置服务端最大读message)
# 例子-服务端

### net http升级到websocket服务端
```go

package main

import (
"fmt"

"github.com/antlabs/greatws"
)

type echoHandler struct{}

func (e *echoHandler) OnOpen(c *greatws.Conn) {
Expand Down Expand Up @@ -92,7 +116,163 @@ func main() {
log.Println("non-tls server exit:", http.Serve(rawTCP, mux))
}
```
[返回](#内容)

### gin升级到websocket服务端
```go
package main

import (
"fmt"

"github.com/antlabs/greatws"
"github.com/gin-gonic/gin"
)

type handler struct{
m *greatws.MultiEventLoop
}

func (h *handler) OnOpen(c *greatws.Conn) {
fmt.Printf("服务端收到一个新的连接")
}

func (h *handler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
// 如果msg的生命周期不是在OnMessage中结束,需要拷贝一份
// newMsg := make([]byte, len(msg))
// copy(newMsg, msg)

fmt.Printf("收到客户端消息:%s\n", msg)
c.WriteMessage(op, msg)
// os.Stdout.Write(msg)
}

func (h *handler) OnClose(c *greatws.Conn, err error) {
fmt.Printf("服务端连接关闭:%v\n", err)
}

func main() {
r := gin.Default()
var h handler
h.m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
h.m.Start()

r.GET("/", func(c *gin.Context) {
con, err := greatws.Upgrade(c.Writer, c.Request, greatws.WithServerCallback(h.m), greatws.WithServerMultiEventLoop(h.m))
if err != nil {
return
}
con.StartReadLoop()
})
r.Run()
}
```
[返回](#内容)

### 客户端
```go
package main

import (
"fmt"
"time"

"github.com/antlabs/greatws"
)

var m *greatws.MultiEventLoop
type handler struct{}

func (h *handler) OnOpen(c *greatws.Conn) {
fmt.Printf("客户端连接成功\n")
}

func (h *handler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
// 如果msg的生命周期不是在OnMessage中结束,需要拷贝一份
// newMsg := make([]byte, len(msg))
// copy(newMsg, msg)

fmt.Printf("收到服务端消息:%s\n", msg)
c.WriteMessage(op, msg)
time.Sleep(time.Second)
}

func (h *handler) OnClose(c *greatws.Conn, err error) {
fmt.Printf("客户端端连接关闭:%v\n", err)
}

func main() {
m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
m.Start()
c, err := greatws.Dial("ws://127.0.0.1:8080/", greatws.WithClientCallback(&handler{}), greatws.WithServerMultiEventLoop(h.m))
if err != nil {
fmt.Printf("连接失败:%v\n", err)
return
}

c.WriteMessage(opcode.Text, []byte("hello"))
time.Sleep(time.Hour) //demo里面等待下OnMessage 看下执行效果,因为greatws.Dial和WriteMessage都是非阻塞的函数调用,不会卡住主go程
}
```
[返回](#内容)
## 配置函数
### 客户端配置参数
#### 配置header
```go
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientHTTPHeader(http.Header{
"h1": "v1",
"h2":"v2",
}))
}
```
[返回](#内容)
#### 配置握手时的超时时间
```go
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientDialTimeout(2 * time.Second))
}
```
[返回](#内容)

#### 配置自动回复ping消息
```go
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientReplyPing())
}
```
[返回](#内容)
#### 配置客户端最大读message
```go
// 限制客户端最大服务返回返回的最大包是1024,如果超过这个大小报错
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientReadMaxMessage(1024))
```
[返回](#内容)
### 服务端配置参数
#### 配置服务自动回复ping消息
```go
func main() {
c, err := greatws.Upgrade(w, r, greatws.WithServerReplyPing())
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
```
[返回](#内容)

#### 配置服务端最大读message
```go
func main() {
// 配置服务端读取客户端最大的包是1024大小, 超过该值报错
c, err := greatws.Upgrade(w, r, greatws.WithServerReadMaxMessage(1024))
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
```
[返回](#内容)
## 100w websocket长链接测试

### e5 洋垃圾机器
Expand Down
113 changes: 113 additions & 0 deletions common_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1728,4 +1728,117 @@ func Test_CommonOption(t *testing.T) {
t.Error("not run server:method fail")
}
})

t.Run("22.1.WithServerReadMaxMessage:local-Upgrade", func(t *testing.T) {
var tsort testServerOptionReadTimeout

tsort.err = make(chan error, 1)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := Upgrade(w, r, WithServerCallback(&tsort), WithServerReadMaxMessage(1<<10), WithServerMultiEventLoop(m))
if err != nil {
t.Error(err)
}
c.StartReadLoop()
}))

defer ts.Close()

url := strings.ReplaceAll(ts.URL, "http", "ws")
con, err := Dial(url, WithClientMultiEventLoop(m), WithClientOnMessageFunc(func(c *Conn, mt Opcode, payload []byte) {
}))
if err != nil {
t.Error(err)
return
}
defer con.Close()

con.WriteMessage(Text, bytes.Repeat([]byte("1"), 1025))
select {
case d := <-tsort.err:
if d == nil {
t.Errorf("got:nil, need:error\n")
}
case <-time.After(1000 * time.Millisecond):
t.Errorf(" Test_ServerOption:WithServerReadMaxMessage timeout\n")
}
if atomic.LoadInt32(&tsort.run) != 1 {
t.Error("not run server:method fail")
}
})

t.Run("22.2.WithServerReadMaxMessage", func(t *testing.T) {
var tsort testServerOptionReadTimeout

upgrade := NewUpgrade(WithServerCallback(&tsort), WithServerReadMaxMessage(1<<10), WithServerMultiEventLoop(m))
tsort.err = make(chan error, 1)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := upgrade.Upgrade(w, r)
if err != nil {
t.Error(err)
}
c.StartReadLoop()
}))

defer ts.Close()

url := strings.ReplaceAll(ts.URL, "http", "ws")
con, err := Dial(url, WithClientMultiEventLoop(m), WithClientOnMessageFunc(func(c *Conn, mt Opcode, payload []byte) {
}))
if err != nil {
t.Error(err)
return
}
defer con.Close()

con.WriteMessage(Text, bytes.Repeat([]byte("1"), 1025))
select {
case d := <-tsort.err:
if d == nil {
t.Errorf("got:nil, need:error\n")
}
case <-time.After(100 * time.Millisecond):
t.Errorf(" Test_ServerOption:WithServerReadTimeout timeout\n")
}
if atomic.LoadInt32(&tsort.run) != 1 {
t.Error("not run server:method fail")
}
})

t.Run("22.3.WithClientReadMaxMessage", func(t *testing.T) {
var tsort testServerOptionReadTimeout

upgrade := NewUpgrade(WithServerCallback(&tsort), WithServerReadTimeout(time.Millisecond*60), WithServerMultiEventLoop(m))
tsort.err = make(chan error, 1)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := upgrade.Upgrade(w, r)
if err != nil {
t.Error(err)
}
c.WriteMessage(Binary, bytes.Repeat([]byte("1"), 1025))
c.StartReadLoop()
}))

defer ts.Close()

url := strings.ReplaceAll(ts.URL, "http", "ws")
con, err := Dial(url, WithClientMultiEventLoop(m), WithClientReadMaxMessage(1<<10), WithClientOnMessageFunc(func(c *Conn, mt Opcode, payload []byte) {
}))
if err != nil {
t.Error(err)
return
}
defer con.Close()

select {
case d := <-tsort.err:
if d == nil {
t.Errorf("got:nil, need:error\n")
}
case <-time.After(100 * time.Millisecond):
t.Errorf(" Test_ServerOption:WithServerReadTimeout timeout\n")
}
if atomic.LoadInt32(&tsort.run) != 1 {
t.Error("not run server:method fail")
}
})
}
26 changes: 23 additions & 3 deletions conn_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package greatws
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"log/slog"
"math/rand"
Expand Down Expand Up @@ -159,6 +160,10 @@ func (c *Conn) readHeader() (sucess bool, err error) {
head = head[8:]
}

if c.readMaxMessage > 0 && c.rh.PayloadLen > c.readMaxMessage {
return false, TooBigMessage
}

if c.rh.Mask {
c.rh.MaskKey = binary.LittleEndian.Uint32(head[:4])
}
Expand Down Expand Up @@ -493,14 +498,29 @@ func (c *Conn) processCallbackData(f frame.Frame2, payload *[]byte, rsv1 bool, d
return false
}

func (c *Conn) writeAndMaybeOnClose(err error) error {
var sc *StatusCode
defer func() {
c.onCloseOnce.Do(&c.mu2, func() {
c.Callback.OnClose(c, err)
})
}()

if errors.As(err, &sc) {
if err := c.WriteTimeout(opcode.Close, sc.toBytes(), 2*time.Second); err != nil {
return err
}
}
return nil
}

func (c *Conn) writeErrAndOnClose(code StatusCode, userErr error) error {
defer func() {
c.onCloseOnce.Do(&c.mu2, func() {
c.Callback.OnClose(c, userErr)
})
}()

if err := c.WriteTimeout(opcode.Close, statusCodeToBytes(code), 2*time.Second); err != nil {
if err := c.WriteTimeout(opcode.Close, code.toBytes(), 2*time.Second); err != nil {
return err
}

Expand Down Expand Up @@ -638,7 +658,7 @@ func (c *Conn) WriteControl(op Opcode, data []byte) (err error) {
}

func (c *Conn) WriteCloseTimeout(sc StatusCode, t time.Duration) (err error) {
buf := statusCodeToBytes(sc)
buf := sc.toBytes()
return c.WriteTimeout(opcode.Close, buf, t)
}

Expand Down
5 changes: 5 additions & 0 deletions conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,11 @@ fail:
bytespool.PutBytes(c.rbuf)
c.rbuf = nil
}

if err != nil {
// 如果是status code类型,要回写符合rfc的close包
c.writeAndMaybeOnClose(err)
}
return err
}

Expand Down
Loading

0 comments on commit b18a660

Please sign in to comment.