vim monitor using rpc

This patch sets up the rpc queue named KILL_ACTION.<vim_id>
in mistral action class. When vim is deleted, the tacker server
will send(cast) kill message to the mistral task, which will exit.

To test it:
1. python setup.py develop
2. mistral-db-manage --config-file /etc/mistral/mistral.conf
populate
3. restart mistral related service and tacker service
4. tacker vim-register to register a vim
5. mistral workflow-list to check if the monitor workflow is
started
6. tacker vim-delete to check if the workflow and task is deleted

DocImpact
Implements: blueprint refactor-vim-monitor

Change-Id: I078917af65e57305c06b4605835c9d0d3dc1cf68
This commit is contained in:
jing.liuqing 2017-06-19 11:19:00 +08:00
parent d0256bc238
commit 60187643b5
16 changed files with 333 additions and 120 deletions

View File

@ -79,7 +79,8 @@ oslo.config.opts =
tacker.vnfm.monitor_drivers.ping.ping = tacker.vnfm.monitor_drivers.ping.ping:config_opts
tacker.vnfm.monitor_drivers.ceilometer.ceilometer = tacker.vnfm.monitor_drivers.ceilometer.ceilometer:config_opts
tacker.alarm_receiver = tacker.alarm_receiver:config_opts
mistral.actions =
tacker.vim_ping_action = tacker.nfvo.workflows.vim_monitor.vim_ping_action:PingVimAction
[build_sphinx]

View File

@ -48,6 +48,11 @@ EXTRA_EXMODS = []
RPC_DISABLED = False
def init_action_rpc(conf):
global TRANSPORT
TRANSPORT = oslo_messaging.get_transport(conf)
def init(conf):
global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
@ -288,9 +293,11 @@ class Connection(object):
super(Connection, self).__init__()
self.servers = []
def create_consumer(self, topic, endpoints, fanout=False):
def create_consumer(self, topic, endpoints, fanout=False,
exchange='tacker', host=None):
target = oslo_messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
topic=topic, server=host or cfg.CONF.host, fanout=fanout,
exchange=exchange)
server = get_server(target, endpoints)
self.servers.append(server)
@ -308,7 +315,8 @@ class Connection(object):
class VoidConnection(object):
def create_consumer(self, topic, endpoints, fanout=False):
def create_consumer(self, topic, endpoints, fanout=False,
exchange='tacker', host=None):
pass
def consume_in_threads(self):

View File

@ -1,39 +1,14 @@
# Copyright (c) 2012 OpenStack Foundation.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
CREATE = 'create'
DELETE = 'delete'
UPDATE = 'update'
def get_topic_name(prefix, table, operation, host=None):
"""Create a topic name.
The topic name needs to be synced between the agent and the
plugin. The plugin will send a fanout message to all of the
listening agents so that the agents in turn can perform their
updates accordingly.
:param prefix: Common prefix for the plugin/agent message queues.
:param table: The table in question (NETWORK, SUBNET, PORT).
:param operation: The operation that invokes notification (CREATE,
DELETE, UPDATE)
:param host: Add host to the topic
:returns: The topic name.
"""
if host:
return '%s-%s-%s.%s' % (prefix, table, operation, host)
return '%s-%s-%s' % (prefix, table, operation)
TOPIC_ACTION_KILL = 'KILL_ACTION'

View File

View File

@ -0,0 +1,27 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_messaging
from tacker.common import topics
class MistralActionKillRPC(object):
target = oslo_messaging.Target(
exchange='tacker',
topic=topics.TOPIC_ACTION_KILL,
fanout=False,
version='1.0')
def killAction(self, context, **kwargs):
pass

View File

@ -83,14 +83,6 @@ class VimAbstractDriver(extensions.PluginInterface):
"""
pass
@abc.abstractmethod
def vim_status(self, auth_url):
"""Health check for VIM
Checks the health status of VIM and return a boolean value
"""
pass
@abc.abstractmethod
def get_vim_resource_id(self, vim_obj, resource_type, resource_name):
"""Parses a VIM resource ID from a given type and name

View File

@ -29,7 +29,6 @@ from oslo_config import cfg
from oslo_log import log as logging
from tacker._i18n import _
from tacker.agent.linux import utils as linux_utils
from tacker.common import log
from tacker.extensions import nfvo
from tacker.mistral import mistral_client
@ -188,11 +187,7 @@ class OpenStack_Driver(abstract_vim_driver.VimAbstractDriver,
@log.log
def register_vim(self, vim_obj):
"""Validate and register VIM
Store VIM information in Tacker for
VNF placements
"""
"""Validate and set VIM placements."""
ks_client = self.authenticate_vim(vim_obj)
self.discover_placement_attr(vim_obj, ks_client)
self.encode_vim_auth(vim_obj['id'], vim_obj['auth_cred'])
@ -240,23 +235,6 @@ class OpenStack_Driver(abstract_vim_driver.VimAbstractDriver,
except IOError:
raise nfvo.VimKeyNotFoundException(vim_id=vim_id)
@log.log
def vim_status(self, auth_url):
"""Checks the VIM health status"""
vim_ip = auth_url.split("//")[-1].split(":")[0].split("/")[0]
ping_cmd = ['ping',
'-c', cfg.CONF.vim_monitor.count,
'-W', cfg.CONF.vim_monitor.timeout,
'-i', cfg.CONF.vim_monitor.interval,
vim_ip]
try:
linux_utils.execute(ping_cmd, check_exit_code=True)
return True
except RuntimeError:
LOG.warning("Cannot ping ip address: %s", vim_ip)
return False
@log.log
def get_vim_resource_id(self, vim_obj, resource_type, resource_name):
"""Locates openstack resource by type/name and returns ID

View File

@ -15,7 +15,6 @@
# under the License.
import os
import threading
import time
import uuid
import yaml
@ -34,13 +33,13 @@ from tacker._i18n import _
from tacker.common import driver_manager
from tacker.common import log
from tacker.common import utils
from tacker import context as t_context
from tacker.db.nfvo import nfvo_db
from tacker.db.nfvo import ns_db
from tacker.db.nfvo import vnffg_db
from tacker.extensions import common_services as cs
from tacker.extensions import nfvo
from tacker import manager
from tacker.nfvo.workflows.vim_monitor import vim_monitor_utils
from tacker.plugins.common import constants
from tacker.vnfm import vim_client
@ -67,7 +66,6 @@ class NfvoPlugin(nfvo_db.NfvoPluginDb, vnffg_db.VnffgPluginDbMixin,
extension for providing the specified VIM information
"""
supported_extension_aliases = ['nfvo']
_lock = threading.RLock()
OPTS = [
cfg.ListOpt(
@ -85,20 +83,7 @@ class NfvoPlugin(nfvo_db.NfvoPluginDb, vnffg_db.VnffgPluginDbMixin,
self._vim_drivers = driver_manager.DriverManager(
'tacker.nfvo.vim.drivers',
cfg.CONF.nfvo_vim.vim_drivers)
self._created_vims = dict()
self.vim_client = vim_client.VimClient()
context = t_context.get_admin_context()
vims = self.get_vims(context)
for vim in vims:
self._created_vims[vim["id"]] = vim
self._monitor_interval = cfg.CONF.nfvo_vim.monitor_interval
threading.Thread(target=self.__run__).start()
def __run__(self):
while(1):
time.sleep(self._monitor_interval)
for created_vim in self._created_vims.values():
self.monitor_vim(created_vim)
def get_auth_dict(self, context):
auth = CONF.keystone_authtoken
@ -123,16 +108,17 @@ class NfvoPlugin(nfvo_db.NfvoPluginDb, vnffg_db.VnffgPluginDbMixin,
try:
self._vim_drivers.invoke(vim_type, 'register_vim', vim_obj=vim_obj)
res = super(NfvoPlugin, self).create_vim(context, vim_obj)
vim_obj["status"] = "REGISTERING"
with self._lock:
self._created_vims[res["id"]] = res
self.monitor_vim(vim_obj)
return res
except Exception:
with excutils.save_and_reraise_exception():
self._vim_drivers.invoke(vim_type, 'delete_vim_auth',
vim_id=vim_obj['id'])
try:
self.monitor_vim(context, vim_obj)
except Exception:
LOG.warning("Failed to set up vim monitoring")
return res
def _get_vim(self, context, vim_id):
if not self.is_vim_still_in_use(context, vim_id):
return self.get_vim(context, vim_id)
@ -166,31 +152,17 @@ class NfvoPlugin(nfvo_db.NfvoPluginDb, vnffg_db.VnffgPluginDbMixin,
vim_obj = self._get_vim(context, vim_id)
self._vim_drivers.invoke(vim_obj['type'], 'deregister_vim',
vim_id=vim_id)
with self._lock:
self._created_vims.pop(vim_id, None)
try:
auth_dict = self.get_auth_dict(context)
vim_monitor_utils.delete_vim_monitor(context, auth_dict, vim_obj)
except Exception:
LOG.exception("Failed to remove vim monitor")
super(NfvoPlugin, self).delete_vim(context, vim_id)
@log.log
def monitor_vim(self, vim_obj):
vim_id = vim_obj["id"]
auth_url = vim_obj["auth_url"]
vim_status = self._vim_drivers.invoke(vim_obj['type'],
'vim_status',
auth_url=auth_url)
current_status = "REACHABLE" if vim_status else "UNREACHABLE"
if current_status != vim_obj["status"]:
status = current_status
with self._lock:
context = t_context.get_admin_context()
res = super(NfvoPlugin, self).update_vim_status(context,
vim_id, status)
self._created_vims[vim_id]["status"] = status
self._cos_db_plg.create_event(
context, res_id=res['id'],
res_type=constants.RES_TYPE_VIM,
res_state=res['status'],
evt_type=constants.RES_EVT_MONITOR,
tstamp=res[constants.RES_EVT_UPDATED_FLD])
def monitor_vim(self, context, vim_obj):
auth_dict = self.get_auth_dict(context)
vim_monitor_utils.monitor_vim(auth_dict, vim_obj)
@log.log
def validate_tosca(self, template):

View File

View File

@ -0,0 +1,15 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
RESOURCE_NAME = 'ping_vim'
PING_VIM_TASK_NAME = 'PingVIMTASK'

View File

@ -0,0 +1,89 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
from oslo_config import cfg
from oslo_log import log as logging
from tacker.common import rpc
from tacker.mistral.actionrpc import kill_action as killaction
from tacker.mistral import mistral_client
from tacker.nfvo.workflows.vim_monitor import workflow_generator
from tacker.vnfm import keystone
LOG = logging.getLogger(__name__)
def get_mistral_client(auth_dict):
return mistral_client.MistralClient(
keystone.Keystone().initialize_client('2', **auth_dict),
auth_dict['token']).get_client()
def prepare_and_create_workflow(mistral_client, vim_id, action,
kwargs):
wg = workflow_generator.WorkflowGenerator(vim_id, action)
wg.task(**kwargs)
definition_yaml = yaml.safe_dump(wg.definition, default_flow_style=False)
LOG.debug('vim monitor workflow: %s', definition_yaml)
workflow = mistral_client.workflows.create(definition_yaml)
return {'id': workflow[0].id, 'input': wg.get_input_dict()}
def execute_workflow(mistral_client, workflow):
return mistral_client.executions.create(
workflow_identifier=workflow['id'],
workflow_input=workflow['input'],
wf_params={})
def delete_executions(mistral_client, vim_id):
executions = mistral_client.executions.list(
workflow_name='vim_id_' + vim_id)
for execution in executions:
mistral_client.executions.delete(execution.id)
def delete_workflow(mistral_client, vim_id):
return mistral_client.workflows.delete('vim_id_' + vim_id)
def monitor_vim(auth_dict, vim_obj):
mc = get_mistral_client(auth_dict)
auth_url = vim_obj["auth_url"]
vim_ip = auth_url.split("//")[-1].split(":")[0].split("/")[0]
workflow_input_dict = {
'vim_id': vim_obj['id'],
'count': cfg.CONF.vim_monitor.count,
'timeout': cfg.CONF.vim_monitor.timeout,
'interval': cfg.CONF.vim_monitor.interval,
'targetip': vim_ip}
workflow = prepare_and_create_workflow(
mc, vim_obj['id'], 'monitor',
workflow_input_dict)
execute_workflow(mc, workflow)
def kill_action(context, vim_obj):
target = killaction.MistralActionKillRPC.target
rpc_client = rpc.get_client(target)
cctxt = rpc_client.prepare(server=vim_obj['id'])
cctxt.cast(context, 'killAction')
def delete_vim_monitor(context, auth_dict, vim_obj):
mc = get_mistral_client(auth_dict)
delete_executions(mc, vim_obj['id'])
delete_workflow(mc, vim_obj['id'])
kill_action(context, vim_obj)

View File

@ -0,0 +1,104 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
from mistral.actions import base
from oslo_config import cfg
from oslo_log import log as logging
from tacker.agent.linux import utils as linux_utils
from tacker.common import rpc
from tacker.common import topics
LOG = logging.getLogger(__name__)
class PingVimAction(base.Action):
def __init__(self, count, targetip, vim_id,
interval, timeout):
self.killed = False
self.count = count
self.timeout = timeout
self.interval = interval
self.targetip = targetip
self.vim_id = vim_id
self.current_status = "PENDING"
def start_rpc_listeners(self):
"""Start the RPC loop to let the server communicate with actions."""
self.endpoints = [self]
self.conn = rpc.create_connection()
self.conn.create_consumer(topics.TOPIC_ACTION_KILL,
self.endpoints, fanout=False,
host=self.vim_id)
return self.conn.consume_in_threads()
def killAction(self, context, **kwargs):
self.killed = True
def _ping(self):
ping_cmd = ['ping', '-c', self.count,
'-W', self.timeout,
'-i', self.interval,
self.targetip]
try:
linux_utils.execute(ping_cmd, check_exit_code=True)
return 'REACHABLE'
except RuntimeError:
LOG.warning(("Cannot ping ip address: %s"), self.targetip)
return 'UNREACHABLE'
def _update(self, status):
# TODO(liuqing) call tacker conductor
LOG.info("VIM %s changed to status %s", self.vim_id, status)
x = random.randint(1, 10)
if 0 == (x % 2):
return 'UNREACHABLE'
else:
return 'REACHABLE'
return status
def run(self):
servers = []
try:
rpc.init_action_rpc(cfg.CONF)
servers = self.start_rpc_listeners()
except Exception:
LOG.exception('failed to start rpc in vim action')
return 'FAILED'
try:
while True:
if self.killed:
break
status = self._ping()
if self.current_status != status:
self.current_status = self._update(status)
except Exception:
LOG.exception('failed to run mistral action for vim %s',
self.vim_id)
return 'FAILED'
# to stop rpc connection
for server in servers:
try:
server.stop()
except Exception:
LOG.exception(
'failed to stop rpc connection for vim %s',
self.vim_id)
return 'KILLED'
def test(self):
return 'REACHABLE'

View File

@ -0,0 +1,59 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from tacker.mistral import workflow_generator
from tacker.nfvo.workflows import vim_monitor
LOG = logging.getLogger(__name__)
class WorkflowGenerator(workflow_generator.WorkflowGeneratorBase):
def __init__(self, vim_id, action):
super(WorkflowGenerator, self).__init__(
vim_monitor.RESOURCE_NAME, action)
self.wf_identifier = 'vim_id_' + vim_id
self._build_basic_workflow()
def _add_ping_vim_tasks(self):
task_dict = dict()
task = self.wf_name + vim_monitor.PING_VIM_TASK_NAME
task_dict[task] = {
'action': 'tacker.vim_ping_action',
'input': {'count': self.input_dict_data['count'],
'targetip': self.input_dict_data['targetip'],
'vim_id': self.input_dict_data['vim_id'],
'interval': self.input_dict_data['interval'],
'timeout': self.input_dict_data['timeout']},
}
return task_dict
def get_input_dict(self):
return self.input_dict
def _build_input(self, vim_id, count, timeout,
interval, targetip):
self.input_dict_data = {'vim_id': vim_id,
'count': count,
'timeout': timeout,
'interval': interval,
'targetip': targetip}
self.input_dict[self.resource] = self.input_dict_data
def monitor_ping_vim(self, vim_id=None, count=1, timeout=1,
interval=1, targetip="127.0.0.1"):
self._build_input(vim_id, count, timeout,
interval, targetip)
self.definition[self.wf_identifier]['tasks'] = dict()
self.definition[self.wf_identifier]['tasks'].update(
self._add_ping_vim_tasks())

View File

@ -197,7 +197,6 @@ class TestNfvoPlugin(db_base.SqlTestCase):
self.addCleanup(mock.patch.stopall)
self.context = context.get_admin_context()
self._mock_driver_manager()
mock.patch('tacker.nfvo.nfvo_plugin.NfvoPlugin.__run__').start()
mock.patch('tacker.nfvo.nfvo_plugin.NfvoPlugin._get_vim_from_vnf',
side_effect=dummy_get_vim).start()
self.nfvo_plugin = nfvo_plugin.NfvoPlugin()
@ -244,14 +243,8 @@ class TestNfvoPlugin(db_base.SqlTestCase):
self.context, evt_type=constants.RES_EVT_CREATE, res_id=mock.ANY,
res_state=mock.ANY, res_type=constants.RES_TYPE_VIM,
tstamp=mock.ANY)
self._cos_db_plugin.create_event.assert_any_call(
mock.ANY, evt_type=constants.RES_EVT_MONITOR, res_id=mock.ANY,
res_state=mock.ANY, res_type=constants.RES_TYPE_VIM,
tstamp=mock.ANY)
self._driver_manager.invoke.assert_any_call(vim_type,
'register_vim', vim_obj=vim_dict['vim'])
self._driver_manager.invoke.assert_any_call('openstack', 'vim_status',
auth_url='http://localhost:5000')
self.assertIsNotNone(res)
self.assertEqual(SECRET_PASSWORD, res['auth_cred']['password'])
self.assertIn('id', res)