package socket import ( "fmt" "git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/model/app_manage/node" "git.inspur.com/sbg-jszt/cfn/cfn-schedule/internal/socket/transport" "git.inspur.com/sbg-jszt/cfn/cfn-schedule/pkg/log" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "time" ) type BaseProxyHandler struct { BaseHandler nodeUrl string } // 构造函数 func NewBaseProxyHandler(nodeUrl string) *BaseProxyHandler { return &BaseProxyHandler{nodeUrl: nodeUrl} } // 获取参数 type getParameters func(attributes map[string]interface{}) []interface{} func (bh *BaseProxyHandler) AfterConnectionEstablished(c *gin.Context, gp getParameters) { conn, _ := c.Get("wsConn") session := conn.(*websocket.Conn) value, _ := c.Get("attributes") attributes := value.(map[string]interface{}) bh.ShowHelloMsg(session, attributes) permissionMsg, ok := attributes["permissionMsg"] if ok && permissionMsg != "" { bh.SendMsg(session, permissionMsg.(string)) time.Sleep(2 * time.Second) bh.Destroy(session, attributes) return } bh.AfterConnectionEstablishedImpl(session, attributes, gp) } // 建立会话后的处理 func (bh *BaseProxyHandler) AfterConnectionEstablishedImpl(session *websocket.Conn, attributes map[string]interface{}, gp getParameters) { bh.BaseHandler.AfterConnectionEstablishedImpl(session, attributes) bh.init(session, attributes, gp) } // 连接成功初始化 func (bh *BaseProxyHandler) init(session *websocket.Conn, attributes map[string]interface{}, gp getParameters) error { _, ok := attributes["init"] if ok { return nil } // console和nodescript的有,serverscript没有 nodeInfo, ok := attributes["nodeInfo"] if ok { n := nodeInfo.(*node.Node) machineId := n.MachineId if machineId == "" { return fmt.Errorf("no machine id specified") } machineNode := &node.MachineNode{} machineNode.Id = machineId err := machineNode.GetById(machineNode) if err != nil { return fmt.Errorf("machine node not found: %s", machineId) } webSocketClientHandler := transport.Websocket(machineNode, bh.nodeUrl, n.WorkspaceId, gp(attributes)...) err = webSocketClientHandler.Connect() if err != nil { log.Errorf("与逻辑节点连接失败:", err) bh.SendMsg(session, "与逻辑节点连接失败") return err } //defer webSocketClientHandler.Close() // 设置消息处理器 webSocketClientHandler.SetConsumer(func(msg string) { bh.SendMsg(session, msg) }) // 启动接收消息的goroutine go webSocketClientHandler.Receive() attributes["proxySession"] = webSocketClientHandler } attributes["init"] = true return nil } func (bh *BaseProxyHandler) HandleTransportError(session *websocket.Conn, attributes map[string]interface{}, err error) { log.Errorf("%s WebSocket异常: %v\n", session.RemoteAddr(), err) bh.Destroy(session, attributes) } // 连接关闭处理 func (bh *BaseProxyHandler) AfterConnectionClosed(session *websocket.Conn, attributes map[string]interface{}) { bh.Destroy(session, attributes) } // 销毁会话 func (bh *BaseProxyHandler) Destroy(session *websocket.Conn, attributes map[string]interface{}) { proxySession, ok := attributes["proxySession"] if ok { webSocketClientHandler := proxySession.(*transport.WebSocketClientHandler) webSocketClientHandler.Close() } session.Close() }