Merge "collector: Don't use eventlet thread"
This commit is contained in:
commit
cc18736c7b
|
@ -15,6 +15,7 @@
|
|||
|
||||
from itertools import chain
|
||||
import socket
|
||||
import threading
|
||||
|
||||
import msgpack
|
||||
from oslo_config import cfg
|
||||
|
@ -68,10 +69,13 @@ class CollectorService(os_service.Service):
|
|||
(self.meter_manager, self.event_manager) = dispatcher_managers
|
||||
self.sample_listener = None
|
||||
self.event_listener = None
|
||||
self.udp_thread = None
|
||||
super(CollectorService, self).start()
|
||||
|
||||
if cfg.CONF.collector.udp_address:
|
||||
self.tg.add_thread(self.start_udp)
|
||||
self.udp_thread = threading.Thread(target=self.start_udp)
|
||||
self.udp_thread.daemon = True
|
||||
self.udp_thread.start()
|
||||
|
||||
transport = messaging.get_transport(optional=True)
|
||||
if transport:
|
||||
|
@ -100,6 +104,8 @@ class CollectorService(os_service.Service):
|
|||
self.event_listener.start()
|
||||
|
||||
if not cfg.CONF.collector.udp_address:
|
||||
# NOTE(sileht): We have to drop oslo.service to remove this
|
||||
# last eventlet thread
|
||||
# Add a dummy thread to have wait() working
|
||||
self.tg.add_timer(604800, lambda: None)
|
||||
|
||||
|
@ -130,11 +136,13 @@ class CollectorService(os_service.Service):
|
|||
LOG.exception(_("UDP: Unable to store meter"))
|
||||
|
||||
def stop(self):
|
||||
self.udp_run = False
|
||||
if self.sample_listener:
|
||||
utils.kill_listeners([self.sample_listener])
|
||||
if self.event_listener:
|
||||
utils.kill_listeners([self.event_listener])
|
||||
if self.udp_thread:
|
||||
self.udp_run = False
|
||||
self.udp_thread.join()
|
||||
super(CollectorService, self).stop()
|
||||
|
||||
def record_metering_data(self, context, data):
|
||||
|
|
|
@ -38,6 +38,17 @@ class FakeConnection(object):
|
|||
pass
|
||||
|
||||
|
||||
class FakeThread(object):
|
||||
def __init__(self, target):
|
||||
self.target = target
|
||||
|
||||
def start(self):
|
||||
self.target()
|
||||
|
||||
def join(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestCollector(tests_base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestCollector, self).setUp()
|
||||
|
@ -77,13 +88,8 @@ class TestCollector(tests_base.BaseTestCase):
|
|||
|
||||
self.srv = collector.CollectorService()
|
||||
|
||||
self.useFixture(mockpatch.PatchObject(
|
||||
self.srv.tg, 'add_thread',
|
||||
side_effect=self._dummy_thread_group_add_thread))
|
||||
|
||||
@staticmethod
|
||||
def _dummy_thread_group_add_thread(method):
|
||||
method()
|
||||
self.useFixture(mockpatch.Patch(
|
||||
"threading.Thread", side_effect=FakeThread))
|
||||
|
||||
def _setup_messaging(self, enabled=True):
|
||||
if enabled:
|
||||
|
|
Loading…
Reference in New Issue