Merge "Remove RPC server and client related stuff"
This commit is contained in:
commit
21ea0312ac
|
@ -63,10 +63,7 @@ class Controller(object):
|
|||
self._native_sorting = self._is_native_sorting_supported()
|
||||
self._policy_attrs = [name for (name, info) in self._attr_info.items()
|
||||
if info.get('required_by_policy')]
|
||||
self._notifier = n_rpc.get_notifier('network')
|
||||
# if cfg.CONF.notify_nova_on_port_data_changes:
|
||||
# from tacker.notifiers import nova
|
||||
# self._nova_notifier = nova.Notifier()
|
||||
self._notifier = n_rpc.get_notifier('nfv')
|
||||
self._member_actions = member_actions
|
||||
self._primary_key = self._get_primary_key()
|
||||
if self._allow_pagination and self._native_pagination:
|
||||
|
|
|
@ -77,25 +77,6 @@ def get_allowed_exmods():
|
|||
return ALLOWED_EXMODS + EXTRA_EXMODS
|
||||
|
||||
|
||||
def get_client(target, version_cap=None, serializer=None):
|
||||
assert TRANSPORT is not None
|
||||
serializer = PluginRpcSerializer(serializer)
|
||||
return oslo_messaging.RPCClient(TRANSPORT,
|
||||
target,
|
||||
version_cap=version_cap,
|
||||
serializer=serializer)
|
||||
|
||||
|
||||
def get_server(target, endpoints, serializer=None):
|
||||
assert TRANSPORT is not None
|
||||
serializer = PluginRpcSerializer(serializer)
|
||||
return oslo_messaging.get_rpc_server(TRANSPORT,
|
||||
target,
|
||||
endpoints,
|
||||
executor='eventlet',
|
||||
serializer=serializer)
|
||||
|
||||
|
||||
def get_notifier(service=None, host=None, publisher_id=None):
|
||||
assert NOTIFIER is not None
|
||||
if not publisher_id:
|
||||
|
|
|
@ -1,205 +0,0 @@
|
|||
# Copyright 2011 VMware, Inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# based on tacker.service and nova.service
|
||||
# Copyright 2013, 2014 Intel Corporation.
|
||||
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
|
||||
# <isaku.yamahata at gmail com>
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# @author: Isaku Yamahata, Intel Corporation.
|
||||
|
||||
import inspect
|
||||
import os.path
|
||||
import random
|
||||
|
||||
import oslo_messaging
|
||||
|
||||
from tacker import context
|
||||
from tacker.openstack.common.gettextutils import _
|
||||
from tacker.openstack.common import importutils
|
||||
from tacker.openstack.common import log as logging
|
||||
from tacker.openstack.common import loopingcall
|
||||
from tacker.openstack.common import service
|
||||
from tacker import service as tacker_service # noqa # for service_opts
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
TRANSPORT_ALIASES = {
|
||||
'tacker.openstack.common.rpc.impl_kombu': 'rabbit',
|
||||
'tacker.openstack.common.rpc.impl_qpid': 'qpid',
|
||||
'tacker.openstack.common.rpc.impl_zmq': 'zmq',
|
||||
'tacker.openstack.common.rpc.impl_fake': 'fake',
|
||||
}
|
||||
|
||||
|
||||
# replacement for tacker.openstack.common.rpc.service.Service
|
||||
class RpcService(service.Service):
|
||||
"""Service object for binaries running on hosts.
|
||||
|
||||
A service enables rpc by listening to queues based on topic and host.
|
||||
"""
|
||||
def __init__(self, conf, host, topic, manager=None, serializer=None):
|
||||
super(RpcService, self).__init__()
|
||||
self.conf = conf
|
||||
self.host = host
|
||||
self.topic = topic
|
||||
self.serializer = serializer
|
||||
if manager is None:
|
||||
self.manager = self
|
||||
else:
|
||||
self.manager = manager
|
||||
|
||||
def start(self):
|
||||
super(RpcService, self).start()
|
||||
|
||||
target = oslo_messaging.Target(topic=self.topic, server=self.host)
|
||||
endpoints = [self.manager]
|
||||
transport = oslo_messaging.get_transport(self.conf,
|
||||
aliases=TRANSPORT_ALIASES)
|
||||
self.rpcserver = oslo_messaging.get_rpc_server(
|
||||
transport, target, endpoints, executor='eventlet',
|
||||
serializer=self.serializer)
|
||||
|
||||
# Hook to allow the manager to do other initializations after
|
||||
# the rpc connection is created.
|
||||
if callable(getattr(self.manager, 'initialize_service_hook', None)):
|
||||
self.manager.initialize_service_hook(self)
|
||||
|
||||
self.rpcserver.start()
|
||||
|
||||
def stop(self):
|
||||
# Try to shut the connection down, but if we get any sort of
|
||||
# errors, go ahead and ignore them.. as we're shutting down anyway
|
||||
try:
|
||||
self.rpcserver.stop()
|
||||
self.rpcserver.wait()
|
||||
except Exception:
|
||||
pass
|
||||
super(RpcService, self).stop()
|
||||
|
||||
|
||||
# replacement for tacker.service.Service
|
||||
class TackerService(RpcService):
|
||||
def __init__(self, conf, host, binary, topic, manager,
|
||||
report_interval=None,
|
||||
periodic_interval=None, periodic_fuzzy_delay=None,
|
||||
*args, **kwargs):
|
||||
self.binary = binary
|
||||
self.manager_class_name = manager
|
||||
manager_class = importutils.import_class(self.manager_class_name)
|
||||
self.manager = manager_class(conf=conf, host=host, *args, **kwargs)
|
||||
self.report_interval = report_interval
|
||||
self.periodic_interval = periodic_interval
|
||||
self.periodic_fuzzy_delay = periodic_fuzzy_delay
|
||||
self.saved_args, self.saved_kwargs = args, kwargs
|
||||
self.timers = []
|
||||
super(TackerService, self).__init__(conf, host, topic,
|
||||
manager=self.manager)
|
||||
|
||||
def start(self):
|
||||
self.manager.init_host()
|
||||
super(TackerService, self).start()
|
||||
if self.report_interval:
|
||||
pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
|
||||
pulse.start(interval=self.report_interval,
|
||||
initial_delay=self.report_interval)
|
||||
self.timers.append(pulse)
|
||||
|
||||
if self.periodic_interval:
|
||||
if self.periodic_fuzzy_delay:
|
||||
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
|
||||
else:
|
||||
initial_delay = None
|
||||
|
||||
periodic = loopingcall.FixedIntervalLoopingCall(
|
||||
self.periodic_tasks)
|
||||
periodic.start(interval=self.periodic_interval,
|
||||
initial_delay=initial_delay)
|
||||
self.timers.append(periodic)
|
||||
self.manager.after_start()
|
||||
|
||||
def kill(self):
|
||||
"""Destroy the service object."""
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
super(TackerService, self).stop()
|
||||
for x in self.timers:
|
||||
try:
|
||||
x.stop()
|
||||
except Exception:
|
||||
LOG.exception(_("Exception occurs when timer stops"))
|
||||
pass
|
||||
self.timers = []
|
||||
|
||||
def wait(self):
|
||||
super(TackerService, self).wait()
|
||||
for x in self.timers:
|
||||
try:
|
||||
x.wait()
|
||||
except Exception:
|
||||
LOG.exception(_("Exception occurs when waiting for timer"))
|
||||
pass
|
||||
|
||||
def periodic_tasks(self, raise_on_error=False):
|
||||
"""Tasks to be run at a periodic interval."""
|
||||
ctxt = context.get_admin_context()
|
||||
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
|
||||
|
||||
def report_state(self):
|
||||
"""Update the state of this service."""
|
||||
# Todo(gongysh) report state to tacker server
|
||||
pass
|
||||
|
||||
def __getattr__(self, key):
|
||||
manager = self.__dict__.get('manager', None)
|
||||
return getattr(manager, key)
|
||||
|
||||
@classmethod
|
||||
def create(cls, conf, host=None, binary=None, topic=None, manager=None,
|
||||
report_interval=None, periodic_interval=None,
|
||||
periodic_fuzzy_delay=None):
|
||||
"""Instantiates class and passes back application object.
|
||||
|
||||
:param host: defaults to conf.host
|
||||
:param binary: defaults to basename of executable
|
||||
:param topic: defaults to bin_name - 'nova-' part
|
||||
:param manager: defaults to conf.<topic>_manager
|
||||
:param report_interval: defaults to conf.report_interval
|
||||
:param periodic_interval: defaults to conf.periodic_interval
|
||||
:param periodic_fuzzy_delay: defaults to conf.periodic_fuzzy_delay
|
||||
|
||||
"""
|
||||
if not host:
|
||||
host = conf.host
|
||||
if not binary:
|
||||
binary = os.path.basename(inspect.stack()[-1][1])
|
||||
if not topic:
|
||||
topic = binary.rpartition('tacker-')[2]
|
||||
topic = topic.replace("-", "_")
|
||||
if not manager:
|
||||
manager = conf.get('%s_manager' % topic, None)
|
||||
if report_interval is None:
|
||||
report_interval = conf.AGENT.report_interval
|
||||
if periodic_interval is None:
|
||||
periodic_interval = conf.periodic_interval
|
||||
if periodic_fuzzy_delay is None:
|
||||
periodic_fuzzy_delay = conf.periodic_fuzzy_delay
|
||||
service_obj = cls(conf, host, binary, topic, manager,
|
||||
report_interval=report_interval,
|
||||
periodic_interval=periodic_interval,
|
||||
periodic_fuzzy_delay=periodic_fuzzy_delay)
|
||||
|
||||
return service_obj
|
|
@ -13,20 +13,13 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import inspect
|
||||
import logging as std_logging
|
||||
import os
|
||||
import random
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from tacker.common import config
|
||||
from tacker.common import rpc_compat
|
||||
from tacker import context
|
||||
from tacker.openstack.common import excutils
|
||||
from tacker.openstack.common import importutils
|
||||
from tacker.openstack.common import log as logging
|
||||
from tacker.openstack.common import loopingcall
|
||||
from tacker import wsgi
|
||||
|
||||
|
||||
|
@ -115,122 +108,3 @@ def _run_wsgi(app_name):
|
|||
{'host': cfg.CONF.bind_host,
|
||||
'port': cfg.CONF.bind_port})
|
||||
return server
|
||||
|
||||
|
||||
class Service(rpc_compat.Service):
|
||||
"""Service object for binaries running on hosts.
|
||||
|
||||
A service takes a manager and enables rpc by listening to queues based
|
||||
on topic. It also periodically runs tasks on the manager.
|
||||
"""
|
||||
|
||||
def __init__(self, host, binary, topic, manager, report_interval=None,
|
||||
periodic_interval=None, periodic_fuzzy_delay=None,
|
||||
*args, **kwargs):
|
||||
|
||||
self.binary = binary
|
||||
self.manager_class_name = manager
|
||||
manager_class = importutils.import_class(self.manager_class_name)
|
||||
self.manager = manager_class(host=host, *args, **kwargs)
|
||||
self.report_interval = report_interval
|
||||
self.periodic_interval = periodic_interval
|
||||
self.periodic_fuzzy_delay = periodic_fuzzy_delay
|
||||
self.saved_args, self.saved_kwargs = args, kwargs
|
||||
self.timers = []
|
||||
super(Service, self).__init__(host, topic, manager=self.manager)
|
||||
|
||||
def start(self):
|
||||
self.manager.init_host()
|
||||
super(Service, self).start()
|
||||
if self.report_interval:
|
||||
pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
|
||||
pulse.start(interval=self.report_interval,
|
||||
initial_delay=self.report_interval)
|
||||
self.timers.append(pulse)
|
||||
|
||||
if self.periodic_interval:
|
||||
if self.periodic_fuzzy_delay:
|
||||
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
|
||||
else:
|
||||
initial_delay = None
|
||||
|
||||
periodic = loopingcall.FixedIntervalLoopingCall(
|
||||
self.periodic_tasks)
|
||||
periodic.start(interval=self.periodic_interval,
|
||||
initial_delay=initial_delay)
|
||||
self.timers.append(periodic)
|
||||
self.manager.after_start()
|
||||
|
||||
def __getattr__(self, key):
|
||||
manager = self.__dict__.get('manager', None)
|
||||
return getattr(manager, key)
|
||||
|
||||
@classmethod
|
||||
def create(cls, host=None, binary=None, topic=None, manager=None,
|
||||
report_interval=None, periodic_interval=None,
|
||||
periodic_fuzzy_delay=None):
|
||||
"""Instantiates class and passes back application object.
|
||||
|
||||
:param host: defaults to CONF.host
|
||||
:param binary: defaults to basename of executable
|
||||
:param topic: defaults to bin_name - 'nova-' part
|
||||
:param manager: defaults to CONF.<topic>_manager
|
||||
:param report_interval: defaults to CONF.report_interval
|
||||
:param periodic_interval: defaults to CONF.periodic_interval
|
||||
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
|
||||
|
||||
"""
|
||||
if not host:
|
||||
host = CONF.host
|
||||
if not binary:
|
||||
binary = os.path.basename(inspect.stack()[-1][1])
|
||||
if not topic:
|
||||
topic = binary.rpartition('tacker-')[2]
|
||||
topic = topic.replace("-", "_")
|
||||
if not manager:
|
||||
manager = CONF.get('%s_manager' % topic, None)
|
||||
if report_interval is None:
|
||||
report_interval = CONF.report_interval
|
||||
if periodic_interval is None:
|
||||
periodic_interval = CONF.periodic_interval
|
||||
if periodic_fuzzy_delay is None:
|
||||
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
|
||||
service_obj = cls(host, binary, topic, manager,
|
||||
report_interval=report_interval,
|
||||
periodic_interval=periodic_interval,
|
||||
periodic_fuzzy_delay=periodic_fuzzy_delay)
|
||||
|
||||
return service_obj
|
||||
|
||||
def kill(self):
|
||||
"""Destroy the service object."""
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
super(Service, self).stop()
|
||||
for x in self.timers:
|
||||
try:
|
||||
x.stop()
|
||||
except Exception:
|
||||
LOG.exception(_("Exception occurs when timer stops"))
|
||||
pass
|
||||
self.timers = []
|
||||
|
||||
def wait(self):
|
||||
super(Service, self).wait()
|
||||
for x in self.timers:
|
||||
try:
|
||||
x.wait()
|
||||
except Exception:
|
||||
LOG.exception(_("Exception occurs when waiting for timer"))
|
||||
pass
|
||||
|
||||
def periodic_tasks(self, raise_on_error=False):
|
||||
"""Tasks to be run at a periodic interval."""
|
||||
ctxt = context.get_admin_context()
|
||||
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
|
||||
|
||||
def report_state(self):
|
||||
"""Update the state of this service."""
|
||||
# Todo(gongysh) report state to tacker server
|
||||
pass
|
||||
|
|
|
@ -1,113 +0,0 @@
|
|||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2012 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from tacker.agent import rpc
|
||||
from tacker.openstack.common import context
|
||||
from tacker.tests import base
|
||||
|
||||
|
||||
class AgentRPCPluginApi(base.BaseTestCase):
|
||||
def _test_rpc_call(self, method):
|
||||
agent = rpc.PluginApi('fake_topic')
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
expect_val = 'foo'
|
||||
with mock.patch('tacker.common.rpc_compat.RpcProxy.call') as rpc_call:
|
||||
rpc_call.return_value = expect_val
|
||||
func_obj = getattr(agent, method)
|
||||
if method == 'tunnel_sync':
|
||||
actual_val = func_obj(ctxt, 'fake_tunnel_ip')
|
||||
else:
|
||||
actual_val = func_obj(ctxt, 'fake_device', 'fake_agent_id')
|
||||
self.assertEqual(actual_val, expect_val)
|
||||
|
||||
def test_get_device_details(self):
|
||||
self._test_rpc_call('get_device_details')
|
||||
|
||||
def test_update_device_down(self):
|
||||
self._test_rpc_call('update_device_down')
|
||||
|
||||
def test_tunnel_sync(self):
|
||||
self._test_rpc_call('tunnel_sync')
|
||||
|
||||
|
||||
class AgentPluginReportState(base.BaseTestCase):
|
||||
def test_plugin_report_state_use_call(self):
|
||||
topic = 'test'
|
||||
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
||||
expected_agent_state = {'agent': 'test'}
|
||||
with mock.patch.object(reportStateAPI, 'call') as call:
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
reportStateAPI.report_state(ctxt, expected_agent_state,
|
||||
use_call=True)
|
||||
self.assertEqual(call.call_args[0][0], ctxt)
|
||||
self.assertEqual(call.call_args[0][1]['method'],
|
||||
'report_state')
|
||||
self.assertEqual(call.call_args[0][1]['args']['agent_state'],
|
||||
{'agent_state': expected_agent_state})
|
||||
self.assertIsInstance(call.call_args[0][1]['args']['time'],
|
||||
str)
|
||||
self.assertEqual(call.call_args[1]['topic'], topic)
|
||||
|
||||
def test_plugin_report_state_cast(self):
|
||||
topic = 'test'
|
||||
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
||||
expected_agent_state = {'agent': 'test'}
|
||||
with mock.patch.object(reportStateAPI, 'cast') as cast:
|
||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||
reportStateAPI.report_state(ctxt, expected_agent_state)
|
||||
self.assertEqual(cast.call_args[0][0], ctxt)
|
||||
self.assertEqual(cast.call_args[0][1]['method'],
|
||||
'report_state')
|
||||
self.assertEqual(cast.call_args[0][1]['args']['agent_state'],
|
||||
{'agent_state': expected_agent_state})
|
||||
self.assertIsInstance(cast.call_args[0][1]['args']['time'],
|
||||
str)
|
||||
self.assertEqual(cast.call_args[1]['topic'], topic)
|
||||
|
||||
|
||||
class AgentRPCMethods(base.BaseTestCase):
|
||||
def test_create_consumers(self):
|
||||
endpoints = [mock.Mock()]
|
||||
expected = [
|
||||
mock.call(new=True),
|
||||
mock.call().create_consumer('foo-topic-op', endpoints,
|
||||
fanout=True),
|
||||
mock.call().consume_in_threads()
|
||||
]
|
||||
|
||||
call_to_patch = 'tacker.common.rpc_compat.create_connection'
|
||||
with mock.patch(call_to_patch) as create_connection:
|
||||
rpc.create_consumers(endpoints, 'foo', [('topic', 'op')])
|
||||
create_connection.assert_has_calls(expected)
|
||||
|
||||
def test_create_consumers_with_node_name(self):
|
||||
endpoints = [mock.Mock()]
|
||||
expected = [
|
||||
mock.call(new=True),
|
||||
mock.call().create_consumer('foo-topic-op', endpoints,
|
||||
fanout=True),
|
||||
mock.call().create_consumer('foo-topic-op.node1', endpoints,
|
||||
fanout=False),
|
||||
mock.call().consume_in_threads()
|
||||
]
|
||||
|
||||
call_to_patch = 'tacker.common.rpc_compat.create_connection'
|
||||
with mock.patch(call_to_patch) as create_connection:
|
||||
rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')])
|
||||
create_connection.assert_has_calls(expected)
|
Loading…
Reference in New Issue