From 867ad8d6ab0b8b5392c458732265ccb86b039d88 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Mon, 2 Nov 2015 18:51:13 +0100 Subject: [PATCH] Remove eventlet usage This removes entirely our usage of eventlet and its ugly monkey-patching in favor of a threaded approach. Implements: remove-eventlet Change-Id: Ib5f623e2d1ff9e9254601ad091bf5b53ab32000d --- ceilometer/__init__.py | 9 -- .../cmd/{eventlet => }/agent_notification.py | 0 ceilometer/cmd/{eventlet => }/collector.py | 0 ceilometer/cmd/eventlet/__init__.py | 22 ----- ceilometer/cmd/{eventlet => }/polling.py | 0 ceilometer/cmd/{eventlet => }/sample.py | 0 ceilometer/cmd/{eventlet => }/storage.py | 0 ceilometer/compute/virt/xenapi/inspector.py | 9 +- ceilometer/messaging.py | 4 +- ceilometer/opts.py | 4 +- ceilometer/publisher/messaging.py | 1 - ceilometer/tests/base.py | 5 - .../tests/functional/test_notification.py | 93 ++----------------- ceilometer/tests/unit/agent/test_manager.py | 29 +----- .../publisher/test_messaging_publisher.py | 28 ------ requirements.txt | 1 - setup.cfg | 12 +-- tox.ini | 4 - 18 files changed, 25 insertions(+), 196 deletions(-) rename ceilometer/cmd/{eventlet => }/agent_notification.py (100%) rename ceilometer/cmd/{eventlet => }/collector.py (100%) delete mode 100644 ceilometer/cmd/eventlet/__init__.py rename ceilometer/cmd/{eventlet => }/polling.py (100%) rename ceilometer/cmd/{eventlet => }/sample.py (100%) rename ceilometer/cmd/{eventlet => }/storage.py (100%) diff --git a/ceilometer/__init__.py b/ceilometer/__init__.py index 8753fef0..676c802f 100644 --- a/ceilometer/__init__.py +++ b/ceilometer/__init__.py @@ -1,7 +1,5 @@ # Copyright 2014 eNovance # -# Authors: Julien Danjou -# # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -14,13 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -# This must be set before the initial import of eventlet because if -# dnspython is present in your environment then eventlet monkeypatches -# socket.getaddrinfo() with an implementation which doesn't work for IPv6. -import os - -os.environ['EVENTLET_NO_GREENDNS'] = 'yes' - class NotImplementedError(NotImplementedError): # FIXME(jd) This is used by WSME to return a correct HTTP code. We should diff --git a/ceilometer/cmd/eventlet/agent_notification.py b/ceilometer/cmd/agent_notification.py similarity index 100% rename from ceilometer/cmd/eventlet/agent_notification.py rename to ceilometer/cmd/agent_notification.py diff --git a/ceilometer/cmd/eventlet/collector.py b/ceilometer/cmd/collector.py similarity index 100% rename from ceilometer/cmd/eventlet/collector.py rename to ceilometer/cmd/collector.py diff --git a/ceilometer/cmd/eventlet/__init__.py b/ceilometer/cmd/eventlet/__init__.py deleted file mode 100644 index 99efcc46..00000000 --- a/ceilometer/cmd/eventlet/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# -*- encoding: utf-8 -*- -# -# Copyright 2014 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import eventlet -# NOTE(jd) We need to monkey patch the socket and select module for, -# at least, oslo.messaging, otherwise everything's blocked on its -# first read() or select(), thread need to be patched too, because -# oslo.messaging use threading.local -eventlet.monkey_patch(socket=True, select=True, thread=True, time=True) diff --git a/ceilometer/cmd/eventlet/polling.py b/ceilometer/cmd/polling.py similarity index 100% rename from ceilometer/cmd/eventlet/polling.py rename to ceilometer/cmd/polling.py diff --git a/ceilometer/cmd/eventlet/sample.py b/ceilometer/cmd/sample.py similarity index 100% rename from ceilometer/cmd/eventlet/sample.py rename to ceilometer/cmd/sample.py diff --git a/ceilometer/cmd/eventlet/storage.py b/ceilometer/cmd/storage.py similarity index 100% rename from ceilometer/cmd/eventlet/storage.py rename to ceilometer/cmd/storage.py diff --git a/ceilometer/compute/virt/xenapi/inspector.py b/ceilometer/compute/virt/xenapi/inspector.py index 28e5afef..02d69ed0 100644 --- a/ceilometer/compute/virt/xenapi/inspector.py +++ b/ceilometer/compute/virt/xenapi/inspector.py @@ -13,7 +13,6 @@ # under the License. """Implementation of Inspector abstraction for XenAPI.""" -from eventlet import timeout from oslo_config import cfg from oslo_utils import units try: @@ -38,9 +37,6 @@ OPTS = [ cfg.StrOpt('connection_password', help='Password for connection to XenServer/Xen Cloud Platform.', secret=True), - cfg.IntOpt('login_timeout', - default=10, - help='Timeout in seconds for XenAPI login.'), ] CONF = cfg.CONF @@ -63,13 +59,10 @@ def get_api_session(): raise XenapiException(_('Must specify connection_url, and ' 'connection_password to use')) - exception = api.Failure(_("Unable to log in to XenAPI " - "(is the Dom0 disk full?)")) try: session = (api.xapi_local() if url == 'unix://local' else api.Session(url)) - with timeout.Timeout(CONF.xenapi.login_timeout, exception): - session.login_with_password(username, password) + session.login_with_password(username, password) except api.Failure as e: msg = _("Could not connect to XenAPI: %s") % e.details[0] raise XenapiException(msg) diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py index 6f5dce5d..5e7ea318 100644 --- a/ceilometer/messaging.py +++ b/ceilometer/messaging.py @@ -61,7 +61,7 @@ def get_rpc_server(transport, topic, endpoint): serializer = oslo_serializer.RequestContextSerializer( oslo_serializer.JsonPayloadSerializer()) return oslo_messaging.get_rpc_server(transport, target, - [endpoint], executor='eventlet', + [endpoint], executor='threading', serializer=serializer) @@ -79,7 +79,7 @@ def get_notification_listener(transport, targets, endpoints, allow_requeue=False): """Return a configured oslo_messaging notification listener.""" return oslo_messaging.get_notification_listener( - transport, targets, endpoints, executor='eventlet', + transport, targets, endpoints, executor='threading', allow_requeue=allow_requeue) diff --git a/ceilometer/opts.py b/ceilometer/opts.py index 971ea83e..7eb94da3 100644 --- a/ceilometer/opts.py +++ b/ceilometer/opts.py @@ -16,7 +16,7 @@ import itertools import ceilometer.agent.manager import ceilometer.api import ceilometer.api.app -import ceilometer.cmd.eventlet.polling +import ceilometer.cmd.polling import ceilometer.collector import ceilometer.compute.discovery import ceilometer.compute.notifications @@ -57,7 +57,7 @@ def list_opts(): ('DEFAULT', itertools.chain(ceilometer.agent.manager.OPTS, ceilometer.api.app.OPTS, - ceilometer.cmd.eventlet.polling.CLI_OPTS, + ceilometer.cmd.polling.CLI_OPTS, ceilometer.compute.notifications.OPTS, ceilometer.compute.util.OPTS, ceilometer.compute.virt.inspector.OPTS, diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py index a44b6ad1..5ca95d98 100644 --- a/ceilometer/publisher/messaging.py +++ b/ceilometer/publisher/messaging.py @@ -139,7 +139,6 @@ class MessagingPublisher(publisher.PublisherBase): def flush(self): # NOTE(sileht): - # IO of the rpc stuff in handled by eventlet, # this is why the self.local_queue, is emptied before processing the # queue and the remaining messages in the queue are added to # self.local_queue after in case of a other call have already added diff --git a/ceilometer/tests/base.py b/ceilometer/tests/base.py index 72f45500..4def20be 100644 --- a/ceilometer/tests/base.py +++ b/ceilometer/tests/base.py @@ -18,7 +18,6 @@ import functools import os.path -import eventlet import oslo_messaging.conffixture from oslo_utils import timeutils from oslotest import base @@ -39,10 +38,6 @@ class BaseTestCase(base.BaseTestCase): exchange = 'ceilometer' conf.set_override("control_exchange", exchange) - # oslo.messaging fake driver needs time and thread - # to be patched, otherwise there are chances of deadlocks - eventlet.monkey_patch(time=True, thread=True) - # NOTE(sileht): Ensure a new oslo.messaging driver is loaded # between each tests self.transport = messaging.get_transport("fake://", cache=False) diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index c0ebe6b5..52e0244a 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -16,7 +16,6 @@ import shutil -import eventlet import mock from oslo_config import fixture as fixture_config from oslo_context import context @@ -255,7 +254,6 @@ class BaseRealNotification(tests_base.BaseTestCase): if (len(self.publisher.samples) >= self.expected_samples and len(self.publisher.events) >= self.expected_events): break - eventlet.sleep(0) self.assertNotEqual(self.srv.listeners, self.srv.pipeline_listeners) self.srv.stop() @@ -284,114 +282,45 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification): self.assertIn(pipeline_poller_call, self.srv.tg.add_timer.call_args_list) - @mock.patch('ceilometer.publisher.test.TestPublisher') - def test_notification_reloaded_pipeline(self, fake_publisher_cls): - fake_publisher_cls.return_value = self.publisher - + def test_notification_reloaded_pipeline(self): pipeline_cfg_file = self.setup_pipeline(['instance']) self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) - self.expected_samples = 1 self.srv.start() - notifier = messaging.get_notifier(self.transport, - "compute.vagrant-precise") - notifier.info(context.RequestContext(), 'compute.instance.create.end', - TEST_NOTICE_PAYLOAD) + pipeline = self.srv.pipe_manager - start = timeutils.utcnow() - while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: - if (len(self.publisher.samples) >= self.expected_samples and - len(self.publisher.events) >= self.expected_events): - break - eventlet.sleep(0) - - self.assertEqual(self.expected_samples, len(self.publisher.samples)) - - # Flush publisher samples to test reloading - self.publisher.samples = [] # Modify the collection targets updated_pipeline_cfg_file = self.setup_pipeline(['vcpus', 'disk.root.size']) # Move/re-name the updated pipeline file to the original pipeline # file path as recorded in oslo config shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file) + self.srv.refresh_pipeline() - self.expected_samples = 2 - # Random sleep to let the pipeline poller complete the reloading - eventlet.sleep(3) - # Send message again to verify the reload works - notifier = messaging.get_notifier(self.transport, - "compute.vagrant-precise") - notifier.info(context.RequestContext(), 'compute.instance.create.end', - TEST_NOTICE_PAYLOAD) - - start = timeutils.utcnow() - while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: - if (len(self.publisher.samples) >= self.expected_samples and - len(self.publisher.events) >= self.expected_events): - break - eventlet.sleep(0) - - self.assertEqual(self.expected_samples, len(self.publisher.samples)) - - (self.assertIn(sample.name, ['disk.root.size', 'vcpus']) - for sample in self.publisher.samples) - - @mock.patch('ceilometer.publisher.test.TestPublisher') - def test_notification_reloaded_event_pipeline(self, fake_publisher_cls): - fake_publisher_cls.return_value = self.publisher + self.assertNotEqual(pipeline, self.srv.pipe_manager) + def test_notification_reloaded_event_pipeline(self): ev_pipeline_cfg_file = self.setup_event_pipeline( ['compute.instance.create.start']) self.CONF.set_override("event_pipeline_cfg_file", ev_pipeline_cfg_file) self.CONF.set_override("store_events", True, group="notification") - self.expected_events = 1 + self.srv.start() - notifier = messaging.get_notifier(self.transport, - "compute.vagrant-precise") - notifier.info(context.RequestContext(), - 'compute.instance.create.start', - TEST_NOTICE_PAYLOAD) + pipeline = self.srv.event_pipe_manager - start = timeutils.utcnow() - while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: - if len(self.publisher.events) >= self.expected_events: - break - eventlet.sleep(0) - - self.assertEqual(self.expected_events, len(self.publisher.events)) - - # Flush publisher events to test reloading - self.publisher.events = [] # Modify the collection targets updated_ev_pipeline_cfg_file = self.setup_event_pipeline( ['compute.instance.*']) + # Move/re-name the updated pipeline file to the original pipeline # file path as recorded in oslo config shutil.move(updated_ev_pipeline_cfg_file, ev_pipeline_cfg_file) + self.srv.refresh_pipeline() - self.expected_events = 1 - # Random sleep to let the pipeline poller complete the reloading - eventlet.sleep(3) - # Send message again to verify the reload works - notifier = messaging.get_notifier(self.transport, - "compute.vagrant-precise") - notifier.info(context.RequestContext(), 'compute.instance.create.end', - TEST_NOTICE_PAYLOAD) - - start = timeutils.utcnow() - while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: - if len(self.publisher.events) >= self.expected_events: - break - eventlet.sleep(0) - - self.assertEqual(self.expected_events, len(self.publisher.events)) - - self.assertEqual(self.publisher.events[0].event_type, - 'compute.instance.create.end') + self.assertNotEqual(pipeline, self.srv.pipe_manager) class TestRealNotification(BaseRealNotification): @@ -417,7 +346,6 @@ class TestRealNotification(BaseRealNotification): while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: if len(self.publisher.events) >= self.expected_events: break - eventlet.sleep(0) self.srv.stop() self.assertEqual(self.expected_events, len(self.publisher.events)) @@ -582,7 +510,6 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase): if (len(self.publisher.samples + self.publisher2.samples) >= self.expected_samples): break - eventlet.sleep(0) self.srv.stop() self.srv2.stop() diff --git a/ceilometer/tests/unit/agent/test_manager.py b/ceilometer/tests/unit/agent/test_manager.py index eba5baa0..505def76 100644 --- a/ceilometer/tests/unit/agent/test_manager.py +++ b/ceilometer/tests/unit/agent/test_manager.py @@ -12,18 +12,15 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -"""Tests for ceilometer/central/manager.py -""" +"""Tests for ceilometer agent manager""" import shutil -import eventlet from keystoneclient import exceptions as ks_exceptions import mock from novaclient import client as novaclient from oslo_service import service as os_service from oslo_utils import fileutils -from oslo_utils import timeutils from oslotest import base from oslotest import mockpatch import requests @@ -408,11 +405,9 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): self.mgr.tg = os_service.threadgroup.ThreadGroup(1000) self.mgr.start() - start = timeutils.utcnow() - while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: - if len(self.notified_samples) >= expected_samples: - break - eventlet.sleep(0) + # Manually executes callbacks + for timer in self.mgr.pollster_timers: + timer.f(*timer.args, **timer.kw) samples = self.notified_samples self.assertEqual(expected_samples, len(samples)) @@ -442,12 +437,6 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) self.mgr.tg = os_service.threadgroup.ThreadGroup(1000) self.mgr.start() - expected_samples = 1 - start = timeutils.utcnow() - while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: - if len(self.notified_samples) >= expected_samples: - break - eventlet.sleep(0) # we only got the old name of meters for sample in self.notified_samples: @@ -475,20 +464,10 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): # file path as recorded in oslo config shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file) - # Random sleep to let the pipeline poller complete the reloading - eventlet.sleep(3) - # Flush notified samples to test only new, nothing latent on # fake message bus. self.notified_samples = [] - expected_samples = 1 - start = timeutils.utcnow() - while timeutils.delta_seconds(start, timeutils.utcnow()) < 600: - if len(self.notified_samples) >= expected_samples: - break - eventlet.sleep(0) - # we only got the new name of meters for sample in self.notified_samples: self.assertEqual('testanother', sample['counter_name']) diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py index 3bd26487..10b15e2d 100644 --- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py +++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py @@ -17,7 +17,6 @@ import datetime import uuid -import eventlet import mock from oslo_config import fixture as fixture_config from oslo_context import context @@ -116,7 +115,6 @@ class RpcOnlyPublisherTest(BasePublisherTestCase): collector.stop()) collector.start() - eventlet.sleep() publisher.publish_samples(context.RequestContext(), self.test_sample_data) collector.wait() @@ -219,32 +217,6 @@ class TestPublisher(testscenarios.testcase.WithScenarios, class TestPublisherPolicy(TestPublisher): - def test_published_concurrency(self): - """Test concurrent access to the local queue of the rpc publisher.""" - - publisher = self.publisher_cls( - netutils.urlsplit('%s://' % self.protocol)) - - with mock.patch.object(publisher, '_send') as fake_send: - def fake_send_wait(ctxt, topic, meters): - fake_send.side_effect = mock.Mock() - # Sleep to simulate concurrency and allow other threads to work - eventlet.sleep(0) - - fake_send.side_effect = fake_send_wait - - job1 = eventlet.spawn(getattr(publisher, self.pub_func), - mock.MagicMock(), self.test_data) - job2 = eventlet.spawn(getattr(publisher, self.pub_func), - mock.MagicMock(), self.test_data) - - job1.wait() - job2.wait() - - self.assertEqual('default', publisher.policy) - self.assertEqual(2, len(fake_send.mock_calls)) - self.assertEqual(0, len(publisher.local_queue)) - @mock.patch('ceilometer.publisher.messaging.LOG') def test_published_with_no_policy(self, mylog): publisher = self.publisher_cls( diff --git a/requirements.txt b/requirements.txt index d0c83bd1..374779f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ # process, which may cause wedges in the gate later. retrying!=1.3.0,>=1.2.3 # Apache-2.0 -eventlet>=0.17.4 jsonpath-rw-ext>=0.1.9 jsonschema!=2.5.0,<3.0.0,>=2.0.0 kafka-python>=0.9.2 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index 95553013..04682287 100644 --- a/setup.cfg +++ b/setup.cfg @@ -240,13 +240,13 @@ ceilometer.event.trait_plugin = console_scripts = ceilometer-api = ceilometer.cmd.api:main - ceilometer-polling = ceilometer.cmd.eventlet.polling:main - ceilometer-agent-notification = ceilometer.cmd.eventlet.agent_notification:main - ceilometer-send-sample = ceilometer.cmd.eventlet.sample:send_sample - ceilometer-dbsync = ceilometer.cmd.eventlet.storage:dbsync - ceilometer-expirer = ceilometer.cmd.eventlet.storage:expirer + ceilometer-polling = ceilometer.cmd.polling:main + ceilometer-agent-notification = ceilometer.cmd.agent_notification:main + ceilometer-send-sample = ceilometer.cmd.sample:send_sample + ceilometer-dbsync = ceilometer.cmd.storage:dbsync + ceilometer-expirer = ceilometer.cmd.storage:expirer ceilometer-rootwrap = oslo_rootwrap.cmd:main - ceilometer-collector = ceilometer.cmd.eventlet.collector:main + ceilometer-collector = ceilometer.cmd.collector:main ceilometer.dispatcher.meter = database = ceilometer.dispatcher.database:DatabaseDispatcher diff --git a/tox.ini b/tox.ini index 608940b9..e5f19523 100644 --- a/tox.ini +++ b/tox.ini @@ -9,7 +9,6 @@ deps = -r{toxinidir}/requirements.txt install_command = pip install -U {opts} {packages} usedevelop = True setenv = VIRTUAL_ENV={envdir} - EVENTLET_NO_GREENDNS=yes OS_TEST_PATH=ceilometer/tests/unit passenv = OS_TEST_TIMEOUT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTURE commands = @@ -44,7 +43,6 @@ commands = [testenv:functional] setenv = VIRTUAL_ENV={envdir} - EVENTLET_NO_GREENDNS=yes OS_TEST_PATH=ceilometer/tests/functional/ passenv = CEILOMETER_* commands = @@ -52,7 +50,6 @@ commands = [testenv:py34-functional] setenv = VIRTUAL_ENV={envdir} - EVENTLET_NO_GREENDNS=yes OS_TEST_PATH=ceilometer/tests/functional/ basepython = python3.4 passenv = CEILOMETER_* @@ -61,7 +58,6 @@ commands = [testenv:integration] setenv = VIRTUAL_ENV={envdir} - EVENTLET_NO_GREENDNS=yes OS_TEST_PATH=./ceilometer/tests/integration OS_TEST_TIMEOUT=2400 GABBI_LIVE_FAIL_IF_NO_TEST=1