computing-offload/qtfs/ipc/uds_event.c
Yikun Jiang a68570b5d9 Add computing offloading code
1. Add computing offloading code
2. Add script.md
3. Add virsh_demo.xml

Change-Id: Id9ef883e2f0eb727eb5448b9d1c47767f46b1021
Signed-off-by: Yikun Jiang <yikunkero@gmail.com>
2023-10-23 19:29:57 +08:00

1097 lines
33 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/******************************************************************************
* Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved.
* qtfs licensed under the Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
* PURPOSE.
* See the Mulan PSL v2 for more details.
* Author: Liqiang
* Create: 2023-03-20
* Description:
*******************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <glib.h>
#include "dirent.h"
#include "uds_main.h"
#include "uds_event.h"
int uds_event_build_step2(void *arg, int epfd, struct uds_event_global_var *p_event_var);
int uds_event_remote_build(void *arg, int epfd, struct uds_event_global_var *p_event_var);
int uds_event_build_step3(void *arg, int epfd, struct uds_event_global_var *p_event_var);
int uds_event_uds2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var);
int uds_event_tcp2uds(void *arg, int epfd, struct uds_event_global_var *p_event_var);
int uds_event_build_step4(void *arg, int epfd, struct uds_event_global_var *p_event_var);
int uds_event_tcp2pipe(void *arg, int epfd, struct uds_event_global_var *p_event_var);
int uds_event_pipe2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var);
int uds_event_module_init(struct uds_event_global_var *p)
{
p->msg_controllen = UDS_EVENT_BUFLEN;
p->msg_control = (char *)malloc(p->msg_controllen);
if (p->msg_control == NULL) {
uds_err("malloc msg control buf failed.");
return EVENT_ERR;
}
p->msg_controlsendlen = UDS_EVENT_BUFLEN;
p->msg_control_send = (char *)malloc(p->msg_controlsendlen);
if (p->msg_control_send == NULL) {
goto free1;
}
p->iov_len = UDS_EVENT_BUFLEN;
p->iov_base = (char *)malloc(p->iov_len);
if (p->iov_base == NULL) {
uds_err("malloc iov base failed.");
goto free2;
}
p->iov_sendlen = UDS_EVENT_BUFLEN;
p->iov_base_send = (char *)malloc(p->iov_sendlen);
if (p->iov_base_send == NULL) {
goto free3;
}
p->buflen = UDS_EVENT_BUFLEN;
p->buf = (char *)malloc(p->buflen);
if (p->buf == NULL) {
uds_err("malloc buf failed.");
goto free4;
}
return EVENT_OK;
free4:
free(p->iov_base_send);
p->iov_base_send = NULL;
free3:
free(p->iov_base);
p->iov_base = NULL;
free2:
free(p->msg_control_send);
p->msg_control_send = NULL;
free1:
free(p->msg_control);
p->msg_control = NULL;
p->msg_controllen = 0;
p->msg_controlsendlen = 0;
p->iov_len = 0;
p->iov_sendlen = 0;
p->buflen = 0;
return EVENT_ERR;
}
void uds_event_module_fini(struct uds_event_global_var *p)
{
if (p->msg_control != NULL) {
free(p->msg_control);
p->msg_control = NULL;
p->msg_controllen = 0;
}
if (p->msg_control_send != NULL) {
free(p->msg_control_send);
p->msg_control_send = NULL;
p->msg_controlsendlen = 0;
}
if (p->iov_base != NULL) {
free(p->iov_base);
p->iov_base = NULL;
p->iov_len = 0;
}
if (p->iov_base_send != NULL) {
free(p->iov_base_send);
p->iov_base_send = NULL;
p->iov_sendlen = 0;
}
if (p->buf != NULL) {
free(p->buf);
p->buf = NULL;
p->buflen = 0;
}
return;
}
int uds_event_pre_hook(struct uds_event_global_var *p_event_var)
{
p_event_var->cur = 0;
memset(p_event_var->tofree, 0, sizeof(struct uds_event *) * UDS_EPOLL_MAX_EVENTS);
return 0;
}
int uds_event_post_hook(struct uds_event_global_var *p_event_var)
{
for (int i = 0; i < p_event_var->cur; i++) {
uds_log("event:%d fd:%d free by its peer", i, p_event_var->tofree[i]->fd);
uds_del_event(p_event_var->tofree[i]);
}
return 0;
}
int uds_event_add_to_free(struct uds_event_global_var *p_event_var, struct uds_event *evt)
{
if (evt->pipe == 1) {
uds_log("pipe event:%d no need to free peer", evt->fd);
return 0;
}
struct uds_event *peerevt = evt->peer;
if (peerevt == NULL || p_event_var->cur >= UDS_EPOLL_MAX_EVENTS) {
uds_err("peer event add to free is NULL, my fd:%d", evt->fd);
return -1;
}
peerevt->tofree = 1;
uds_log("event fd:%d addr add to free", peerevt->fd);
p_event_var->tofree[p_event_var->cur] = peerevt;
p_event_var->cur++;
return 0;
}
int uds_event_pre_handler(struct uds_event *evt)
{
if (evt->tofree == 1) {
uds_log("event fd:%d marked by peer as pending deletion", evt->fd);
return EVENT_ERR;
}
return EVENT_OK;
}
/*
* 1. accept local uds connect request
* 2. set new connection's event to build link step2
* 3. add new connection event to epoll list
*/
int uds_event_uds_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
int connfd;
struct uds_event *evt = (struct uds_event *)arg;
if (evt == NULL) {
uds_err("param is invalid.");
return EVENT_ERR;
}
connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
if (connfd <= 0) {
uds_err("conn fd error:%d", connfd);
return EVENT_ERR;
}
uds_log("accept an new connection, fd:%d", connfd);
if (uds_add_event(connfd, NULL, uds_event_build_step2, NULL) == NULL) {
uds_err("failed to add event,connfd:%d", connfd);
return EVENT_ERR;
}
return EVENT_OK;
}
int uds_event_build_step2(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
struct uds_event *evt = (struct uds_event *)arg;
if (evt == NULL) {
uds_err("param is invalid.");
return EVENT_ERR;
}
char buf[sizeof(struct uds_tcp2tcp) + sizeof(struct uds_proxy_remote_conn_req)] = {0};
struct uds_tcp2tcp *bdmsg = (struct uds_tcp2tcp *)buf;
struct uds_proxy_remote_conn_req *msg = (struct uds_proxy_remote_conn_req *)bdmsg->data;
struct uds_proxy_remote_conn_rsp rsp;
int len;
memset(buf, 0, sizeof(buf));
len = uds_recv_with_timeout(evt->fd, msg, sizeof(struct uds_proxy_remote_conn_req));
if (len == 0) {
uds_err("recv err msg:%d errno:%d", len, errno);
return EVENT_DEL;
}
if (len < 0) {
uds_err("read msg error:%d errno:%d", len, errno);
goto end;
}
if (strlen(msg->sun_path) >= (UDS_SUN_PATH_LEN - strlen(UDS_PROXY_SUFFIX))) {
uds_err("sun_path:<%s> len:%d is too large to add suffex:<%s>, so can't build uds proxy server.",
msg->sun_path, strlen(msg->sun_path), UDS_PROXY_SUFFIX);
goto end;
}
if (msg->type != SOCK_STREAM && msg->type != SOCK_DGRAM) {
uds_err("uds type:%d invalid", msg->type);
goto end;
}
struct uds_proxy_remote_conn_req *priv = (void *)malloc(sizeof(struct uds_proxy_remote_conn_req));
if (priv == NULL) {
uds_err("malloc failed");
goto end;
}
struct uds_conn_arg tcp = {
.cs = UDS_SOCKET_CLIENT,
};
int ret;
if ((ret = uds_build_tcp_connection(&tcp)) < 0) {
uds_err("step2 build tcp connection failed, return:%d", ret);
free(priv);
goto end;
}
bdmsg->msgtype = MSGCNTL_UDS;
bdmsg->msglen = sizeof(struct uds_proxy_remote_conn_req);
if (write(tcp.connfd, bdmsg, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_proxy_remote_conn_req)) < 0) {
uds_err("send msg to tcp failed");
free(priv);
close(tcp.connfd);
goto end;
}
uds_log("step2 recv sun path:%s, add step3 event fd:%d", msg->sun_path, tcp.connfd);
memcpy(priv, msg, sizeof(struct uds_proxy_remote_conn_req));
if (uds_add_event(tcp.connfd, evt, uds_event_build_step3, priv) == NULL) {
uds_err("failed to add event, fd:%d", tcp.connfd);
// 新事件添加失败,本事件也要删除,否则残留在中间状态
free(priv);
close(tcp.connfd);
return EVENT_DEL;
}
return EVENT_OK;
end:
rsp.ret = 0;
write(evt->fd, &rsp, sizeof(struct uds_proxy_remote_conn_rsp));
return EVENT_OK;
}
int uds_event_build_step3(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
struct uds_event *evt = (struct uds_event *)arg;
struct uds_proxy_remote_conn_rsp msg;
int len;
memset(&msg, 0, sizeof(struct uds_proxy_remote_conn_rsp));
len = read(evt->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp));
if (len <= 0) {
uds_err("read error len:%d", len);
if (len == 0)
goto event_del;
return EVENT_ERR;
}
if (msg.ret == EVENT_ERR) {
uds_log("get build ack:%d, failed", msg.ret);
goto event_del;
}
struct uds_proxy_remote_conn_req *udsmsg = (struct uds_proxy_remote_conn_req *)evt->priv;
struct uds_conn_arg uds;
memset(&uds, 0, sizeof(struct uds_conn_arg));
uds.cs = UDS_SOCKET_SERVER;
uds.udstype = udsmsg->type;
strncpy(uds.sun_path, udsmsg->sun_path, sizeof(uds.sun_path));
if (strlen(uds.sun_path) + strlen(UDS_PROXY_SUFFIX) >= sizeof(uds.sun_path)) {
uds_err("invalid sunpath:%s cant add proxy suffix:%s", uds.sun_path, UDS_PROXY_SUFFIX);
goto event_del;
}
strcat(uds.sun_path, UDS_PROXY_SUFFIX);
if (uds_build_unix_connection(&uds) < 0) {
uds_err("failed to build uds server sunpath:%s", uds.sun_path);
goto event_del;
}
uds_log("remote conn build success, build uds server type:%d sunpath:%s fd:%d OK this event suspend,",
uds.udstype, uds.sun_path, uds.sockfd);
uds_event_suspend(epfd, evt);
struct uds_event *newevt = uds_add_event(uds.sockfd, evt, uds_event_build_step4, NULL);
if (newevt == NULL) {
close(uds.sockfd);
uds_err("uds_add_event uds_event_build_step4 is failed\n");
goto event_del;
}
evt->tmout = UDS_EVENT_WAIT_TMOUT;
newevt->tmout = UDS_EVENT_WAIT_TMOUT;
if (uds_hash_insert_dirct(event_tmout_hash, evt->fd, evt) != 0 ||
uds_hash_insert_dirct(event_tmout_hash, newevt->fd, newevt) != 0) {
uds_err("add time out hash failed fd:%d %d", evt->fd, newevt->fd);
}
uds_log("Add hash key:%d-->value and key:%d-->value", evt->fd, newevt->fd);
msg.ret = 1;
write(evt->peer->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp));
free(evt->priv);
evt->priv = NULL;
return EVENT_OK;
event_del:
msg.ret = 0;
write(evt->peer->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp));
free(evt->priv);
evt->priv = NULL;
return EVENT_DEL;
}
int uds_event_build_step4(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
struct uds_event *evt = (struct uds_event *)arg;
int connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
if (connfd < 0) {
uds_err("accept connection failed fd:%d", connfd);
return EVENT_ERR;
}
if (uds_hash_remove_dirct(event_tmout_hash, evt->fd) != 0 ||
uds_hash_remove_dirct(event_tmout_hash, evt->peer->fd) != 0) {
uds_err("failed to remove time out hash fd:%d %d", evt->fd, evt->peer->fd);
}
evt->tmout = 0;
evt->peer->tmout = 0;
struct uds_event *peerevt = (struct uds_event *)evt->peer;
peerevt->handler = uds_event_tcp2uds;
peerevt->peer = uds_add_event(connfd, peerevt, uds_event_uds2tcp, NULL);
if (peerevt->peer == NULL) {
uds_err("failed to add new event fd:%d", connfd);
uds_event_add_to_free(p_event_var, peerevt);
return EVENT_DEL;
}
uds_log("accept new connection fd:%d, peerfd:%d frontfd:%d peerfd:%d, peerevt(fd:%d) active now",
connfd, evt->peer->fd, peerevt->fd, peerevt->peer->fd, peerevt->fd);
uds_event_insert(epfd, peerevt);
return EVENT_DEL;
}
int uds_event_tcp_listener(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
struct uds_event *evt = (struct uds_event *)arg;
int connfd = uds_sock_step_accept(evt->fd, AF_INET);
if (connfd <= 0) {
uds_err("tcp conn fd error:%d", connfd);
return EVENT_ERR;
}
uds_log("tcp listener event enter, new connection fd:%d.", connfd);
if (uds_add_event(connfd, NULL, uds_event_remote_build, NULL) == NULL) {
uds_err("failed to add new event fd:%d", connfd);
return EVENT_ERR;
}
return EVENT_OK;
}
int uds_build_connect2uds(struct uds_event *evt, struct uds_proxy_remote_conn_req *msg)
{
struct uds_conn_arg targ;
int len = uds_recv_with_timeout(evt->fd, msg, sizeof(struct uds_proxy_remote_conn_req));
if (len <= 0) {
uds_err("recv failed, len:%d errno:%d", len, errno);
return EVENT_ERR;
}
targ.cs = UDS_SOCKET_CLIENT;
targ.udstype = msg->type;
memset(targ.sun_path, 0, sizeof(targ.sun_path));
strncpy(targ.sun_path, msg->sun_path, sizeof(targ.sun_path));
if (uds_build_unix_connection(&targ) < 0) {
uds_err("can't connect to sun_path:%s", targ.sun_path);
goto err_ack;
}
evt->peer = uds_add_event(targ.connfd, evt, uds_event_uds2tcp, NULL);
if (evt->peer == NULL) {
uds_err("failed to add new event fd:%d", targ.connfd);
close(targ.connfd);
goto err_ack;
}
evt->handler = uds_event_tcp2uds;
uds_log("build link req from tcp, sunpath:%s, type:%d, eventfd:%d peerfd:%d",
msg->sun_path, msg->type, targ.connfd, evt->fd);
struct uds_proxy_remote_conn_rsp ack;
ack.ret = EVENT_OK;
int ret = write(evt->fd, &ack, sizeof(struct uds_proxy_remote_conn_rsp));
if (ret <= 0) {
uds_err("reply ack failed, ret:%d", ret);
return EVENT_DEL;
}
return EVENT_OK;
err_ack:
do {
int ret;
struct uds_proxy_remote_conn_rsp ack;
ack.ret = EVENT_ERR;
ret = write(evt->fd, &ack, sizeof(struct uds_proxy_remote_conn_rsp));
if (ret <= 0) {
uds_err("reply ack failed, ret:%d", ret);
}
} while (0);
return EVENT_DEL;
}
int uds_build_pipe_proxy(int efd, struct uds_event *evt, struct uds_stru_scm_pipe *msg)
{
int len = uds_recv_with_timeout(evt->fd, msg, sizeof(struct uds_stru_scm_pipe));
if (len <= 0) {
uds_err("recv failed, len:%d errno:%d", len, errno);
return EVENT_ERR;
}
if (msg->srcfd < 0) {
uds_err("recv failed, srcid:%d", msg->srcfd);
return EVENT_ERR;
}
if (msg->dir != SCM_PIPE_READ && msg->dir != SCM_PIPE_WRITE) {
uds_err("invalid pipe dir:%d", msg->dir);
return EVENT_ERR;
}
uds_log("pipe proxy event fd:%d pipe fd:%d dir:%d", evt->fd, msg->srcfd, msg->dir);
if (msg->dir == SCM_PIPE_READ) {
uds_add_pipe_event(msg->srcfd, evt->fd, uds_event_pipe2tcp, NULL);
// 此处必须保留evt->fd只删除对他的监听以及释放evt内存即可
uds_event_suspend(efd, evt);
free(evt);
} else {
evt->pipe = 1;
evt->peerfd = msg->srcfd;
evt->handler = uds_event_tcp2pipe;
}
return EVENT_OK;
}
int uds_event_remote_build(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
struct uds_event *evt = (struct uds_event *)arg;
struct uds_tcp2tcp *bdmsg = (struct uds_tcp2tcp *)p_event_var->iov_base;
struct uds_proxy_remote_conn_req *msg = (struct uds_proxy_remote_conn_req *)bdmsg->data;
int len;
int ret = EVENT_OK;
memset(p_event_var->iov_base, 0, p_event_var->iov_len);
len = uds_recv_with_timeout(evt->fd, bdmsg, sizeof(struct uds_tcp2tcp));
if (len <= 0) {
uds_err("read no msg from sock:%d, len:%d", evt->fd, len);
return EVENT_DEL;
}
switch (bdmsg->msgtype) {
case MSGCNTL_UDS:
ret = uds_build_connect2uds(evt, msg);
break;
case MSGCNTL_PIPE:
ret = uds_build_pipe_proxy(epfd, evt, (struct uds_stru_scm_pipe *)bdmsg->data);
break;
default:
ret = EVENT_DEL;
uds_err("remote build not support msgtype %d now", bdmsg->msgtype);
break;
}
return ret;
}
static inline mode_t uds_msg_file_mode(int fd)
{
struct stat st;
char path[32] = {0};
if (fstat(fd, &st) != 0) {
uds_err("get fd:%d fstat failed, errno:%d", fd, errno);
}
if (S_ISFIFO(st.st_mode)) {
uds_log("fd:%d is fifo", fd);
}
return st.st_mode;
}
static int uds_msg_scm_regular_file(int scmfd, int tcpfd, struct uds_event_global_var *p_event_var)
{
int ret;
struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->buf;
struct uds_msg_scmrights *p_scmr = (struct uds_msg_scmrights *)&p_msg->data;
char *fdproc = calloc(1, UDS_PATH_MAX);
if (fdproc == NULL) {
uds_err("failed to calloc memory");
return EVENT_ERR;
}
sprintf(fdproc, "/proc/self/fd/%d", scmfd);
ret = readlink(fdproc, p_scmr->path, UDS_PATH_MAX);
if (ret < 0) {
uds_err("readlink:%s error, ret:%d, errno:%d", fdproc, ret, errno);
free(fdproc);
close(scmfd);
return EVENT_ERR;
}
free(fdproc);
p_scmr->flags = fcntl(scmfd, F_GETFL, 0);
if (p_scmr->flags < 0 || strlen(p_scmr->path) >= sizeof(p_scmr->path)) {
uds_err("fcntl get flags failed:%d len:%d errno:%d", p_scmr->flags, strlen(p_scmr->path), errno);
close(scmfd);
return EVENT_ERR;
}
close(scmfd);
p_msg->msgtype = MSG_SCM_RIGHTS;
p_msg->msglen = sizeof(struct uds_msg_scmrights) - sizeof(p_scmr->path) + strlen(p_scmr->path) + 1;
ret = write(tcpfd, p_msg, sizeof(struct uds_tcp2tcp) + p_msg->msglen);
if (ret <= 0) {
uds_err("send scm rights msg to tcp failed, ret:%d", ret);
return EVENT_ERR;
}
uds_log("scm rights msg send to tcp, fd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags);
return EVENT_OK;
}
static int uds_msg_scm_fifo_file(int scmfd, int tcpfd, struct uds_event_global_var *p_event_var)
{
#define FDPATH_LEN 32
int ret;
struct uds_tcp2tcp *p_get = (struct uds_tcp2tcp *)p_event_var->buf;
struct uds_stru_scm_pipe *p_pipe = (struct uds_stru_scm_pipe *)p_get->data;
char path[FDPATH_LEN] = {0};
struct stat st;
p_get->msgtype = MSG_SCM_PIPE;
p_get->msglen = sizeof(struct uds_stru_scm_pipe);
sprintf(path, "/proc/self/fd/%d", scmfd);
lstat(path, &st);
if (st.st_mode & S_IRUSR) {
p_pipe->dir = SCM_PIPE_READ;
uds_log("scm rights recv read pipe fd:%d, mode:%o", scmfd, st.st_mode);
} else if (st.st_mode & S_IWUSR) {
p_pipe->dir = SCM_PIPE_WRITE;
uds_log("scm rights recv write pipe fd:%d, mode:%o", scmfd, st.st_mode);
} else {
uds_err("scm rights recv invalid pipe, mode:%o fd:%d", st.st_mode, scmfd);
return EVENT_ERR;
}
p_pipe->srcfd = scmfd;
ret = send(tcpfd, p_get, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_stru_scm_pipe), 0);
if (ret <= 0) {
uds_err("send tar get msg failed, ret:%d errno:%d", ret, errno);
return EVENT_ERR;
}
return EVENT_OK;
}
static int uds_msg_scmrights2tcp(struct cmsghdr *cmsg, int tcpfd, struct uds_event_global_var *p_event_var)
{
int scmfd;
mode_t mode;
memset(p_event_var->buf, 0, p_event_var->buflen);
memcpy(&scmfd, CMSG_DATA(cmsg), sizeof(scmfd));
if (scmfd <= 0) {
uds_err("recv invalid scm fd:%d", scmfd);
return EVENT_ERR;
}
mode = uds_msg_file_mode(scmfd);
switch (mode & S_IFMT) {
case S_IFREG:
uds_log("recv scmfd:%d from uds, is regular file", scmfd);
uds_msg_scm_regular_file(scmfd, tcpfd, p_event_var);
break;
case S_IFIFO:
uds_log("recv scmfd:%d from uds, is fifo", scmfd);
uds_msg_scm_fifo_file(scmfd, tcpfd, p_event_var);
break;
default:
uds_err("scm rights not support file mode:%o", mode);
break;
}
return EVENT_OK;
}
static int uds_msg_cmsg2tcp(struct msghdr *msg, struct uds_event *evt, struct uds_event_global_var *p_event_var)
{
int cnt = 0;
struct cmsghdr *cmsg = CMSG_FIRSTHDR(msg);
while (cmsg != NULL) {
cnt ++;
uds_log("cmsg type:%d len:%d level:%d, tcpfd:%d", cmsg->cmsg_type,
cmsg->cmsg_len, cmsg->cmsg_level, evt->peer->fd);
switch (cmsg->cmsg_type) {
case SCM_RIGHTS:
uds_msg_scmrights2tcp(cmsg, evt->peer->fd, p_event_var);
break;
default:
uds_err("cmsg type:%d not support now", cmsg->cmsg_type);
break;
}
cmsg = CMSG_NXTHDR(msg, cmsg);
}
return cnt;
}
static int uds_msg_scmfd_combine_msg(struct msghdr *msg, struct cmsghdr **cmsg, int *controllen, int fd)
{
struct cmsghdr *cnxt = NULL;
if (*cmsg == NULL) {
cnxt = CMSG_FIRSTHDR(msg);
} else {
cnxt = CMSG_NXTHDR(msg, *cmsg);
}
*cmsg = cnxt;
cnxt->cmsg_level = SOL_SOCKET;
cnxt->cmsg_type = SCM_RIGHTS;
cnxt->cmsg_len = CMSG_LEN(sizeof(fd));
memcpy(CMSG_DATA(cnxt), &fd, sizeof(fd));
*controllen = *controllen + cnxt->cmsg_len;
return EVENT_OK;
}
static int uds_msg_scmright_send_fd(int sock, int fd)
{
char byte = 0;
struct iovec iov;
struct msghdr msg;
struct cmsghdr *cmsg;
char buf[CMSG_SPACE(sizeof(fd))];
// send at least one char
memset(&msg, 0, sizeof(msg));
iov.iov_base = &byte;
iov.iov_len = 1;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(fd));
// Initialize the payload
memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
msg.msg_controllen = cmsg->cmsg_len;
if (sendmsg(sock, &msg, 0) != iov.iov_len)
return -1;
return 0;
}
static int uds_msg_cmsg2uds(struct uds_tcp2tcp *msg, struct uds_event *evt)
{
int scmfd = -1;
switch (msg->msgtype) {
case MSG_SCM_RIGHTS: {
struct uds_msg_scmrights *p_scmr = (struct uds_msg_scmrights *)&msg->data;
int ret;
scmfd = open(p_scmr->path, p_scmr->flags);
if (scmfd < 0) {
uds_err("scm rights send fd failed, scmfd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags);
return -1;
}
uds_log("scm send fd:%d path:%s flags:%d", scmfd, p_scmr->path, p_scmr->flags);
break;
}
default:
uds_err("msg type:%d not support.", msg->msgtype);
return -1;
}
return scmfd;
}
// drop is 1, drop this msg
int uds_msg_tcp2uds_scm_pipe(struct uds_tcp2tcp *p_msg, struct uds_event *evt, int drop)
{
int scmfd;
int fd[SCM_PIPE_NUM];
struct uds_stru_scm_pipe *p_pipe = (struct uds_stru_scm_pipe *)p_msg->data;
int len = uds_recv_with_timeout(evt->fd, p_pipe, p_msg->msglen);
if (len <= 0) {
uds_err("recv data failed, len:%d", len);
return EVENT_DEL;
}
if (drop) {
uds_err("just drop this msg.");
return EVENT_ERR;
}
if (p_pipe->dir != SCM_PIPE_READ && p_pipe->dir != SCM_PIPE_WRITE) {
uds_err("scm pipe recv invalid pipe dir:%d, srcfd:%d", p_pipe->dir, p_pipe->srcfd);
return EVENT_ERR;
}
struct uds_conn_arg tcp = {
.cs = UDS_SOCKET_CLIENT,
};
int ret;
if ((ret = uds_build_tcp_connection(&tcp)) < 0) {
uds_err("build tcp connection failed, return:%d", ret);
return EVENT_ERR;
}
if (pipe(fd) == -1) {
uds_err("pipe syscall error, errno:%d", errno);
close(tcp.connfd);
return EVENT_ERR;
}
if (p_pipe->dir == SCM_PIPE_READ) {
uds_log("send read pipe:%d to peer:%d", fd[SCM_PIPE_READ], evt->peer->fd);
scmfd = fd[SCM_PIPE_READ];
// read方向proxy读取消息并转发此代码处是远端所以监听tcp换发给pipe write
uds_add_pipe_event(tcp.connfd, fd[SCM_PIPE_WRITE], uds_event_tcp2pipe, NULL);
} else {
uds_log("send write pipe:%d to peer:%d", fd[SCM_PIPE_WRITE], evt->peer->fd);
scmfd = fd[SCM_PIPE_WRITE];
// write方向proxy读取远端代理pipe消息并转发此处是远端所以监听pipe read并转发给tcp
uds_add_pipe_event(fd[SCM_PIPE_READ], tcp.connfd, uds_event_pipe2tcp, NULL);
}
p_msg->msgtype = MSGCNTL_PIPE;
p_msg->msglen = sizeof(struct uds_stru_scm_pipe);
len = write(tcp.connfd, p_msg, sizeof(struct uds_tcp2tcp) + sizeof(struct uds_stru_scm_pipe));
if (len <= 0) {
uds_err("send pipe msg failed, len:%d", len);
return EVENT_ERR;
}
uds_log("success to build pipe fd map, dir:%d srcfd:%d tcpfd:%d readfd:%d writefd:%d",
p_pipe->dir, p_pipe->srcfd, tcp.connfd, fd[SCM_PIPE_READ], fd[SCM_PIPE_WRITE]);
return scmfd;
}
int uds_event_tcp2pipe(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
struct uds_event *evt = (struct uds_event *)arg;
memset(p_event_var->iov_base, 0, p_event_var->iov_len);
int len = read(evt->fd, p_event_var->iov_base, p_event_var->iov_len);
if (len <= 0) {
uds_err("read from tcp failed, len:%d errno:%d", len, errno);
return EVENT_DEL;
}
uds_log("tcp:%d to pipe:%d len:%d, buf:\n>>>>>>>\n%.*s\n<<<<<<<\n", evt->fd, evt->peerfd, len, len, p_event_var->iov_base);
int ret = write(evt->peerfd, p_event_var->iov_base, len);
if (ret <= 0) {
uds_err("write to pipe failed, fd:%d errno:%d", evt->peerfd, errno);
return EVENT_DEL;
}
return EVENT_OK;
}
int uds_event_pipe2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
struct uds_event *evt = (struct uds_event *)arg;
memset(p_event_var->iov_base, 0, p_event_var->iov_len);
int len = read(evt->fd, p_event_var->iov_base, p_event_var->iov_len);
if (len <= 0) {
uds_err("read from pipe failed, len:%d errno:%d", len, errno);
return EVENT_DEL;
}
uds_log("pipe:%d to tcp:%d len:%d, buf:\n>>>>>>>\n%.*s\n<<<<<<<\n", evt->fd, evt->peerfd, len, len, p_event_var->iov_base);
int ret = write(evt->peerfd, p_event_var->iov_base, len);
if (ret <= 0) {
uds_err("write to tcp failed, fd:%d errno:%d", evt->peerfd, errno);
return EVENT_DEL;
}
return EVENT_OK;
}
int uds_msg_tcp_end_msg(int sock)
{
struct uds_tcp2tcp end = {.msgtype = MSG_END, .msglen = 0,};
int ret = write(sock, &end, sizeof(struct uds_tcp2tcp));
if (ret <= 0) {
uds_err("write end msg failed, ret:%d fd:%d", ret, sock);
return EVENT_DEL;
}
return EVENT_OK;
}
void uds_msg_init_event_buf(struct uds_event_global_var *p)
{
memset(p->iov_base, 0, p->iov_len);
memset(p->iov_base_send, 0, p->iov_sendlen);
memset(p->msg_control, 0, p->msg_controllen);
memset(p->msg_control_send, 0, p->msg_controlsendlen);
memset(p->buf, 0, p->buflen);
return;
}
#define TEST_BUFLEN 256
int uds_event_uds2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
struct uds_event *evt = (struct uds_event *)arg;
struct iovec iov;
struct msghdr msg;
struct cmsghdr *cmsg;
int cmsgcnt = 0;
int len;
memset(&msg, 0, sizeof(msg));
iov.iov_base = p_event_var->iov_base + sizeof(struct uds_tcp2tcp);
iov.iov_len = p_event_var->iov_len - sizeof(struct uds_tcp2tcp);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_control = p_event_var->msg_control;
msg.msg_controllen = p_event_var->msg_controllen;
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_len = p_event_var->msg_controllen;
len = recvmsg(evt->fd, &msg, 0);
if (len == 0) {
uds_err("recvmsg error, return:%d", len);
uds_event_add_to_free(p_event_var, evt);
return EVENT_DEL;
}
if (len < 0) {
uds_err("recvmsg error return val:%d", len);
return EVENT_ERR;
}
cmsg = CMSG_FIRSTHDR(&msg);
if (cmsg != NULL) {
uds_log("recvmsg cmsg len:%d cmsglen:%d iovlen:%d iov:%s cmsglevel:%d cmsgtype:%d",
len, cmsg->cmsg_len, iov.iov_len, iov.iov_base, cmsg->cmsg_level, cmsg->cmsg_type);
cmsgcnt = uds_msg_cmsg2tcp(&msg, evt, p_event_var);
if (len - cmsgcnt == 0)
goto endmsg;
}
struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->iov_base;
p_msg->msgtype = MSG_NORMAL;
p_msg->msglen = len;
int ret = write(evt->peer->fd, (void *)p_msg, p_msg->msglen + sizeof(struct uds_tcp2tcp));
if (ret <= 0) {
uds_err("write to peer:%d failed, retcode:%d len:%d", evt->peer->fd, ret, len);
return EVENT_ERR;
}
uds_log("write iov msg to tcp success, msgtype:%d ret:%d iovlen:%d recvlen:%d udsheadlen:%d msglen:%d msg:\n>>>>>>>\n%.*s\n<<<<<<<\n",
p_msg->msgtype, ret, iov.iov_len, len, sizeof(struct uds_tcp2tcp), p_msg->msglen, p_msg->msglen, p_msg->data);
endmsg:
return uds_msg_tcp_end_msg(evt->peer->fd);
}
static inline void uds_close_fds(int *fds, int fdnum)
{
for (int i = 0; i < fdnum; i++) {
close(fds[i]);
}
return;
}
int uds_event_tcp2uds(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
#define MAX_FDS 64
int fds[MAX_FDS] = {0};
int fdnum = 0;
struct uds_event *evt = (struct uds_event *)arg;
struct uds_tcp2tcp *p_msg = (struct uds_tcp2tcp *)p_event_var->iov_base;
int ret;
int normal_msg_len = 0;
struct msghdr msg;
struct cmsghdr *cmsg = NULL;
struct iovec iov;
int msg_controllen = 0;
memset(&msg, 0, sizeof(msg));
iov.iov_base = p_event_var->iov_base_send;
iov.iov_len = 0;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_control = p_event_var->msg_control_send;
msg.msg_controllen = p_event_var->msg_controlsendlen;
while (1) {
int len = uds_recv_with_timeout(evt->fd, p_msg, sizeof(struct uds_tcp2tcp));
if (len <= 0) {
uds_err("recv no msg maybe sock is closed, delete this tcp2uds event, len:%d.", len);
goto close_event;
}
uds_log(" type:%d len:%d len:%d", p_msg->msgtype, p_msg->msglen, len);
if (p_msg->msgtype == MSG_END) {
break;
}
if (p_msg->msglen > p_event_var->iov_len - sizeof(struct uds_tcp2tcp) || p_msg->msglen <= 0) {
uds_err("pmsg len:%d is invalid, fd:%d peerfd:%d", p_msg->msglen, evt->fd, evt->peer->fd);
continue;
}
switch(p_msg->msgtype) {
case MSG_NORMAL:
if (normal_msg_len != 0) {
uds_err("normal msg repeat recv fd:%d", evt->fd);
goto err;
}
normal_msg_len = uds_recv_with_timeout(evt->fd, p_event_var->iov_base_send, p_msg->msglen);
if (normal_msg_len <= 0) {
uds_err("recv msg error:%d fd:%d", len, evt->fd);
goto close_event;
}
iov.iov_len = normal_msg_len;
uds_log("recv normal msg len:%d str: \n>>>>>>>\n%.*s\n<<<<<<<", iov.iov_len, iov.iov_len, iov.iov_base);
break;
case MSG_SCM_RIGHTS: {
int len;
int scmfd;
struct uds_msg_scmrights *p_scm = (struct uds_msg_scmrights *) p_msg->data;
if (p_msg->msglen >= sizeof(p_scm->path)) {
uds_err("recv msg len invalid:%d", p_msg->msglen);
goto err;
}
memset(p_scm->path, 0, sizeof(p_scm->path));
// SCM RIGHTS msg proc
len = uds_recv_with_timeout(evt->fd, p_msg->data, p_msg->msglen);
if (len <= 0) {
uds_err("recv data failed len:%d", p_msg->msglen);
return EVENT_DEL;
}
if (fdnum >= MAX_FDS) {
uds_err("Too many fds scm.");
continue;
}
scmfd = uds_msg_cmsg2uds(p_msg, evt);
if (scmfd == -1) {
goto err;
}
fds[fdnum++] = scmfd;
uds_msg_scmfd_combine_msg(&msg, &cmsg, &msg_controllen, scmfd);
break;
}
case MSG_SCM_PIPE: {
int scmfd;
scmfd = uds_msg_tcp2uds_scm_pipe(p_msg, evt, (fdnum >= MAX_FDS) ? 1 : 0);
if (scmfd == EVENT_DEL)
goto close_event;
if (scmfd < 0)
goto err;
if (fdnum >= MAX_FDS) {
uds_err("fdnum >= MAX_FDS\n");
continue;
}
fds[fdnum++] = scmfd;
uds_msg_scmfd_combine_msg(&msg, &cmsg, &msg_controllen, scmfd);
break;
}
default:
uds_err("recv unsupport msg type:%d event fd:%d", p_msg->msgtype, evt->fd);
break;
}
}
if (msg_controllen == 0 && iov.iov_len == 0)
goto err;
msg.msg_controllen = msg_controllen;
if (iov.iov_len == 0) iov.iov_len = 1;
ret = sendmsg(evt->peer->fd, &msg, 0);
uds_log("evt:%d sendmsg len:%d, controllen:%d errno:%d", evt->fd, ret, msg_controllen, errno);
uds_close_fds(fds, fdnum);
return EVENT_OK;
err:
uds_close_fds(fds, fdnum);
return EVENT_ERR;
close_event:
uds_close_fds(fds, fdnum);
uds_event_add_to_free(p_event_var, evt);
return EVENT_DEL;
}
int uds_diag_is_epoll_fd(int fd)
{
for (int i = 0; i < p_uds_var->work_thread_num; i++) {
if (fd == p_uds_var->efd[i])
return 1;
}
return 0;
}
int uds_diag_string(char *buf, int len)
{
int pos = 0;
memset(buf, 0, len);
len--;
pos = snprintf(buf, len - pos, "+-----------------------------Unix Proxy Diagnostic information-------------------------+\n");
pos += snprintf(&buf[pos], len - pos, "+ Thread nums:%d\n", p_uds_var->work_thread_num);
for (int i = 0; i < p_uds_var->work_thread_num; i++) {
pos += snprintf(&buf[pos], len - pos, "+ Thread %d events count:%d\n", i+1, p_uds_var->work_thread[i].info.events);
}
pos += snprintf(&buf[pos], len - pos, "+ Log level:%s\n", p_uds_var->logstr[p_uds_var->loglevel]);
pos += snprintf(&buf[pos], len - pos, "+---------------------------------------------------------------------------------------+\n");
return strlen(buf);
}
// DIAG INFO
int uds_event_diag_info(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
int connfd;
int len;
int ret;
struct uds_event *evt = (struct uds_event *)arg;
if (evt == NULL) {
uds_err("diag info param is invalid.");
return EVENT_ERR;
}
connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
if (connfd <= 0) {
uds_err("diag info conn fd error:%d", connfd);
return EVENT_ERR;
}
uds_log("diag accept an new connection to send diag info, fd:%d", connfd);
len = uds_diag_string(p_event_var->iov_base, p_event_var->iov_len);
ret = send(connfd, p_event_var->iov_base, len, 0);
if (ret <= 0) {
uds_err("send diag info error, ret:%d len:%d", ret, len);
}
close(connfd);
return EVENT_OK;
}
#define UDS_LOG_STR(level) (level < 0 || level >= UDS_LOG_MAX) ? p_uds_var->logstr[UDS_LOG_MAX] : p_uds_var->logstr[level]
int uds_event_debug_level(void *arg, int epfd, struct uds_event_global_var *p_event_var)
{
int connfd;
int len;
int ret;
int cur;
struct uds_event *evt = (struct uds_event *)arg;
if (evt == NULL) {
uds_err("debug level set param is invalid.");
return EVENT_ERR;
}
connfd = uds_sock_step_accept(evt->fd, AF_UNIX);
if (connfd <= 0) {
uds_err("debug level set conn fd error:%d", connfd);
return EVENT_ERR;
}
cur = p_uds_var->loglevel;
if (cur + 1 < UDS_LOG_MAX) {
p_uds_var->loglevel += 1;
} else {
p_uds_var->loglevel = UDS_LOG_NONE;
}
uds_log("debug level accept a new connection, current level:%s change to:%s", UDS_LOG_STR(cur), UDS_LOG_STR(p_uds_var->loglevel));
len = sprintf(p_event_var->iov_base, "+---------------UDS LOG LEVEL UPDATE--------------+\n"
"+ Log level is:%s before, now change to :%s.\n"
"+-------------------------------------------------+\n", UDS_LOG_STR(cur), UDS_LOG_STR(p_uds_var->loglevel));
ret = send(connfd, p_event_var->iov_base, len, 0);
if (ret <= 0) {
uds_err("send debug level info error, ret:%d len:%d", ret, len);
}
close(connfd);
return EVENT_OK;
}