115 lines
3.3 KiB
Go
115 lines
3.3 KiB
Go
![]() |
package socket
|
|||
|
|
|||
|
import (
|
|||
|
"fmt"
|
|||
|
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/app_manage/node"
|
|||
|
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/socket/transport"
|
|||
|
"git.inspur.com/sbg-jszt/cfn/cfn-schedule/pkg/log"
|
|||
|
"github.com/gin-gonic/gin"
|
|||
|
"github.com/gorilla/websocket"
|
|||
|
"time"
|
|||
|
)
|
|||
|
|
|||
|
type BaseProxyHandler struct {
|
|||
|
BaseHandler
|
|||
|
nodeUrl string
|
|||
|
}
|
|||
|
|
|||
|
// 构造函数
|
|||
|
func NewBaseProxyHandler(nodeUrl string) *BaseProxyHandler {
|
|||
|
return &BaseProxyHandler{nodeUrl: nodeUrl}
|
|||
|
}
|
|||
|
|
|||
|
// 获取参数
|
|||
|
type getParameters func(attributes map[string]interface{}) []interface{}
|
|||
|
|
|||
|
func (bh *BaseProxyHandler) AfterConnectionEstablished(c *gin.Context, gp getParameters) {
|
|||
|
conn, _ := c.Get("wsConn")
|
|||
|
session := conn.(*websocket.Conn)
|
|||
|
|
|||
|
value, _ := c.Get("attributes")
|
|||
|
attributes := value.(map[string]interface{})
|
|||
|
|
|||
|
bh.ShowHelloMsg(session, attributes)
|
|||
|
permissionMsg, ok := attributes["permissionMsg"]
|
|||
|
if ok && permissionMsg != "" {
|
|||
|
bh.SendMsg(session, permissionMsg.(string))
|
|||
|
time.Sleep(2 * time.Second)
|
|||
|
bh.Destroy(session, attributes)
|
|||
|
return
|
|||
|
}
|
|||
|
bh.AfterConnectionEstablishedImpl(session, attributes, gp)
|
|||
|
}
|
|||
|
|
|||
|
// 建立会话后的处理
|
|||
|
func (bh *BaseProxyHandler) AfterConnectionEstablishedImpl(session *websocket.Conn, attributes map[string]interface{}, gp getParameters) {
|
|||
|
bh.BaseHandler.AfterConnectionEstablishedImpl(session, attributes)
|
|||
|
bh.init(session, attributes, gp)
|
|||
|
}
|
|||
|
|
|||
|
// 连接成功初始化
|
|||
|
func (bh *BaseProxyHandler) init(session *websocket.Conn, attributes map[string]interface{}, gp getParameters) error {
|
|||
|
_, ok := attributes["init"]
|
|||
|
if ok {
|
|||
|
return nil
|
|||
|
}
|
|||
|
|
|||
|
// console和nodescript的有,serverscript没有
|
|||
|
nodeInfo, ok := attributes["nodeInfo"]
|
|||
|
if ok {
|
|||
|
n := nodeInfo.(*node.Node)
|
|||
|
machineId := n.MachineId
|
|||
|
if machineId == "" {
|
|||
|
return fmt.Errorf("no machine id specified")
|
|||
|
}
|
|||
|
machineNode := &node.MachineNode{}
|
|||
|
machineNode.Id = machineId
|
|||
|
err := machineNode.GetById(machineNode)
|
|||
|
if err != nil {
|
|||
|
return fmt.Errorf("machine node not found: %s", machineId)
|
|||
|
}
|
|||
|
|
|||
|
webSocketClientHandler := transport.Websocket(machineNode, bh.nodeUrl, n.WorkspaceId, gp(attributes)...)
|
|||
|
|
|||
|
err = webSocketClientHandler.Connect()
|
|||
|
if err != nil {
|
|||
|
log.Errorf("与逻辑节点连接失败:", err)
|
|||
|
bh.SendMsg(session, "与逻辑节点连接失败")
|
|||
|
return err
|
|||
|
}
|
|||
|
//defer webSocketClientHandler.Close()
|
|||
|
|
|||
|
// 设置消息处理器
|
|||
|
webSocketClientHandler.SetConsumer(func(msg string) {
|
|||
|
bh.SendMsg(session, msg)
|
|||
|
})
|
|||
|
|
|||
|
// 启动接收消息的goroutine
|
|||
|
go webSocketClientHandler.Receive()
|
|||
|
|
|||
|
attributes["proxySession"] = webSocketClientHandler
|
|||
|
}
|
|||
|
attributes["init"] = true
|
|||
|
return nil
|
|||
|
}
|
|||
|
|
|||
|
func (bh *BaseProxyHandler) HandleTransportError(session *websocket.Conn, attributes map[string]interface{}, err error) {
|
|||
|
log.Errorf("%s WebSocket异常: %v\n", session.RemoteAddr(), err)
|
|||
|
bh.Destroy(session, attributes)
|
|||
|
}
|
|||
|
|
|||
|
// 连接关闭处理
|
|||
|
func (bh *BaseProxyHandler) AfterConnectionClosed(session *websocket.Conn, attributes map[string]interface{}) {
|
|||
|
bh.Destroy(session, attributes)
|
|||
|
}
|
|||
|
|
|||
|
// 销毁会话
|
|||
|
func (bh *BaseProxyHandler) Destroy(session *websocket.Conn, attributes map[string]interface{}) {
|
|||
|
proxySession, ok := attributes["proxySession"]
|
|||
|
if ok {
|
|||
|
webSocketClientHandler := proxySession.(*transport.WebSocketClientHandler)
|
|||
|
webSocketClientHandler.Close()
|
|||
|
}
|
|||
|
session.Close()
|
|||
|
}
|