diff --git a/api/version1/webStocetmsg/server.go b/api/version1/webStocetmsg/server.go new file mode 100644 index 0000000..595bb3b --- /dev/null +++ b/api/version1/webStocetmsg/server.go @@ -0,0 +1,116 @@ +package webstocetmsg + +import ( + "fmt" + "sync" + "time" + + "github.com/golang-module/carbon" +) + +var GatewayClients, GatewayUser, GatewayGroup sync.Map + +/** + * @description: 客户端心跳检测,超时即断开连接(主要是为了降低服务端承载压力) + * @param {string} clientID + * @return {*} + */ + +func clientHeartbeatCheck(clientID string) { + + for { + + time.Sleep(5 * time.Second) + + clientInterface, exists := GatewayClients.Load(clientID) + + if !exists { + + break + } + + client, _ := clientInterface.(*WebSocketClientBase) + + if (carbon.Now().Timestamp() - client.LastHeartbeat) > int64(HeartbeatTime) { + + fmt.Println("Client", clientID, "heartbeat timeout") + + client.Conn.Close() + GatewayClients.Delete(clientID) + break + } + } +} + +/** + * @description: 客户端断线时自动踢出Uid绑定列表 + * @param {string} clientID + * @param {string} uid + * @return {*} + */ +func clientUnBindUid(clientID string, uid string) { + + value, ok := GatewayUser.Load(uid) + + if ok { + + users := value.(*WebSocketUserBase) + + for k, v := range users.ClientID { + + if v == clientID { + + users.ClientID = append(users.ClientID[:k], users.ClientID[k+1:]...) + } + } + + if len(users.ClientID) == 0 { + + GatewayUser.Delete(uid) + } + + } +} + +/** + * @description: 客户端断线时自动踢出已加入的群组 + * @param {string} clientID + * @return {*} + */ +func clientLeaveGroup(clientID string) { + // 使用 Load 方法获取值 + value, ok := GatewayClients.Load(clientID) + if !ok { + // 如果没有找到对应的值,处理相应的逻辑 + return + } + + client := value.(*WebSocketClientBase) + + // 遍历 JoinGroup + for _, v := range client.JoinGroup { + // 使用 Load 方法获取值 + groupValue, groupOK := GatewayGroup.Load(v) + if !groupOK { + // 如果没有找到对应的值,处理相应的逻辑 + continue + } + + group := groupValue.(*WebSocketGroupBase) + + // 在群组中找到对应的 clientID,并删除 + for j, id := range group.ClientID { + if id == clientID { + copy(group.ClientID[j:], group.ClientID[j+1:]) + group.ClientID = group.ClientID[:len(group.ClientID)-1] + + // 如果群组中没有成员了,删除群组 + if len(group.ClientID) == 0 { + GatewayGroup.Delete(v) + } + + break + } + } + } +} diff --git a/api/version1/webStocetmsg/webStocketConfig/config.ini b/api/version1/webStocetmsg/webStocketConfig/config.ini new file mode 100644 index 0000000..7c1fd07 --- /dev/null +++ b/api/version1/webStocetmsg/webStocketConfig/config.ini @@ -0,0 +1,25 @@ +[gateway] +# gateway服务运行占用端口 (默认占用端口:20818) +gatewayServicePort = 8282 + +# gRPC服务运行占用端口(默认占用端口:20819) +gRPCServicePort = 8182 + +# 消息发送缓冲区大小,如果这个值设置得太小,可能会导致服务端在发送大型消息时遇到问题 (默认大小:1024byte) +writeBufferSize = 1024 + +# websocket消息内容格式(默认:text,可选,text = 文本,binary = 二进制) +messageFormat = text + +# 心跳超时时间,超时未发送ping包服务端自动断线(默认时长:180秒) +heartbeatTimeout = 360 + +[server] + +# 限制服务端的来源(默认为空即不限制服务端来源,多个ip使用逗号分割即可) +ip = [] + +[cluster] + +# 集群名称 +clusterName = "gateway" \ No newline at end of file diff --git a/api/version1/webStocetmsg/webStocketConfig/setConfig.go b/api/version1/webStocetmsg/webStocketConfig/setConfig.go new file mode 100644 index 0000000..3d5efd2 --- /dev/null +++ b/api/version1/webStocetmsg/webStocketConfig/setConfig.go @@ -0,0 +1,131 @@ +package webstocketconfig + +import ( + "encoding/json" + "net" + "os" + "strconv" + + "github.com/gorilla/websocket" + "gopkg.in/ini.v1" +) + +var GatewayConfig = make(map[string]interface{}) + +/** + * @description: 初始化进程 + * @return {*} + */ +func init() { + + /** + * 初始化时定义服务的配置项 + */ + + GatewayConfig["GatewayServicePort"] = 20818 + GatewayConfig["WriteBufferSize"] = 1024 + GatewayConfig["HeartbeatTimeout"] = 180 + GatewayConfig["GRPCServicePort"] = 20819 + GatewayConfig["ClientIP"] = []string{} + GatewayConfig["MessageFormat"] = websocket.TextMessage + + configPath := "./webstocketconfig/config.ini" + + checkPath := func(path string) bool { + + _, err := os.Stat(path) + + return err == nil || os.IsExist(err) + } + + if !checkPath(configPath) { + + return + } + + cnf, err := ini.Load(configPath) + + if err != nil { + + return + } + + // 设置gRPC服务端运行端口 + if cnf.Section("gateway").Key("gRPCServicePort").String() != "" { + + gRPCServicePort, err := strconv.Atoi(cnf.Section("gateway").Key("gRPCServicePort").String()) + + if err == nil { + + GatewayConfig["GRPCServicePort"] = gRPCServicePort + } + } + + // 设置ws服务端运行端口 + if cnf.Section("gateway").Key("gatewayServicePort").String() != "" { + + gatewayServicePort, err := strconv.Atoi(cnf.Section("gateway").Key("gatewayServicePort").String()) + + if err == nil { + + GatewayConfig["GatewayServicePort"] = gatewayServicePort + } + } + + // 消息发送缓冲区大小 + if cnf.Section("gateway").Key("writeBufferSize").String() != "" { + + writeBufferSize, err := strconv.Atoi(cnf.Section("gateway").Key("writeBufferSize").String()) + + if err == nil { + + GatewayConfig["WriteBufferSize"] = writeBufferSize + } + } + + // 设置websocket消息体格式 + if cnf.Section("gateway").Key("messageFormat").String() != "" { + + if cnf.Section("gateway").Key("messageFormat").String() == "binary" { + + GatewayConfig["MessageFormat"] = websocket.BinaryMessage + + } else { + + GatewayConfig["MessageFormat"] = websocket.TextMessage + } + } + + // 心跳超时时间 + if cnf.Section("gateway").Key("heartbeatTimeout").String() != "" { + + heartbeatTimeout, err := strconv.Atoi(cnf.Section("gateway").Key("heartbeatTimeout").String()) + + if err == nil { + + GatewayConfig["HeartbeatTimeout"] = heartbeatTimeout + } + } + + if cnf.Section("server").Key("ip").String() != "" { + + var ips []string + + if err := json.Unmarshal([]byte(cnf.Section("server").Key("ip").String()), &ips); err != nil { + + return + } + + var validIPs []string + + for _, ip := range ips { + + if net.ParseIP(ip) != nil { + + validIPs = append(validIPs, ip) + } + } + + GatewayConfig["ServerIP"] = validIPs + } +}