collector: remove deprecated RPC code

Change-Id: I995398ee239754a4d333460112700caeec516eb5
This commit is contained in:
Julien Danjou 2015-09-25 17:42:56 +02:00
parent 512b7f0678
commit 38af0f6b64
8 changed files with 4 additions and 203 deletions

View File

@ -45,16 +45,9 @@ OPTS = [
default=False, default=False,
help='Requeue the event on the collector event queue ' help='Requeue the event on the collector event queue '
'when the collector fails to dispatch it.'), 'when the collector fails to dispatch it.'),
cfg.BoolOpt('enable_rpc',
default=False,
help='Enable the RPC functionality of collector. This '
'functionality is now deprecated in favour of notifier '
'publisher and queues.')
] ]
cfg.CONF.register_opts(OPTS, group="collector") cfg.CONF.register_opts(OPTS, group="collector")
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
group='publisher_rpc')
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging', cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
group='publisher_notifier') group='publisher_notifier')
cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging', cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging',
@ -73,7 +66,6 @@ class CollectorService(os_service.Service):
# ensure dispatcher is configured before starting other services # ensure dispatcher is configured before starting other services
dispatcher_managers = dispatcher.load_dispatcher_manager() dispatcher_managers = dispatcher.load_dispatcher_manager()
(self.meter_manager, self.event_manager) = dispatcher_managers (self.meter_manager, self.event_manager) = dispatcher_managers
self.rpc_server = None
self.sample_listener = None self.sample_listener = None
self.event_listener = None self.event_listener = None
super(CollectorService, self).start() super(CollectorService, self).start()
@ -83,12 +75,6 @@ class CollectorService(os_service.Service):
transport = messaging.get_transport(optional=True) transport = messaging.get_transport(optional=True)
if transport: if transport:
if cfg.CONF.collector.enable_rpc:
LOG.warning('RPC collector is deprecated in favour of queues. '
'Please switch to notifier publisher.')
self.rpc_server = messaging.get_rpc_server(
transport, cfg.CONF.publisher_rpc.metering_topic, self)
if list(self.meter_manager): if list(self.meter_manager):
sample_target = oslo_messaging.Target( sample_target = oslo_messaging.Target(
topic=cfg.CONF.publisher_notifier.metering_topic) topic=cfg.CONF.publisher_notifier.metering_topic)
@ -109,9 +95,6 @@ class CollectorService(os_service.Service):
requeue_event_on_dispatcher_error)) requeue_event_on_dispatcher_error))
self.event_listener.start() self.event_listener.start()
if cfg.CONF.collector.enable_rpc:
self.rpc_server.start()
if not cfg.CONF.collector.udp_address: if not cfg.CONF.collector.udp_address:
# Add a dummy thread to have wait() working # Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None) self.tg.add_timer(604800, lambda: None)
@ -144,8 +127,6 @@ class CollectorService(os_service.Service):
def stop(self): def stop(self):
self.udp_run = False self.udp_run = False
if cfg.CONF.collector.enable_rpc and self.rpc_server:
self.rpc_server.stop()
if self.sample_listener: if self.sample_listener:
utils.kill_listeners([self.sample_listener]) utils.kill_listeners([self.sample_listener])
if self.event_listener: if self.event_listener:

View File

@ -81,23 +81,6 @@ _SERIALIZER = RequestContextSerializer(
oslo_serializer.JsonPayloadSerializer()) oslo_serializer.JsonPayloadSerializer())
def get_rpc_server(transport, topic, endpoint):
"""Return a configured oslo_messaging rpc server."""
cfg.CONF.import_opt('host', 'ceilometer.service')
target = oslo_messaging.Target(server=cfg.CONF.host, topic=topic)
return oslo_messaging.get_rpc_server(transport, target,
[endpoint], executor='threading',
serializer=_SERIALIZER)
def get_rpc_client(transport, retry=None, **kwargs):
"""Return a configured oslo_messaging RPCClient."""
target = oslo_messaging.Target(**kwargs)
return oslo_messaging.RPCClient(transport, target,
serializer=_SERIALIZER,
retry=retry)
def get_notification_listener(transport, targets, endpoints, def get_notification_listener(transport, targets, endpoints,
allow_requeue=False): allow_requeue=False):
"""Return a configured oslo_messaging notification listener.""" """Return a configured oslo_messaging notification listener."""

View File

@ -104,7 +104,6 @@ def list_opts():
('polling', ceilometer.agent.manager.OPTS), ('polling', ceilometer.agent.manager.OPTS),
('publisher', ceilometer.publisher.utils.OPTS), ('publisher', ceilometer.publisher.utils.OPTS),
('publisher_notifier', ceilometer.publisher.messaging.NOTIFIER_OPTS), ('publisher_notifier', ceilometer.publisher.messaging.NOTIFIER_OPTS),
('publisher_rpc', ceilometer.publisher.messaging.RPC_OPTS),
('rgw_admin_credentials', ceilometer.objectstore.rgw.CREDENTIAL_OPTS), ('rgw_admin_credentials', ceilometer.objectstore.rgw.CREDENTIAL_OPTS),
# NOTE(sileht): the configuration file contains only the options # NOTE(sileht): the configuration file contains only the options
# for the password plugin that handles keystone v2 and v3 API # for the password plugin that handles keystone v2 and v3 API

View File

@ -35,15 +35,6 @@ from ceilometer.publisher import utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
RPC_OPTS = [
cfg.StrOpt('metering_topic',
default='metering',
deprecated_for_removal=True,
help='The topic that ceilometer uses for metering messages.',
deprecated_group="DEFAULT",
),
]
NOTIFIER_OPTS = [ NOTIFIER_OPTS = [
cfg.StrOpt('metering_topic', cfg.StrOpt('metering_topic',
default='metering', default='metering',
@ -63,8 +54,6 @@ NOTIFIER_OPTS = [
) )
] ]
cfg.CONF.register_opts(RPC_OPTS,
group="publisher_rpc")
cfg.CONF.register_opts(NOTIFIER_OPTS, cfg.CONF.register_opts(NOTIFIER_OPTS,
group="publisher_notifier") group="publisher_notifier")
cfg.CONF.import_opt('host', 'ceilometer.service') cfg.CONF.import_opt('host', 'ceilometer.service')
@ -122,7 +111,7 @@ class MessagingPublisher(publisher.PublisherBase):
sample, cfg.CONF.publisher.telemetry_secret) sample, cfg.CONF.publisher.telemetry_secret)
for sample in samples for sample in samples
] ]
topic = cfg.CONF.publisher_rpc.metering_topic topic = cfg.CONF.publisher_notifier.metering_topic
self.local_queue.append((context, topic, meters)) self.local_queue.append((context, topic, meters))
if self.per_meter_topic: if self.per_meter_topic:
@ -201,26 +190,6 @@ class MessagingPublisher(publisher.PublisherBase):
"""Send the meters to the messaging topic.""" """Send the meters to the messaging topic."""
class RPCPublisher(MessagingPublisher):
def __init__(self, parsed_url):
super(RPCPublisher, self).__init__(parsed_url)
options = urlparse.parse_qs(parsed_url.query)
self.target = options.get('target', ['record_metering_data'])[0]
self.rpc_client = messaging.get_rpc_client(
messaging.get_transport(),
retry=self.retry, version='1.0'
)
def _send(self, context, topic, meters):
try:
self.rpc_client.prepare(topic=topic).cast(context, self.target,
data=meters)
except oslo_messaging.MessageDeliveryFailure as e:
raise_delivery_failure(e)
class NotifierPublisher(MessagingPublisher): class NotifierPublisher(MessagingPublisher):
def __init__(self, parsed_url, default_topic): def __init__(self, parsed_url, default_topic):
super(NotifierPublisher, self).__init__(parsed_url) super(NotifierPublisher, self).__init__(parsed_url)

View File

@ -17,7 +17,6 @@ import socket
import mock import mock
import msgpack import msgpack
from oslo_config import fixture as fixture_config from oslo_config import fixture as fixture_config
from oslo_context import context
import oslo_messaging import oslo_messaging
from oslo_utils import timeutils from oslo_utils import timeutils
from oslotest import mockpatch from oslotest import mockpatch
@ -25,7 +24,6 @@ from stevedore import extension
from ceilometer import collector from ceilometer import collector
from ceilometer import dispatcher from ceilometer import dispatcher
from ceilometer import messaging
from ceilometer.publisher import utils from ceilometer.publisher import utils
from ceilometer import sample from ceilometer import sample
from ceilometer.tests import base as tests_base from ceilometer.tests import base as tests_base
@ -208,18 +206,6 @@ class TestCollector(tests_base.BaseTestCase):
self.assertEqual(0, rpc_start.call_count) self.assertEqual(0, rpc_start.call_count)
self.assertEqual(1, udp_start.call_count) self.assertEqual(1, udp_start.call_count)
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
@mock.patch.object(collector.CollectorService, 'start_udp')
def test_only_rpc(self, udp_start, rpc_start):
"""Check that only RPC is started if udp_address is empty."""
self.CONF.set_override('enable_rpc', True, group='collector')
self.CONF.set_override('udp_address', '', group='collector')
self._setup_fake_dispatcher()
self.srv.start()
# two calls because two servers (notification and rpc)
self.assertEqual(2, rpc_start.call_count)
self.assertEqual(0, udp_start.call_count)
def test_udp_receive_valid_encoding(self): def test_udp_receive_valid_encoding(self):
self._setup_messaging(False) self._setup_messaging(False)
mock_dispatcher = self._setup_fake_dispatcher() mock_dispatcher = self._setup_fake_dispatcher()
@ -231,21 +217,6 @@ class TestCollector(tests_base.BaseTestCase):
mock_dispatcher.method_calls[0][1][0], mock_dispatcher.method_calls[0][1][0],
"not-so-secret")) "not-so-secret"))
@mock.patch('ceilometer.storage.impl_log.LOG')
def test_collector_no_mock(self, mylog):
self.CONF.set_override('enable_rpc', True, group='collector')
self.CONF.set_override('udp_address', '', group='collector')
mylog.info.side_effect = lambda *args: self.srv.stop()
self.srv.start()
client = messaging.get_rpc_client(self.transport, version='1.0')
cclient = client.prepare(topic='metering')
cclient.cast(context.RequestContext(),
'record_metering_data', data=[self.utf8_msg])
self.srv.rpc_server.wait()
mylog.info.assert_called_once_with(
'metering data test for test_run_tasks: 1')
def _test_collector_requeue(self, listener): def _test_collector_requeue(self, listener):
mock_dispatcher = self._setup_fake_dispatcher() mock_dispatcher = self._setup_fake_dispatcher()

View File

@ -19,12 +19,10 @@ import uuid
import mock import mock
from oslo_config import fixture as fixture_config from oslo_config import fixture as fixture_config
from oslo_context import context
from oslo_utils import netutils from oslo_utils import netutils
import testscenarios.testcase import testscenarios.testcase
from ceilometer.event.storage import models as event from ceilometer.event.storage import models as event
from ceilometer import messaging
from ceilometer.publisher import messaging as msg_publisher from ceilometer.publisher import messaging as msg_publisher
from ceilometer import sample from ceilometer import sample
from ceilometer.tests import base as tests_base from ceilometer.tests import base as tests_base
@ -103,76 +101,6 @@ class BasePublisherTestCase(tests_base.BaseTestCase):
self.setup_messaging(self.CONF) self.setup_messaging(self.CONF)
class RpcOnlyPublisherTest(BasePublisherTestCase):
def test_published_no_mock(self):
publisher = msg_publisher.RPCPublisher(
netutils.urlsplit('rpc://'))
endpoint = mock.MagicMock(['record_metering_data'])
collector = messaging.get_rpc_server(
self.transport, self.CONF.publisher_rpc.metering_topic, endpoint)
endpoint.record_metering_data.side_effect = (lambda *args, **kwds:
collector.stop())
collector.start()
publisher.publish_samples(context.RequestContext(),
self.test_sample_data)
collector.wait()
class Matcher(object):
@staticmethod
def __eq__(data):
for i, sample_item in enumerate(data):
if (sample_item['counter_name'] !=
self.test_sample_data[i].name):
return False
return True
endpoint.record_metering_data.assert_called_once_with(
mock.ANY, data=Matcher())
def test_publish_target(self):
publisher = msg_publisher.RPCPublisher(
netutils.urlsplit('rpc://?target=custom_procedure_call'))
cast_context = mock.MagicMock()
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
prepare.return_value = cast_context
publisher.publish_samples(mock.MagicMock(),
self.test_sample_data)
prepare.assert_called_once_with(
topic=self.CONF.publisher_rpc.metering_topic)
cast_context.cast.assert_called_once_with(
mock.ANY, 'custom_procedure_call', data=mock.ANY)
def test_published_with_per_meter_topic(self):
publisher = msg_publisher.RPCPublisher(
netutils.urlsplit('rpc://?per_meter_topic=1'))
with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
publisher.publish_samples(mock.MagicMock(),
self.test_sample_data)
class MeterGroupMatcher(object):
def __eq__(self, meters):
return len(set(meter['counter_name']
for meter in meters)) == 1
topic = self.CONF.publisher_rpc.metering_topic
expected = [mock.call(topic=topic),
mock.call().cast(mock.ANY, 'record_metering_data',
data=mock.ANY),
mock.call(topic=topic + '.test'),
mock.call().cast(mock.ANY, 'record_metering_data',
data=MeterGroupMatcher()),
mock.call(topic=topic + '.test2'),
mock.call().cast(mock.ANY, 'record_metering_data',
data=MeterGroupMatcher()),
mock.call(topic=topic + '.test3'),
mock.call().cast(mock.ANY, 'record_metering_data',
data=MeterGroupMatcher())]
self.assertEqual(expected, prepare.mock_calls)
class NotifierOnlyPublisherTest(BasePublisherTestCase): class NotifierOnlyPublisherTest(BasePublisherTestCase):
@mock.patch('oslo_messaging.Notifier') @mock.patch('oslo_messaging.Notifier')
@ -203,17 +131,13 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
publisher_cls=msg_publisher.EventNotifierPublisher, publisher_cls=msg_publisher.EventNotifierPublisher,
test_data=BasePublisherTestCase.test_event_data, test_data=BasePublisherTestCase.test_event_data,
pub_func='publish_events', attr='event_type')), pub_func='publish_events', attr='event_type')),
('rpc', dict(protocol="rpc",
publisher_cls=msg_publisher.RPCPublisher,
test_data=BasePublisherTestCase.test_sample_data,
pub_func='publish_samples', attr='source')),
] ]
def setUp(self): def setUp(self):
super(TestPublisher, self).setUp() super(TestPublisher, self).setUp()
self.topic = (self.CONF.publisher_notifier.event_topic self.topic = (self.CONF.publisher_notifier.event_topic
if self.pub_func == 'publish_events' else if self.pub_func == 'publish_events' else
self.CONF.publisher_rpc.metering_topic) self.CONF.publisher_notifier.metering_topic)
class TestPublisherPolicy(TestPublisher): class TestPublisherPolicy(TestPublisher):

View File

@ -218,9 +218,6 @@ ceilometer.transformer =
ceilometer.publisher = ceilometer.publisher =
test = ceilometer.publisher.test:TestPublisher test = ceilometer.publisher.test:TestPublisher
meter_publisher = ceilometer.publisher.messaging:RPCPublisher
meter = ceilometer.publisher.messaging:RPCPublisher
rpc = ceilometer.publisher.messaging:RPCPublisher
notifier = ceilometer.publisher.messaging:SampleNotifierPublisher notifier = ceilometer.publisher.messaging:SampleNotifierPublisher
udp = ceilometer.publisher.udp:UDPPublisher udp = ceilometer.publisher.udp:UDPPublisher
file = ceilometer.publisher.file:FilePublisher file = ceilometer.publisher.file:FilePublisher

View File

@ -30,7 +30,6 @@ import uuid
import make_test_data import make_test_data
from oslo_config import cfg from oslo_config import cfg
from oslo_context import context
import oslo_messaging import oslo_messaging
from six import moves from six import moves
@ -39,11 +38,6 @@ from ceilometer.publisher import utils
from ceilometer import service from ceilometer import service
def send_batch_rpc(rpc_client, topic, batch):
rpc_client.prepare(topic=topic).cast(context.RequestContext(),
'record_metering_data', data=batch)
def send_batch_notifier(notifier, topic, batch): def send_batch_notifier(notifier, topic, batch):
notifier.sample({}, event_type=topic, payload=batch) notifier.sample({}, event_type=topic, payload=batch)
@ -58,13 +52,6 @@ def get_notifier(config_file):
) )
def get_rpc_client(config_file):
service.prepare_service(argv=['/', '--config-file', config_file])
transport = messaging.get_transport()
rpc_client = messaging.get_rpc_client(transport, version='1.0')
return rpc_client
def generate_data(send_batch, make_data_args, samples_count, def generate_data(send_batch, make_data_args, samples_count,
batch_size, resources_count, topic): batch_size, resources_count, topic):
make_data_args.interval = 1 make_data_args.interval = 1
@ -104,12 +91,6 @@ def generate_data(send_batch, make_data_args, samples_count,
def get_parser(): def get_parser():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument(
'--notify',
dest='notify',
type=bool,
default=True
)
parser.add_argument( parser.add_argument(
'--batch-size', '--batch-size',
@ -148,12 +129,8 @@ def get_parser():
def main(): def main():
args = get_parser().parse_known_args()[0] args = get_parser().parse_known_args()[0]
make_data_args = make_test_data.get_parser().parse_known_args()[0] make_data_args = make_test_data.get_parser().parse_known_args()[0]
if args.notify: notifier = get_notifier(args.config_file)
notifier = get_notifier(args.config_file) send_batch = functools.partial(send_batch_notifier, notifier)
send_batch = functools.partial(send_batch_notifier, notifier)
else:
rpc_client = get_rpc_client(args.config_file)
send_batch = functools.partial(send_batch_rpc, rpc_client)
result_dir = args.result_dir result_dir = args.result_dir
del args.notify del args.notify
del args.config_file del args.config_file