超级管理员 1 year ago
parent
commit
2e3824f481
  1. 116
      api/version1/webStocetmsg/server.go
  2. 25
      api/version1/webStocetmsg/webStocketConfig/config.ini
  3. 131
      api/version1/webStocetmsg/webStocketConfig/setConfig.go

116
api/version1/webStocetmsg/server.go

@ -0,0 +1,116 @@
package webstocetmsg
import (
"fmt"
"sync"
"time"
"github.com/golang-module/carbon"
)
var GatewayClients, GatewayUser, GatewayGroup sync.Map
/**
* @description: 客户端心跳检测超时即断开连接主要是为了降低服务端承载压力
* @param {string} clientID
* @return {*}
*/
func clientHeartbeatCheck(clientID string) {
for {
time.Sleep(5 * time.Second)
clientInterface, exists := GatewayClients.Load(clientID)
if !exists {
break
}
client, _ := clientInterface.(*WebSocketClientBase)
if (carbon.Now().Timestamp() - client.LastHeartbeat) > int64(HeartbeatTime) {
fmt.Println("Client", clientID, "heartbeat timeout")
client.Conn.Close()
GatewayClients.Delete(clientID)
break
}
}
}
/**
* @description: 客户端断线时自动踢出Uid绑定列表
* @param {string} clientID
* @param {string} uid
* @return {*}
*/
func clientUnBindUid(clientID string, uid string) {
value, ok := GatewayUser.Load(uid)
if ok {
users := value.(*WebSocketUserBase)
for k, v := range users.ClientID {
if v == clientID {
users.ClientID = append(users.ClientID[:k], users.ClientID[k+1:]...)
}
}
if len(users.ClientID) == 0 {
GatewayUser.Delete(uid)
}
}
}
/**
* @description: 客户端断线时自动踢出已加入的群组
* @param {string} clientID
* @return {*}
*/
func clientLeaveGroup(clientID string) {
// 使用 Load 方法获取值
value, ok := GatewayClients.Load(clientID)
if !ok {
// 如果没有找到对应的值,处理相应的逻辑
return
}
client := value.(*WebSocketClientBase)
// 遍历 JoinGroup
for _, v := range client.JoinGroup {
// 使用 Load 方法获取值
groupValue, groupOK := GatewayGroup.Load(v)
if !groupOK {
// 如果没有找到对应的值,处理相应的逻辑
continue
}
group := groupValue.(*WebSocketGroupBase)
// 在群组中找到对应的 clientID,并删除
for j, id := range group.ClientID {
if id == clientID {
copy(group.ClientID[j:], group.ClientID[j+1:])
group.ClientID = group.ClientID[:len(group.ClientID)-1]
// 如果群组中没有成员了,删除群组
if len(group.ClientID) == 0 {
GatewayGroup.Delete(v)
}
break
}
}
}
}

25
api/version1/webStocetmsg/webStocketConfig/config.ini

@ -0,0 +1,25 @@
[gateway]
# gateway服务运行占用端口 (默认占用端口:20818)
gatewayServicePort = 8282
# gRPC服务运行占用端口(默认占用端口:20819)
gRPCServicePort = 8182
# 消息发送缓冲区大小,如果这个值设置得太小,可能会导致服务端在发送大型消息时遇到问题 (默认大小:1024byte)
writeBufferSize = 1024
# websocket消息内容格式(默认:text,可选,text = 文本,binary = 二进制)
messageFormat = text
# 心跳超时时间,超时未发送ping包服务端自动断线(默认时长:180秒)
heartbeatTimeout = 360
[server]
# 限制服务端的来源(默认为空即不限制服务端来源,多个ip使用逗号分割即可)
ip = []
[cluster]
# 集群名称
clusterName = "gateway"

131
api/version1/webStocetmsg/webStocketConfig/setConfig.go

@ -0,0 +1,131 @@
package webstocketconfig
import (
"encoding/json"
"net"
"os"
"strconv"
"github.com/gorilla/websocket"
"gopkg.in/ini.v1"
)
var GatewayConfig = make(map[string]interface{})
/**
* @description: 初始化进程
* @return {*}
*/
func init() {
/**
* 初始化时定义服务的配置项
*/
GatewayConfig["GatewayServicePort"] = 20818
GatewayConfig["WriteBufferSize"] = 1024
GatewayConfig["HeartbeatTimeout"] = 180
GatewayConfig["GRPCServicePort"] = 20819
GatewayConfig["ClientIP"] = []string{}
GatewayConfig["MessageFormat"] = websocket.TextMessage
configPath := "./webstocketconfig/config.ini"
checkPath := func(path string) bool {
_, err := os.Stat(path)
return err == nil || os.IsExist(err)
}
if !checkPath(configPath) {
return
}
cnf, err := ini.Load(configPath)
if err != nil {
return
}
// 设置gRPC服务端运行端口
if cnf.Section("gateway").Key("gRPCServicePort").String() != "" {
gRPCServicePort, err := strconv.Atoi(cnf.Section("gateway").Key("gRPCServicePort").String())
if err == nil {
GatewayConfig["GRPCServicePort"] = gRPCServicePort
}
}
// 设置ws服务端运行端口
if cnf.Section("gateway").Key("gatewayServicePort").String() != "" {
gatewayServicePort, err := strconv.Atoi(cnf.Section("gateway").Key("gatewayServicePort").String())
if err == nil {
GatewayConfig["GatewayServicePort"] = gatewayServicePort
}
}
// 消息发送缓冲区大小
if cnf.Section("gateway").Key("writeBufferSize").String() != "" {
writeBufferSize, err := strconv.Atoi(cnf.Section("gateway").Key("writeBufferSize").String())
if err == nil {
GatewayConfig["WriteBufferSize"] = writeBufferSize
}
}
// 设置websocket消息体格式
if cnf.Section("gateway").Key("messageFormat").String() != "" {
if cnf.Section("gateway").Key("messageFormat").String() == "binary" {
GatewayConfig["MessageFormat"] = websocket.BinaryMessage
} else {
GatewayConfig["MessageFormat"] = websocket.TextMessage
}
}
// 心跳超时时间
if cnf.Section("gateway").Key("heartbeatTimeout").String() != "" {
heartbeatTimeout, err := strconv.Atoi(cnf.Section("gateway").Key("heartbeatTimeout").String())
if err == nil {
GatewayConfig["HeartbeatTimeout"] = heartbeatTimeout
}
}
if cnf.Section("server").Key("ip").String() != "" {
var ips []string
if err := json.Unmarshal([]byte(cnf.Section("server").Key("ip").String()), &ips); err != nil {
return
}
var validIPs []string
for _, ip := range ips {
if net.ParseIP(ip) != nil {
validIPs = append(validIPs, ip)
}
}
GatewayConfig["ServerIP"] = validIPs
}
}
Loading…
Cancel
Save