/* * Copyright (c) 2013-2016 Wind River Systems, Inc. * * SPDX-License-Identifier: Apache-2.0 * */ /** * @file * Wind River CGCS Platform Process Monitor Service Messaging */ #include #include #include "pmon.h" #include "nodeMacro.h" #include "pmonAlarm.h" /* for ... this module's alarm utilities / defs */ #include "hostwMsg.h" #include "jsonUtil.h" /* Where to send events */ string mtcAgent_ip = "" ; static int pulse_log_threshold ; /** * Messaging Socket Control Struct - The allocated struct */ static pmon_socket_type pmon_sock; pmon_socket_type * pmon_getSock_ptr ( void ) { return ( &pmon_sock ); } msgSock_type * get_mtclogd_sockPtr ( void ) { return (&pmon_sock.mtclogd); } /*********************************************************************/ /* Setup pmon broadcast transmit port/socket */ /*********************************************************************/ int event_port_init ( const char * iface , int port ) { int rc = PASS ; /* Load the event port number */ pmon_sock.event_port = port ; mtcAgent_ip = getipbyname ( CONTROLLER ); ilog ("ControllerIP: %s\n", mtcAgent_ip.c_str()); pmon_sock.event_sock = new msgClassTx(mtcAgent_ip.c_str(), port, IPPROTO_UDP, iface); if (pmon_sock.event_sock->return_status!=PASS) { elog ("Failed to setup pmond to mtce transmit event port %d\n", port ); return (pmon_sock.event_sock->return_status) ; } return rc; } /* Setup the Unix Domain Transmit Pulse Socket */ int pulse_port_init ( void ) { int rc = PASS ; int port = daemon_get_cfg_ptr()->pmon_pulse_port; pmon_sock.pulse_sock = new msgClassTx(LOOPBACK_IP, port, IPPROTO_UDP); if (pmon_sock.pulse_sock->return_status!=PASS) return (pmon_sock.pulse_sock->return_status) ; snprintf (&pmon_sock.pulse.hdr[0], MSG_HEADER_SIZE, "%s", get_pmond_pulse_header()); pmon_sock.msg_len = ((sizeof(mtc_message_type))-(BUF_SIZE)); return (rc); } /* Setup the Unix Host Watchdog Socket */ int hostwd_port_init ( void ) { memset(&pmon_sock.hostwd_addr, 0, sizeof(pmon_sock.hostwd_addr)); pmon_sock.hostwd_sock = socket(AF_UNIX, SOCK_DGRAM, 0); if (pmon_sock.hostwd_sock <= 0) { wlog("Could not connect to create hostwd socket - will retry\n"); pmon_sock.hostwd_sock = 0 ; return (FAIL_SOCKET_CREATE); } /* Set up the socket address */ memset (&pmon_sock.hostwd_addr, 0, sizeof(pmon_sock.hostwd_addr)); pmon_sock.hostwd_addr.sun_family = AF_UNIX; /* Unix abstract namespace takes a string that starts with a NULL * as the identifier. Thus, we need a pointer to byte[1] of the * sockaddr_un.sun_path (a char array) */ strncpy( &(pmon_sock.hostwd_addr.sun_path[1]), HOSTW_UNIX_SOCKNAME, UNIX_PATH_MAX-1); int len = sizeof(pmon_sock.hostwd_addr); int connected = connect( pmon_sock.hostwd_sock, (sockaddr*) &pmon_sock.hostwd_addr, len); if (connected == -1) { wlog("Could not connect to hostwd port - will retry\n"); if ( pmon_sock.hostwd_sock ) close(pmon_sock.hostwd_sock); pmon_sock.hostwd_sock = 0; return (FAIL_CONNECT); } ilog ("connected to host watchdog\n"); return (PASS); } /* Build a message for host watchdog, and send it */ int pmon_send_hostwd ( void ) { if (pmon_sock.hostwd_sock) { mtc_message_type msg; int bytes; int i; memset(&msg, 0, sizeof(msg)); memcpy (&msg.hdr, get_cmd_req_msg_header(), MSG_HEADER_SIZE); msg.ver = MTC_CMD_VERSION; msg.rev = MTC_CMD_REVISION; msg.cmd = MTC_CMD_NONE; /* All good - take no action */ for (i = 0; i < get_ctrl_ptr()->processes; i++) { process_config_type * pProcess = get_process_config_ptr (i); if (pProcess->quorum && pProcess->quorum_unrecoverable) { ilog ("%s unrecoverable ; reporting this to host watchdog\n", pProcess->process); snprintf ( (char*) &(msg.buf), BUF_SIZE - 1, "PMON detected %s failure", pProcess->process); msg.cmd = MTC_EVENT_PMON_CRIT; /* things are bad */ } } bytes = sendto( pmon_sock.hostwd_sock, (char*)&msg, sizeof(msg), 0, /* flags */ (struct sockaddr *) &pmon_sock.hostwd_addr, sizeof(pmon_sock.hostwd_addr) ); if (bytes == sizeof(msg)) { return (PASS); } else { elog("Error sending message to host watchdog -- error %d (%s)\n", errno, strerror(errno)); if ( pmon_sock.hostwd_sock ) { close(pmon_sock.hostwd_sock); pmon_sock.hostwd_sock = 0; } return (FAIL); } } return (FAIL); } /****************************/ /* Initialization Utilities */ /****************************/ /* Init the messaging socket control structure * The following messaging interfaces use this structure and * are initialized separately * * pulse_port_init - port that pmon I'm alive messages are transmitted on * event_port_init - port that pmon sends events to mtce on * amon_port_init - aggrigated active process monitor receive port * * */ void pmon_msg_init ( void ) { memset(&pmon_sock, 0, sizeof(pmon_sock)); pulse_log_threshold = 0 ; } void pmon_msg_fini ( void ) { /* Close the pmond pulse socket */ if ( pmon_sock.cmd_sock ) delete (pmon_sock.cmd_sock ); if ( pmon_sock.event_sock ) delete (pmon_sock.event_sock); if (pmon_sock.pulse_sock) delete pmon_sock.pulse_sock; if ( pmon_sock.amon_sock ) close (pmon_sock.amon_sock); if ( pmon_sock.hostwd_sock ) close (pmon_sock.hostwd_sock); } /* Initialize the command receive port * Its a LO interface only */ int pmon_inbox_init ( void ) { pmon_sock.cmd_sock = new msgClassRx(LOOPBACK_IP,daemon_get_cfg_ptr()->pmon_cmd_port,IPPROTO_UDP); if ( pmon_sock.cmd_sock ) { pmon_sock.cmd_port = daemon_get_cfg_ptr()->pmon_cmd_port ; pmon_sock.cmd_sock->sock_ok(true); return (PASS); } return (FAIL); } int pmon_send_pulse ( void ) { #define LOG_THROTTLE 1000 int bytes = pmon_sock.pulse_sock->write((char*)&pmon_sock.pulse, pmon_sock.msg_len); if ( bytes <= 0 ) { /* Force reconnect attempt on next go around */ elog ("Cannot sendto hbsClient (bytes=%d) (%d:%s)\n", bytes , errno, strerror(errno)); } else { ilog_throttled ( pulse_log_threshold, LOG_THROTTLE, "sent health pulse - %d bytes (throttled:%d)\n", bytes, LOG_THROTTLE ); dlog3 ( "Pulse: %s (%d:%d)\n", &pmon_sock.pulse.hdr[0], pmon_sock.pulse_sock->get_dst_addr()->getSockLen(), bytes ); } return (PASS) ; } int pmon_send_event ( unsigned int event_cmd , process_config_type * ptr ) { mtc_message_type event ; int rc = PASS ; /* Don't report events while we are in reset mode */ if ( daemon_is_file_present ( NODE_RESET_FILE ) ) return ( PASS ); pmon_ctrl_type * ctrl_ptr = get_ctrl_ptr () ; memset (&event, 0 , sizeof(mtc_message_type)); snprintf ( &event.hdr[0], MSG_HEADER_SIZE, "%s", get_mtce_event_header()); event.cmd = event_cmd ; event.ver = MTC_CMD_FEATURE_VER__KEYVALUE_IN_BUF ; event.num = 1 ; event.parm[0] = ctrl_ptr->nodetype ; /* default to node type */ string event_info = "{\"" ; event_info.append(MTC_JSON_INV_NAME); event_info.append("\":\""); event_info.append(ctrl_ptr->my_hostname); event_info.append("\",\""); event_info.append(MTC_JSON_SERVICE); event_info.append("\":\""); event_info.append(MTC_SERVICE_PMOND_NAME ); if ( ( ptr != NULL ) && ( ptr->process ) ) { event_info.append("\",\""); event_info.append(MTC_JSON_PROCESS); event_info.append("\":\""); event_info.append(ptr->process); } event_info.append( "\"}"); size_t len = event_info.length()+1 ; snprintf ( &event.buf[0], len, "%s", event_info.data()); int bytes = ((sizeof(mtc_message_type))-(BUF_SIZE-len)); /* override with subfunction case */ if (( ptr != NULL ) && ( ctrl_ptr->subfunction != 0 ) && ( ctrl_ptr->subfunction != ctrl_ptr->function )) { if ( ptr->subfunction != NULL ) { string temp = ptr->subfunction ; event.parm[0]= get_host_function_mask (temp) ; if ( ( event_cmd == MTC_EVENT_PMON_MINOR) || ( event_cmd == MTC_EVENT_PMON_MAJOR) || ( event_cmd == MTC_EVENT_PMON_LOG) || ( event_cmd == MTC_EVENT_PMON_CRIT ) ) { mlog ("%s process failed\n", ptr->process ); } else if (( event_cmd == MTC_EVENT_PMON_CLEAR ) && ( ptr->was_failed == true )) { ilog ("%s process recovered\n", ptr->process ); ptr->was_failed = false ; } } } /* Send the event */ if ((rc = pmon_sock.event_sock->write((char*)&event.hdr[0], bytes)) != bytes ) { elog ("event message send failed (%d) (%d) (%d:%m) (%s:%d)\n", bytes, rc, errno, pmon_sock.event_sock->get_dst_addr()->toString(), pmon_sock.event_sock->get_dst_addr()->getPort()); } else { string severity = get_event_str ( event.cmd ); if ( ptr ) { /* Only log the clear event for a specified process. * Avoid logging the periodic degrade clear event. */ ilog ("%s %s sent", ctrl_ptr->my_hostname, get_mtcNodeCommand_str(event_cmd)); } rc = PASS ; } print_mtc_message ( ctrl_ptr->my_hostname, MTC_CMD_TX, event, get_iface_name_str(MGMNT_INTERFACE), rc ); return rc ; } /************************************************************************** * * ********** A C T I V E M O N I T O R I N G ********** * *************************************************************************/ int amon_port_init ( int port ) { int val = 1 ; int rc = FAIL ; if ( port ) { pmon_sock.amon_port = port ; pmon_sock.amon_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if ( 0 >= pmon_sock.amon_sock ) return (-errno); if ( setsockopt ( pmon_sock.amon_sock , SOL_SOCKET, SO_REUSEADDR, &val, sizeof(int)) == -1 ) { wlog ( "amon: failed to set active monitor socket as re-useable (%d:%s)\n", errno, strerror(errno)); } /* Set socket to be non-blocking. */ rc = ioctl(pmon_sock.amon_sock, FIONBIO, (char *)&val); if ( 0 > rc ) { elog ("Failed to set amon socket non-blocking\n"); } /* Setup with localhost ip */ memset(&pmon_sock.amon_addr, 0, sizeof(struct sockaddr_in)); pmon_sock.amon_addr.sin_family = AF_INET ; // pmon_sock.amon_addr.sin_addr.s_addr = htonl(INADDR_ANY); pmon_sock.amon_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); pmon_sock.amon_addr.sin_port = htons(pmon_sock.amon_port) ; /* bind socket to the receive addr */ if ( bind ( pmon_sock.amon_sock, (const struct sockaddr *)&pmon_sock.amon_addr, sizeof(struct sockaddr_in)) == -1 ) { elog ( "failed to bind to rx socket with port %d (%d:%s)\n", port, errno, strerror(errno) ); close (pmon_sock.amon_sock); pmon_sock.amon_sock = 0 ; return (-errno); } else { rc = PASS ; } } else { elog ("No port specified\n"); } return (rc) ; } int open_process_socket ( process_config_type * ptr ) { int rc = FAIL ; /* Prop the port numnber into the message struct */ if ( ptr->port ) ptr->msg.tx_port = ptr->port ; if ( ptr->msg.tx_port ) { /* if the sock is already open then close it first */ if ( ptr->msg.tx_sock ) { wlog ("%s open on already open socket %d, closing first\n", ptr->process, ptr->msg.tx_sock ); close (ptr->msg.tx_sock); } ptr->msg.tx_sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if ( 0 >= ptr->msg.tx_sock ) return (-errno); /* Setup with localhost ip */ memset(&ptr->msg.tx_addr, 0, sizeof(struct sockaddr_in)); ptr->msg.tx_addr.sin_family = AF_INET ; ptr->msg.tx_addr.sin_addr.s_addr = inet_addr(LOOPBACK_IP); ptr->msg.tx_addr.sin_port = htons(ptr->msg.tx_port) ; rc = PASS ; } else { elog ("%s has no port specified\n", ptr->process ); } return (rc) ; } void close_process_socket ( process_config_type * ptr ) { if ( ptr->msg.tx_sock ) close ( ptr->msg.tx_sock ); } int amon_service_inbox ( int processes ) { #define MAX_T 100 int count = 0 ; int bytes = 0 ; char buf[AMON_MAX_LEN] ; socklen_t len = sizeof(struct sockaddr_in) ; do { char str[AMON_MAX_LEN] ; unsigned int magic = 0 ; int seq = 0 ; memset ( buf,0,sizeof(buf)); memset ( str,0,sizeof(str)); bytes = recvfrom( pmon_sock.amon_sock, buf, AMON_MAX_LEN, 0, (struct sockaddr *)&pmon_sock.amon_addr, &len); if ( bytes > 0 ) { sscanf ( buf, "%99s %8x %d", str, &magic, &seq ); if ( AMON_MAGIC_NUM == (magic ^ -1) ) { if ( str[0] != '\0' ) { bool found = false ; for ( int i = 0 ; i < processes ; i++ ) { process_config_type * ptr = get_process_config_ptr (i); if ( ptr != NULL ) { if ( ! strcmp ( str, ptr->process ) ) { found = true ; alog ( "%s %x %d (found)\n", ptr->process, magic, seq ); if ( ptr->waiting == true ) { ptr->rx_sequence = seq ; } else { wlog_throttled ( count , MAX_T, "%s unexpected monitor pulse\n", ptr->process ); } break ; } } } if ( !found ) { wlog_throttled (count, MAX_T, "Unexpected message (%s)\n", buf ); } } else { wlog ("Null string !\n"); } } else { wlog_throttled ( count, MAX_T, "message with invalid magic number (%x)\n", magic ); } } else if (( 0 > bytes ) && ( errno != EINTR ) && ( errno != EAGAIN )) { wlog_throttled ( count , MAX_T, "receive error (%d:%s)\n", errno, strerror(errno)); } } while ( bytes > 0 ) ; /* Put the messages in he structs */ return (PASS); } int amon_send_request ( process_config_type * ptr ) { int rc ; ptr->rx_sequence = 0 ; memset ( ptr->msg.tx_buf, 0, sizeof(ptr->msg.tx_buf)); sprintf ( ptr->msg.tx_buf, "%s %x %u", ptr->process, AMON_MAGIC_NUM, ++ptr->tx_sequence ) ; rc = sendto ( ptr->msg.tx_sock, ptr->msg.tx_buf , strlen ( ptr->msg.tx_buf), 0, (struct sockaddr *) &ptr->msg.tx_addr, sizeof(struct sockaddr_in)); if ( 0 >= rc ) { elog ("%s sendto error (%d:%s) (%s) (%s)\n", ptr->process, errno , strerror(errno), ptr->msg.tx_buf, inet_ntoa(ptr->msg.tx_addr.sin_addr)); } else { mlog3 ("%s\n", &ptr->msg.tx_buf[0] ); rc = PASS ; } return (rc); } #define MAX_COMMANDS (4) /****************************************************************** * * Handle pmon command request messages * * Supports a max MAX_COMMANDS number of queued messages per call * Checks for valid command string and content before action * * message format : json string { key:value , key:value } * * supported requests: * * { "command":"restart", "process":"" } * *****************************************************************/ void pmon_service_inbox ( void ) { int bytes = 0 ; int retries = 0 ; do { /* Receive command messages */ char cmd_buf[MAX_COMMAND_LEN] ; memset ( &cmd_buf, 0, sizeof(cmd_buf)); bytes = pmon_sock.cmd_sock->read((char*)&cmd_buf, MAX_COMMAND_LEN ); if ( bytes > 0 ) { if ( bytes <= MAX_COMMAND_LEN ) { bool string_is_terminated = false ; for ( int i = 0 ; i < MAX_COMMAND_LEN ; i++ ) { if ( cmd_buf[i] == '\0' ) { string_is_terminated = true ; break ; } } if ( string_is_terminated ) { string command ; string process ; mlog1 ("rx <- %s\n", cmd_buf ); int rc1 = jsonUtil_get_key_val ( &cmd_buf[0], "command", command ); int rc2 = jsonUtil_get_key_val ( &cmd_buf[0], "process", process ); mlog ("cmd:%s process:%s\n", command.c_str(), process.c_str()); if ( rc1 || rc2 ) { ilog ("failed to parse command request.\n"); ilog ("... expecting: command:, process:\n"); wlog ("... received : %s\n", cmd_buf ); } else { if ( (!command.compare("none")) || (!process.compare("none"))) { wlog ("one or more invalid command request key:value pairs\n"); wlog ("... command:%s process:%s\n", command.c_str(), process.c_str()); } /* handle start command * - get the pointer to the specified process * - if its in the stopped state then take it out of that state * - inject it into the respawn phase of the passive monitor FSM */ else if ( !command.compare("start")) { process_config_type * ptr = get_process_config_ptr ( process ); ilog ("%s process 'start' request\n", process.c_str()); if ( ptr != NULL ) { if ( strcmp ( ptr->mode, "status" ) == 0 ) { wlog ("%s process-start rejected\n", process.c_str()); wlog ("%s ... status monitoring mode 'start' not supported\n", process.c_str()); } else if ( ptr->stopped == true ) { mtcTimer_reset ( ptr->pt_ptr ); ptr->failed = true ; // so get_events will ignore it till process respawn is complete ptr->stopped = false ; // take the process out of the stopped state ptr->ignore = false ; // have the fsm stop ignoriing the process ; start respawn passiveStageChange ( ptr, PMON_STAGE__RESPAWN ); } else { wlog ("%s process is not in the stopped state ; start request ignored\n", ptr->process ); } } } /* handle stop command * - get the pointer to the specified process * - unregister the process to avoid a kernel notification * - kill the process * - put it in the ignored state * - put it in the stopped state * - reinitialize its active monitoring states and stats * - start the auto recovery timer */ else if ( !command.compare("stop")) { process_config_type * ptr = get_process_config_ptr ( process ); ilog ("%s process 'stop' request\n", process.c_str()); if ( ptr != NULL ) { if ( strcmp ( ptr->mode, "status" ) == 0 ) { wlog ("%s process-stop rejected\n", process.c_str()); wlog ("%s ... status monitoring mode 'stop' not supported\n", process.c_str()); } else if ( ptr->stopped == true ) { wlog ("%s process is already stopped ; stop request ignored\n", ptr->process ); } else { int auto_recovery_timeout = MTC_MINS_30 ; unregister_process ( ptr ); kill_running_process ( ptr->pid ); ptr->stopped = true ; ptr->ignore = true ; passiveStageChange ( ptr, PMON_STAGE__IGNORE ) ; /* as a backup */ if ( !strcmp ( ptr->mode, "active" ) ) { activeStageChange ( ptr , ACTIVE_STAGE__IDLE ); } mtcTimer_reset ( ptr->pt_ptr ); /* Start a recovery timer */ mtcTimer_start ( ptr->pt_ptr, pmon_timer_handler, auto_recovery_timeout ); ilog ("%s process 'stopped' by request ; auto restart in %d seconds\n", ptr->process, auto_recovery_timeout); } } } /* handle restart command */ else if ( !command.compare("restart")) { /* handle this 'pmond' process restart request */ if ( !process.compare("pmond") ) { ilog ("%s self-restart ; by request\n", process.c_str()); /* process is auto restarted by systemd in centos or inittab in WRL */ daemon_exit (); } else { process_config_type * ptr = get_process_config_ptr ( process ); dlog ("%s process 'restart' request\n", process.c_str()); if ( ptr != NULL ) { if ( strcmp ( ptr->mode, "status" ) == 0 ) { wlog ("%s process-restart rejected\n", process.c_str()); wlog ("%s ... status monitoring mode restart not supported\n", process.c_str()); } else if ( ptr->restart == false ) { ilog ("%s process-restart ; by request\n", process.c_str()); ptr->restart = true ; if ( ptr->stopped ) { ptr->stopped = false ; ptr->ignore = false ; } passiveStageChange ( ptr, PMON_STAGE__MANAGE ); } else { ilog ("%s process-restart ; in progress\n", process.c_str()); } } else { wlog ("%s process-restart ; cannot execute, process not found\n", process.c_str()); } } } else { wlog ("unsupported command:%s for process:%s\n", command.c_str(), process.c_str()); } } /* end else */ } else { wlog ("badly formed command request (%d) (not null terminated)\n", bytes ); } } else { ; /* message to big ; do not log to protect against DOS attack */ } } else if ( bytes < 0 ) { if ( errno == EAGAIN ) { return ; } else { wlog ("commnd socket read error (%d:%d:%m)\n", bytes, errno ); } } retries++ ; } while ( ( bytes != 0 ) && ( retries < MAX_COMMANDS ) ) ; }