From c6a41c20a96d38e4e24596fc5939cb727d174db4 Mon Sep 17 00:00:00 2001 From: Alyson Deives Pereira Date: Wed, 28 Sep 2022 09:05:14 -0300 Subject: [PATCH] Add ZeroMQ RPC backend This feature adds a new RPC backend for communication between sysinv-api, sysinv-conductor and sysinv-agent processes. This backend is implemented using a patched zerorpc library [1], which is built on top of ZeroMQ and message-pack. The motivation behind this change is to decouple sysinv from RabbitMQ, and use a brokerless solution for RPC instead. The key points are: - All imports of rpcapi.py are replaced by rpcapiproxy.py, which decides the backend to use (rabbitmq or zeromq) according to configuration. - During an upgrade process the rpc service listens to both rabbitmq and zeromq. For communication between hosts, the client backend api is chosen according to host software version. - In future versions, the usage of RabbitMQ will no longer be necessary and its usage can be removed. I have marked these parts of code with "TODO(RPCHybridMode)" to easily track it. [1] https://review.opendev.org/c/starlingx/integ/+/864310 TEST PLAN: PASS: Bootstrap and host-unlock on AIO-SX, AIO-Duplex, Standard PASS: Bootstrap and host-unlock on DC system-controller and subcloud PASS: Verify sysinv.log and confirm no error occurs in RPC communication PASS: Perform system cli commands that interacts with sysinv RPCs: - system host-cpu-max-frequency-modify - system license-install - system storage-backend-add ceph-external - system host-swact PASS: Backup & Restore on AIO-SX PASS: Bootstrap replay (updating mgmt and cluster subnet) on AIO-SX PASS: Platform upgrade on AIO-DX (22.06 -> 22.12) PASS: Platform upgrade on AIO-DX+ (22.06 -> 22.12) PASS: Platform upgrade on AIO-SX (22.06 -> 22.12) Depends-On: https://review.opendev.org/c/starlingx/tools/+/859576 Depends-On: https://review.opendev.org/c/starlingx/stx-puppet/+/859575 Depends-On: https://review.opendev.org/c/starlingx/ansible-playbooks/+/862609 Story: 2010087 Task: 46444 Change-Id: I5cd61b541a6d8c62628a0f99db0e35af1eae5961 Signed-off-by: Alyson Deives Pereira Signed-off-by: Eduardo Juliano Alberti --- .../controllerconfig/upgrades/controller.py | 6 + .../scripts/sysinv-service-restart.sh | 20 ++ controllerconfig/debian/deb_folder/rules | 1 + sysinv/sysinv/debian/deb_folder/control | 2 + sysinv/sysinv/sysinv/requirements.txt | 3 +- .../sysinv/sysinv/scripts/manage-partitions | 2 +- sysinv/sysinv/sysinv/sysinv/agent/disk.py | 2 +- sysinv/sysinv/sysinv/sysinv/agent/manager.py | 63 ++++- sysinv/sysinv/sysinv/sysinv/agent/rpcapi.py | 16 ++ .../sysinv/sysinv/sysinv/agent/rpcapiproxy.py | 26 ++ .../sysinv/sysinv/sysinv/agent/rpcapizmq.py | 62 +++++ .../sysinv/sysinv/api/controllers/v1/disk.py | 2 +- sysinv/sysinv/sysinv/sysinv/api/hooks.py | 2 +- .../sysinv/sysinv/cmd/dnsmasq_lease_update.py | 2 +- sysinv/sysinv/sysinv/sysinv/cmd/utils.py | 2 +- sysinv/sysinv/sysinv/sysinv/common/context.py | 4 + sysinv/sysinv/sysinv/sysinv/common/service.py | 11 +- .../sysinv/sysinv/sysinv/conductor/manager.py | 63 ++++- .../sysinv/sysinv/conductor/rpcapiproxy.py | 47 ++++ .../sysinv/sysinv/conductor/rpcapizmq.py | 45 ++++ sysinv/sysinv/sysinv/sysinv/helm/utils.py | 2 +- .../sysinv/sysinv/openstack/common/context.py | 6 +- .../sysinv/openstack/common/rpc/__init__.py | 15 ++ .../sysinv/openstack/common/rpc/common.py | 22 ++ .../sysinv/sysinv/sysinv/puppet/inventory.py | 11 +- .../sysinv/tests/api/test_certificate.py | 2 +- .../sysinv/tests/api/test_controller_fs.py | 2 +- .../sysinv/tests/api/test_device_image.py | 2 +- .../sysinv/sysinv/tests/api/test_dns.py | 2 +- .../sysinv/tests/api/test_helm_charts.py | 2 +- .../sysinv/sysinv/tests/api/test_host.py | 2 +- .../sysinv/sysinv/tests/api/test_host_fs.py | 2 +- .../sysinv/sysinv/tests/api/test_interface.py | 4 +- .../tests/api/test_kube_rootca_update.py | 4 +- .../sysinv/tests/api/test_kube_upgrade.py | 2 +- .../sysinv/sysinv/tests/api/test_ntp.py | 2 +- .../sysinv/sysinv/tests/api/test_partition.py | 2 +- .../sysinv/sysinv/tests/api/test_upgrade.py | 2 +- .../sysinv/sysinv/tests/conf_fixture.py | 1 + .../sysinv/sysinv/sysinv/zmq_rpc/__init__.py | 3 + .../sysinv/sysinv/zmq_rpc/client_provider.py | 46 ++++ .../sysinv/sysinv/zmq_rpc/serializer.py | 65 +++++ .../sysinv/sysinv/sysinv/zmq_rpc/zmq_rpc.py | 237 ++++++++++++++++++ tsconfig/tsconfig/tsconfig/tsconfig.py | 4 + 44 files changed, 789 insertions(+), 34 deletions(-) create mode 100644 controllerconfig/controllerconfig/scripts/sysinv-service-restart.sh create mode 100644 sysinv/sysinv/sysinv/sysinv/agent/rpcapiproxy.py create mode 100644 sysinv/sysinv/sysinv/sysinv/agent/rpcapizmq.py create mode 100644 sysinv/sysinv/sysinv/sysinv/conductor/rpcapiproxy.py create mode 100644 sysinv/sysinv/sysinv/sysinv/conductor/rpcapizmq.py create mode 100644 sysinv/sysinv/sysinv/sysinv/zmq_rpc/__init__.py create mode 100644 sysinv/sysinv/sysinv/sysinv/zmq_rpc/client_provider.py create mode 100644 sysinv/sysinv/sysinv/sysinv/zmq_rpc/serializer.py create mode 100644 sysinv/sysinv/sysinv/sysinv/zmq_rpc/zmq_rpc.py diff --git a/controllerconfig/controllerconfig/controllerconfig/upgrades/controller.py b/controllerconfig/controllerconfig/controllerconfig/upgrades/controller.py index 673411bd86..3a78a304b2 100644 --- a/controllerconfig/controllerconfig/controllerconfig/upgrades/controller.py +++ b/controllerconfig/controllerconfig/controllerconfig/upgrades/controller.py @@ -40,6 +40,7 @@ from tsconfig.tsconfig import CONTROLLER_UPGRADE_FLAG from tsconfig.tsconfig import CONTROLLER_UPGRADE_COMPLETE_FLAG from tsconfig.tsconfig import CONTROLLER_UPGRADE_FAIL_FLAG from tsconfig.tsconfig import CONTROLLER_UPGRADE_STARTED_FLAG +from tsconfig.tsconfig import SYSINV_HYBRID_RPC_FLAG from controllerconfig.common import constants from controllerconfig import utils as cutils @@ -893,6 +894,11 @@ def upgrade_controller(from_release, to_release): LOG.error("Failed to stop %s service" % "sysinv-agent") raise + # Creating Sysinv Hybrid Mode flag + # TODO(RPCHybridMode): This is only required for 21.12 -> 22.12 upgrades. + # Remove in future release. + open(SYSINV_HYBRID_RPC_FLAG, "w").close() + # Mount required filesystems from mate controller LOG.info("Mounting filesystems") nfs_mount_filesystem(PLATFORM_PATH) diff --git a/controllerconfig/controllerconfig/scripts/sysinv-service-restart.sh b/controllerconfig/controllerconfig/scripts/sysinv-service-restart.sh new file mode 100644 index 0000000000..11892434d1 --- /dev/null +++ b/controllerconfig/controllerconfig/scripts/sysinv-service-restart.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +NAME=$(basename $0) + +function log { + logger -p local1.info $1 +} + +log "$NAME: restarting sysinv services" + +sm-restart service sysinv-conductor +sleep 2 +pmon-restart sysinv-agent + +exit 0 diff --git a/controllerconfig/debian/deb_folder/rules b/controllerconfig/debian/deb_folder/rules index 364ac1bb70..159afb8202 100755 --- a/controllerconfig/debian/deb_folder/rules +++ b/controllerconfig/debian/deb_folder/rules @@ -15,6 +15,7 @@ override_dh_install: install -p -D -m 700 scripts/openstack_update_admin_password $(ROOT)/usr/bin/openstack_update_admin_password install -p -D -m 700 scripts/upgrade_swact_migration.py $(ROOT)/usr/bin/upgrade_swact_migration.py install -p -D -m 755 scripts/image-backup.sh $(ROOT)/usr/bin/image-backup.sh + install -p -D -m 755 scripts/sysinv-service-restart.sh $(ROOT)/usr/bin/sysinv-service-restart.sh install -d -m 755 $(ROOT)/etc/goenabled.d/ install -p -D -m 700 scripts/config_goenabled_check.sh $(ROOT)/etc/goenabled.d/config_goenabled_check.sh.controller install -d -m 755 $(ROOT)/etc/init.d diff --git a/sysinv/sysinv/debian/deb_folder/control b/sysinv/sysinv/debian/deb_folder/control index 3bf7c2653a..94e7372d57 100644 --- a/sysinv/sysinv/debian/deb_folder/control +++ b/sysinv/sysinv/debian/deb_folder/control @@ -75,6 +75,7 @@ Build-Depends-Indep: python3-fm-api, python3-platform-util, python3-cephclient, + zerorpc-python, cgts-client, controllerconfig Standards-Version: 4.5.1 @@ -141,6 +142,7 @@ Depends: ${python3:Depends}, ${misc:Depends}, python3-cgcs-patch, platform-util, python3-cephclient, + zerorpc-python, cgts-client Description: Starlingx system inventory - daemon Starlingx system inventory diff --git a/sysinv/sysinv/sysinv/requirements.txt b/sysinv/sysinv/sysinv/requirements.txt index d904e1f6a2..8051434a7b 100644 --- a/sysinv/sysinv/sysinv/requirements.txt +++ b/sysinv/sysinv/sysinv/requirements.txt @@ -52,4 +52,5 @@ python-barbicanclient rfc3986 importlib-metadata>=3.3.0;python_version=="3.6" importlib-resources==5.2.2;python_version=="3.6" -oslo.policy # Apache-2.0 \ No newline at end of file +oslo.policy # Apache-2.0 +zerorpc @ git+https://github.com/0rpc/zerorpc-python.git@99ee6e47c8baf909b97eec94f184a19405f392a2 # MIT diff --git a/sysinv/sysinv/sysinv/scripts/manage-partitions b/sysinv/sysinv/sysinv/scripts/manage-partitions index e999bc5a09..56144bfe4c 100755 --- a/sysinv/sysinv/sysinv/scripts/manage-partitions +++ b/sysinv/sysinv/sysinv/scripts/manage-partitions @@ -35,7 +35,7 @@ from sysinv.common import service as sysinv_service from sysinv.common import utils from sysinv.common import disk_utils -from sysinv.conductor import rpcapi as conductor_rpcapi +from sysinv.conductor import rpcapiproxy as conductor_rpcapi from sysinv.openstack.common import context from functools import cmp_to_key diff --git a/sysinv/sysinv/sysinv/sysinv/agent/disk.py b/sysinv/sysinv/sysinv/sysinv/agent/disk.py index 5cdcfa1e1a..8b19fadde6 100644 --- a/sysinv/sysinv/sysinv/sysinv/agent/disk.py +++ b/sysinv/sysinv/sysinv/sysinv/agent/disk.py @@ -22,7 +22,7 @@ from oslo_log import log as logging from sysinv.common import disk_utils from sysinv.common import constants from sysinv.common import utils -from sysinv.conductor import rpcapi as conductor_rpcapi +from sysinv.conductor import rpcapiproxy as conductor_rpcapi from sysinv.openstack.common import context LOG = logging.getLogger(__name__) diff --git a/sysinv/sysinv/sysinv/sysinv/agent/manager.py b/sysinv/sysinv/sysinv/sysinv/agent/manager.py index f9a8b49234..0ef447cdfc 100644 --- a/sysinv/sysinv/sysinv/sysinv/agent/manager.py +++ b/sysinv/sysinv/sysinv/sysinv/agent/manager.py @@ -38,6 +38,7 @@ from eventlet.green import subprocess import fileinput import os import retrying +import six import shutil import sys import tempfile @@ -66,12 +67,15 @@ from sysinv.common import service from sysinv.common import utils from sysinv.objects import base as objects_base from sysinv.puppet import common as puppet -from sysinv.conductor import rpcapi as conductor_rpcapi +from sysinv.conductor import rpcapiproxy as conductor_rpcapi from sysinv.openstack.common import context as mycontext from sysinv.openstack.common import periodic_task from sysinv.openstack.common.rpc.common import Timeout from sysinv.openstack.common.rpc.common import serialize_remote_exception +from sysinv.openstack.common.rpc import service as rpc_service from sysinv.openstack.common.rpc.common import RemoteError +from sysinv.zmq_rpc.zmq_rpc import ZmqRpcServer +from sysinv.zmq_rpc.zmq_rpc import is_rpc_hybrid_mode_active import tsconfig.tsconfig as tsc @@ -157,8 +161,28 @@ class AgentManager(service.PeriodicService): HOST_FILESYSTEMS} def __init__(self, host, topic): + self.host = host + self.topic = topic serializer = objects_base.SysinvObjectSerializer() - super(AgentManager, self).__init__(host, topic, serializer=serializer) + super(AgentManager, self).__init__() + self._rpc_service = None + self._zmq_rpc_service = None + + # TODO(RPCHybridMode): Usage of RabbitMQ RPC is only required for + # 21.12 -> 22.12 upgrades. + # Remove this in new releases, when it's no longer necessary do the + # migration work through RabbitMQ and ZeroMQ + # NOTE: If more switches are necessary before RabbitMQ removal, + # refactor this into an RPC layer + if not CONF.rpc_backend_zeromq or is_rpc_hybrid_mode_active(): + self._rpc_service = rpc_service.Service(self.host, self.topic, + manager=self, + serializer=serializer) + if CONF.rpc_backend_zeromq: + self._zmq_rpc_service = ZmqRpcServer( + self, + CONF.rpc_zeromq_bind_ip, + CONF.rpc_zeromq_agent_bind_port) self._report_to_conductor_iplatform_avail_flag = False self._report_to_conductor_fpga_info = True @@ -191,7 +215,10 @@ class AgentManager(service.PeriodicService): def start(self): super(AgentManager, self).start() - + if self._rpc_service: + self._rpc_service.start() + if self._zmq_rpc_service: + self._zmq_rpc_service.run() # Do not collect inventory and report to conductor at startup in # order to eliminate two inventory reports # (one from here and one from audit) being sent to the conductor @@ -203,6 +230,13 @@ class AgentManager(service.PeriodicService): if tsc.system_mode == constants.SYSTEM_MODE_SIMPLEX: utils.touch(SYSINV_READY_FLAG) + def stop(self): + if self._rpc_service: + self._rpc_service.stop() + if self._zmq_rpc_service: + self._zmq_rpc_service.stop() + super(AgentManager, self).stop() + def _report_to_conductor(self): """ Initial inventory report to conductor required @@ -1576,7 +1610,8 @@ class AgentManager(service.PeriodicService): basename = os.path.basename(file_name) fd, tmppath = tempfile.mkstemp(dir=dirname, prefix=basename) with os.fdopen(fd, 'wb') as f: - f.write(f_content.encode()) + f_content = six.ensure_binary(f_content) + f.write(f_content) if os.path.islink(file_name): os.unlink(file_name) os.rename(tmppath, file_name) @@ -2129,3 +2164,23 @@ class AgentManager(service.PeriodicService): except exception.SysinvException: LOG.exception("Sysinv Agent exception updating ipv" "conductor.") + + # TODO(RPCHybridMode): This is only useful for 21.12 -> 22.12 upgrades. + # Remove this method in new releases, when it's no longer necessary to + # perform upgrade through hybrid mode messaging system + def delete_sysinv_hybrid_state(self, context, host_uuid): + """Delete the Sysinv flag of Hybrid Mode. + + :param host_uuid: ihost uuid unique id + :return: None + """ + + if self._ihost_uuid and self._ihost_uuid == host_uuid: + if os.path.exists(tsc.SYSINV_HYBRID_RPC_FLAG): + utils.delete_if_exists(tsc.SYSINV_HYBRID_RPC_FLAG) + LOG.info("Sysinv Hybrid Mode deleted.") + LOG.info("Sysinv services will be restarted") + # pylint: disable=not-callable + subprocess.call(['/usr/bin/sysinv-service-restart.sh']) + else: + LOG.info("Hybrid flag doesn't exist. Ignoring delete.") diff --git a/sysinv/sysinv/sysinv/sysinv/agent/rpcapi.py b/sysinv/sysinv/sysinv/sysinv/agent/rpcapi.py index 24a263c908..7626739c41 100644 --- a/sysinv/sysinv/sysinv/sysinv/agent/rpcapi.py +++ b/sysinv/sysinv/sysinv/sysinv/agent/rpcapi.py @@ -294,3 +294,19 @@ class AgentAPI(sysinv.openstack.common.rpc.proxy.RpcProxy): transaction_id=transaction_id, retimer_included=retimer_included), topic=topic) + + # TODO(RPCHybridMode): This is only useful for 21.12 -> 22.12 upgrades. + # Remove this method in new releases, when it's no longer necessary to + # perform upgrade through hybrid mode messaging system + def delete_sysinv_hybrid_state(self, context, host_uuid): + """Asynchronously, have the agent to delete sysinv hybrid + mode flag + + :param context: request context. + :param host_uuid: ihost uuid unique id + :returns: pass or fail + """ + + return self.cast(context, + self.make_msg('delete_sysinv_hybrid_state', + host_uuid=host_uuid)) diff --git a/sysinv/sysinv/sysinv/sysinv/agent/rpcapiproxy.py b/sysinv/sysinv/sysinv/sysinv/agent/rpcapiproxy.py new file mode 100644 index 0000000000..74911eee4f --- /dev/null +++ b/sysinv/sysinv/sysinv/sysinv/agent/rpcapiproxy.py @@ -0,0 +1,26 @@ +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +from oslo_config import cfg +from oslo_log import log +import sysinv.agent.rpcapi as rpcapi +from sysinv.agent.rpcapizmq import AgentAPI as ZMQAgentAPI +from sysinv.agent.rpcapi import AgentAPI as AMQPAgentAPI +from sysinv.zmq_rpc.zmq_rpc import is_rpc_hybrid_mode_active + +LOG = log.getLogger(__name__) +MANAGER_TOPIC = rpcapi.MANAGER_TOPIC + + +def AgentAPI(topic=None): + rpc_backend = cfg.CONF.rpc_backend + rpc_backend_zeromq = cfg.CONF.rpc_backend_zeromq + rpc_backend_hybrid_mode = is_rpc_hybrid_mode_active() + LOG.debug("Current agent rpc_backend: {} " + "use_zeromq: {} hybrid_mode: {}".format(rpc_backend, + rpc_backend_zeromq, + rpc_backend_hybrid_mode)) + if rpc_backend_zeromq: + return ZMQAgentAPI(topic) + return AMQPAgentAPI(topic) diff --git a/sysinv/sysinv/sysinv/sysinv/agent/rpcapizmq.py b/sysinv/sysinv/sysinv/sysinv/agent/rpcapizmq.py new file mode 100644 index 0000000000..f96e1ab573 --- /dev/null +++ b/sysinv/sysinv/sysinv/sysinv/agent/rpcapizmq.py @@ -0,0 +1,62 @@ +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Client side of the agent RPC API using ZeroMQ backend. +""" + +from oslo_config import cfg +from oslo_log import log +from sysinv.agent.rpcapi import AgentAPI as BaseAgentAPI +from sysinv.agent.rpcapi import MANAGER_TOPIC +from sysinv.zmq_rpc.zmq_rpc import ZmqRpcClient +from sysinv.zmq_rpc.zmq_rpc import is_rpc_hybrid_mode_active +from sysinv.zmq_rpc.zmq_rpc import is_zmq_backend_available + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +class AgentAPI(ZmqRpcClient, BaseAgentAPI): + def __init__(self, topic=None): + if topic is None: + topic = MANAGER_TOPIC + host = None + port = CONF.rpc_zeromq_agent_bind_port + super(AgentAPI, self).__init__(host, port, topic) + + def call(self, context, msg, topic=None, version=None, timeout=None): + if is_rpc_hybrid_mode_active(): + host_uuid = msg['args']['host_uuid'] + if not is_zmq_backend_available(host_uuid): + LOG.debug("RPC hybrid mode is active and agent zmq backend is " + "not yet available in host {}. Calling RPC call " + "method {} through rabbitmq".format(host_uuid, + msg['method'])) + rpcapi = BaseAgentAPI() + return rpcapi.call(context, msg, topic, version, timeout) + + return super(AgentAPI, self).call(context, msg, timeout) + + def cast(self, context, msg, topic=None, version=None): + if is_rpc_hybrid_mode_active(): + host_uuid = msg['args']['host_uuid'] + if not is_zmq_backend_available(host_uuid): + LOG.debug("RPC hybrid mode is active and agent zmq backend is " + "not yet available in host {}. Calling RPC cast " + "method {} through rabbitmq".format(host_uuid, + msg['method'])) + rpcapi = BaseAgentAPI() + return rpcapi.cast(context, msg, topic, version) + + return super(AgentAPI, self).cast(context, msg) + + def fanout_cast(self, context, msg, topic=None, version=None): + if is_rpc_hybrid_mode_active(): + method = msg['method'] + LOG.debug("RPC hybrid mode is active. Calling RPC fanout_cast " + "method {} through rabbitmq and zmq".format(method)) + rpcapi = BaseAgentAPI() + rpcapi.fanout_cast(context, msg, topic, version) + return super(AgentAPI, self).fanout_cast(context, msg) diff --git a/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/disk.py b/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/disk.py index ec2f9ec05a..a0018628e9 100644 --- a/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/disk.py +++ b/sysinv/sysinv/sysinv/sysinv/api/controllers/v1/disk.py @@ -33,7 +33,7 @@ from sysinv.api.controllers.v1 import link from sysinv.api.controllers.v1 import partition from sysinv.api.controllers.v1 import types from sysinv.api.controllers.v1 import utils -from sysinv.agent import rpcapi as agent_rpcapi +from sysinv.agent import rpcapiproxy as agent_rpcapi from sysinv.common import exception from sysinv.common import constants from sysinv.common import utils as cutils diff --git a/sysinv/sysinv/sysinv/sysinv/api/hooks.py b/sysinv/sysinv/sysinv/sysinv/api/hooks.py index 7d59bb9534..46fe9790e2 100644 --- a/sysinv/sysinv/sysinv/sysinv/api/hooks.py +++ b/sysinv/sysinv/sysinv/sysinv/api/hooks.py @@ -34,7 +34,7 @@ from sysinv._i18n import _ from sysinv.api.policies import base as base_policy from sysinv.common import context from sysinv.common import utils -from sysinv.conductor import rpcapi +from sysinv.conductor import rpcapiproxy as rpcapi from sysinv.db import api as dbapi from sysinv.common import policy from webob import exc diff --git a/sysinv/sysinv/sysinv/sysinv/cmd/dnsmasq_lease_update.py b/sysinv/sysinv/sysinv/sysinv/cmd/dnsmasq_lease_update.py index 66603d0feb..4b5ec3a501 100755 --- a/sysinv/sysinv/sysinv/sysinv/cmd/dnsmasq_lease_update.py +++ b/sysinv/sysinv/sysinv/sysinv/cmd/dnsmasq_lease_update.py @@ -37,7 +37,7 @@ from oslo_config import cfg from oslo_log import log from sysinv._i18n import _ from sysinv.common import service as sysinv_service -from sysinv.conductor import rpcapi as conductor_rpcapi +from sysinv.conductor import rpcapiproxy as conductor_rpcapi from sysinv.openstack.common import context CONF = cfg.CONF diff --git a/sysinv/sysinv/sysinv/sysinv/cmd/utils.py b/sysinv/sysinv/sysinv/sysinv/cmd/utils.py index 5a3843b95a..0fef27948b 100644 --- a/sysinv/sysinv/sysinv/sysinv/cmd/utils.py +++ b/sysinv/sysinv/sysinv/sysinv/cmd/utils.py @@ -10,7 +10,7 @@ import yaml from sysinv.common import constants from sysinv.common import service -from sysinv.conductor import rpcapi as conductor_rpcapi +from sysinv.conductor import rpcapiproxy as conductor_rpcapi from sysinv.db import api from sysinv.openstack.common import context diff --git a/sysinv/sysinv/sysinv/sysinv/common/context.py b/sysinv/sysinv/sysinv/sysinv/common/context.py index 29ea4fb7e5..0011e1ebcd 100644 --- a/sysinv/sysinv/sysinv/sysinv/common/context.py +++ b/sysinv/sysinv/sysinv/sysinv/common/context.py @@ -75,3 +75,7 @@ class RequestContext(context.RequestContext): result.update(super(RequestContext, self).to_dict()) return result + + @classmethod + def from_dict(cls, values): + return cls(**values) diff --git a/sysinv/sysinv/sysinv/sysinv/common/service.py b/sysinv/sysinv/sysinv/sysinv/common/service.py index 1732c3b7e4..d794067613 100644 --- a/sysinv/sysinv/sysinv/sysinv/common/service.py +++ b/sysinv/sysinv/sysinv/sysinv/common/service.py @@ -25,7 +25,7 @@ from oslo_service import service from sysinv.openstack.common import context from sysinv.openstack.common import periodic_task from sysinv.openstack.common import rpc -from sysinv.openstack.common.rpc import service as rpc_service +from sysinv.openstack.common import service as base_service from sysinv import version @@ -47,7 +47,14 @@ cfg.CONF.register_opts([ CONF = cfg.CONF -class PeriodicService(rpc_service.Service, periodic_task.PeriodicTasks): +class PeriodicService(base_service.Service, periodic_task.PeriodicTasks): + + def __init__(self, manager=None): + super(PeriodicService, self).__init__() + if manager is None: + self.manager = self + else: + self.manager = manager def start(self): super(PeriodicService, self).start() diff --git a/sysinv/sysinv/sysinv/sysinv/conductor/manager.py b/sysinv/sysinv/sysinv/sysinv/conductor/manager.py index ac22397cf9..5a9891c283 100644 --- a/sysinv/sysinv/sysinv/sysinv/conductor/manager.py +++ b/sysinv/sysinv/sysinv/sysinv/conductor/manager.py @@ -39,6 +39,7 @@ import re import requests import ruamel.yaml as yaml import shutil +import six import socket import tempfile import time @@ -77,7 +78,7 @@ from platform_util.license import license from sqlalchemy.orm import exc from six.moves import http_client as httplib from sysinv._i18n import _ -from sysinv.agent import rpcapi as agent_rpcapi +from sysinv.agent import rpcapiproxy as agent_rpcapi from sysinv.api.controllers.v1 import address_pool from sysinv.api.controllers.v1 import cpu_utils from sysinv.api.controllers.v1 import kube_app as kube_api @@ -112,11 +113,14 @@ from sysinv.objects import base as objects_base from sysinv.objects import kube_app as kubeapp_obj from sysinv.openstack.common import context as ctx from sysinv.openstack.common import periodic_task +from sysinv.openstack.common.rpc import service as rpc_service from sysinv.puppet import common as puppet_common from sysinv.puppet import puppet from sysinv.helm import helm from sysinv.helm.lifecycle_constants import LifecycleConstants from sysinv.helm.lifecycle_hook import LifecycleHookInfo +from sysinv.zmq_rpc.zmq_rpc import ZmqRpcServer +from sysinv.zmq_rpc.zmq_rpc import is_rpc_hybrid_mode_active MANAGER_TOPIC = 'sysinv.conductor_manager' @@ -222,9 +226,29 @@ class ConductorManager(service.PeriodicService): my_host_id = None def __init__(self, host, topic): + self.host = host + self.topic = topic serializer = objects_base.SysinvObjectSerializer() - super(ConductorManager, self).__init__(host, topic, - serializer=serializer) + super(ConductorManager, self).__init__() + self._rpc_service = None + self._zmq_rpc_service = None + + # TODO(RPCHybridMode): Usage of RabbitMQ RPC is only required for + # 21.12 -> 22.12 upgrades. + # Remove this in new releases, when it's no longer necessary do the + # migration work through RabbitMQ and ZeroMQ + # NOTE: If more switches are necessary before RabbitMQ removal, + # refactor this into an RPC layer + if not CONF.rpc_backend_zeromq or is_rpc_hybrid_mode_active(): + self._rpc_service = rpc_service.Service(self.host, self.topic, + manager=self, + serializer=serializer) + if CONF.rpc_backend_zeromq: + self._zmq_rpc_service = ZmqRpcServer( + self, + CONF.rpc_zeromq_conductor_bind_ip, + CONF.rpc_zeromq_conductor_bind_port) + self.dbapi = None self.fm_api = None self.fm_log = None @@ -282,6 +306,12 @@ class ConductorManager(service.PeriodicService): self._start() # accept API calls and run periodic tasks after # initializing conductor manager service + if self._rpc_service: + self._rpc_service.start() + + if self._zmq_rpc_service: + self._zmq_rpc_service.run() + super(ConductorManager, self).start() # greenthreads must be called after super.start for it to work properly @@ -393,6 +423,13 @@ class ConductorManager(service.PeriodicService): """ Periodic tasks are run at pre-specified intervals. """ return self.run_periodic_tasks(context, raise_on_error=raise_on_error) + def stop(self): + if self._rpc_service: + self._rpc_service.stop() + if self._zmq_rpc_service: + self._zmq_rpc_service.stop() + super(ConductorManager, self).stop() + @contextmanager def session(self): session = dbapi.get_instance().get_session(autocommit=True) @@ -12187,6 +12224,16 @@ class ConductorManager(service.PeriodicService): # Delete upgrade record self.dbapi.software_upgrade_destroy(upgrade.uuid) + # TODO(RPCHybridMode): This is only useful for 21.12 -> 22.12 upgrades. + # Remove this in new releases, when it's no longer necessary + # do the migration work through RabbitMQ and ZeroMQ + if (tsc.system_mode is not constants.SYSTEM_MODE_SIMPLEX): + rpcapi = agent_rpcapi.AgentAPI() + controller_1 = self.dbapi.ihost_get_by_hostname( + constants.CONTROLLER_1_HOSTNAME) + LOG.info("Deleting Sysinv Hybrid state") + rpcapi.delete_sysinv_hybrid_state(context, controller_1['uuid']) + # Clear upgrades alarm entity_instance_id = "%s=%s" % (fm_constants.FM_ENTITY_TYPE_HOST, constants.CONTROLLER_HOSTNAME) @@ -12778,6 +12825,9 @@ class ConductorManager(service.PeriodicService): LOG.info("Overwriting file %s in %s " % (ceph_conf_filename, tsc.PLATFORM_CEPH_CONF_PATH)) + # contents might be bytes, make sure it is str + contents = six.ensure_str(contents) + try: with open(opt_ceph_conf_file, 'w+') as f: f.write(contents) @@ -12794,6 +12844,10 @@ class ConductorManager(service.PeriodicService): """ LOG.info("Install license file.") + + # contents might be bytes, make sure it is str + contents = six.ensure_str(contents) + license_file = os.path.join(tsc.PLATFORM_CONF_PATH, constants.LICENSE_FILE) temp_license_file = license_file + '.temp' @@ -13018,6 +13072,9 @@ class ConductorManager(service.PeriodicService): LOG.info("config_certificate mode=%s" % mode) + # pem_contents might be bytes, make sure it is str + pem_contents = six.ensure_str(pem_contents) + cert_list, private_key = \ self._extract_keys_from_pem(mode, pem_contents, serialization.PrivateFormat.PKCS8, diff --git a/sysinv/sysinv/sysinv/sysinv/conductor/rpcapiproxy.py b/sysinv/sysinv/sysinv/sysinv/conductor/rpcapiproxy.py new file mode 100644 index 0000000000..c1040828fc --- /dev/null +++ b/sysinv/sysinv/sysinv/sysinv/conductor/rpcapiproxy.py @@ -0,0 +1,47 @@ +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +import os +from oslo_config import cfg +from oslo_log import log + +import sysinv.conductor.rpcapi as rpcapi +from sysinv.conductor.rpcapi import ConductorAPI as AMQPConductorAPI +from sysinv.conductor.rpcapizmq import ConductorAPI as ZMQConductorAPI +from sysinv.zmq_rpc.zmq_rpc import is_rpc_hybrid_mode_active +from sysinv.zmq_rpc.zmq_rpc import check_connection + +LOG = log.getLogger(__name__) +MANAGER_TOPIC = rpcapi.MANAGER_TOPIC + + +def ConductorAPI(topic=None): + rpc_backend_zeromq = cfg.CONF.rpc_backend_zeromq + rpc_backend_hybrid_mode = is_rpc_hybrid_mode_active() + rpc_backend = cfg.CONF.rpc_backend + LOG.debug("Current conductor rpc_backend: {} " + "use_zeromq: {} hybrid_mode: {}".format(rpc_backend, + rpc_backend_zeromq, + rpc_backend_hybrid_mode)) + # Hybrid mode is expected to be defined for controller-1 only during upgrade + # all other nodes should be running ZeroMQ exclusively + if rpc_backend_hybrid_mode: + # in controller-1 agent, we need to know if conductor + # is able to listen to ZeroRPC. + # If conductor is running on same host, we know it is running in + # hybrid mode, and we assume ZeroMQ is preferred. + # Otherwise, it can be conductor running on controller-0 before + # migrate to ZeroMQ, so we verify before send the RPC call + # if ZeroMQ is running and if yes, use it, otherwise use RabbitMQ + if os.path.isfile("/var/run/sysinv-conductor.pid"): + return ZMQConductorAPI(topic) + else: + if check_connection(cfg.CONF.rpc_zeromq_conductor_bind_ip, + cfg.CONF.rpc_zeromq_conductor_bind_port): + return ZMQConductorAPI(topic) + else: + return AMQPConductorAPI(topic) + if rpc_backend_zeromq: + return ZMQConductorAPI(topic) + return AMQPConductorAPI(topic) diff --git a/sysinv/sysinv/sysinv/sysinv/conductor/rpcapizmq.py b/sysinv/sysinv/sysinv/sysinv/conductor/rpcapizmq.py new file mode 100644 index 0000000000..2edacb5cbf --- /dev/null +++ b/sysinv/sysinv/sysinv/sysinv/conductor/rpcapizmq.py @@ -0,0 +1,45 @@ +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +""" +Client side of the conductor RPC API using ZeroMQ backend. +""" + +import os +from oslo_config import cfg +from oslo_log import log +from sysinv.common import constants +from sysinv.conductor.rpcapi import ConductorAPI as BaseConductorAPI +from sysinv.conductor.rpcapi import MANAGER_TOPIC +from sysinv.zmq_rpc.zmq_rpc import ZmqRpcClient + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +class ConductorAPI(ZmqRpcClient, BaseConductorAPI): + def __init__(self, topic=None): + if topic is None: + topic = MANAGER_TOPIC + host = CONF.rpc_zeromq_conductor_bind_ip + + # It is expected to have a value assigned + # if we are using default value, puppet was not executed or + # there was an issue. + # We can still use it in case conductor is running locally + # otherwise we try to communicate using controller hostname + if host == "::" and not os.path.isfile("/var/run/sysinv-conductor.pid"): + host = constants.CONTROLLER_HOSTNAME + + port = CONF.rpc_zeromq_conductor_bind_port + super(ConductorAPI, self).__init__(host, port, topic) + + def call(self, context, msg, topic=None, version=None, timeout=None): + return super(ConductorAPI, self).call(context, msg, timeout) + + def cast(self, context, msg, topic=None, version=None): + return super(ConductorAPI, self).cast(context, msg) + + def fanout_cast(self, context, msg, topic=None, version=None): + return super(ConductorAPI, self).fanout_cast(context, msg) diff --git a/sysinv/sysinv/sysinv/sysinv/helm/utils.py b/sysinv/sysinv/sysinv/sysinv/helm/utils.py index c18fffb785..441054f411 100644 --- a/sysinv/sysinv/sysinv/sysinv/helm/utils.py +++ b/sysinv/sysinv/sysinv/sysinv/helm/utils.py @@ -20,7 +20,7 @@ import zlib from eventlet.green import subprocess from oslo_log import log as logging -from sysinv.agent import rpcapi as agent_rpcapi +from sysinv.agent import rpcapiproxy as agent_rpcapi from sysinv.common import exception from sysinv.common import kubernetes from sysinv.openstack.common import context diff --git a/sysinv/sysinv/sysinv/sysinv/openstack/common/context.py b/sysinv/sysinv/sysinv/sysinv/openstack/common/context.py index 4203eaff57..3e206ee5a6 100644 --- a/sysinv/sysinv/sysinv/sysinv/openstack/common/context.py +++ b/sysinv/sysinv/sysinv/sysinv/openstack/common/context.py @@ -59,8 +59,12 @@ class RequestContext(object): 'auth_token': self.auth_token, 'request_id': self.request_id} + @classmethod + def from_dict(cls, values): + return cls(**values) -def get_admin_context(show_deleted="no"): + +def get_admin_context(show_deleted=False): context = RequestContext(None, tenant=None, is_admin=True, diff --git a/sysinv/sysinv/sysinv/sysinv/openstack/common/rpc/__init__.py b/sysinv/sysinv/sysinv/sysinv/openstack/common/rpc/__init__.py index 677ba24607..8f50cb84c9 100644 --- a/sysinv/sysinv/sysinv/sysinv/openstack/common/rpc/__init__.py +++ b/sysinv/sysinv/sysinv/sysinv/openstack/common/rpc/__init__.py @@ -40,6 +40,21 @@ rpc_opts = [ cfg.StrOpt('rpc_backend', default='%s.impl_kombu' % __package__, help="The messaging module to use, defaults to kombu."), + cfg.BoolOpt('rpc_backend_zeromq', + default=True, + help='Use ZeroMQ for RPC communication'), + cfg.StrOpt('rpc_zeromq_bind_ip', + default='::', + help='Bind IP address for ZeroMQ RPC backend'), + cfg.StrOpt('rpc_zeromq_conductor_bind_ip', + default='::', + help='Bind IP address for sysinv-conductor ZeroMQ RPC backend'), + cfg.StrOpt('rpc_zeromq_agent_bind_port', + default=9502, + help='Bind port for sysinv-agent ZeroMQ RPC server'), + cfg.StrOpt('rpc_zeromq_conductor_bind_port', + default=9501, + help='Bind port for sysinv-conductor ZeroMQ RPC server'), cfg.IntOpt('rpc_thread_pool_size', default=64, help='Size of RPC thread pool'), diff --git a/sysinv/sysinv/sysinv/sysinv/openstack/common/rpc/common.py b/sysinv/sysinv/sysinv/sysinv/openstack/common/rpc/common.py index 9d7c424330..552275fb37 100644 --- a/sysinv/sysinv/sysinv/sysinv/openstack/common/rpc/common.py +++ b/sysinv/sysinv/sysinv/sysinv/openstack/common/rpc/common.py @@ -138,6 +138,28 @@ class Timeout(RPCException): method=method or _('')) +class LostRemote(RPCException): + """Signifies that a heartbeat has failed for zerorpc rpc backend. + + This exception is raised if a heartbeat is not received while + waiting for a response from the remote side. + """ + message = _('%(lost_remote_msg)s - ' + 'topic: "%(topic)s", RPC method: "%(method)s" ') + + def __init__(self, lost_remote_msg=None, topic=None, method=None): + self.lost_remote_msg = _('Lost remote after waiting for heartbeat') + self.topic = topic + self.method = method + if lost_remote_msg: + self.lost_remote_msg = lost_remote_msg + super(LostRemote, self).__init__( + None, + lost_remote_msg=self.lost_remote_msg, + topic=topic or _(''), + method=method or _('')) + + class DuplicateMessageError(RPCException): message = _("Found duplicate message(%(msg_id)s). Skipping it.") diff --git a/sysinv/sysinv/sysinv/sysinv/puppet/inventory.py b/sysinv/sysinv/sysinv/sysinv/puppet/inventory.py index f74340d220..f12dab6c9e 100644 --- a/sysinv/sysinv/sysinv/sysinv/puppet/inventory.py +++ b/sysinv/sysinv/sysinv/sysinv/puppet/inventory.py @@ -93,7 +93,16 @@ class SystemInventoryPuppet(openstack.OpenstackBasePuppet): 'sysinv::api::openstack_keystone_tenant': self._operator.keystone.get_admin_project_name(), 'sysinv::api::openstack_keyring_service': - self.OPENSTACK_KEYRING_SERVICE + self.OPENSTACK_KEYRING_SERVICE, + + 'sysinv::rpc_zeromq_conductor_bind_ip': self._get_management_address() + } + + def get_host_config(self, host): + node_ip = self._get_address_by_name( + host.hostname, constants.NETWORK_TYPE_MGMT).address + return { + 'sysinv::rpc_zeromq_bind_ip': node_ip } def get_secure_system_config(self): diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_certificate.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_certificate.py index 7331941f1a..2e47b30eb8 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_certificate.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_certificate.py @@ -206,7 +206,7 @@ class ApiCertificateTestCaseMixin(object): super(ApiCertificateTestCaseMixin, self).setUp() self.fake_conductor_api = FakeConductorAPI() - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_controller_fs.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_controller_fs.py index b58d0d9d61..198af9d361 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_controller_fs.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_controller_fs.py @@ -68,7 +68,7 @@ class ApiControllerFSTestCaseMixin(base.FunctionalTest, 1, 'extension-lv') self.fake_conductor_api = FakeConductorAPI() - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_device_image.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_device_image.py index c51309ad5d..24c0ec94e1 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_device_image.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_device_image.py @@ -62,7 +62,7 @@ class TestDeviceImage(base.FunctionalTest, dbbase.BaseHostTestCase): # Mock the Conductor API self.fake_conductor_api = FakeConductorAPI() - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_dns.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_dns.py index c0a79f9f4c..5f88b40e8f 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_dns.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_dns.py @@ -52,7 +52,7 @@ class ApiDNSTestCaseMixin(object): def setUp(self): super(ApiDNSTestCaseMixin, self).setUp() self.fake_conductor_api = FakeConductorAPI() - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_helm_charts.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_helm_charts.py index c62f4dfabd..016a5a8e35 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_helm_charts.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_helm_charts.py @@ -56,7 +56,7 @@ class ApiHelmChartTestCaseMixin(base.FunctionalTest, def setUp(self): super(ApiHelmChartTestCaseMixin, self).setUp() self.fake_conductor_api = FakeConductorAPI() - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_host.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_host.py index 29a4bf4c02..3fbaec1352 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_host.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_host.py @@ -61,7 +61,7 @@ class TestHost(base.FunctionalTest, dbbase.BaseHostTestCase): # Mock the conductor API self.fake_conductor_api = FakeConductorAPI(self.dbapi) - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_host_fs.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_host_fs.py index d169aa0271..9aaa8b9f4c 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_host_fs.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_host_fs.py @@ -52,7 +52,7 @@ class ApiHostFSTestCaseMixin(base.FunctionalTest, 30, 'docker-lv') self.fake_conductor_api = FakeConductorAPI() - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_interface.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_interface.py index 796290d206..8bf2d8b77d 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_interface.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_interface.py @@ -2249,7 +2249,7 @@ class TestAIOUnlockedPost(InterfaceTestCase): # Mock the conductor API self.fake_conductor_api = FakeConductorAPI() - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) @@ -2361,7 +2361,7 @@ class TestAIOUnlockedPatch(InterfaceTestCase): # Mock the conductor API self.fake_conductor_api = FakeConductorAPI() - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_kube_rootca_update.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_kube_rootca_update.py index 1caffb1555..cd017c49bc 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_kube_rootca_update.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_kube_rootca_update.py @@ -94,7 +94,7 @@ class TestKubeRootCAUpdate(base.FunctionalTest): self.fake_conductor_api = FakeConductorAPI() # rather than start the fake_conductor_api.service, we stage its dbapi self.fake_conductor_api.service.dbapi = self.dbapi - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.headers = API_HEADERS @@ -720,7 +720,7 @@ class TestKubeRootCAHostUpdate(base.FunctionalTest): self.fake_conductor_api = FakeConductorAPI() # rather than start the fake_conductor_api.service, we stage its dbapi self.fake_conductor_api.service.dbapi = self.dbapi - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_kube_upgrade.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_kube_upgrade.py index eedfd790e6..fc0b111a7e 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_kube_upgrade.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_kube_upgrade.py @@ -95,7 +95,7 @@ class TestKubeUpgrade(base.FunctionalTest): self.fake_conductor_api = FakeConductorAPI() # rather than start the fake_conductor_api.service, we stage its dbapi self.fake_conductor_api.service.dbapi = self.dbapi - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_ntp.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_ntp.py index d56684a7fd..f0f2b65958 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_ntp.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_ntp.py @@ -52,7 +52,7 @@ class ApiNTPTestCaseMixin(object): def setUp(self): super(ApiNTPTestCaseMixin, self).setUp() self.fake_conductor_api = FakeConductorAPI() - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_partition.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_partition.py index 2b22700bbf..1d6397c056 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_partition.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_partition.py @@ -57,7 +57,7 @@ class TestPartition(base.FunctionalTest): # Mock the conductor API self.fake_conductor_api = FakeConductorAPI(self.dbapi) - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_upgrade.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_upgrade.py index e6da09bf4b..5834cf2109 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_upgrade.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_upgrade.py @@ -40,7 +40,7 @@ class TestUpgrade(base.FunctionalTest, dbbase.BaseSystemTestCase): # Mock the Conductor API self.fake_conductor_api = FakeConductorAPI() - p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI') + p = mock.patch('sysinv.conductor.rpcapiproxy.ConductorAPI') self.mock_conductor_api = p.start() self.mock_conductor_api.return_value = self.fake_conductor_api self.addCleanup(p.stop) diff --git a/sysinv/sysinv/sysinv/sysinv/tests/conf_fixture.py b/sysinv/sysinv/sysinv/sysinv/tests/conf_fixture.py index 4ee3cbac1b..99f9ff41ac 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/conf_fixture.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/conf_fixture.py @@ -38,6 +38,7 @@ class ConfFixture(config_fixture.Config): self.conf.set_default('host', 'fake-mini') self.conf.set_default('rpc_backend', 'sysinv.openstack.common.rpc.impl_fake') + self.conf.set_default('rpc_backend_zeromq', False) self.conf.set_default('rpc_cast_timeout', 5) self.conf.set_default('rpc_response_timeout', 5) self.conf.set_default('connection', "sqlite://", group='database') diff --git a/sysinv/sysinv/sysinv/sysinv/zmq_rpc/__init__.py b/sysinv/sysinv/sysinv/sysinv/zmq_rpc/__init__.py new file mode 100644 index 0000000000..104a101689 --- /dev/null +++ b/sysinv/sysinv/sysinv/sysinv/zmq_rpc/__init__.py @@ -0,0 +1,3 @@ +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/sysinv/sysinv/sysinv/sysinv/zmq_rpc/client_provider.py b/sysinv/sysinv/sysinv/sysinv/zmq_rpc/client_provider.py new file mode 100644 index 0000000000..f384636130 --- /dev/null +++ b/sysinv/sysinv/sysinv/sysinv/zmq_rpc/client_provider.py @@ -0,0 +1,46 @@ +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +import zerorpc +from oslo_config import cfg +from sysinv.zmq_rpc.serializer import decode +from sysinv.zmq_rpc.serializer import encode + + +CONF = cfg.CONF + + +class ClientProvider(object): + def __init__(self): + self.clients = {} + + def _create_client(self, endpoint): + # pylint: disable=unexpected-keyword-arg + return zerorpc.Client( + connect_to=endpoint, + encoder=encode, + decoder=decode, + # TODO: with the default of 5s we get heartbeat timeouts when + # executing some RPCs that take longer than that to finish. + # We need to understand why this is happening because this scenario + # should be supported by zerorpc + heartbeat=None, + # TODO: we need to determine the correct timeout value here based on + # the max time an RPC can take to execute + timeout=CONF.rpc_response_timeout) + + def get_client_for_endpoint(self, endpoint): + client = self.clients.get(endpoint, None) + if client is None: + client = self._create_client(endpoint) + self.clients[endpoint] = client + return client + + def cleanup(self): + for endpoint, client in self.clients.items(): + try: + client.close() + except Exception: + pass + self.clients.clear() diff --git a/sysinv/sysinv/sysinv/sysinv/zmq_rpc/serializer.py b/sysinv/sysinv/sysinv/sysinv/zmq_rpc/serializer.py new file mode 100644 index 0000000000..a082bea873 --- /dev/null +++ b/sysinv/sysinv/sysinv/sysinv/zmq_rpc/serializer.py @@ -0,0 +1,65 @@ +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +import datetime +import ipaddress +import netaddr +import uuid + +from oslo_utils import timeutils +from sysinv.objects.base import SysinvObject +from sysinv.common.context import RequestContext +from sysinv.openstack.common.context import RequestContext as BaseRequestContext +from sysinv.openstack.common.rpc.amqp import RpcContext +from sysinv.openstack.common.rpc.common import CommonRpcContext + + +def encode(obj, chain=None): + if isinstance(obj, (RequestContext, BaseRequestContext, + RpcContext, CommonRpcContext)): + if isinstance(obj, RequestContext): + context_type = b'request' + elif isinstance(obj, BaseRequestContext): + context_type = b'base_request' + elif isinstance(obj, RpcContext): + context_type = b'rpc' + else: + context_type = b'common_rpc' + return {b'context': True, + b'context_type': context_type, + b'data': obj.to_dict()} + if hasattr(obj, 'obj_to_primitive') and callable(obj.obj_to_primitive): + return obj.obj_to_primitive() + if isinstance(obj, datetime.datetime): + return obj.strftime(timeutils.PERFECT_TIME_FORMAT) + if isinstance(obj, uuid.UUID): + return str(obj) + if netaddr and isinstance(obj, (netaddr.IPAddress, netaddr.IPNetwork)): + return str(obj) + if ipaddress and isinstance(obj, + (ipaddress.IPv4Address, + ipaddress.IPv6Address)): + return str(obj) + if isinstance(obj, Exception): + return repr(obj) + return obj if chain is None else chain(obj) + + +def decode(obj, chain=None): + try: + if b'context' in obj: + context_dict = obj[b'data'] + context_type = obj[b'context_type'] + if context_type == b'request': + return RequestContext.from_dict(context_dict) + if context_type == b'base_request': + return BaseRequestContext.from_dict(context_dict) + if context_type == b'rpc': + return RpcContext.from_dict(context_dict) + return CommonRpcContext.from_dict(context_dict) + if isinstance(obj, dict) and 'sysinv_object.name' in obj: + return SysinvObject.obj_from_primitive(obj) + return obj if chain is None else chain(obj) + except KeyError: + return obj if chain is None else chain(obj) diff --git a/sysinv/sysinv/sysinv/sysinv/zmq_rpc/zmq_rpc.py b/sysinv/sysinv/sysinv/sysinv/zmq_rpc/zmq_rpc.py new file mode 100644 index 0000000000..0f8d7bfcef --- /dev/null +++ b/sysinv/sysinv/sysinv/sysinv/zmq_rpc/zmq_rpc.py @@ -0,0 +1,237 @@ +# Copyright (c) 2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 + +import zerorpc +import eventlet +import os + +from eventlet import greenthread +from oslo_log import log +from zerorpc import exceptions + +from sysinv.db import api +from sysinv.objects.base import SysinvObject +from sysinv.zmq_rpc.client_provider import ClientProvider +from sysinv.zmq_rpc.serializer import decode +from sysinv.zmq_rpc.serializer import encode +import sysinv.openstack.common.rpc.common as rpc_common +import tsconfig.tsconfig as tsc + +LOG = log.getLogger(__name__) + +client_provider = ClientProvider() + + +class RpcWrapper(object): + def __init__(self, target): + self.target = target + self.target_methods = [f for f in dir(self.target) if + not f.startswith('_')] + + def __getattr__(self, func): + def method(context, **kwargs): + if func in self.target_methods: + # hydrate any sysinv object passed as argument with the context + kwargs = self._inject_context(context, kwargs) + LOG.debug("Calling RPC server method {} with context {} args {}" + .format(func, context, kwargs)) + retval = getattr(self.target, func)(context, **kwargs) + LOG.debug("Finished RPC server method {} with context {} args {}" + .format(func, context, kwargs)) + return retval + else: + raise AttributeError + + return method + + def __dir__(self): + return dir(self.target) + + def _process_iterable(self, context, action_fn, values): + """Process an iterable, taking an action on each value. + :param:context: Request context + :param:action_fn: Action to take on each item in values + :param:values: Iterable container of things to take action on + :returns: A new container of the same type (except set) with + items from values having had action applied. + """ + iterable = values.__class__ + if iterable == set: + # NOTE(danms): A set can't have an unhashable value inside, such as + # a dict. Convert sets to tuples, which is fine, since we can't + # send them over RPC anyway. + iterable = tuple + return iterable([action_fn(context, value) for value in values]) + + def _inject_context_to_arg(self, ctx, arg): + if isinstance(arg, SysinvObject): + arg._context = ctx + elif isinstance(arg, (tuple, list, set)): + arg = self._process_iterable(ctx, self._inject_context_to_arg, arg) + return arg + + def _inject_context(self, context, kwargs): + new_kwargs = dict() + for argname, arg in kwargs.items(): + new_kwargs[argname] = self._inject_context_to_arg(context, arg) + return new_kwargs + + +class ZmqRpcServer(object): + def __init__(self, target, host, port): + self.target = target + self.endpoint = get_tcp_endpoint(host, port) + self.server = None + + def run(self): + def _run_in_thread(): + try: + LOG.info("Starting zmq server at {}".format(self.endpoint)) + # pylint: disable=unexpected-keyword-arg + # TODO with the default of 5s hearbeat we get LostRemote + # exceptions when executing some RPCs that take longer than + # that to finish. We need to understand why this happens + # because this scenario should be supported by zerorpc + self.server = zerorpc.Server(RpcWrapper(self.target), + heartbeat=None, + encoder=encode, + decoder=decode) + self.server.bind(self.endpoint) + self.server.run() + except eventlet.greenlet.GreenletExit: + return + except Exception as e: + LOG.error("Error while running zmq rpc server at {}: " + "{}".format(self.endpoint, str(e))) + return + + return greenthread.spawn(_run_in_thread) + + def stop(self): + if self.server: + self.server.close() + client_provider.cleanup() + + +class ZmqRpcClient(object): + def __init__(self, host, port, topic): + try: + self.host = host + self.port = port + self.topic = topic + self.client = None + if host is not None: + endpoint = get_tcp_endpoint(host, port) + self.client = client_provider.get_client_for_endpoint(endpoint) + + LOG.debug("Started zmq rpc client to [{}]:{}".format( + self.host, self.port)) + except Exception as e: + LOG.error("Error while running zmq client to {}:{}: {}".format( + self.host, self.port, str(e))) + + def _exec(self, client, context, method, **kwargs): + if not client: + host_uuid = kwargs.get('host_uuid', None) + if host_uuid is None: + raise Exception("Missing host_uuid parameter for rpc endpoint") + dbapi = api.get_instance() + host = dbapi.ihost_get(host_uuid) + endpoint = get_tcp_endpoint(host.mgmt_ip, self.port) + client = client_provider.get_client_for_endpoint(endpoint) + + try: + LOG.debug( + "Calling RPC client method {} with context {} args {}".format( + method, context, kwargs)) + return getattr(client, method)(context, **kwargs) + except exceptions.TimeoutExpired: + raise rpc_common.Timeout(topic=self.topic, + method=method) + except exceptions.RemoteError as e: + raise rpc_common.RemoteError(exc_type=e.name, + value=e.msg, + traceback=e.traceback) + except exceptions.LostRemote as e: + raise rpc_common.LostRemote(lost_remote_msg=str(e), + topic=self.topic, + method=method) + + def call(self, context, msg, timeout=None): + method = msg['method'] + args = msg['args'] + if timeout is not None: + args['timeout_'] = timeout + return self._exec(self.client, context, method, **args) + + def cast(self, context, msg): + method = msg['method'] + args = msg['args'] + args['async_'] = True + return self._exec(self.client, context, method, **args) + + def fanout_cast(self, context, msg): + method = msg['method'] + args = msg['args'] + args['async_'] = True + endpoints = self.get_fanout_endpoints() + for endpoint in endpoints: + client = client_provider.get_client_for_endpoint(endpoint) + LOG.debug("Calling fanout method {} to endpoint {}".format( + method, endpoint)) + self._exec(client, context, method, **args) + + def get_fanout_endpoints(self): + endpoints = [] + dbapi = api.get_instance() + hosts = dbapi.ihost_get_list() + for host in hosts: + LOG.debug( + "Evaluating host {} to add as endpoint (" + "availability={}, operational={}, " + "personality={}, subfunctions={})".format( + host.hostname, host.availability, host.operational, + host.personality, host.subfunctions)) + endpoint = get_tcp_endpoint(host.mgmt_ip, self.port) + endpoints.append(endpoint) + LOG.debug("Add host {} with endpoint {} to fanout request".format( + host.hostname, endpoint)) + if not endpoints: + endpoint = get_tcp_endpoint("::", self.port) + LOG.warning("No host available. Add localhost with endpoint {} " + "to fanout request.".format(endpoint)) + endpoints.append(endpoint) + return endpoints + + +# TODO(RPCHybridMode): This function is only useful for 21.12 -> 22.12 upgrades. +# Remove in future release. +def is_rpc_hybrid_mode_active(): + return os.path.isfile(tsc.SYSINV_HYBRID_RPC_FLAG) + + +# TODO(RPCHybridMode): This function is only useful for 21.12 -> 22.12 upgrades. +# Remove in future release. +def is_zmq_backend_available(host_uuid): + dbapi = api.get_instance() + host = dbapi.ihost_get(host_uuid) + host_upgrade = dbapi.host_upgrade_get_by_host(host.id) + target_load = dbapi.load_get(host_upgrade.target_load) + return target_load.software_version >= tsc.SW_VERSION_22_12 + + +def get_tcp_endpoint(host, port): + return "tcp://[{}]:{}".format(host, port) + + +def check_connection(host, port): + ret = True + client = zerorpc.Client(heartbeat=None) + try: + client.connect(get_tcp_endpoint(host, port)) + client._zerorpc_list() + except (zerorpc.TimeoutExpired, zerorpc.RemoteError): + ret = False + client.close() + return ret diff --git a/tsconfig/tsconfig/tsconfig/tsconfig.py b/tsconfig/tsconfig/tsconfig/tsconfig.py index 180f69cda1..e5cceb78bd 100644 --- a/tsconfig/tsconfig/tsconfig/tsconfig.py +++ b/tsconfig/tsconfig/tsconfig/tsconfig.py @@ -249,6 +249,10 @@ UPGRADE_ABORT_FLAG = os.path.join( PTP_UPDATE_PARAMETERS_DONE = '.update_ptp_parameters_done' PTP_UPDATE_PARAMETERS_FLAG = os.path.join(CONFIG_PATH, PTP_UPDATE_PARAMETERS_DONE) +# TODO(RPCHybridMode): This is required only for 21.12 -> 22.12 upgrades. +# Remove in future release. +SYSINV_HYBRID_RPC_FLAG = os.path.join( + PLATFORM_CONF_PATH, '.sysinv_hybrid_rpc') # Set on controller-0 (by controller-1) to indicate that data migration has # started