115 lines
3.3 KiB
Go
115 lines
3.3 KiB
Go
package socket
|
||
|
||
import (
|
||
"fmt"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/app_manage/node"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/socket/transport"
|
||
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/pkg/log"
|
||
"github.com/gin-gonic/gin"
|
||
"github.com/gorilla/websocket"
|
||
"time"
|
||
)
|
||
|
||
type BaseProxyHandler struct {
|
||
BaseHandler
|
||
nodeUrl string
|
||
}
|
||
|
||
// 构造函数
|
||
func NewBaseProxyHandler(nodeUrl string) *BaseProxyHandler {
|
||
return &BaseProxyHandler{nodeUrl: nodeUrl}
|
||
}
|
||
|
||
// 获取参数
|
||
type getParameters func(attributes map[string]interface{}) []interface{}
|
||
|
||
func (bh *BaseProxyHandler) AfterConnectionEstablished(c *gin.Context, gp getParameters) {
|
||
conn, _ := c.Get("wsConn")
|
||
session := conn.(*websocket.Conn)
|
||
|
||
value, _ := c.Get("attributes")
|
||
attributes := value.(map[string]interface{})
|
||
|
||
bh.ShowHelloMsg(session, attributes)
|
||
permissionMsg, ok := attributes["permissionMsg"]
|
||
if ok && permissionMsg != "" {
|
||
bh.SendMsg(session, permissionMsg.(string))
|
||
time.Sleep(2 * time.Second)
|
||
bh.Destroy(session, attributes)
|
||
return
|
||
}
|
||
bh.AfterConnectionEstablishedImpl(session, attributes, gp)
|
||
}
|
||
|
||
// 建立会话后的处理
|
||
func (bh *BaseProxyHandler) AfterConnectionEstablishedImpl(session *websocket.Conn, attributes map[string]interface{}, gp getParameters) {
|
||
bh.BaseHandler.AfterConnectionEstablishedImpl(session, attributes)
|
||
bh.init(session, attributes, gp)
|
||
}
|
||
|
||
// 连接成功初始化
|
||
func (bh *BaseProxyHandler) init(session *websocket.Conn, attributes map[string]interface{}, gp getParameters) error {
|
||
_, ok := attributes["init"]
|
||
if ok {
|
||
return nil
|
||
}
|
||
|
||
// console和nodescript的有,serverscript没有
|
||
nodeInfo, ok := attributes["nodeInfo"]
|
||
if ok {
|
||
n := nodeInfo.(*node.Node)
|
||
machineId := n.MachineId
|
||
if machineId == "" {
|
||
return fmt.Errorf("no machine id specified")
|
||
}
|
||
machineNode := &node.MachineNode{}
|
||
machineNode.Id = machineId
|
||
err := machineNode.GetById(machineNode)
|
||
if err != nil {
|
||
return fmt.Errorf("machine node not found: %s", machineId)
|
||
}
|
||
|
||
webSocketClientHandler := transport.Websocket(machineNode, bh.nodeUrl, n.WorkspaceId, gp(attributes)...)
|
||
|
||
err = webSocketClientHandler.Connect()
|
||
if err != nil {
|
||
log.Errorf("与逻辑节点连接失败:", err)
|
||
bh.SendMsg(session, "与逻辑节点连接失败")
|
||
return err
|
||
}
|
||
//defer webSocketClientHandler.Close()
|
||
|
||
// 设置消息处理器
|
||
webSocketClientHandler.SetConsumer(func(msg string) {
|
||
bh.SendMsg(session, msg)
|
||
})
|
||
|
||
// 启动接收消息的goroutine
|
||
go webSocketClientHandler.Receive()
|
||
|
||
attributes["proxySession"] = webSocketClientHandler
|
||
}
|
||
attributes["init"] = true
|
||
return nil
|
||
}
|
||
|
||
func (bh *BaseProxyHandler) HandleTransportError(session *websocket.Conn, attributes map[string]interface{}, err error) {
|
||
log.Errorf("%s WebSocket异常: %v\n", session.RemoteAddr(), err)
|
||
bh.Destroy(session, attributes)
|
||
}
|
||
|
||
// 连接关闭处理
|
||
func (bh *BaseProxyHandler) AfterConnectionClosed(session *websocket.Conn, attributes map[string]interface{}) {
|
||
bh.Destroy(session, attributes)
|
||
}
|
||
|
||
// 销毁会话
|
||
func (bh *BaseProxyHandler) Destroy(session *websocket.Conn, attributes map[string]interface{}) {
|
||
proxySession, ok := attributes["proxySession"]
|
||
if ok {
|
||
webSocketClientHandler := proxySession.(*transport.WebSocketClientHandler)
|
||
webSocketClientHandler.Close()
|
||
}
|
||
session.Close()
|
||
}
|