herenshan112 5 months ago
parent
commit
9a230f057d
  1. 13
      .vscode/settings.json
  2. 62
      controller/common/websocketApi.go
  3. 100
      controller/common/websocketpack/api.go
  4. 17
      controller/common/websocketpack/type.go
  5. 22
      controller/stock/entry.go
  6. 10
      controller/stock/stockApi.go

13
.vscode/settings.json

@ -0,0 +1,13 @@
{
"editor.gotoLocation.alternativeDeclarationCommand": "editor.action.revealDefinition",
"editor.gotoLocation.alternativeDefinitionCommand": "editor.action.revealDefinition",
"editor.gotoLocation.alternativeTypeDefinitionCommand": "editor.action.revealDefinition",
"editor.selectionHighlight": false,
"files.autoSave": "onFocusChange",
"editor.suggest.snippetsPreventQuickSuggestions": false,
"editor.quickSuggestions": {
"other": "on",
"comments": "off",
"strings": "on"
}
}

62
controller/common/websocketApi.go

@ -1,10 +1,12 @@
package common
import (
"appNewPlatform/controller/common/websocketpack"
"appNewPlatform/utils/formatoutput"
"errors"
"fmt"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
@ -178,3 +180,63 @@ func SendMsgToClient(clientId string, message []byte) error {
}
return nil
}
/*
========================================================================================================
使用webstock
========================================================================================================
*/
// var (
// upgrader = websocket.Upgrader{
// // 允许跨域
// CheckOrigin: func(r *http.Request) bool {
// return true
// },
// }
// )
func wsHandler(w http.ResponseWriter, r *http.Request) {
// w.Write([]byte("hello"))
var (
wsConn *websocket.Conn
err error
conn *websocketpack.Connection
data []byte
)
// 完成ws协议的握手操作
// Upgrade:websocket
if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
return
}
if conn, err = websocketpack.InitConnection(wsConn); err != nil {
goto ERR
}
// 启动线程,不断发消息
go func() {
var (
err error
)
for {
if err = conn.WriteMessage([]byte("heartbeat")); err != nil {
return
}
time.Sleep(1 * time.Second)
}
}()
for {
if data, err = conn.ReadMessage(); err != nil {
goto ERR
}
if err = conn.WriteMessage(data); err != nil {
goto ERR
}
}
ERR:
conn.Close()
}

100
controller/common/websocketpack/api.go

@ -0,0 +1,100 @@
package websocketpack
import (
"errors"
"github.com/gorilla/websocket"
)
/*
*
@ 作者: 秦东
@ 时间: 2025-06-12 16:19:04
@ 功能: 初始化设置
*/
func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) {
conn = &Connection{
WsConnect: wsConn,
InChan: make(chan []byte, 1000),
OutChan: make(chan []byte, 1000),
CloseChan: make(chan byte, 1),
}
go conn.readLoop()
// 启动写协程
go conn.writeLoop()
return
}
/**
@ 作者: 秦东
@ 时间: 2025-06-12 16:21:26
@ 功能: 读取消息
*/
func (conn *Connection) ReadMessage() (data []byte, err error) {
select {
case data = <-conn.InChan:
case <-conn.CloseChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection) WriteMessage(data []byte) (err error) {
select {
case conn.OutChan <- data:
case <-conn.CloseChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection) Close() {
// 线程安全,可多次调用
conn.WsConnect.Close()
// 利用标记,让closeChan只关闭一次
conn.Mutex.Lock()
if !conn.IsClosed {
close(conn.CloseChan)
conn.IsClosed = true
}
conn.Mutex.Unlock()
}
// 内部实现
func (conn *Connection) readLoop() {
var (
data []byte
err error
)
for {
if _, data, err = conn.WsConnect.ReadMessage(); err != nil {
goto ERR
}
//阻塞在这里,等待inChan有空闲位置
select {
case conn.InChan <- data:
case <-conn.CloseChan: // closeChan 感知 conn断开
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection) writeLoop() {
var (
data []byte
err error
)
for {
select {
case data = <-conn.OutChan:
case <-conn.CloseChan:
goto ERR
}
if err = conn.WsConnect.WriteMessage(websocket.TextMessage, data); err != nil {
goto ERR
}
}
ERR:
conn.Close()
}

17
controller/common/websocketpack/type.go

@ -0,0 +1,17 @@
package websocketpack
import (
"sync"
"github.com/gorilla/websocket"
)
// 基础配置
type Connection struct {
WsConnect *websocket.Conn //ws指针对象
InChan chan []byte
OutChan chan []byte
CloseChan chan byte
Mutex sync.Mutex
IsClosed bool
}

22
controller/stock/entry.go

@ -1,5 +1,10 @@
package stock
import (
"net"
"sync"
)
type ApiMethod struct{}
// 获取天干地址
@ -7,3 +12,20 @@ type GainHeavenlyEarthly struct {
HeavenlyStems string `json:"heavenlyStems"` //天干
EarthlyBranches string `json:"earthlyBranches"` //地支
}
//心跳结构体
// 这是每个连接对象 每次接收到消息就会更新times为当前时间戳
type Session struct {
Id uint32
Con net.Conn
times int64
lock sync.Mutex
}
type SessionM struct {
sessions map[uint32]*Session
num uint32
lock sync.RWMutex
isWebSocket bool
// ser
}

10
controller/stock/stockApi.go

@ -0,0 +1,10 @@
package stock
import "time"
//这是更新时间函数
func (this *Session) UpdateTime() {
this.times = time.Now().Unix()
}
//--SESSION管理类-
Loading…
Cancel
Save