metal/mtce-common/cgts-mtce-common-1.0/guest/guestSvrHdlr.cpp
Jack Ding 94cdbb73d4 Rewrite virtio message handler for guest heartbeat
Due to race conditions, multiple messages might be received from a
single read by guestServer. guestServer in this case would only handle
the first message and discard the remaining ones.

In this particular issue, guestServer received a heartbeat challenge
response message and a vote notification response (reject) message from
a single read, and the latter message was discarded.

This fix rewrites message handler for virtio serial channel to handle
segmented and multiple messages. It uses newline character to deliminate
messages so it assumes any newline characters in client log message are
removed.

Change-Id: Ic6f0509c98fcedf3631f4d210f753c32c37aa442
Signed-off-by: Jack Ding <jack.ding@windriver.com>
2018-06-22 21:00:05 -04:00

1421 lines
63 KiB
C++

/*
* Copyright (c) 2015-2018 Wind River Systems, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
/****************************************************************************
* @file
* Wind River CGTS Platform Guest Services "Handlers" Implementation
*
* Description: This file contains the following FSM handlers,
* Interfaces:
*
* guestInstClass::timer_handler
* guestInstClass::monitor_handler
* guestInstClass::connect_handler
*
****************************************************************************/
#include <json-c/json.h>
#include "nodeBase.h"
#include "nodeUtil.h" /* for ... clean_bm_response_files */
#include "nodeTimers.h" /* for ... mtcTimer_start/stop */
#include "jsonUtil.h" /* for ... jsonApi_array_value */
#include "daemon_common.h"
#include "guestBase.h" /* for ... */
#include "guestUtil.h" /* for ... guestUtil_print_instance */
#include "guestSvrUtil.h" /* for ... hb_get_message_type_name */
#include "guestVirtio.h" /* for ... */
#include "guestSvrMsg.h" /* for ... */
#include "guestInstClass.h" /* for ... */
static int failure_reporting_count = 0 ;
void voteStateChange ( instInfo * instInfo_ptr , hb_state_t newState )
{
if ( instInfo_ptr->vnState == newState )
return ;
clog ("%s '%s' -> '%s'\n",
log_prefix(instInfo_ptr).c_str(),
hb_get_state_name(instInfo_ptr->vnState),
hb_get_state_name(newState));
instInfo_ptr->vnState = newState ;
}
void beatStateChange ( instInfo * instInfo_ptr , hb_state_t newState )
{
if ( instInfo_ptr->hbState == newState )
return ;
if ((( instInfo_ptr->hbState == hbs_server_waiting_challenge ) &&
( newState == hbs_server_waiting_response )) ||
(( instInfo_ptr->hbState == hbs_server_waiting_response ) &&
( newState == hbs_server_waiting_challenge )))
{
; /* don't print heartbeat state changes */
}
else if (( newState == hbs_server_waiting_init ) &&
( instInfo_ptr->hbState != hbs_server_waiting_init ))
{
ilog ("%s waiting for init ... \n", log_prefix(instInfo_ptr).c_str());
}
else
{
clog ("%s '%s' -> '%s'\n",
log_prefix(instInfo_ptr).c_str(),
hb_get_state_name(instInfo_ptr->hbState),
hb_get_state_name(newState));
}
instInfo_ptr->hbState = newState ;
}
void hbStatusChange ( instInfo * instInfo_ptr, bool status )
{
if ( instInfo_ptr->heartbeating != status )
{
instInfo_ptr->heartbeating = status ;
string payload = guestUtil_set_inst_info ( get_ctrl_ptr()->hostname , instInfo_ptr );
if ( status == true )
{
ilog ("%s is now heartbeating\n", log_prefix(instInfo_ptr).c_str());
send_to_guestAgent ( MTC_EVENT_HEARTBEAT_RUNNING, payload.data());
}
else
{
ilog ("%s is not heartbeating\n", log_prefix(instInfo_ptr).c_str());
send_to_guestAgent ( MTC_EVENT_HEARTBEAT_STOPPED, payload.data());
}
jlog ("%s Heartbeating State Change: %s\n", log_prefix(instInfo_ptr).c_str(), payload.c_str());
}
else
{
clog ("%s heartbeating is still %s\n",
log_prefix(instInfo_ptr).c_str(), status ? "enabled" : "disabled" );
}
}
void manage_heartbeat_failure ( instInfo * instInfo_ptr )
{
instInfo_ptr->heartbeat.failed = true ;
dlog ("%s calling hbStatusChange false\n", log_prefix(instInfo_ptr).c_str());
hbStatusChange ( instInfo_ptr, false) ; /* heartbeating is now false */
beatStateChange ( instInfo_ptr, hbs_server_waiting_init ) ;
}
/* Looks up the timer ID and asserts the corresponding node's ringer */
void guestInstClass::timer_handler ( int sig, siginfo_t *si, void *uc)
{
struct guestInstClass::inst * inst_ptr ;
timer_t * tid_ptr = (void**)si->si_value.sival_ptr ;
ctrl_type * ctrl_ptr = get_ctrl_ptr();
/* Avoid compiler errors/warnings for parms we must
* have but currently do nothing with */
sig=sig ; uc = uc ;
if ( !(*tid_ptr) )
{
return ;
}
else if ( *tid_ptr == search_timer.tid )
{
mtcTimer_stop_int_safe ( search_timer );
search_timer.ring = true ;
return ;
}
else if ( *tid_ptr == ctrl_ptr->timer.tid )
{
mtcTimer_stop_int_safe ( ctrl_ptr->timer );
ctrl_ptr->timer.ring = true ;
return ;
}
for ( int timer_id = INST_TIMER_MONITOR ; timer_id < INST_TIMER_MAX ; timer_id++ )
{
if ( ( inst_ptr = guestInstClass::getInst_timer ( *tid_ptr , timer_id ) ) != NULL )
{
switch ( timer_id )
{
case INST_TIMER_MONITOR:
{
if (( *tid_ptr == inst_ptr->monitor_timer.tid ) )
{
mtcTimer_stop_int_safe ( inst_ptr->monitor_timer );
inst_ptr->monitor_timer.ring = true ;
return ;
}
break ;
}
case INST_TIMER_CONNECT:
{
if (( *tid_ptr == inst_ptr->connect_timer.tid ) )
{
mtcTimer_stop_int_safe ( inst_ptr->connect_timer );
inst_ptr->connect_timer.ring = true ;
return ;
}
break ;
}
case INST_TIMER_RECONNECT:
{
if (( *tid_ptr == inst_ptr->reconnect_timer.tid ) )
{
mtcTimer_stop_int_safe ( inst_ptr->reconnect_timer );
inst_ptr->reconnect_timer.ring = true ;
return ;
}
break ;
}
case INST_TIMER_INIT:
{
if (( *tid_ptr == inst_ptr->init_timer.tid ) )
{
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ;
mtcTimer_stop_int_safe ( inst_ptr->init_timer );
return ;
}
break ;
}
case INST_TIMER_VOTE:
{
if (( *tid_ptr == inst_ptr->vote_timer.tid ) )
{
mtcTimer_stop_int_safe ( inst_ptr->vote_timer );
inst_ptr->vote_timer.ring = true ;
return ;
}
break ;
}
default:
{
// slog ("unknown timer id (%d)\n", timer_id);
}
} /* end switch */
} /* end if */
} /* end for */
}
/* guest services timer object wrapper
* - does a instance lookup and calls the timer handler */
void guestTimer_handler ( int sig, siginfo_t *si, void *uc)
{
get_instInv_ptr()->timer_handler ( sig, si, uc );
}
void guestInstClass::start_monitor_timer ( struct guestInstClass::inst * inst_ptr )
{
if ( inst_ptr->monitor_timer.tid )
mtcTimer_stop ( inst_ptr->monitor_timer );
mtcTimer_start_sec_msec ( &inst_ptr->monitor_timer,
guestTimer_handler,
(inst_ptr->instance.heartbeat_interval_ms/1000),
(inst_ptr->instance.heartbeat_interval_ms%1000));
}
void _schedule_init_timer ( string event_type , struct mtc_timer & timer )
{
if (( !event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_SUSPEND) ) ||
( !event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_LIVE_MIGRATE_BEGIN) ) ||
( !event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_COLD_MIGRATE_BEGIN) ) ||
( !event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_REBOOT)))
{
if ( timer.tid )
mtcTimer_stop ( timer );
mtcTimer_start ( timer, guestTimer_handler, WAIT_FOR_INIT_TIMEOUT );
ilog ("scheduling waiting_init transition in %d seconds\n", WAIT_FOR_INIT_TIMEOUT );
}
}
/* extend the reconnect time as the attempts pile up. max out at 1 minute. */
void manage_reconnect_timeout ( instInfo * instInfo_ptr )
{
/* extend the reconnect time as the attempts pile up. max out at 1 minute. */
if ( (instInfo_ptr->connect_wait_in_secs*2) > MTC_MINS_1 )
instInfo_ptr->connect_wait_in_secs = MTC_MINS_1 ;
else
instInfo_ptr->connect_wait_in_secs *= 2 ;
}
int connect_count = 0 ;
int guestInstClass::connect_handler ( struct guestInstClass::inst * inst_ptr )
{
int rc = PASS ;
switch ( inst_ptr->connectStage )
{
case INST_CONNECT__START:
{
if ( inst_ptr->instance.connected == true )
{
inst_ptr->connectStage = INST_CONNECT__START ;
inst_ptr->action = FSM_ACTION__NONE ;
if (inst_ptr->connect_timer.tid)
mtcTimer_stop ( inst_ptr->connect_timer );
}
else
{
ilog ("%s connect attempt in %d seconds\n",
log_prefix(&inst_ptr->instance).c_str(), inst_ptr->instance.connect_wait_in_secs);
inst_ptr->instance.connecting = true ;
mtcTimer_start ( inst_ptr->connect_timer, guestTimer_handler, inst_ptr->instance.connect_wait_in_secs );
inst_ptr->connectStage = INST_CONNECT__WAIT ;
}
break ;
}
case INST_CONNECT__WAIT:
{
if ( inst_ptr->instance.connecting != true )
{
slog ("%s bad connect wait state ; auto correcting\n",
log_prefix(&inst_ptr->instance).c_str());
inst_ptr->connectStage = INST_CONNECT__START ;
inst_ptr->action = FSM_ACTION__NONE ;
}
else if ( inst_ptr->connect_timer.ring == true )
{
char buf[PATH_MAX];
inst_ptr->connect_timer.ring = false ;
/* if the socket is not there then don't try and connect to it */
snprintf(buf, sizeof(buf), "%s/cgcs.heartbeat.%s.sock", QEMU_CHANNEL_DIR, inst_ptr->instance.uuid.data());
if ( daemon_is_file_present ( buf ) )
{
/* Try to connect with virtio_channel_connect ...
* If that succeeds then go DONE.
* if that fails with a ENOENT hen that means the socket fd is gone do close and delete instance
* otherwise retry the connect
*/
ilog ( "%s connect start\n", log_prefix(&inst_ptr->instance).c_str());
rc = virtio_channel_connect ( &inst_ptr->instance );
if ( rc == PASS )
{
inst_ptr->connectStage = INST_CONNECT__DONE ;
break ;
}
/* Abort connect if the instance channel is no longer there.
* -1 and errno=2 : No such file or directory) */
else if (( rc == -1 ) && ( errno == ENOENT ))
{
ilog ("%s channel gone\n", log_prefix(&inst_ptr->instance).c_str() );
del_inst ( inst_ptr->instance.uuid );
return (RETRY);
}
else
{
wlog ("%s channel connect failed\n",
log_prefix(&inst_ptr->instance).c_str() );
manage_reconnect_timeout ( &inst_ptr->instance );
}
}
else
{
ilog ("%s does not exist\n", buf );
manage_reconnect_timeout ( &inst_ptr->instance );
}
inst_ptr->connectStage = INST_CONNECT__START ;
}
break ;
}
case INST_CONNECT__DONE:
{
inst_ptr->connectStage = INST_CONNECT__START ;
inst_ptr->action = FSM_ACTION__NONE ;
inst_ptr->instance.connecting = false ;
inst_ptr->instance.connected = true ;
failure_reporting_count = 0 ;
/* no longer failed */
inst_ptr->instance.heartbeat.failed = false ;
inst_ptr->instance.heartbeat.b2b_misses = 0 ;
/* waiting for init message */
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ;
/* default back to the start 2 second reconnect time default */
inst_ptr->instance.connect_wait_in_secs = DEFAULT_CONNECT_WAIT ;
start_monitor_timer ( inst_ptr );
if ( inst_ptr->reconnect_timer.tid )
mtcTimer_stop ( inst_ptr->reconnect_timer );
mtcTimer_start ( inst_ptr->reconnect_timer, guestTimer_handler, HEARTBEAT_START_TIMEOUT );
ilog ("%s connect done\n", log_prefix(&inst_ptr->instance).c_str());
break ;
}
default:
{
slog ("Unsupported connect stage (%d) ... correcting\n", inst_ptr->connectStage );
inst_ptr->connectStage = INST_CONNECT__START ;
}
}
return(rc);
}
int guestInstClass::monitor_handler ( struct guestInstClass::inst * inst_ptr )
{
int rc = PASS ;
#ifdef WANT_THIS
clog ("%s in '%s:%s' state - stage %d - R:%c F:%c H:%c\n",
log_prefix(&inst_ptr->instance).c_str(),
hb_get_state_name(inst_ptr->instance.hbState),
hb_get_state_name(inst_ptr->instance.vnState),
inst_ptr->monitorStage,
inst_ptr->instance.heartbeat.reporting ? 'Y' : 'n',
inst_ptr->instance.heartbeat.failed ? 'Y' : 'n',
inst_ptr->instance.heartbeating ? 'Y' : 'n');
// inst_ptr->instance.heartbeat.waiting ? 'Y' : 'n');
#endif
switch ( inst_ptr->monitorStage )
{
case INST_MONITOR__STEADY:
{
/* Manage Reconnect Timer */
if ( inst_ptr->reconnect_timer.ring == true )
{
inst_ptr->reconnect_timer.ring = false ;
if (( inst_ptr->instance.heartbeating == false ) &&
( inst_ptr->instance.connecting == false ))
{
/* If this timer rings and heartbeating is not started
* then we need to close the connection and repoen it
* Since the re-open is automatic all we need to do is
* close it here */
wlog ("%s issuing auto-reconnect ; no heartbeating\n",
log_prefix(&inst_ptr->instance).c_str() );
reconnect_start ( inst_ptr->instance.uuid.data() );
}
mtcTimer_start ( inst_ptr->reconnect_timer, guestTimer_handler, HEARTBEAT_START_TIMEOUT );
}
/* Manage Monitor Timer - expires in 3 cases
* 1. heartbeat miss - hbs_server_waiting_response
* 2. heartbeat done - hbs_server_waiting_challenge - interval is done and ready for the next one
* 3. heartbeat none - not heartbeating ; waiting for init
* 4. heratbeat fail - in wrong state
**/
if ( inst_ptr->monitor_timer.ring == true )
{
inst_ptr->monitor_timer.ring = false ;
/* Case 1: heartbeat miss while waiting for heartbeat response */
if ( inst_ptr->instance.hbState == hbs_server_waiting_response )
{
int threshold = daemon_get_cfg_ptr()->hbs_failure_threshold ;
if (( inst_ptr->instance.heartbeat.failed == true ) ||
( inst_ptr->instance.heartbeat.reporting == false ))
{
hbStatusChange ( &inst_ptr->instance, false );
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init) ;
}
else if ( ++inst_ptr->instance.heartbeat.b2b_misses > threshold )
{
inst_ptr->instance.message_count = 0 ;
inst_ptr->instance.heartbeat.b2b_misses = 0 ;
elog ("%s *** Heartbeat Loss *** (Timeout=%d msec)\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.heartbeat_interval_ms );
manage_heartbeat_failure ( &inst_ptr->instance );
inst_ptr->monitorStage = INST_MONITOR__FAILURE ;
}
else
{
wlog ("%s *** Heartbeat Miss *** %d of %d (Timeout=%d msec)\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.heartbeat.b2b_misses,
threshold,
inst_ptr->instance.heartbeat_interval_ms );
/* Send another challenge */
send_challenge ( inst_ptr ) ;
}
}
/* Case 2: Heartbeat done and the interval is expired.
* Just start another challenge request
*/
else if (( inst_ptr->instance.hbState != hbs_server_waiting_init ) &&
( inst_ptr->instance.hbState != hbs_server_waiting_response) &&
( inst_ptr->instance.heartbeat.waiting == false ))
{
// printf ("*");
/* Send another challenge */
inst_ptr->instance.heartbeat.b2b_misses = 0 ;
send_challenge ( inst_ptr ) ;
}
/* Case 3: The monitor timer still runs while we are in the
* waiting for init state so just make sure we are
* handling init stuff
*/
else if ( inst_ptr->instance.hbState == hbs_server_waiting_init )
{
clog ("%s is %s\n", log_prefix(&inst_ptr->instance).c_str(),
hb_get_state_name(inst_ptr->instance.hbState));
inst_ptr->messageStage = INST_MESSAGE__RECEIVE ;
inst_ptr->instance.message_count = 0 ;
inst_ptr->instance.heartbeat.b2b_misses = 0 ;
}
/* Case 4: Heratbeat has failed while we are in the wrong state */
else
{
int threshold = daemon_get_cfg_ptr()->hbs_failure_threshold ;
if ( inst_ptr->instance.heartbeat.failed == true )
{
; /* nothing to do while failed */
}
else if ( inst_ptr->instance.heartbeat.reporting == false )
{
/* Send a challenge to keep the heartbeat going */
send_challenge ( inst_ptr ) ;
}
else if ( ++inst_ptr->instance.heartbeat.b2b_misses > threshold )
{
inst_ptr->instance.message_count = 0 ;
inst_ptr->instance.heartbeat.b2b_misses = 0 ;
elog ("%s *** Heartbeat Loss *** (state:%s)\n",
log_prefix(&inst_ptr->instance).c_str(),
hb_get_state_name(inst_ptr->instance.hbState));
manage_heartbeat_failure ( &inst_ptr->instance );
inst_ptr->monitorStage = INST_MONITOR__FAILURE ;
}
else
{
wlog ("%s *** Heartbeat Miss *** (state:%s)\n",
log_prefix(&inst_ptr->instance).c_str(),
hb_get_state_name(inst_ptr->instance.hbState));
/* Send another challenge */
send_challenge ( inst_ptr ) ;
}
}
}
if ( inst_ptr->vote_timer.ring == true )
{
if ( inst_ptr->instance.vnState == hbs_client_waiting_shutdown_response )
{
// handle time out as silent agreement to accept
if ( !inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_ACTION_NOTIFY) ||
!inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_ACTION_RESPONSE) )
{
ilog ("%s response time out on '%s' message ; proceeding with action\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.msg_type.c_str());
string reject_reason = "";
string vote_result = GUEST_HEARTBEAT_MSG_VOTE_RESULT_UNKNOWN;
if (!inst_ptr->instance.notification_type.compare(GUEST_HEARTBEAT_MSG_NOTIFY_REVOCABLE))
{
vote_result = GUEST_HEARTBEAT_MSG_VOTE_RESULT_ACCEPT;
}
else if (!inst_ptr->instance.notification_type.compare(GUEST_HEARTBEAT_MSG_NOTIFY_IRREVOCABLE))
{
vote_result = GUEST_HEARTBEAT_MSG_VOTE_RESULT_COMPLETE;
}
else
{
wlog ("%s Unexpected '%s' notify timeout ; proceeding with action\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.notification_type.c_str());
}
send_vote_notify_resp (get_ctrl_ptr()->hostname,
inst_ptr->instance.uuid,
inst_ptr->instance.notification_type,
inst_ptr->instance.event_type,
vote_result, reject_reason);
}
_schedule_init_timer ( inst_ptr->instance.event_type ,
inst_ptr->init_timer) ;
voteStateChange ( &inst_ptr->instance, hbs_server_waiting_init );
}
inst_ptr->vote_timer.ring = false ;
}
break ;
}
case INST_MONITOR__DELAY:
{
if ( inst_ptr->monitor_timer.ring == true )
{
inst_ptr->monitorStage = INST_MONITOR__FAILURE ;
}
break ;
}
case INST_MONITOR__FAILURE:
{
if ( get_instInv_ptr()->reporting == false )
{
wlog_throttled (failure_reporting_count, 100, "host level reporting is disabled\n");
}
else if ( inst_ptr->instance.heartbeat.reporting == false )
{
wlog_throttled (failure_reporting_count, 100, "%s instance level reporting is disabled\n",
log_prefix(&inst_ptr->instance).c_str());
}
else
{
inst_ptr->instance.heartbeat.failures++ ;
wlog ("%s sending failure notification to guestAgent (failures:%d)\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.heartbeat.failures);
string payload = "" ;
payload.append ("{\"hostname\":\"");
payload.append (get_ctrl_ptr()->hostname);
payload.append ("\",\"uuid\":\"");
payload.append (inst_ptr->instance.uuid);
payload.append ("\"}");
jlog1 ("%s Failure Event Payload: %s\n",
log_prefix(&inst_ptr->instance).c_str(), payload.c_str());
send_to_guestAgent ( MTC_EVENT_HEARTBEAT_LOSS , payload.data());
failure_reporting_count = 0 ;
}
// inst_ptr->instance.heartbeat.failed = false ;
inst_ptr->monitorStage = INST_MONITOR__STEADY ;
break ;
}
default:
{
inst_ptr->monitorStage = INST_MONITOR__STEADY ;
break ;
}
}
/* This will try to reconnect failed channels */
if (( !inst_ptr->instance.connected ) ||
(( inst_ptr->instance.chan_fd > 0 ) && ( inst_ptr->instance.chan_ok != true )))
{
if ( inst_ptr->action == FSM_ACTION__NONE )
{
ilog ("%s enabling connect FSM\n", log_prefix(&inst_ptr->instance).c_str());
hbStatusChange ( &inst_ptr->instance, false) ;
inst_ptr->connectStage = INST_CONNECT__START ;
inst_ptr->action = FSM_ACTION__CONNECT ;
}
else if ( inst_ptr->action != FSM_ACTION__CONNECT )
{
wlog ("%s bypassing reconnect due to existing action (%d)\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->action);
}
}
return (rc);
}
/*****************************************************************************
*
* Name : message_handler
*
* Purpose : Receive messages from the guest and trigger actions
* based on message content and type.
*
* Description: Only stage presently supported is INST_MESSAGE__RECEIVE
* for each connected socket. This FSM handler is not called
* unless there is a valid receive message to be handled. If
* for some reason there are no enqued messages then the FSM
* just returns having done thinting ; should not happen
* through.
*
* Currently supported message types are.
*
* GUEST_HEARTBEAT_MSG_INIT - vm heartbeat init message.
* > Action is to send an init_ack message to start heartbeating
*
* GUEST_HEARTBEAT_MSG_CHALLENGE_RESPONSE - a challenge response message
* > Action is to change state to 'hbs_server_waiting_challenge'
* and allow the heartbeat interval timer to expire in the
* monitor_handler which will then send another challenge
* request setting state back to 'hbs_server_waiting_response'
*
* Note: Unsupported messages are popped off the queue and discarded with
* an error log containing the message type.
*
*****************************************************************************/
int guestInstClass::message_handler ( struct guestInstClass::inst * inst_ptr )
{
int rc = PASS ;
switch ( inst_ptr->messageStage )
{
case INST_MESSAGE__RECEIVE:
{
/* Only process if there are messages */
if ( inst_ptr->message_list.size() )
{
struct json_object *jobj_msg = inst_ptr->message_list.front();
inst_ptr->message_list.pop_front();
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VERSION, &inst_ptr->instance.version) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VERSION, jobj_msg);
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_REVISION, &inst_ptr->instance.revision) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_REVISION, jobj_msg);
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_MSG_TYPE, &inst_ptr->instance.msg_type) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_MSG_TYPE, jobj_msg);
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SEQUENCE, &inst_ptr->instance.sequence) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SEQUENCE, jobj_msg);
return FAIL;
}
mlog1 ("%s:%s message - Seq:%x Ver:%d.%d Fd:%d\n",
inst_ptr->instance.uuid.c_str(),
inst_ptr->instance.msg_type.c_str(),
inst_ptr->instance.sequence ,
inst_ptr->instance.version, inst_ptr->instance.revision,
inst_ptr->instance.chan_fd);
if ( !inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_CHALLENGE_RESPONSE) )
{
if ( inst_ptr->instance.hbState == hbs_server_waiting_response )
{
uint32_t heartbeat_response;
string heartbeat_health;
string corrective_action;
string log_msg;
inst_ptr->instance.heartbeat.waiting = false ;
if ( daemon_get_cfg_ptr()->debug_work )
printf ("-");
inst_ptr->heartbeat_count++ ;
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_RESPONSE, &heartbeat_response) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_RESPONSE, jobj_msg);
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_HEALTH, &heartbeat_health) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_HEALTH, jobj_msg);
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, &corrective_action) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, jobj_msg);
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_LOG_MSG, &log_msg) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_LOG_MSG, jobj_msg);
return FAIL;
}
if ( heartbeat_response != inst_ptr->instance.heartbeat_challenge)
{
inst_ptr->instance.health_count = 0 ;
wlog_throttled (inst_ptr->mismatch_count, 100, "%s challenge secret mismatch (%d:%d) (throttle:100)\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.heartbeat_challenge,
heartbeat_response);
}
else if (!heartbeat_health.compare(GUEST_HEARTBEAT_MSG_HEALTHY))
{
inst_ptr->mismatch_count = 0 ;
inst_ptr->instance.health_count = 0 ;
inst_ptr->instance.corrective_action_count = 0 ;
mlog ("%s recv '%s' (seq:%x) (health:%s)\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.msg_type.c_str(), inst_ptr->instance.sequence, heartbeat_health.c_str());
/* lets wait for the period timer to expire before
* sending another in the monitor_handler */
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_challenge ) ;
if ( inst_ptr->instance.heartbeating != true )
{
hbStatusChange ( &inst_ptr->instance, true );
}
if (inst_ptr->instance.heartbeat.failed != false )
{
inst_ptr->instance.heartbeat.failed = false ;
}
ilog_throttled ( inst_ptr->instance.message_count, 1000, "%s is heartbeating ...(seq:%08x)\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.sequence );
}
else
{
const char *msg = json_object_to_json_string_ext(jobj_msg, JSON_C_TO_STRING_PLAIN);
ilog ("%s received unhealthy response message: %s\n",
log_prefix(&inst_ptr->instance).c_str(), msg );
inst_ptr->mismatch_count = 0 ;
/* lets wait for the period timer to expire before
* sending another in the monitor_handler */
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_challenge ) ;
if ( inst_ptr->instance.health_count == 0 )
{
if ( heartbeat_health.compare(GUEST_HEARTBEAT_MSG_UNHEALTHY) != 0 )
{
wlog ("%s Invalid health reported (%s)\n",
log_prefix(&inst_ptr->instance).c_str(),
heartbeat_health.c_str() );
}
wlog_throttled ( inst_ptr->instance.health_count, 500,
"%s VM Unhealthy Message:\n",
log_prefix(&inst_ptr->instance).c_str());
wlog ("%s ... %s\n", log_prefix(&inst_ptr->instance).c_str(),
log_msg.c_str() );
}
inst_ptr->instance.unhealthy_corrective_action = corrective_action;
if (!inst_ptr->instance.unhealthy_corrective_action.compare(GUEST_HEARTBEAT_MSG_ACTION_NONE) ||
!inst_ptr->instance.unhealthy_corrective_action.compare(GUEST_HEARTBEAT_MSG_ACTION_UNKNOWN))
{
wlog_throttled ( inst_ptr->instance.corrective_action_count, 500,
"%s corrective action is %s ; not reporting\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.unhealthy_corrective_action.c_str());
} else {
inst_ptr->instance.unhealthy_failure = true ;
string payload = guestUtil_set_inst_info ( get_ctrl_ptr()->hostname , &inst_ptr->instance );
inst_ptr->instance.unhealthy_failure = false ;
ilog ("%s ill health notification\n", log_prefix(&inst_ptr->instance).c_str());
send_to_guestAgent ( MTC_EVENT_HEARTBEAT_ILLHEALTH, payload.data());
inst_ptr->instance.corrective_action_count = 0 ;
}
}
}
else if ( inst_ptr->instance.hbState == hbs_server_waiting_challenge )
{
wlog ("%s received late '%s' response (seq:%x)\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.msg_type.c_str(),
inst_ptr->instance.sequence);
}
else
{
dlog ("%s recv '%s' while in '%s' state (seq:%x)\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.msg_type.c_str(),
hb_get_state_name(inst_ptr->instance.hbState),
inst_ptr->instance.sequence);
}
}
else if ( !inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_INIT) )
{
const char *msg = json_object_to_json_string_ext(jobj_msg, JSON_C_TO_STRING_PLAIN);
ilog ("%s received init message: %s\n",
log_prefix(&inst_ptr->instance).c_str(), msg );
if (inst_ptr->instance.hbState != hbs_server_waiting_init)
{
wlog("%s unexpected 'init' message ; currState: '%s' (%d)\n",
log_prefix(&inst_ptr->instance).c_str(),
hb_get_state_name(inst_ptr->instance.hbState),
inst_ptr->instance.hbState );
/* Allow the heartbeat challenge response message log */
inst_ptr->instance.message_count = 0 ;
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ;
}
else
{
string instance_name;
string response;
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_INVOCATION_ID, &inst_ptr->instance.invocation_id) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_INVOCATION_ID, jobj_msg);
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_NAME, &instance_name) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_NAME, jobj_msg);
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, &inst_ptr->instance.corrective_action) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_CORRECTIVE_ACTION, jobj_msg);
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_HEARTBEAT_INTERVAL_MS, &inst_ptr->instance.heartbeat_interval_ms) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_HEARTBEAT_INTERVAL_MS, jobj_msg);
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_VOTE_SECS, &inst_ptr->instance.vote_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VOTE_SECS, jobj_msg);
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SHUTDOWN_NOTICE_SECS, &inst_ptr->instance.shutdown_notice_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SHUTDOWN_NOTICE_SECS, jobj_msg);
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_SUSPEND_NOTICE_SECS, &inst_ptr->instance.suspend_notice_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_SUSPEND_NOTICE_SECS, jobj_msg);
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_RESUME_NOTICE_SECS, &inst_ptr->instance.resume_notice_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_RESUME_NOTICE_SECS, jobj_msg);
return FAIL;
}
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_RESTART_SECS, &inst_ptr->instance.restart_secs) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_RESTART_SECS, jobj_msg);
return FAIL;
}
inst_ptr->instance.name = instance_name;
/* Override the unused 'inst' name with an abbreviated version of the instance uuid
* cgcs.heartbeat.1f0bc3e3-efbe-48b8-9688-4821fc0ff83c.sock
*
* */
if ( inst_ptr->instance.uuid.length() >= (24+12) )
inst_ptr->instance.inst = inst_ptr->instance.uuid.substr(24,12);
string name = log_prefix(&inst_ptr->instance).c_str() ;
ilog ("%s 'init' message ; sending 'init_ack' (ver:%d.%d)\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.version,
inst_ptr->instance.revision );
inst_ptr->instance.heartbeat_challenge = rand();
/* Set the unhealthy corrective action to unknown by default */
inst_ptr->instance.unhealthy_corrective_action = GUEST_HEARTBEAT_MSG_ACTION_UNKNOWN ;
ilog ("%s corrective_action = %s\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.corrective_action.c_str() );
ilog ("%s Interval : %4d msec\n",name.c_str(), inst_ptr->instance.heartbeat_interval_ms);
/* auto correct an interval that is too small */
if ( inst_ptr->instance.heartbeat_interval_ms < (uint32_t)daemon_get_cfg_ptr()->hbs_pulse_period )
{
wlog ("%s cannot have an interval of zero seconds\n",
log_prefix(&inst_ptr->instance).c_str());
wlog ("%s ... auto correcting to %d msecs\n",
log_prefix(&inst_ptr->instance).c_str(),
daemon_get_cfg_ptr()->hbs_pulse_period);
inst_ptr->instance.heartbeat_interval_ms = daemon_get_cfg_ptr()->hbs_pulse_period ;
}
ilog ("%s Vote TO : %4d secs\n",name.c_str(), inst_ptr->instance.vote_secs);
inst_ptr->instance.vote_to_str = time_in_secs_to_str(inst_ptr->instance.vote_secs) ;
ilog ("%s Shutdown : %4d secs\n", name.c_str(), inst_ptr->instance.shutdown_notice_secs);
inst_ptr->instance.shutdown_to_str = time_in_secs_to_str (inst_ptr->instance.shutdown_notice_secs);
ilog ("%s Suspend : %4d secs\n", name.c_str(), inst_ptr->instance.suspend_notice_secs);
inst_ptr->instance.suspend_to_str = time_in_secs_to_str (inst_ptr->instance.suspend_notice_secs);
ilog ("%s Resume : %4d secs\n", name.c_str(), inst_ptr->instance.resume_notice_secs);
inst_ptr->instance.resume_to_str = time_in_secs_to_str (inst_ptr->instance.resume_notice_secs);
ilog ("%s Restart : %4d secs\n", name.c_str(), inst_ptr->instance.restart_secs);
inst_ptr->instance.restart_to_str = time_in_secs_to_str(inst_ptr->instance.restart_secs);
/* cancel the init timer since we already got the init */
if ( inst_ptr->init_timer.tid )
mtcTimer_stop ( inst_ptr->init_timer ) ;
/*************************************************************
*
* Send INIT ACK right away followed by the first Challenge.
*
* Cannot allow the FSM to run or we might see a
* race condition with another INIT messages that come after.
*
*************************************************************/
response = guestSvrMsg_hdr_init(inst_ptr->instance.uuid , GUEST_HEARTBEAT_MSG_INIT_ACK);
response.append ("\"");
response.append (GUEST_HEARTBEAT_MSG_INVOCATION_ID);
response.append ("\":");
response.append (int_to_string(inst_ptr->instance.invocation_id));
response.append ("}\n");
inst_ptr->instance.message_count = 0 ;
/* Send message to the vm through the libvirt channel */
ilog("%s sending 'init_ack' invocation_id:%d, msg: %s\n", name.c_str(),
inst_ptr->instance.invocation_id, response.c_str());
get_instInv_ptr()->write_inst (&inst_ptr->instance, response.c_str(), response.length());
/* Send a challenge right away */
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_response ) ;
inst_ptr->instance.heartbeat.b2b_misses = 0 ;
inst_ptr->instance.heartbeat.failed = false ;
send_challenge ( inst_ptr ) ;
inst_ptr->messageStage = INST_MESSAGE__RECEIVE ;
}
}
else if ( !inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_ACTION_RESPONSE) )
{
uint32_t invocation_id;
const char *msg = json_object_to_json_string_ext(jobj_msg, JSON_C_TO_STRING_PLAIN);
ilog ("%s received action response message: %s\n",
log_prefix(&inst_ptr->instance).c_str(), msg );
if (jsonUtil_get_int(jobj_msg, GUEST_HEARTBEAT_MSG_INVOCATION_ID, &invocation_id) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_INVOCATION_ID, jobj_msg);
return FAIL;
}
if ( invocation_id != inst_ptr->instance.invocation_id )
{
wlog ("%s invocation id mismatch (%x:%x) - dropping response\n",
log_prefix(&inst_ptr->instance).c_str(),
invocation_id,
inst_ptr->instance.invocation_id );
string log_err = "Invocation id mismatch. Received: ";
log_err.append(int_to_string(invocation_id));
log_err.append(" expect: ");
log_err.append(int_to_string(inst_ptr->instance.invocation_id));
send_client_msg_nack(&inst_ptr->instance, log_err);
}
else
{
string event_type;
string notification_type;
string vote_result;
string reject_reason;
if(jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_EVENT_TYPE, &event_type) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_EVENT_TYPE, jobj_msg);
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_NOTIFICATION_TYPE, &notification_type) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_NOTIFICATION_TYPE, jobj_msg);
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_VOTE_RESULT, &vote_result) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_VOTE_RESULT, jobj_msg);
return FAIL;
}
if (jsonUtil_get_string(jobj_msg, GUEST_HEARTBEAT_MSG_LOG_MSG, &reject_reason) != PASS)
{
handle_parse_failure(inst_ptr, GUEST_HEARTBEAT_MSG_LOG_MSG, jobj_msg);
return FAIL;
}
send_vote_notify_resp (get_ctrl_ptr()->hostname,
inst_ptr->instance.uuid,
notification_type,
event_type,
vote_result,
reject_reason);
inst_ptr->monitorStage = INST_MONITOR__STEADY ;
_schedule_init_timer ( event_type , inst_ptr->init_timer );
// if pause-accept or pause-complete)
if (!event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_PAUSE) &&
(!vote_result.compare(GUEST_HEARTBEAT_MSG_VOTE_RESULT_ACCEPT) ||
!vote_result.compare(GUEST_HEARTBEAT_MSG_VOTE_RESULT_COMPLETE)) )
{
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ;
}
voteStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ;
// cancel the vote timer
if ( inst_ptr->vote_timer.tid )
mtcTimer_stop ( inst_ptr->vote_timer );
inst_ptr->vote_timer.ring = false ;
}
}
else if ( !inst_ptr->instance.msg_type.compare(GUEST_HEARTBEAT_MSG_EXIT) )
{
const char *msg = json_object_to_json_string_ext(jobj_msg, JSON_C_TO_STRING_PLAIN);
ilog ("%s received client exit request: %s\n",
log_prefix(&inst_ptr->instance).c_str(), msg );
/* Prevent a heartbeat loss in the case of a graceful exit
* by moving into the waiting_init state */
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_init ) ;
hbStatusChange ( &inst_ptr->instance, false );
}
else
{
elog ("%s unsupported message type: %s.\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.msg_type.c_str());
string log_err = "unsupported message type: ";
log_err.append(inst_ptr->instance.msg_type);
send_client_msg_nack(&inst_ptr->instance, log_err);
}
json_object_put(jobj_msg);
}
break ;
}
default:
{
elog ("Unsupported stage (%d)\n", inst_ptr->messageStage );
}
}
return (rc) ;
}
/*************************************************************************************
*
* Name : send_challenge
*
* Description: Transmit a heartbeat challenge to he specified VM
* and start the timoeut timer.
*
**************************************************************************************/
int guestInstClass::send_challenge ( struct guestInstClass::inst * inst_ptr )
{
size_t bytes_sent ;
string message = guestSvrMsg_hdr_init(inst_ptr->instance.uuid , GUEST_HEARTBEAT_MSG_CHALLENGE);
beatStateChange ( &inst_ptr->instance, hbs_server_waiting_response );
inst_ptr->instance.heartbeat_challenge = rand();
message.append ("\"");
message.append (GUEST_HEARTBEAT_MSG_HEARTBEAT_CHALLENGE);
message.append ("\":");
message.append (int_to_string(inst_ptr->instance.heartbeat_challenge));
message.append ("}\n");
/* Send message to the vm through the libvirt channel */
bytes_sent = write_inst (&inst_ptr->instance, message.c_str(), message.length());
/* The write_inst will report an error log.
* This one is only to report a partial message send.
*/
if (( bytes_sent > 0) && ( bytes_sent != message.length()))
{
wlog ("%s only sent %ld of %ld bytes\n",
log_prefix(&inst_ptr->instance).c_str(),
bytes_sent, message.length() );
}
/* Waiting on a response now */
inst_ptr->instance.heartbeat.waiting = true ;
start_monitor_timer ( inst_ptr ) ;
if ( daemon_get_cfg_ptr()->debug_work )
printf ("_");
return (PASS);
}
/*************************************************************************************
*
* Name : send_vote_notify
*
* Description: Send a voting or notification message to GuestClient on VM
* and start the timeout timer.
*
**************************************************************************************/
int guestInstClass::send_vote_notify ( string uuid )
{
struct guestInstClass::inst * inst_ptr = getInst(uuid);
size_t bytes_sent ;
uint32_t timeout_ms;
string message = guestSvrMsg_hdr_init(inst_ptr->instance.uuid , GUEST_HEARTBEAT_MSG_ACTION_NOTIFY);
voteStateChange ( &inst_ptr->instance, hbs_client_waiting_shutdown_response );
if ( !inst_ptr->instance.notification_type.compare(GUEST_HEARTBEAT_MSG_NOTIFY_REVOCABLE) )
{
timeout_ms = inst_ptr->instance.vote_secs * 1000;
}
else
{
timeout_ms = inst_ptr->instance.vote_secs ;
if (!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_STOP) ||
!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_REBOOT))
{
timeout_ms = inst_ptr->instance.shutdown_notice_secs * 1000 ;
} else if (!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_SUSPEND) ||
!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_PAUSE) ||
!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_RESIZE_BEGIN) ||
!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_LIVE_MIGRATE_BEGIN) ||
!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_COLD_MIGRATE_BEGIN) ) {
timeout_ms = inst_ptr->instance.suspend_notice_secs * 1000 ;
} else if (!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_UNPAUSE) ||
!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_RESUME) ||
!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_RESIZE_END) ||
!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_LIVE_MIGRATE_END) ||
!inst_ptr->instance.event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_COLD_MIGRATE_END) ) {
timeout_ms = inst_ptr->instance.resume_notice_secs * 1000 ;
} else {
wlog ("%s unsupported event type (%s) defaulting to 'vote' timeout of %d secs\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.event_type.c_str(),
inst_ptr->instance.vote_secs);
}
}
dlog ("%s event_type:%s notification_type:%s invocation_id:%d timeout_ms:%d\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.event_type.c_str(),
inst_ptr->instance.notification_type.c_str(),
inst_ptr->instance.invocation_id,
timeout_ms);
message.append ("\"");
message.append (GUEST_HEARTBEAT_MSG_INVOCATION_ID);
message.append ("\":");
message.append (int_to_string(inst_ptr->instance.invocation_id));
message.append (",\"");
message.append (GUEST_HEARTBEAT_MSG_EVENT_TYPE);
message.append ("\":\"");
message.append (inst_ptr->instance.event_type);
message.append ("\",\"");
message.append (GUEST_HEARTBEAT_MSG_NOTIFICATION_TYPE);
message.append ("\":\"");
message.append (inst_ptr->instance.notification_type);
message.append ("\",\"");
message.append (GUEST_HEARTBEAT_MSG_TIMEOUT_MS);
message.append ("\":");
message.append (int_to_string(timeout_ms));
message.append ("}\n");
ilog("%s send_vote_notify message=%s\n",
log_prefix(&inst_ptr->instance).c_str(), message.c_str());
/* Send message to the vm through the libvirt channel */
bytes_sent = write_inst (&inst_ptr->instance, message.c_str(), message.length());
if ( bytes_sent != message.length() )
{
wlog ("%s only sent %ld of %ld bytes\n", inst_ptr->instance.inst.c_str(),
bytes_sent, message.length() );
}
if ( inst_ptr->vote_timer.tid )
mtcTimer_stop ( inst_ptr->vote_timer );
mtcTimer_start ( inst_ptr->vote_timer, guestTimer_handler, inst_ptr->instance.vote_secs );
dlog("%s timer started for %d seconds\n",
log_prefix(&inst_ptr->instance).c_str(),
inst_ptr->instance.vote_secs);
return (PASS);
}
/*************************************************************************************
*
* Name : send_vote_notify_resp
*
* Description: Send response for voting or notification to GuestAgent
*
**************************************************************************************/
int guestInstClass::send_vote_notify_resp ( char * hostname, string uuid,
string notification_type,
string event_type,
string vote_result,
string reject_reason)
{
instInfo * instInfo_ptr = get_inst ( uuid );
if ( !instInfo_ptr )
{
elog ("%s is unknown\n", uuid.c_str());
return FAIL;
}
if (!vote_result.compare(GUEST_HEARTBEAT_MSG_VOTE_RESULT_ACCEPT) ||
!vote_result.compare(GUEST_HEARTBEAT_MSG_VOTE_RESULT_COMPLETE))
{
// accept
ilog ("%s '%s' '%s' '%s'\n",
log_prefix(instInfo_ptr).c_str(),
notification_type.c_str(),
event_type.c_str(),
vote_result.c_str());
if (!vote_result.compare(GUEST_HEARTBEAT_MSG_VOTE_RESULT_COMPLETE) &&
!event_type.compare(GUEST_HEARTBEAT_MSG_EVENT_SUSPEND))
{
instInfo_ptr->connected = false ;
hbStatusChange ( instInfo_ptr , false );
}
} else if (!vote_result.compare(GUEST_HEARTBEAT_MSG_VOTE_RESULT_REJECT)) {
ilog ("%s '%s' '%s' '%s' reason: %s\n",
log_prefix(instInfo_ptr).c_str(),
notification_type.c_str(),
event_type.c_str(),
vote_result.c_str(),
reject_reason.c_str());
} else if (!vote_result.compare(GUEST_HEARTBEAT_MSG_VOTE_RESULT_TIMEOUT)) {
ilog ("%s '%s' '%s' '%s'\n",
log_prefix(instInfo_ptr).c_str(),
notification_type.c_str(),
event_type.c_str(),
vote_result.c_str());
} else if (!vote_result.compare(GUEST_HEARTBEAT_MSG_VOTE_RESULT_ERROR)) {
elog ("%s vote to '%s' returned error: %s\n",
log_prefix(instInfo_ptr).c_str(),
event_type.c_str(),
vote_result.c_str());
} else {
elog ("%s vote to '%s' unknown vote response %s\n",
log_prefix(instInfo_ptr).c_str(),
event_type.c_str(),
vote_result.c_str());
}
string payload = "" ;
payload.append ("{\"hostname\":\"");
payload.append (hostname);
payload.append ("\", \"uuid\": \"");
payload.append (uuid.c_str());
payload.append ("\", \"notification_type\": \"");
payload.append (notification_type);
payload.append ("\", \"event-type\": \"");
payload.append (event_type);
payload.append ("\", \"vote\": \"");
payload.append (vote_result);
payload.append ("\", \"reason\": \"");
payload.append (reject_reason);
payload.append ("\"}");
jlog ("%s Notification Event Payload: %s\n", log_prefix(instInfo_ptr).c_str(), payload.c_str());
send_to_guestAgent ( MTC_EVENT_VOTE_NOTIFY , payload.data());
return (PASS);
}
/*************************************************************************************
*
* Name : send_client_msg_nack
*
* Description: Send failure response to GuestClient when fail to process the client message
*
**************************************************************************************/
void guestInstClass::send_client_msg_nack ( instInfo * instInfo_ptr,
string log_err)
{
size_t bytes_sent ;
string message = guestSvrMsg_hdr_init(instInfo_ptr->uuid , GUEST_HEARTBEAT_MSG_NACK);
message.append ("\"");
message.append (GUEST_HEARTBEAT_MSG_INVOCATION_ID);
message.append ("\":");
message.append (int_to_string(instInfo_ptr->invocation_id));
message.append (",\"");
message.append (GUEST_HEARTBEAT_MSG_LOG_MSG);
message.append ("\":\"");
message.append (log_err.c_str());
message.append ("\"}\n");
ilog("%s send_client_msg_nack message=%s\n",
log_prefix(instInfo_ptr).c_str(), message.c_str());
/* Send message to the vm through the libvirt channel */
bytes_sent = write_inst (instInfo_ptr, message.c_str(), message.length());
if ( bytes_sent != message.length() )
{
wlog ("%s only sent %ld of %ld bytes\n", instInfo_ptr->inst.c_str(),
bytes_sent, message.length() );
}
}
/*************************************************************************************
*
* Name : handle_parse_failure
*
* Description: Handle JSON parse failure
*
**************************************************************************************/
void guestInstClass::handle_parse_failure ( struct guestInstClass::inst * inst_ptr,
const char *key,
struct json_object *jobj_msg)
{
string log_err = "failed to parse ";
log_err.append(key);
elog("%s %s\n", log_prefix(&inst_ptr->instance).c_str(), log_err.c_str());
send_client_msg_nack(&inst_ptr->instance, log_err);
/* pop_front() only deletes the internal copy of jobj_msg in the message_list.
The original object still needs to be released here */
json_object_put(jobj_msg);
}