Remove RPC server and client related stuff

RPC server and client are not used by tacker.
But notification is kept.

Change-Id: I3af66841bc46fb6ece17395622b88324fad2a47b
Partial-Bug: 1515864
This commit is contained in:
gong yong sheng
2015-12-10 04:45:56 -05:00
committed by Sridhar Ramaswamy
parent aa8f5d3e57
commit 7c24957eff
5 changed files with 1 additions and 467 deletions

View File

@@ -63,10 +63,7 @@ class Controller(object):
self._native_sorting = self._is_native_sorting_supported() self._native_sorting = self._is_native_sorting_supported()
self._policy_attrs = [name for (name, info) in self._attr_info.items() self._policy_attrs = [name for (name, info) in self._attr_info.items()
if info.get('required_by_policy')] if info.get('required_by_policy')]
self._notifier = n_rpc.get_notifier('network') self._notifier = n_rpc.get_notifier('nfv')
# if cfg.CONF.notify_nova_on_port_data_changes:
# from tacker.notifiers import nova
# self._nova_notifier = nova.Notifier()
self._member_actions = member_actions self._member_actions = member_actions
self._primary_key = self._get_primary_key() self._primary_key = self._get_primary_key()
if self._allow_pagination and self._native_pagination: if self._allow_pagination and self._native_pagination:

View File

@@ -77,25 +77,6 @@ def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS return ALLOWED_EXMODS + EXTRA_EXMODS
def get_client(target, version_cap=None, serializer=None):
assert TRANSPORT is not None
serializer = PluginRpcSerializer(serializer)
return oslo_messaging.RPCClient(TRANSPORT,
target,
version_cap=version_cap,
serializer=serializer)
def get_server(target, endpoints, serializer=None):
assert TRANSPORT is not None
serializer = PluginRpcSerializer(serializer)
return oslo_messaging.get_rpc_server(TRANSPORT,
target,
endpoints,
executor='eventlet',
serializer=serializer)
def get_notifier(service=None, host=None, publisher_id=None): def get_notifier(service=None, host=None, publisher_id=None):
assert NOTIFIER is not None assert NOTIFIER is not None
if not publisher_id: if not publisher_id:

View File

@@ -1,205 +0,0 @@
# Copyright 2011 VMware, Inc
# All Rights Reserved.
#
# based on tacker.service and nova.service
# Copyright 2013, 2014 Intel Corporation.
# Copyright 2013, 2014 Isaku Yamahata <isaku.yamahata at intel com>
# <isaku.yamahata at gmail com>
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# @author: Isaku Yamahata, Intel Corporation.
import inspect
import os.path
import random
import oslo_messaging
from tacker import context
from tacker.openstack.common.gettextutils import _
from tacker.openstack.common import importutils
from tacker.openstack.common import log as logging
from tacker.openstack.common import loopingcall
from tacker.openstack.common import service
from tacker import service as tacker_service # noqa # for service_opts
LOG = logging.getLogger(__name__)
TRANSPORT_ALIASES = {
'tacker.openstack.common.rpc.impl_kombu': 'rabbit',
'tacker.openstack.common.rpc.impl_qpid': 'qpid',
'tacker.openstack.common.rpc.impl_zmq': 'zmq',
'tacker.openstack.common.rpc.impl_fake': 'fake',
}
# replacement for tacker.openstack.common.rpc.service.Service
class RpcService(service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, conf, host, topic, manager=None, serializer=None):
super(RpcService, self).__init__()
self.conf = conf
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
self.manager = manager
def start(self):
super(RpcService, self).start()
target = oslo_messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
transport = oslo_messaging.get_transport(self.conf,
aliases=TRANSPORT_ALIASES)
self.rpcserver = oslo_messaging.get_rpc_server(
transport, target, endpoints, executor='eventlet',
serializer=self.serializer)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
self.rpcserver.start()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.rpcserver.stop()
self.rpcserver.wait()
except Exception:
pass
super(RpcService, self).stop()
# replacement for tacker.service.Service
class TackerService(RpcService):
def __init__(self, conf, host, binary, topic, manager,
report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
*args, **kwargs):
self.binary = binary
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(conf=conf, host=host, *args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
super(TackerService, self).__init__(conf, host, topic,
manager=self.manager)
def start(self):
self.manager.init_host()
super(TackerService, self).start()
if self.report_interval:
pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
self.timers.append(pulse)
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = loopingcall.FixedIntervalLoopingCall(
self.periodic_tasks)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
self.timers.append(periodic)
self.manager.after_start()
def kill(self):
"""Destroy the service object."""
self.stop()
def stop(self):
super(TackerService, self).stop()
for x in self.timers:
try:
x.stop()
except Exception:
LOG.exception(_("Exception occurs when timer stops"))
pass
self.timers = []
def wait(self):
super(TackerService, self).wait()
for x in self.timers:
try:
x.wait()
except Exception:
LOG.exception(_("Exception occurs when waiting for timer"))
pass
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
def report_state(self):
"""Update the state of this service."""
# Todo(gongysh) report state to tacker server
pass
def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
@classmethod
def create(cls, conf, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None):
"""Instantiates class and passes back application object.
:param host: defaults to conf.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'nova-' part
:param manager: defaults to conf.<topic>_manager
:param report_interval: defaults to conf.report_interval
:param periodic_interval: defaults to conf.periodic_interval
:param periodic_fuzzy_delay: defaults to conf.periodic_fuzzy_delay
"""
if not host:
host = conf.host
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = binary.rpartition('tacker-')[2]
topic = topic.replace("-", "_")
if not manager:
manager = conf.get('%s_manager' % topic, None)
if report_interval is None:
report_interval = conf.AGENT.report_interval
if periodic_interval is None:
periodic_interval = conf.periodic_interval
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = conf.periodic_fuzzy_delay
service_obj = cls(conf, host, binary, topic, manager,
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay)
return service_obj

View File

@@ -13,20 +13,13 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import inspect
import logging as std_logging import logging as std_logging
import os
import random
from oslo_config import cfg from oslo_config import cfg
from tacker.common import config from tacker.common import config
from tacker.common import rpc_compat
from tacker import context
from tacker.openstack.common import excutils from tacker.openstack.common import excutils
from tacker.openstack.common import importutils
from tacker.openstack.common import log as logging from tacker.openstack.common import log as logging
from tacker.openstack.common import loopingcall
from tacker import wsgi from tacker import wsgi
@@ -115,122 +108,3 @@ def _run_wsgi(app_name):
{'host': cfg.CONF.bind_host, {'host': cfg.CONF.bind_host,
'port': cfg.CONF.bind_port}) 'port': cfg.CONF.bind_port})
return server return server
class Service(rpc_compat.Service):
"""Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
on topic. It also periodically runs tasks on the manager.
"""
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
*args, **kwargs):
self.binary = binary
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=host, *args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
super(Service, self).__init__(host, topic, manager=self.manager)
def start(self):
self.manager.init_host()
super(Service, self).start()
if self.report_interval:
pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
self.timers.append(pulse)
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = loopingcall.FixedIntervalLoopingCall(
self.periodic_tasks)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
self.timers.append(periodic)
self.manager.after_start()
def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'nova-' part
:param manager: defaults to CONF.<topic>_manager
:param report_interval: defaults to CONF.report_interval
:param periodic_interval: defaults to CONF.periodic_interval
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
"""
if not host:
host = CONF.host
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = binary.rpartition('tacker-')[2]
topic = topic.replace("-", "_")
if not manager:
manager = CONF.get('%s_manager' % topic, None)
if report_interval is None:
report_interval = CONF.report_interval
if periodic_interval is None:
periodic_interval = CONF.periodic_interval
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay)
return service_obj
def kill(self):
"""Destroy the service object."""
self.stop()
def stop(self):
super(Service, self).stop()
for x in self.timers:
try:
x.stop()
except Exception:
LOG.exception(_("Exception occurs when timer stops"))
pass
self.timers = []
def wait(self):
super(Service, self).wait()
for x in self.timers:
try:
x.wait()
except Exception:
LOG.exception(_("Exception occurs when waiting for timer"))
pass
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
def report_state(self):
"""Update the state of this service."""
# Todo(gongysh) report state to tacker server
pass

View File

@@ -1,113 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from tacker.agent import rpc
from tacker.openstack.common import context
from tacker.tests import base
class AgentRPCPluginApi(base.BaseTestCase):
def _test_rpc_call(self, method):
agent = rpc.PluginApi('fake_topic')
ctxt = context.RequestContext('fake_user', 'fake_project')
expect_val = 'foo'
with mock.patch('tacker.common.rpc_compat.RpcProxy.call') as rpc_call:
rpc_call.return_value = expect_val
func_obj = getattr(agent, method)
if method == 'tunnel_sync':
actual_val = func_obj(ctxt, 'fake_tunnel_ip')
else:
actual_val = func_obj(ctxt, 'fake_device', 'fake_agent_id')
self.assertEqual(actual_val, expect_val)
def test_get_device_details(self):
self._test_rpc_call('get_device_details')
def test_update_device_down(self):
self._test_rpc_call('update_device_down')
def test_tunnel_sync(self):
self._test_rpc_call('tunnel_sync')
class AgentPluginReportState(base.BaseTestCase):
def test_plugin_report_state_use_call(self):
topic = 'test'
reportStateAPI = rpc.PluginReportStateAPI(topic)
expected_agent_state = {'agent': 'test'}
with mock.patch.object(reportStateAPI, 'call') as call:
ctxt = context.RequestContext('fake_user', 'fake_project')
reportStateAPI.report_state(ctxt, expected_agent_state,
use_call=True)
self.assertEqual(call.call_args[0][0], ctxt)
self.assertEqual(call.call_args[0][1]['method'],
'report_state')
self.assertEqual(call.call_args[0][1]['args']['agent_state'],
{'agent_state': expected_agent_state})
self.assertIsInstance(call.call_args[0][1]['args']['time'],
str)
self.assertEqual(call.call_args[1]['topic'], topic)
def test_plugin_report_state_cast(self):
topic = 'test'
reportStateAPI = rpc.PluginReportStateAPI(topic)
expected_agent_state = {'agent': 'test'}
with mock.patch.object(reportStateAPI, 'cast') as cast:
ctxt = context.RequestContext('fake_user', 'fake_project')
reportStateAPI.report_state(ctxt, expected_agent_state)
self.assertEqual(cast.call_args[0][0], ctxt)
self.assertEqual(cast.call_args[0][1]['method'],
'report_state')
self.assertEqual(cast.call_args[0][1]['args']['agent_state'],
{'agent_state': expected_agent_state})
self.assertIsInstance(cast.call_args[0][1]['args']['time'],
str)
self.assertEqual(cast.call_args[1]['topic'], topic)
class AgentRPCMethods(base.BaseTestCase):
def test_create_consumers(self):
endpoints = [mock.Mock()]
expected = [
mock.call(new=True),
mock.call().create_consumer('foo-topic-op', endpoints,
fanout=True),
mock.call().consume_in_threads()
]
call_to_patch = 'tacker.common.rpc_compat.create_connection'
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(endpoints, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected)
def test_create_consumers_with_node_name(self):
endpoints = [mock.Mock()]
expected = [
mock.call(new=True),
mock.call().create_consumer('foo-topic-op', endpoints,
fanout=True),
mock.call().create_consumer('foo-topic-op.node1', endpoints,
fanout=False),
mock.call().consume_in_threads()
]
call_to_patch = 'tacker.common.rpc_compat.create_connection'
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')])
create_connection.assert_has_calls(expected)