这里先简单介绍一下 websocket,确实只是简单介绍一下。
1. 应用场景有些场景下,比如交易 K 线,我们需要前端对后端进行轮询来不断获取或者更新资源状态。轮询的问题毫无以为是一种笨重的方式,因为每一次 http 请求除了本身的资源信息传输外还有三次握手以及四次挥手。替代轮询的一种方案是复用一个 http 连接,更准确的复用同一个 tcp 连接。这种方式可以是 http 长连接,也可以是 websocket。
2. websocket 和 http 长连接的区别首先 websocket 和 http 是完全不同的两种协议,虽然底层都是 tcp/ip。http 长连接也是属于 http 协议。http 协议和 websocket 的最大区别就是 http 是基于 request/response 模式,而 websocket 的 client 和 server 端却可以随意发起 data push,比如服务端向 app 端的消息下发就比较适合使用 websocket(这种场景下使用 http 长连接也是可以,client 端定时向 server 端发送消息,比如 heatbeat,然后 server 端要 push 的消息以 response 的形式返回给 client)。
这里 https:// gist.github.com/legendt kl/1922db71553c849ef0029429f737aadb 我写一个 github gist 代码片段,给大家体验一下。
3. Golang 最佳实践这里先定义一下我们的使用场景:交易所有很多数据,比如 K 线,比如盘口数据都是在定时刷新的,这里就可以用 websocket 来做。简单来说,前端向后端请求特定的数据,比如 K 线数据,前端和后端建立 websocket 连接,后端持续不断返回信息给前端。
在我们编写 websocket 接口之前,需要略微考虑一下如何抽象,如何设计我们 websocket 框架从而保证代码的良好的扩展性。
3.1 Hub首先 hub 是什么东西,下图是 google image 查出来的结果。简单做个类比,图片中的 USB 3.0 口(蓝色)就相当于一个个 tcp 连接,上面汇总的接口就是我们 hub 的上流数据源。
在我第一时间想去定义 hub 的粒度想到的是使用 controller,也就是请求的 router。但是后来想了一下这样设计太复杂了,因为一个 router 的参数有很多种,不同参数可能就对应不同数据。
那么应该怎么去定义呢?不是从功能性上去定义,而是从数据源上定义。我们只要简单看一下需要提供多少类不停更新的数据,这里的每一类就对应一个 hub。
3.2 Broadcast通过 3.1 我们定义了 hub,下面要考虑的就是如何去做广播。
最简单的方式遍历一个 hub 上面所有的 conn 然后进行 conn.Write()。这种方法非常的简单粗暴,问题也很明显:每个 conn.Write() 都是一个网络 IO,我们这是在串行地处理多个网络 IO,低效。
串行改并行。我们还是遍历 hub 上面所有的 conn,然后每一个 conn.Write() 起一个 goroutine 去做,这样其实就是 IO 多路复用。
思考一下上面这种方式还有没有问题。其实是有的:扩展性的问题。如果 websocket 的接口参数比较多,我们要根据参数对不同的 conn 返回不同的结果,那么应该怎么做的?也很简单,对上面的 conn 进行一次封装,封装成一个 struct。我在很久以前一篇文章讨论函数的扩展性的时候也说过将函数形参设计成 struct 是一种不错的扩展方式。
3.3 Hub 数据感知接 3.2,broadcast 的数据怎么得到,主动去信息源拉,还是别人 push 过来?最简单的实现方式是构造生产者-消费者模型,而在 golang 中实现生产者-消费者模型尤其简单。结合到我们这里,我们只需要在 hub 中定一个 channel 即可。
我的理解,要广播的数据如何生存应该都是业务逻辑,不应该和基础框架耦合在一起。
4. talk is cheap, show me the code代码以下面两个 package 为例:
http:// github.com/astaxie/beeg o http:// github.com/gorilla/webs ocketController 处理。
type WsController struct { beego.Controller}var upgrader = websocket.Upgrader{ ReadBufferSize: maxMessageSize, WriteBufferSize: maxMessageSize,}func (this *WsController) WSTest() { defer this.ServeJSON() ws, err := upgrader.Upgrade(this.Ctx.ResponseWriter, this.Ctx.Request, nil) // 这里 ws 就是 websocket.Conn,是 websocket 对 net.Conn 的封装 if err != nil { this.Data['json'] = 'fail' return } // WsClient 是我们对 websocket.Conn 的再一层封装,后面细说 wsClient := &WsClient{ WsConn: ws, WsSend: make(chan []byte, maxMessageSize), HttpRequest: this.Ctx.Request, //记录请求参数 } service.ServeWsExample(wsClient)}
WsClient 结构。
type WsClient struct { WsConn *websocket.Conn WsSend chan []byte HttpRequest http.Request }
WsClient 有两个基本方法:对 client 端发送过来的数据进行处理,以及对 server 端下发的数据进行处理。这使用函数作为参数,也是为了实现最大的灵活性,但是函数参数的设计不一定是最合适的,如果大家有更合适的,欢迎指教。
func (client *WsClient) ReadMsg(fn func(c *WsClient, s string)) { for { _, msg, err := client.WsConn.ReadMessage() if err != nil { break } fn(client, string(msg)) }}func (client *WsClient) WriteMsg(fn func(s string) error) { for { select { case msg, ok := <-client.WsSend: if !ok { client.WsConn.WriteMessage(websocket.CloseMessage, []byte{}) return } if err := fn(string(msg)); err != nil { return } } }}
Hub。
type WsHub struct { Clients map[*WsClient]bool // clients to be broadcast Broadcast chan string Register chan *WsClient UnRegister chan *WsClient LastMsg string // 最近一次的广播内容。如果我们是 1 分钟广播一次,新来一个请求还没有到广播的时间,就返回最近一次广播的内容 Identity string //可以用作做标志}
Hub 包括一个 export 的 Run 方法和一个私有方法 broadCast()。
func (hub *WsHub) Run() { for { select { case c := <-hub.Register: hub.Clients[c] = true c.WsSend <- []byte(hub.LastMsg) break case c := <-hub.UnRegister: _, ok := hub.Clients[c] if ok { delete(hub.Clients, c) close(c.WsSend) } break case msg := <-hub.Broadcast: hub.LastMsg = msg hub.broadCast() break } }}func (hub *WsHub) broadCast() { for c := range hub.Clients { select { case c.WsSend <- []byte(hub.LastMsg): break default: close(c.WsSend) delete(hub.Clients, c) } }}
我们现在把 client 和 hub 串起来,也就是第一个例子中的 service.ServeWsExample(wsClient) 。
// 初始化func initWs() { WsHubs = make(map[string]*util.WsHub) hubList := []string{'hub1', 'hub2', 'hub2'} for _, hub := range hubList { WsHubs[hub] = &WsHub { Clients: make(map[*util.WsClient]bool), Broadcast: make(chan string), Register: make(chan *util.WsClient), UnRegister: make(chan *util.WsClient), //Identity: hub.String(), } go mockBroadCast(WsHubs[hub].Broadcast) go WsHubs[hub].Run() }}func mockBroadCast(broadCast chan string) { for { broadCast <- 'hello world' time.Sleep(time.Second * 10) }}// controller 请求路由到相应的 ServeWsExample 函数func ServeWsExample(c *util.WsClient, pair string) { defer func() { WsHubs[pair].UnRegister <- c c.WsConn.Close() }() WsHubs[pair].Register <- c go c.WriteMsg(func(string) error {}) c.ReadMsg(func(*WsClient, string){})}
还有一点需要说明的是,这里没有写出生成者(也就是向 Hub 发送数据的进程),因为生产者的写法比较灵活,这里还是简单写一个吧。
//initfunc init() { go Producer()}// 生产者func Producer() { for { // generate msg msg := 'hello, I am legendtkl' // select the proper hub to send the msg WsHubs['hub1'].Broadcast <- msg }} 5. 写在最后
工作之后一直思考的一个问题是,怎么衡量代码的扩展性以及如何写出高扩展性的代码?欢迎交流。
上面的 demo 代码如果有需要,我打包一下传到 github。
来自: https://zhuanlan.zhihu.com/p/35167916