Merge "Propagate profiler info into BatchNotifier threads"
This commit is contained in:
commit
9d4161f955
|
@ -84,7 +84,7 @@ oslo.upgradecheck==0.1.0
|
|||
oslo.utils==3.33.0
|
||||
oslo.versionedobjects==1.35.1
|
||||
oslotest==3.2.0
|
||||
osprofiler==1.4.0
|
||||
osprofiler==2.3.0
|
||||
ovs==2.8.0
|
||||
ovsdbapp==0.9.1
|
||||
Paste==2.0.2
|
||||
|
|
|
@ -43,6 +43,7 @@ from oslo_config import cfg
|
|||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from osprofiler import profiler
|
||||
import pkg_resources
|
||||
|
||||
import neutron
|
||||
|
@ -917,3 +918,47 @@ class Timer(object):
|
|||
@property
|
||||
def delta_time_sec(self):
|
||||
return (datetime.datetime.now() - self.start).total_seconds()
|
||||
|
||||
|
||||
def _collect_profiler_info():
|
||||
p = profiler.get()
|
||||
if p:
|
||||
return {
|
||||
"hmac_key": p.hmac_key,
|
||||
"base_id": p.get_base_id(),
|
||||
"parent_id": p.get_id(),
|
||||
}
|
||||
|
||||
|
||||
def spawn(func, *args, **kwargs):
|
||||
"""As eventlet.spawn() but with osprofiler initialized in the new threads
|
||||
|
||||
osprofiler stores the profiler instance in thread local storage, therefore
|
||||
in new threads (including eventlet threads) osprofiler comes uninitialized
|
||||
by default. This spawn() is a stand-in replacement for eventlet.spawn()
|
||||
but we re-initialize osprofiler in threads spawn()-ed.
|
||||
"""
|
||||
|
||||
profiler_info = _collect_profiler_info()
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if profiler_info:
|
||||
profiler.init(**profiler_info)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return eventlet.spawn(wrapper, *args, **kwargs)
|
||||
|
||||
|
||||
def spawn_n(func, *args, **kwargs):
|
||||
"""See spawn() above"""
|
||||
|
||||
profiler_info = _collect_profiler_info()
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if profiler_info:
|
||||
profiler.init(**profiler_info)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return eventlet.spawn_n(wrapper, *args, **kwargs)
|
||||
|
|
|
@ -14,6 +14,8 @@ import eventlet
|
|||
from neutron_lib.utils import runtime
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.common import utils
|
||||
|
||||
|
||||
class BatchNotifier(object):
|
||||
def __init__(self, batch_interval, callback):
|
||||
|
@ -53,7 +55,7 @@ class BatchNotifier(object):
|
|||
# events to batch up
|
||||
eventlet.sleep(self.batch_interval)
|
||||
|
||||
eventlet.spawn_n(synced_send)
|
||||
utils.spawn_n(synced_send)
|
||||
|
||||
def _notify(self):
|
||||
if not self.pending_events:
|
||||
|
|
|
@ -40,6 +40,7 @@ from oslo_utils import excutils
|
|||
from oslo_utils import fileutils
|
||||
from oslo_utils import strutils
|
||||
from oslotest import base
|
||||
from osprofiler import profiler
|
||||
import six
|
||||
from sqlalchemy import exc as sqlalchemy_exc
|
||||
import testtools
|
||||
|
@ -380,6 +381,7 @@ class BaseTestCase(DietTestCase):
|
|||
self.addCleanup(db_api.sqla_remove_all)
|
||||
self.addCleanup(rpc_consumer_reg.clear)
|
||||
self.addCleanup(rpc_producer_reg.clear)
|
||||
self.addCleanup(profiler.clean)
|
||||
|
||||
def get_new_temp_dir(self):
|
||||
"""Create a new temporary directory.
|
||||
|
|
|
@ -20,10 +20,12 @@ import time
|
|||
|
||||
import ddt
|
||||
import eventlet
|
||||
from eventlet import queue
|
||||
import mock
|
||||
import netaddr
|
||||
from neutron_lib import constants
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
import six
|
||||
import testscenarios
|
||||
import testtools
|
||||
|
@ -570,3 +572,55 @@ class TimerTestCase(base.BaseTestCase):
|
|||
def test_delta_time_sec(self):
|
||||
with utils.Timer() as timer:
|
||||
self.assertIsInstance(timer.delta_time_sec, float)
|
||||
|
||||
|
||||
class SpawnWithOrWithoutProfilerTestCase(
|
||||
testscenarios.WithScenarios, base.BaseTestCase):
|
||||
|
||||
scenarios = [
|
||||
('spawn', {'spawn_variant': utils.spawn}),
|
||||
('spawn_n', {'spawn_variant': utils.spawn_n}),
|
||||
]
|
||||
|
||||
def _compare_profilers_in_parent_and_in_child(self, init_profiler):
|
||||
|
||||
q = queue.Queue()
|
||||
|
||||
def is_profiler_initialized(where):
|
||||
# Instead of returning a single boolean add information so we can
|
||||
# identify which thread produced the result without depending on
|
||||
# queue order.
|
||||
return {where: bool(profiler.get())}
|
||||
|
||||
def thread_with_no_leaked_profiler():
|
||||
if init_profiler:
|
||||
profiler.init(hmac_key='fake secret')
|
||||
|
||||
self.spawn_variant(
|
||||
lambda: q.put(is_profiler_initialized('in-child')))
|
||||
q.put(is_profiler_initialized('in-parent'))
|
||||
|
||||
# Make sure in parent we start with an uninitialized profiler by
|
||||
# eventlet.spawn()-ing a new thread. Otherwise the unit test runner
|
||||
# thread may leak an initialized profiler from one test to another.
|
||||
eventlet.spawn(thread_with_no_leaked_profiler)
|
||||
|
||||
# In order to have some global protection against leaking initialized
|
||||
# profilers neutron.test.base.BaseTestCase.setup() also calls
|
||||
# addCleanup(profiler.clean)
|
||||
|
||||
# Merge the results independently of queue order.
|
||||
results = {}
|
||||
results.update(q.get())
|
||||
results.update(q.get())
|
||||
|
||||
self.assertEqual(
|
||||
{'in-parent': init_profiler,
|
||||
'in-child': init_profiler},
|
||||
results)
|
||||
|
||||
def test_spawn_with_profiler(self):
|
||||
self._compare_profilers_in_parent_and_in_child(init_profiler=True)
|
||||
|
||||
def test_spawn_without_profiler(self):
|
||||
self._compare_profilers_in_parent_and_in_child(init_profiler=False)
|
||||
|
|
|
@ -43,7 +43,7 @@ oslo.service!=1.28.1,>=1.24.0 # Apache-2.0
|
|||
oslo.upgradecheck>=0.1.0 # Apache-2.0
|
||||
oslo.utils>=3.33.0 # Apache-2.0
|
||||
oslo.versionedobjects>=1.35.1 # Apache-2.0
|
||||
osprofiler>=1.4.0 # Apache-2.0
|
||||
osprofiler>=2.3.0 # Apache-2.0
|
||||
os-ken >= 0.3.0 # Apache-2.0
|
||||
ovs>=2.8.0 # Apache-2.0
|
||||
ovsdbapp>=0.9.1 # Apache-2.0
|
||||
|
|
Loading…
Reference in New Issue