85cd488bef
Signed-off-by: Dean Troyer <dtroyer@gmail.com>
498 lines
17 KiB
C
498 lines
17 KiB
C
/*
|
|
* Copyright (c) 2013-2016, Wind River Systems, Inc.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without modification, are
|
|
* permitted provided that the following conditions are met:
|
|
*
|
|
* 1) Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
*
|
|
* 2) Redistributions in binary form must reproduce the above copyright notice,
|
|
* this list of conditions and the following disclaimer in the documentation and/or
|
|
* other materials provided with the distribution.
|
|
*
|
|
* 3) Neither the name of Wind River Systems nor the names of its contributors may be
|
|
* used to endorse or promote products derived from this software without specific
|
|
* prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
|
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
|
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
|
|
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
|
|
/* This implements a library to be used for guest clients to interface with
|
|
* non-standard functionality on the host using the backchannel
|
|
* communications pathway.
|
|
*
|
|
* The general idea is that everything that goes through this is multiplexed
|
|
* over a single unix socket so that the guest app only needs to monitor one
|
|
* socket for activity.
|
|
*/
|
|
|
|
#include <sys/types.h>
|
|
#include <sys/socket.h>
|
|
#include <string.h>
|
|
#include <unistd.h>
|
|
#include <fcntl.h>
|
|
#include <stdio.h>
|
|
#include <sys/un.h>
|
|
#include <errno.h>
|
|
#include <stdlib.h>
|
|
#include <json-c/json.h>
|
|
|
|
#include "server_group.h"
|
|
#include "host_guest_msg_type.h"
|
|
|
|
/* unix socket abstract namespace address of guest agent */
|
|
#define GUEST_CLIENT_ADDR "cgcs.server_grp"
|
|
|
|
#define HOST_GUEST_BUFSIZE 4096 // max size of message that can be received
|
|
|
|
|
|
/* Server Group message is encoded in JSON format.
|
|
The message sent out to UNIX socket is a null-terminated JSON format string
|
|
without embedded newlines.
|
|
|
|
Format:
|
|
{key:value,key:value,..., key:value}
|
|
|
|
Key/value pairs:
|
|
"version": <integer> - version of the interface
|
|
"type": < string> - one of these types “broadcast", "notification",
|
|
"status_query", "status_response",
|
|
"status_response_done”
|
|
|
|
"req": <integer> - sequence number for status_query/status_response/
|
|
status_response_done messages
|
|
|
|
"source_instance“: <string> - source instance that send out the broadcast
|
|
"data": <string> - message content for broadcast, notification,
|
|
status_query and status_response messages.
|
|
Must be a null-terminated string without embedded newlines.
|
|
|
|
|
|
Message Types:
|
|
broadcast - incoming or outgoing
|
|
notification - incoming, state change of other servers within the
|
|
server group
|
|
status_query - outgoing, query the current state of all servers
|
|
within the server group
|
|
status_response - incoming, one or more responses to the status_query
|
|
status_response_done - incoming, last response to the status query
|
|
|
|
*/
|
|
|
|
// server group message type
|
|
#define GRP_BROADCAST "broadcast" // broadcast message from another server
|
|
#define GRP_NOTIFICATION "notification" // notification of server state change
|
|
#define GRP_STATUS_QUERY "status_query" // query of status of all servers in group
|
|
#define GRP_STATUS_RESP "status_response" // query response msg (could be several)
|
|
#define GRP_STATUS_RESP_DONE "status_response_done" // query response done msg (no data)
|
|
#define GRP_NACK "nack" // nack msg indicating parse or protocol error from host
|
|
|
|
|
|
/* Header for incoming server group messages. The exact contents will differ
|
|
* depending on message type.
|
|
*
|
|
* "len" is the overall length including header.
|
|
*
|
|
* For messages with a TYPE of GRP_BROADCAST only, the "sinstance" field
|
|
* will contain the instance name of the instance that sent the broadcast, and
|
|
* DATA will contain the message that was sent.
|
|
*
|
|
*/
|
|
static int sock; // socket for talking to guest agent
|
|
static struct sockaddr_un svaddr; // address of guest agent
|
|
static int svaddrlen; // length of guest agent address
|
|
|
|
static unsigned int status_seqnum; // status query sequence number
|
|
static char *status_buf; // status query reassembly buffer
|
|
static unsigned long status_size; // current status buffer size
|
|
static unsigned long status_len; // currently used buffer length
|
|
|
|
#define ERRORSIZE 400
|
|
static char errorbuf[ERRORSIZE];
|
|
|
|
static sg_broadcast_msg_handler_t sg_broadcast_msg_callback;
|
|
static sg_notification_msg_handler_t sg_notification_msg_callback;
|
|
static sg_status_msg_handler_t sg_status_msg_callback;
|
|
|
|
|
|
/* Generic routine to send a server group message down to the host. */
|
|
int sg_send_host_msg(const char *msg_type, int seq, const char *data)
|
|
{
|
|
int rc;
|
|
|
|
struct json_object *jobj_data = json_object_new_object();
|
|
if (jobj_data == NULL) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "failed to allocate json object for data");
|
|
return -1;
|
|
}
|
|
|
|
json_object_object_add(jobj_data, VERSION, json_object_new_int(CUR_VERSION));
|
|
json_object_object_add(jobj_data, MSG_TYPE, json_object_new_string(msg_type));
|
|
if (!strcmp(msg_type, GRP_STATUS_QUERY)) {
|
|
json_object_object_add(jobj_data, SEQ, json_object_new_int(seq));
|
|
}
|
|
|
|
if (data) {
|
|
json_object_object_add(jobj_data, DATA, json_object_new_string(data));
|
|
}
|
|
|
|
struct json_object *jobj_outmsg = json_object_new_object();
|
|
if (jobj_outmsg == NULL) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "failed to allocate json object for outmsg");
|
|
json_object_put(jobj_data);
|
|
return -1;
|
|
}
|
|
|
|
json_object_object_add(jobj_outmsg, DATA, jobj_data);
|
|
json_object_object_add(jobj_outmsg, VERSION, json_object_new_int(CUR_VERSION));
|
|
/* This is a known address that nova-compute is listening on */
|
|
json_object_object_add(jobj_outmsg, DEST_ADDR, json_object_new_string(GUEST_CLIENT_ADDR));
|
|
|
|
const char *outmsg = json_object_to_json_string_ext(jobj_outmsg, JSON_C_TO_STRING_PLAIN);
|
|
int msglen = strlen(outmsg);
|
|
|
|
rc = sendto(sock, outmsg, msglen, 0, (struct sockaddr *) &svaddr, svaddrlen);
|
|
if (rc != msglen) {
|
|
if (rc > 0) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "sendto returned %d, expected %d",
|
|
rc, msglen);
|
|
} else
|
|
snprintf(errorbuf, ERRORSIZE-1, "sendto: %m");
|
|
json_object_put(jobj_outmsg);
|
|
return -1;
|
|
}
|
|
|
|
json_object_put(jobj_outmsg);
|
|
return 0;
|
|
}
|
|
|
|
/* Send a message to all other servers in our server group. */
|
|
int sg_msg_broadcast(void *msg, unsigned int len)
|
|
{
|
|
return sg_send_host_msg(GRP_BROADCAST, 0, (const char*)msg);
|
|
}
|
|
|
|
/* Ensure the status buffer is at least as big as the specified size.
|
|
* Call realloc() if necessary to grow the buffer.
|
|
*/
|
|
int ensure_status_buf_size(unsigned int size)
|
|
{
|
|
if (size > status_size) {
|
|
/* need to grow the buffer */
|
|
void *ptr = realloc(status_buf, size);
|
|
if (!ptr) {
|
|
/* hopefully shouldn't happen */
|
|
snprintf(errorbuf, ERRORSIZE-1,
|
|
"unable to realloc buffer to size %u", size);
|
|
|
|
/* give up on current status query */
|
|
status_seqnum++;
|
|
status_len = 0;
|
|
return -1;
|
|
}
|
|
status_buf = ptr;
|
|
status_size = size;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
/* Request current status of all servers in server group. Due to limitations in
|
|
* the current implementation of host-guest comm channel agents we receive
|
|
* information on one server per response, then a final "done" message. This
|
|
* could be changed if we fix the host-guest comm channel to handle arbitrarily
|
|
* large messages.
|
|
*
|
|
* Yes, the current design is not the most robust...might want to consider adding
|
|
* an indication of how many servers we expect data for and which one we're on
|
|
* in case we lose a message or something. Better fix might be to just fix the
|
|
* host-guest comm channel to handle arbitrarily large messages.
|
|
*/
|
|
int sg_request_status()
|
|
{
|
|
int rc;
|
|
/* If we were still receiving status updates from a previous query this
|
|
* will cause them to get dropped.
|
|
*/
|
|
status_seqnum++;
|
|
status_len = 0;
|
|
|
|
/* Ensure we have room for an empty list otherwise give up.
|
|
* Start with a decent size buffer to minimize reallocs.
|
|
*/
|
|
if (ensure_status_buf_size(4096) != 0)
|
|
return -1;
|
|
|
|
rc = sg_send_host_msg(GRP_STATUS_QUERY, status_seqnum, NULL);
|
|
if (rc == 0) {
|
|
/* start a list in the buffer */
|
|
status_buf[0] = '[';
|
|
status_len = 1;
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
/* Handle a response to the status query. This should contain the current
|
|
* status of a single server. We add it to the data accumulating in the buffer.
|
|
*/
|
|
int handle_status_resp(unsigned int seqnum, const char *msg, unsigned int len)
|
|
{
|
|
if (seqnum != status_seqnum) {
|
|
snprintf(errorbuf, ERRORSIZE-1,
|
|
"status resp seqnum %u doesn't match expected %u",
|
|
seqnum, status_seqnum);
|
|
return -1;
|
|
}
|
|
|
|
/* Ensure we have room for new data otherwise give up.
|
|
* Add an extra byte for comma between server data.
|
|
*/
|
|
if (ensure_status_buf_size(status_len + len + 1) != 0)
|
|
return -1;
|
|
|
|
/* comma-separate the status for each instance */
|
|
if (status_len != 1) {
|
|
status_buf[status_len] = ',';
|
|
status_len++;
|
|
}
|
|
|
|
/* Now copy the server status into the buffer */
|
|
memcpy((status_buf + status_len), msg, len);
|
|
status_len += len;
|
|
return 0;
|
|
}
|
|
|
|
/* This tells us that we've received all the server status messages
|
|
* so we can call the callback and then reset things for the next one.
|
|
*/
|
|
int handle_status_resp_done(unsigned int seqnum)
|
|
{
|
|
if (seqnum != status_seqnum) {
|
|
snprintf(errorbuf, ERRORSIZE-1,
|
|
"status resp done seqnum %u doesn't match expected %u",
|
|
seqnum, status_seqnum);
|
|
return -1;
|
|
}
|
|
|
|
/* Ensure we have room for list terminator otherwise give up */
|
|
if (ensure_status_buf_size(status_len + 1) != 0)
|
|
return -1;
|
|
|
|
if (status_buf) {
|
|
/* terminate the list */
|
|
status_buf[status_len] = ']';
|
|
status_len++;
|
|
|
|
if (sg_status_msg_callback)
|
|
sg_status_msg_callback(status_buf, status_len);
|
|
}
|
|
|
|
/* reset the buffer */
|
|
status_len = 0;
|
|
/* bump seqnum just in case */
|
|
status_seqnum++;
|
|
return 0;
|
|
}
|
|
|
|
|
|
int dispatch_sg_msg(json_object *job_msg)
|
|
{
|
|
int rc = 0;
|
|
|
|
struct json_object *jobj_msg_type;
|
|
if (!json_object_object_get_ex(job_msg, MSG_TYPE, &jobj_msg_type)) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "failed to parse msg_type");
|
|
return -1;
|
|
}
|
|
const char *msg_type = json_object_get_string(jobj_msg_type);
|
|
|
|
struct json_object *jobj_data;
|
|
const char *data;
|
|
|
|
// type GRP_STATUS_RESP_DONE message does not have a data field
|
|
if (!strcmp(msg_type, GRP_BROADCAST) ||
|
|
!strcmp(msg_type, GRP_NOTIFICATION) ||
|
|
!strcmp(msg_type, GRP_STATUS_RESP)) {
|
|
if (!json_object_object_get_ex(job_msg, DATA, &jobj_data)) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "failed to parse data for type %s", msg_type);
|
|
return -1;
|
|
}
|
|
data = json_object_get_string(jobj_data);
|
|
}
|
|
|
|
if (!strcmp(msg_type, GRP_BROADCAST)) {
|
|
struct json_object *jobj_source_instance;
|
|
if (!json_object_object_get_ex(job_msg, SOURCE_INSTANCE, &jobj_source_instance)) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "failed to parse source_instance for type %s", msg_type);
|
|
return -1;
|
|
}
|
|
const char *source_instance = json_object_get_string(jobj_source_instance);
|
|
|
|
if (sg_broadcast_msg_callback)
|
|
sg_broadcast_msg_callback((char *)source_instance, (void *)data, strlen(data));
|
|
}
|
|
else if (!strcmp(msg_type, GRP_NOTIFICATION)) {
|
|
if (sg_notification_msg_callback)
|
|
sg_notification_msg_callback((void *)data, strlen(data));
|
|
}
|
|
else if ((!strcmp(msg_type, GRP_STATUS_RESP)) || (!strcmp(msg_type, GRP_STATUS_RESP_DONE))) {
|
|
struct json_object *jobj_seq;
|
|
if (!json_object_object_get_ex(job_msg, SEQ, &jobj_seq)) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "failed to parse seq for type %s", msg_type);
|
|
return -1;
|
|
}
|
|
int seq = json_object_get_int(jobj_seq);
|
|
|
|
if (!strcmp(msg_type, GRP_STATUS_RESP)) {
|
|
rc = handle_status_resp(seq, data, strlen(data));
|
|
}
|
|
else if (!strcmp(msg_type, GRP_STATUS_RESP_DONE)) {
|
|
rc = handle_status_resp_done(seq);
|
|
}
|
|
}
|
|
else if (!strcmp(msg_type, GRP_NACK)) {
|
|
struct json_object *jobj_log_msg;
|
|
if (!json_object_object_get_ex(job_msg, LOG_MSG, &jobj_log_msg)) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "Nack: failed to parse log_msg");
|
|
}
|
|
const char *log_msg = json_object_get_string(jobj_log_msg);
|
|
snprintf(errorbuf, ERRORSIZE-1, "Nack received, error message from host: %s", log_msg);
|
|
return -1;
|
|
} else {
|
|
snprintf(errorbuf, ERRORSIZE-1, "unknown server group message type %s", msg_type);
|
|
return -1;
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
|
|
/* Read a message from the socket and process it. */
|
|
int process_sg_msg()
|
|
{
|
|
char buf[HOST_GUEST_BUFSIZE];
|
|
int len;
|
|
int rc = -1;
|
|
|
|
len = recv(sock, buf, sizeof(buf), 0);
|
|
if (len == -1) {
|
|
if (errno == EAGAIN)
|
|
return 0;
|
|
else {
|
|
snprintf(errorbuf, ERRORSIZE-1, "error receiving msg: %m");
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
struct json_object *jobj_msg = json_tokener_parse(buf);
|
|
if (jobj_msg == NULL) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "failed to parse msg");
|
|
return -1;
|
|
}
|
|
|
|
// parse version
|
|
struct json_object *jobj_version;
|
|
if (!json_object_object_get_ex(jobj_msg, VERSION, &jobj_version)) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "failed to parse version");
|
|
goto done;
|
|
}
|
|
int version = json_object_get_int(jobj_version);
|
|
|
|
if (version != CUR_VERSION) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "invalid version");
|
|
goto done;
|
|
}
|
|
|
|
// parse source address
|
|
struct json_object *jobj_source_addr;
|
|
if (!json_object_object_get_ex(jobj_msg, SOURCE_ADDR, &jobj_source_addr)) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "failed to parse source_addr");
|
|
goto done;
|
|
}
|
|
const char *source_addr = json_object_get_string(jobj_source_addr);
|
|
|
|
/* check the host sender */
|
|
if (strcmp(source_addr, GUEST_CLIENT_ADDR) != 0) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "unknown sender address %s", source_addr);
|
|
goto done;
|
|
}
|
|
|
|
// parse data. data is a json object that is nested inside the msg
|
|
struct json_object *jobj_data;
|
|
if (!json_object_object_get_ex(jobj_msg, DATA, &jobj_data)) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "failed to parse data");
|
|
goto done;
|
|
}
|
|
|
|
rc = dispatch_sg_msg(jobj_data);
|
|
|
|
done:
|
|
json_object_put(jobj_msg);
|
|
return rc;
|
|
}
|
|
|
|
|
|
/* This needs to be called first to initialize sockets, buffers,
|
|
* callback pointers, etc.
|
|
*/
|
|
int init_sg(sg_broadcast_msg_handler_t broadcast_handler,
|
|
sg_notification_msg_handler_t notification_handler,
|
|
sg_status_msg_handler_t status_handler)
|
|
{
|
|
int flags;
|
|
int addrlen;
|
|
struct sockaddr_un cliaddr;
|
|
|
|
// socket for talking to guest agent
|
|
sock = socket(AF_UNIX, SOCK_DGRAM, 0);
|
|
if (sock == -1) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "unable to open socket: %m");
|
|
return -1;
|
|
}
|
|
|
|
flags = fcntl(sock, F_GETFL, 0);
|
|
fcntl(sock, F_SETFL, flags | O_NONBLOCK);
|
|
|
|
/* our address */
|
|
memset(&cliaddr, 0, sizeof(struct sockaddr_un));
|
|
cliaddr.sun_family = AF_UNIX;
|
|
cliaddr.sun_path[0] = '\0';
|
|
strncpy(cliaddr.sun_path+1, GUEST_CLIENT_ADDR,
|
|
sizeof(cliaddr.sun_path) - 2);
|
|
addrlen = sizeof(sa_family_t) + strlen(GUEST_CLIENT_ADDR) + 1;
|
|
|
|
if (bind(sock, (struct sockaddr *) &cliaddr, addrlen) == -1) {
|
|
snprintf(errorbuf, ERRORSIZE-1, "unable to bind socket: %m");
|
|
return -1;
|
|
}
|
|
|
|
/* guest agent address */
|
|
memset(&svaddr, 0, sizeof(struct sockaddr_un));
|
|
svaddr.sun_family = AF_UNIX;
|
|
svaddr.sun_path[0] = '\0';
|
|
strncpy(svaddr.sun_path+1, AGENT_ADDR, sizeof(svaddr.sun_path) - 2);
|
|
svaddrlen = sizeof(sa_family_t) + strlen(AGENT_ADDR) + 1;
|
|
|
|
/* set up callback pointers */
|
|
sg_broadcast_msg_callback = broadcast_handler;
|
|
sg_notification_msg_callback = notification_handler;
|
|
sg_status_msg_callback = status_handler;
|
|
|
|
return sock;
|
|
}
|
|
|
|
/* Provide access to the error message if the most recent call failed. */
|
|
char *sg_get_error()
|
|
{
|
|
return errorbuf;
|
|
}
|