115 lines
3.3 KiB
Go
Raw Permalink Normal View History

package socket
import (
"fmt"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/app_manage/node"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/socket/transport"
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/pkg/log"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"time"
)
type BaseProxyHandler struct {
BaseHandler
nodeUrl string
}
// 构造函数
func NewBaseProxyHandler(nodeUrl string) *BaseProxyHandler {
return &BaseProxyHandler{nodeUrl: nodeUrl}
}
// 获取参数
type getParameters func(attributes map[string]interface{}) []interface{}
func (bh *BaseProxyHandler) AfterConnectionEstablished(c *gin.Context, gp getParameters) {
conn, _ := c.Get("wsConn")
session := conn.(*websocket.Conn)
value, _ := c.Get("attributes")
attributes := value.(map[string]interface{})
bh.ShowHelloMsg(session, attributes)
permissionMsg, ok := attributes["permissionMsg"]
if ok && permissionMsg != "" {
bh.SendMsg(session, permissionMsg.(string))
time.Sleep(2 * time.Second)
bh.Destroy(session, attributes)
return
}
bh.AfterConnectionEstablishedImpl(session, attributes, gp)
}
// 建立会话后的处理
func (bh *BaseProxyHandler) AfterConnectionEstablishedImpl(session *websocket.Conn, attributes map[string]interface{}, gp getParameters) {
bh.BaseHandler.AfterConnectionEstablishedImpl(session, attributes)
bh.init(session, attributes, gp)
}
// 连接成功初始化
func (bh *BaseProxyHandler) init(session *websocket.Conn, attributes map[string]interface{}, gp getParameters) error {
_, ok := attributes["init"]
if ok {
return nil
}
// console和nodescript的有serverscript没有
nodeInfo, ok := attributes["nodeInfo"]
if ok {
n := nodeInfo.(*node.Node)
machineId := n.MachineId
if machineId == "" {
return fmt.Errorf("no machine id specified")
}
machineNode := &node.MachineNode{}
machineNode.Id = machineId
err := machineNode.GetById(machineNode)
if err != nil {
return fmt.Errorf("machine node not found: %s", machineId)
}
webSocketClientHandler := transport.Websocket(machineNode, bh.nodeUrl, n.WorkspaceId, gp(attributes)...)
err = webSocketClientHandler.Connect()
if err != nil {
log.Errorf("与逻辑节点连接失败:", err)
bh.SendMsg(session, "与逻辑节点连接失败")
return err
}
//defer webSocketClientHandler.Close()
// 设置消息处理器
webSocketClientHandler.SetConsumer(func(msg string) {
bh.SendMsg(session, msg)
})
// 启动接收消息的goroutine
go webSocketClientHandler.Receive()
attributes["proxySession"] = webSocketClientHandler
}
attributes["init"] = true
return nil
}
func (bh *BaseProxyHandler) HandleTransportError(session *websocket.Conn, attributes map[string]interface{}, err error) {
log.Errorf("%s WebSocket异常: %v\n", session.RemoteAddr(), err)
bh.Destroy(session, attributes)
}
// 连接关闭处理
func (bh *BaseProxyHandler) AfterConnectionClosed(session *websocket.Conn, attributes map[string]interface{}) {
bh.Destroy(session, attributes)
}
// 销毁会话
func (bh *BaseProxyHandler) Destroy(session *websocket.Conn, attributes map[string]interface{}) {
proxySession, ok := attributes["proxySession"]
if ok {
webSocketClientHandler := proxySession.(*transport.WebSocketClientHandler)
webSocketClientHandler.Close()
}
session.Close()
}