kill collector

Change-Id: I7720d20eab345a7835d57fac573332eca0e7d11e
This commit is contained in:
gord chung 2017-09-14 21:23:37 +00:00 committed by Hanxi Liu
parent 83ffaffcb2
commit fad69e9603
14 changed files with 18 additions and 493 deletions

View File

@ -1,30 +0,0 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cotyledon
from cotyledon import oslo_config_glue
from ceilometer import collector
from ceilometer import service
def main():
conf = service.prepare_service()
sm = cotyledon.ServiceManager()
sm.add(collector.CollectorService, workers=conf.collector.workers,
args=(conf,))
oslo_config_glue.setup(sm, conf)
sm.run()

View File

@ -1,194 +0,0 @@
#
# Copyright 2012-2013 eNovance <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from itertools import chain
import select
import socket
import cotyledon
import msgpack
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from oslo_utils import netutils
from oslo_utils import units
from ceilometer import dispatcher
from ceilometer.i18n import _
from ceilometer import messaging
from ceilometer.publisher import utils as publisher_utils
from ceilometer import utils
OPTS = [
cfg.HostAddressOpt('udp_address',
default='0.0.0.0',
help='Address to which the UDP socket is bound. Set to '
'an empty string to disable.'),
cfg.PortOpt('udp_port',
default=4952,
help='Port to which the UDP socket is bound.'),
cfg.IntOpt('batch_size',
default=1,
help='Number of notification messages to wait before '
'dispatching them'),
cfg.IntOpt('batch_timeout',
help='Number of seconds to wait before dispatching samples '
'when batch_size is not reached (None means indefinitely)'),
cfg.IntOpt('workers',
default=1,
min=1,
deprecated_group='DEFAULT',
deprecated_name='collector_workers',
help='Number of workers for collector service. '
'default value is 1.')
]
LOG = log.getLogger(__name__)
class CollectorService(cotyledon.Service):
"""Listener for the collector service."""
def __init__(self, worker_id, conf):
super(CollectorService, self).__init__(worker_id)
self.conf = conf
# ensure dispatcher is configured before starting other services
dispatcher_managers = dispatcher.load_dispatcher_manager(conf)
(self.meter_manager, self.event_manager) = dispatcher_managers
self.sample_listener = None
self.event_listener = None
self.udp_thread = None
import debtcollector
debtcollector.deprecate("Ceilometer collector service is deprecated."
"Use publishers to push data instead",
version="9.0", removal_version="10.0")
def run(self):
if self.conf.collector.udp_address:
self.udp_thread = utils.spawn_thread(self.start_udp)
transport = messaging.get_transport(self.conf, optional=True)
if transport:
if list(self.meter_manager):
sample_target = oslo_messaging.Target(
topic=self.conf.publisher_notifier.metering_topic)
self.sample_listener = (
messaging.get_batch_notification_listener(
transport, [sample_target],
[SampleEndpoint(self.conf.publisher.telemetry_secret,
self.meter_manager)],
allow_requeue=True,
batch_size=self.conf.collector.batch_size,
batch_timeout=self.conf.collector.batch_timeout))
self.sample_listener.start()
if list(self.event_manager):
event_target = oslo_messaging.Target(
topic=self.conf.publisher_notifier.event_topic)
self.event_listener = (
messaging.get_batch_notification_listener(
transport, [event_target],
[EventEndpoint(self.conf.publisher.telemetry_secret,
self.event_manager)],
allow_requeue=True,
batch_size=self.conf.collector.batch_size,
batch_timeout=self.conf.collector.batch_timeout))
self.event_listener.start()
def start_udp(self):
address_family = socket.AF_INET
if netutils.is_valid_ipv6(self.conf.collector.udp_address):
address_family = socket.AF_INET6
udp = socket.socket(address_family, socket.SOCK_DGRAM)
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
# NOTE(zhengwei): linux kernel >= 3.9
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except Exception:
LOG.warning("System does not support socket.SO_REUSEPORT "
"option. Only one worker will be able to process "
"incoming data.")
udp.bind((self.conf.collector.udp_address,
self.conf.collector.udp_port))
self.udp_run = True
while self.udp_run:
# NOTE(sileht): return every 10 seconds to allow
# clear shutdown
if not select.select([udp], [], [], 10.0)[0]:
continue
# NOTE(jd) Arbitrary limit of 64K because that ought to be
# enough for anybody.
data, source = udp.recvfrom(64 * units.Ki)
try:
sample = msgpack.loads(data, encoding='utf-8')
except Exception:
LOG.warning(_("UDP: Cannot decode data sent by %s"), source)
else:
if publisher_utils.verify_signature(
sample, self.conf.publisher.telemetry_secret):
try:
LOG.debug("UDP: Storing %s", sample)
self.meter_manager.map_method(
'record_metering_data', sample)
except Exception:
LOG.exception(_("UDP: Unable to store meter"))
else:
LOG.warning('sample signature invalid, '
'discarding: %s', sample)
def terminate(self):
if self.sample_listener:
utils.kill_listeners([self.sample_listener])
if self.event_listener:
utils.kill_listeners([self.event_listener])
if self.udp_thread:
self.udp_run = False
self.udp_thread.join()
super(CollectorService, self).terminate()
class CollectorEndpoint(object):
def __init__(self, secret, dispatcher_manager):
self.secret = secret
self.dispatcher_manager = dispatcher_manager
def sample(self, messages):
"""RPC endpoint for notification messages
When another service sends a notification over the message
bus, this method receives it.
"""
goods = []
for sample in chain.from_iterable(m["payload"] for m in messages):
if publisher_utils.verify_signature(sample, self.secret):
goods.append(sample)
else:
LOG.warning('notification signature invalid, '
'discarding: %s', sample)
try:
self.dispatcher_manager.map_method(self.method, goods)
except Exception:
LOG.exception("Dispatcher failed to handle the notification, "
"re-queuing it.")
return oslo_messaging.NotificationResult.REQUEUE
class SampleEndpoint(CollectorEndpoint):
method = 'record_metering_data'
class EventEndpoint(CollectorEndpoint):
method = 'record_events'

View File

@ -50,7 +50,6 @@ OPTS = [
'Once set, lowering this value may result in lost data.'),
cfg.BoolOpt('ack_on_event_error',
default=True,
deprecated_group='collector',
help='Acknowledge message when event persistence fails.'),
cfg.BoolOpt('workload_partitioning',
default=False,

View File

@ -20,7 +20,6 @@ from oslo_config import cfg
import ceilometer.agent.manager
import ceilometer.api.app
import ceilometer.api.controllers.v2.root
import ceilometer.collector
import ceilometer.compute.discovery
import ceilometer.compute.virt.inspector
import ceilometer.compute.virt.libvirt.utils
@ -87,7 +86,6 @@ def list_opts():
OPTS)),
('api', itertools.chain(ceilometer.api.app.API_OPTS,
ceilometer.api.controllers.v2.root.API_OPTS)),
('collector', ceilometer.collector.OPTS),
('compute', ceilometer.compute.discovery.OPTS),
('coordination', [
cfg.StrOpt(

View File

@ -440,7 +440,7 @@ class GnocchiPublisher(publisher.ConfigPublisherBase):
with self._gnocchi_resource_lock[cache_key]:
# NOTE(luogangyi): there is a possibility that the
# resource was already built in cache by another
# ceilometer-collector when we get the lock here.
# ceilometer-notification-agent when we get the lock here.
attribute_hash = self._check_resource_cache(cache_key,
resource)
if attribute_hash:

View File

@ -33,8 +33,7 @@ class UDPPublisher(publisher.ConfigPublisherBase):
def __init__(self, conf, parsed_url):
super(UDPPublisher, self).__init__(conf, parsed_url)
self.host, self.port = netutils.parse_host_port(
parsed_url.netloc,
default_port=self.conf.collector.udp_port)
parsed_url.netloc, default_port=4952)
addrinfo = None
try:
addrinfo = socket.getaddrinfo(self.host, None, socket.AF_INET6,

View File

@ -1,244 +0,0 @@
#
# Copyright 2012 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import fixtures
import mock
import msgpack
import oslo_messaging
from oslo_utils import timeutils
from stevedore import extension
from ceilometer import collector
from ceilometer import dispatcher
from ceilometer.publisher import utils
from ceilometer import sample
from ceilometer import service
from ceilometer.tests import base as tests_base
class FakeException(Exception):
pass
class FakeConnection(object):
def create_worker(self, topic, proxy, pool_name):
pass
class TestCollector(tests_base.BaseTestCase):
def setUp(self):
super(TestCollector, self).setUp()
self.CONF = service.prepare_service([], [])
self.CONF.import_opt("connection", "oslo_db.options", group="database")
self.CONF.set_override("connection", "log://", group='database')
self.CONF.set_override('telemetry_secret', 'not-so-secret',
group='publisher')
self._setup_messaging()
self.sample = utils.meter_message_from_counter(
sample.Sample(
name='foobar',
type='bad',
unit='F',
volume=1,
user_id='jd',
project_id='ceilometer',
resource_id='cat',
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={},
), self.CONF.publisher.telemetry_secret)
self.utf8_msg = utils.meter_message_from_counter(
sample.Sample(
name=u'test',
type=sample.TYPE_CUMULATIVE,
unit=u'',
volume=1,
user_id=u'test',
project_id=u'test',
resource_id=u'test_run_tasks',
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={u'name': [([u'TestPublish'])]},
source=u'testsource',
),
'not-so-secret')
self.mock_dispatcher = self._setup_fake_dispatcher()
self.srv = collector.CollectorService(0, self.CONF)
def _setup_messaging(self, enabled=True):
if enabled:
self.setup_messaging(self.CONF)
else:
self.useFixture(fixtures.MockPatch(
'ceilometer.messaging.get_transport',
return_value=None))
def _setup_fake_dispatcher(self):
plugin = mock.MagicMock()
fake_dispatcher = extension.ExtensionManager.make_test_instance([
extension.Extension('test', None, None, plugin,),
], propagate_map_exceptions=True)
self.useFixture(fixtures.MockPatch(
'ceilometer.dispatcher.load_dispatcher_manager',
return_value=(fake_dispatcher, fake_dispatcher)))
return plugin
def _make_fake_socket(self, sample):
def recvfrom(size):
# Make the loop stop
self.srv.udp_run = False
return msgpack.dumps(sample), ('127.0.0.1', 12345)
sock = mock.Mock()
sock.recvfrom = recvfrom
return sock
def _verify_udp_socket(self, udp_socket):
conf = self.CONF.collector
setsocketopt_calls = [mock.call.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1),
mock.call.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEPORT, 1)]
udp_socket.setsockopt.assert_has_calls(setsocketopt_calls)
udp_socket.bind.assert_called_once_with((conf.udp_address,
conf.udp_port))
def test_udp_receive_base(self):
self._setup_messaging(False)
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('select.select', return_value=([udp_socket], [], [])):
with mock.patch('socket.socket') as mock_socket:
mock_socket.return_value = udp_socket
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
mock_socket.assert_called_with(socket.AF_INET,
socket.SOCK_DGRAM)
self._verify_udp_socket(udp_socket)
mock_record = self.mock_dispatcher.record_metering_data
mock_record.assert_called_once_with(self.sample)
def test_udp_socket_ipv6(self):
self._setup_messaging(False)
self.CONF.set_override('udp_address', '::1', group='collector')
sock = self._make_fake_socket(self.sample)
with mock.patch('select.select', return_value=([sock], [], [])):
with mock.patch.object(socket, 'socket') as mock_socket:
mock_socket.return_value = sock
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
mock_socket.assert_called_with(socket.AF_INET6,
socket.SOCK_DGRAM)
def test_udp_receive_storage_error(self):
self._setup_messaging(False)
mock_record = self.mock_dispatcher.record_metering_data
mock_record.side_effect = self._raise_error
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('select.select', return_value=([udp_socket], [], [])):
with mock.patch('socket.socket', return_value=udp_socket):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
self._verify_udp_socket(udp_socket)
mock_record.assert_called_once_with(self.sample)
@staticmethod
def _raise_error(*args, **kwargs):
raise Exception
@mock.patch.object(collector, 'LOG')
def test_udp_receive_bad_decoding(self, log):
self._setup_messaging(False)
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('select.select', return_value=([udp_socket], [], [])):
with mock.patch('socket.socket', return_value=udp_socket):
with mock.patch('msgpack.loads', self._raise_error):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
self._verify_udp_socket(udp_socket)
log.warning.assert_called_once_with(
"UDP: Cannot decode data sent by %s", mock.ANY)
@mock.patch.object(collector.CollectorService, 'start_udp')
def test_only_udp(self, udp_start):
"""Check that only UDP is started if messaging transport is unset."""
self._setup_messaging(False)
udp_socket = self._make_fake_socket(self.sample)
real_start = oslo_messaging.MessageHandlingServer.start
with mock.patch.object(oslo_messaging.MessageHandlingServer,
'start', side_effect=real_start) as rpc_start:
with mock.patch('socket.socket', return_value=udp_socket):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
self.assertEqual(0, rpc_start.call_count)
self.assertEqual(1, udp_start.call_count)
def test_udp_receive_valid_encoding(self):
self._setup_messaging(False)
self.data_sent = []
sock = self._make_fake_socket(self.utf8_msg)
with mock.patch('select.select', return_value=([sock], [], [])):
with mock.patch('socket.socket', return_value=sock):
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
self.assertTrue(utils.verify_signature(
self.mock_dispatcher.method_calls[0][1][0],
"not-so-secret"))
def _test_collector_requeue(self, listener, batch_listener=False):
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
mock_record = self.mock_dispatcher.record_metering_data
mock_record.side_effect = Exception('boom')
self.mock_dispatcher.record_events.side_effect = Exception('boom')
self.srv.run()
self.addCleanup(self.srv.terminate)
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
ret = endp.sample([{'ctxt': {}, 'publisher_id': 'pub_id',
'event_type': 'event', 'payload': {},
'metadata': {}}])
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE,
ret)
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_sample_requeue(self):
self._test_collector_requeue('sample_listener')
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_event_requeue(self):
self._test_collector_requeue('event_listener')

View File

@ -167,8 +167,7 @@ class TestUDPPublisher(base.BaseTestCase):
sent_counters.append(counter)
# Check destination
self.assertEqual(('somehost',
self.CONF.collector.udp_port), dest)
self.assertEqual(('somehost', 4952), dest)
# Check that counters are equal
def sort_func(counter):

View File

@ -296,9 +296,6 @@ function _ceilometer_configure_storage_backend {
elif [ "$CEILOMETER_BACKEND" = 'mongodb' ] ; then
iniset $CEILOMETER_CONF database metering_connection mongodb://localhost:27017/ceilometer
elif [ "$CEILOMETER_BACKEND" = 'gnocchi' ] ; then
# NOTE(gordc): set batching to better handle recording on a slow machine
iniset $CEILOMETER_CONF collector batch_size 50
iniset $CEILOMETER_CONF collector batch_timeout 5
sed -i "s/gnocchi:\/\//gnocchi:\/\/?archive_policy=${GNOCCHI_ARCHIVE_POLICY}\&filter_project=gnocchi_swift/" $CEILOMETER_CONF_DIR/event_pipeline.yaml $CEILOMETER_CONF_DIR/pipeline.yaml
else
die $LINENO "Unable to configure unknown CEILOMETER_BACKEND $CEILOMETER_BACKEND"
@ -379,10 +376,6 @@ function configure_ceilometer {
_ceilometer_configure_storage_backend
fi
if is_service_enabled ceilometer-collector; then
iniset $CEILOMETER_CONF collector workers $API_WORKERS
fi
if [[ "$VIRT_DRIVER" = 'vsphere' ]]; then
iniset $CEILOMETER_CONF DEFAULT hypervisor_inspector vsphere
iniset $CEILOMETER_CONF vmware host_ip "$VMWAREAPI_IP"
@ -481,12 +474,11 @@ function start_ceilometer {
tail_log ceilometer-api /var/log/$APACHE_NAME/ceilometer_access.log
fi
# run the notification agent/collector after restarting apache as it needs
# run the notification agent after restarting apache as it needs
# operational keystone if using gnocchi
run_process ceilometer-anotification "$CEILOMETER_BIN_DIR/ceilometer-agent-notification --config-file $CEILOMETER_CONF"
run_process ceilometer-collector "$CEILOMETER_BIN_DIR/ceilometer-collector --config-file $CEILOMETER_CONF"
# Start the compute agent late to allow time for the collector to
# Start the compute agent late to allow time for the notification agent to
# fully wake up and connect to the message bus. See bug #1355809
if [[ "$VIRT_DRIVER" = 'libvirt' ]]; then
run_process ceilometer-acompute "$CEILOMETER_BIN_DIR/ceilometer-polling --polling-namespaces compute --config-file $CEILOMETER_CONF" $LIBVIRT_GROUP
@ -508,7 +500,7 @@ function stop_ceilometer {
fi
# Kill the ceilometer screen windows
for serv in ceilometer-acompute ceilometer-acentral ceilometer-aipmi ceilometer-anotification ceilometer-collector; do
for serv in ceilometer-acompute ceilometer-acentral ceilometer-aipmi ceilometer-anotification; do
stop_process $serv
done
}

View File

@ -1,7 +1,7 @@
register_project_for_upgrade ceilometer
devstack_localrc base enable_plugin ceilometer git://git.openstack.org/openstack/ceilometer
devstack_localrc base enable_service ceilometer-acompute ceilometer-acentral ceilometer-aipmi ceilometer-anotification ceilometer-collector ceilometer-api tempest
devstack_localrc base enable_service ceilometer-acompute ceilometer-acentral ceilometer-aipmi ceilometer-anotification ceilometer-api tempest
devstack_localrc target enable_plugin ceilometer git://git.openstack.org/openstack/ceilometer
devstack_localrc target enable_service ceilometer-acompute ceilometer-acentral ceilometer-aipmi ceilometer-anotification ceilometer-collector ceilometer-api tempest
devstack_localrc target enable_service ceilometer-acompute ceilometer-acentral ceilometer-aipmi ceilometer-anotification ceilometer-api tempest

View File

@ -22,6 +22,6 @@ stop_ceilometer
# ensure everything is stopped
SERVICES_DOWN="ceilometer-acompute ceilometer-acentral ceilometer-aipmi ceilometer-anotification ceilometer-collector ceilometer-api"
SERVICES_DOWN="ceilometer-acompute ceilometer-acentral ceilometer-aipmi ceilometer-anotification ceilometer-api"
ensure_services_stopped $SERVICES_DOWN

View File

@ -80,8 +80,7 @@ start_ceilometer
ensure_services_started "ceilometer-polling --polling-namespaces compute" \
"ceilometer-polling --polling-namespaces central" \
ceilometer-agent-notification \
ceilometer-api \
ceilometer-collector
ceilometer-api
# Save mongodb state (replace with snapshot)
if grep -q 'connection *= *mongo' /etc/ceilometer/ceilometer.conf; then

View File

@ -0,0 +1,8 @@
---
upgrade:
- |
The collector service is removed. From Ocata, it's possible to edit the
pipeline.yaml and event_pipeline.yaml files and modify the publisher to
provide the same functionality as collector dispatcher. You may change
publisher to 'gnocchi', 'http', 'panko', or any combination of available
publishers listed in documentation.

View File

@ -281,7 +281,6 @@ console_scripts =
ceilometer-db-legacy-clean = ceilometer.cmd.storage:db_clean_legacy
ceilometer-expirer = ceilometer.cmd.storage:expirer
ceilometer-rootwrap = oslo_rootwrap.cmd:main
ceilometer-collector = ceilometer.cmd.collector:main
ceilometer.dispatcher.meter =
database = ceilometer.dispatcher.database:MeterDatabaseDispatcher