liuhaijun e94826ce29 add server
Change-Id: I0760f17f6a01c0121b59fcbfafc666032dbc30af
2024-09-19 09:44:15 +00:00

115 lines
3.3 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package socket
import (
"fmt"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/app_manage/node"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/socket/transport"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/pkg/log"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"time"
)
type BaseProxyHandler struct {
BaseHandler
nodeUrl string
}
// 构造函数
func NewBaseProxyHandler(nodeUrl string) *BaseProxyHandler {
return &BaseProxyHandler{nodeUrl: nodeUrl}
}
// 获取参数
type getParameters func(attributes map[string]interface{}) []interface{}
func (bh *BaseProxyHandler) AfterConnectionEstablished(c *gin.Context, gp getParameters) {
conn, _ := c.Get("wsConn")
session := conn.(*websocket.Conn)
value, _ := c.Get("attributes")
attributes := value.(map[string]interface{})
bh.ShowHelloMsg(session, attributes)
permissionMsg, ok := attributes["permissionMsg"]
if ok && permissionMsg != "" {
bh.SendMsg(session, permissionMsg.(string))
time.Sleep(2 * time.Second)
bh.Destroy(session, attributes)
return
}
bh.AfterConnectionEstablishedImpl(session, attributes, gp)
}
// 建立会话后的处理
func (bh *BaseProxyHandler) AfterConnectionEstablishedImpl(session *websocket.Conn, attributes map[string]interface{}, gp getParameters) {
bh.BaseHandler.AfterConnectionEstablishedImpl(session, attributes)
bh.init(session, attributes, gp)
}
// 连接成功初始化
func (bh *BaseProxyHandler) init(session *websocket.Conn, attributes map[string]interface{}, gp getParameters) error {
_, ok := attributes["init"]
if ok {
return nil
}
// console和nodescript的有serverscript没有
nodeInfo, ok := attributes["nodeInfo"]
if ok {
n := nodeInfo.(*node.Node)
machineId := n.MachineId
if machineId == "" {
return fmt.Errorf("no machine id specified")
}
machineNode := &node.MachineNode{}
machineNode.Id = machineId
err := machineNode.GetById(machineNode)
if err != nil {
return fmt.Errorf("machine node not found: %s", machineId)
}
webSocketClientHandler := transport.Websocket(machineNode, bh.nodeUrl, n.WorkspaceId, gp(attributes)...)
err = webSocketClientHandler.Connect()
if err != nil {
log.Errorf("与逻辑节点连接失败:", err)
bh.SendMsg(session, "与逻辑节点连接失败")
return err
}
//defer webSocketClientHandler.Close()
// 设置消息处理器
webSocketClientHandler.SetConsumer(func(msg string) {
bh.SendMsg(session, msg)
})
// 启动接收消息的goroutine
go webSocketClientHandler.Receive()
attributes["proxySession"] = webSocketClientHandler
}
attributes["init"] = true
return nil
}
func (bh *BaseProxyHandler) HandleTransportError(session *websocket.Conn, attributes map[string]interface{}, err error) {
log.Errorf("%s WebSocket异常: %v\n", session.RemoteAddr(), err)
bh.Destroy(session, attributes)
}
// 连接关闭处理
func (bh *BaseProxyHandler) AfterConnectionClosed(session *websocket.Conn, attributes map[string]interface{}) {
bh.Destroy(session, attributes)
}
// 销毁会话
func (bh *BaseProxyHandler) Destroy(session *websocket.Conn, attributes map[string]interface{}) {
proxySession, ok := attributes["proxySession"]
if ok {
webSocketClientHandler := proxySession.(*transport.WebSocketClientHandler)
webSocketClientHandler.Close()
}
session.Close()
}