Transport context to all threads
The nova.utils.spawn and spawn_n methods transport the context (and profiling information) to the newly created threads. But the same isn't done when submitting work to thread-pools in the ComputeManager. The code doing that for spawn and spawn_n is extracted to a new function and called to submit the work to the thread-pools. Closes-Bug: #1962574 Change-Id: I9085deaa8cf0b167d87db68e4afc4a463c00569c
This commit is contained in:
parent
3b4378c189
commit
646fc51732
@ -8621,7 +8621,8 @@ class ComputeManager(manager.Manager):
|
|||||||
# in order to be able to track and abort it in the future.
|
# in order to be able to track and abort it in the future.
|
||||||
self._waiting_live_migrations[instance.uuid] = (None, None)
|
self._waiting_live_migrations[instance.uuid] = (None, None)
|
||||||
try:
|
try:
|
||||||
future = self._live_migration_executor.submit(
|
future = nova.utils.pass_context(
|
||||||
|
self._live_migration_executor.submit,
|
||||||
self._do_live_migration, context, dest, instance,
|
self._do_live_migration, context, dest, instance,
|
||||||
block_migration, migration, migrate_data)
|
block_migration, migration, migrate_data)
|
||||||
self._waiting_live_migrations[instance.uuid] = (migration, future)
|
self._waiting_live_migrations[instance.uuid] = (migration, future)
|
||||||
@ -9866,7 +9867,9 @@ class ComputeManager(manager.Manager):
|
|||||||
else:
|
else:
|
||||||
LOG.debug('Triggering sync for uuid %s', uuid)
|
LOG.debug('Triggering sync for uuid %s', uuid)
|
||||||
self._syncs_in_progress[uuid] = True
|
self._syncs_in_progress[uuid] = True
|
||||||
self._sync_power_pool.spawn_n(_sync, db_instance)
|
nova.utils.pass_context(self._sync_power_pool.spawn_n,
|
||||||
|
_sync,
|
||||||
|
db_instance)
|
||||||
|
|
||||||
def _query_driver_power_state_and_sync(self, context, db_instance):
|
def _query_driver_power_state_and_sync(self, context, db_instance):
|
||||||
if db_instance.task_state is not None:
|
if db_instance.task_state is not None:
|
||||||
|
@ -2049,8 +2049,8 @@ class ComputeTaskManager:
|
|||||||
skipped_host(target_ctxt, host, image_ids)
|
skipped_host(target_ctxt, host, image_ids)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
fetch_pool.spawn_n(wrap_cache_images, target_ctxt, host,
|
utils.pass_context(fetch_pool.spawn_n, wrap_cache_images,
|
||||||
image_ids)
|
target_ctxt, host, image_ids)
|
||||||
|
|
||||||
# Wait until all those things finish
|
# Wait until all those things finish
|
||||||
fetch_pool.waitall()
|
fetch_pool.waitall()
|
||||||
|
@ -9154,9 +9154,15 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
|
|||||||
self.assertEqual(driver_console.get_connection_info.return_value,
|
self.assertEqual(driver_console.get_connection_info.return_value,
|
||||||
console)
|
console)
|
||||||
|
|
||||||
|
@mock.patch('nova.utils.pass_context')
|
||||||
@mock.patch('nova.compute.manager.ComputeManager.'
|
@mock.patch('nova.compute.manager.ComputeManager.'
|
||||||
'_do_live_migration')
|
'_do_live_migration')
|
||||||
def _test_max_concurrent_live(self, mock_lm):
|
def _test_max_concurrent_live(self, mock_lm, mock_pass_context):
|
||||||
|
# pass_context wraps the function, which doesn't work with a mock
|
||||||
|
# So we simply mock it too
|
||||||
|
def _mock_pass_context(runner, func, *args, **kwargs):
|
||||||
|
return runner(func, *args, **kwargs)
|
||||||
|
mock_pass_context.side_effect = _mock_pass_context
|
||||||
|
|
||||||
@mock.patch('nova.objects.Migration.save')
|
@mock.patch('nova.objects.Migration.save')
|
||||||
def _do_it(mock_mig_save):
|
def _do_it(mock_mig_save):
|
||||||
|
@ -632,15 +632,13 @@ def _serialize_profile_info():
|
|||||||
return trace_info
|
return trace_info
|
||||||
|
|
||||||
|
|
||||||
def spawn(func, *args, **kwargs):
|
def pass_context(runner, func, *args, **kwargs):
|
||||||
"""Passthrough method for eventlet.spawn.
|
"""Generalised passthrough method
|
||||||
|
|
||||||
This utility exists so that it can be stubbed for testing without
|
It will grab the context from the threadlocal store and add it to
|
||||||
interfering with the service spawns.
|
the store on the runner. This allows for continuity in logging the
|
||||||
|
context when using this method to spawn a new thread through the
|
||||||
It will also grab the context from the threadlocal store and add it to
|
runner function
|
||||||
the store on the new thread. This allows for continuity in logging the
|
|
||||||
context when using this method to spawn a new thread.
|
|
||||||
"""
|
"""
|
||||||
_context = common_context.get_current()
|
_context = common_context.get_current()
|
||||||
profiler_info = _serialize_profile_info()
|
profiler_info = _serialize_profile_info()
|
||||||
@ -655,7 +653,21 @@ def spawn(func, *args, **kwargs):
|
|||||||
profiler.init(**profiler_info)
|
profiler.init(**profiler_info)
|
||||||
return func(*args, **kwargs)
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
return eventlet.spawn(context_wrapper, *args, **kwargs)
|
return runner(context_wrapper, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def spawn(func, *args, **kwargs):
|
||||||
|
"""Passthrough method for eventlet.spawn.
|
||||||
|
|
||||||
|
This utility exists so that it can be stubbed for testing without
|
||||||
|
interfering with the service spawns.
|
||||||
|
|
||||||
|
It will also grab the context from the threadlocal store and add it to
|
||||||
|
the store on the new thread. This allows for continuity in logging the
|
||||||
|
context when using this method to spawn a new thread.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return pass_context(eventlet.spawn, func, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def spawn_n(func, *args, **kwargs):
|
def spawn_n(func, *args, **kwargs):
|
||||||
@ -668,25 +680,12 @@ def spawn_n(func, *args, **kwargs):
|
|||||||
the store on the new thread. This allows for continuity in logging the
|
the store on the new thread. This allows for continuity in logging the
|
||||||
context when using this method to spawn a new thread.
|
context when using this method to spawn a new thread.
|
||||||
"""
|
"""
|
||||||
_context = common_context.get_current()
|
pass_context(eventlet.spawn_n, func, *args, **kwargs)
|
||||||
profiler_info = _serialize_profile_info()
|
|
||||||
|
|
||||||
@functools.wraps(func)
|
|
||||||
def context_wrapper(*args, **kwargs):
|
|
||||||
# NOTE: If update_store is not called after spawn_n it won't be
|
|
||||||
# available for the logger to pull from threadlocal storage.
|
|
||||||
if _context is not None:
|
|
||||||
_context.update_store()
|
|
||||||
if profiler_info and profiler:
|
|
||||||
profiler.init(**profiler_info)
|
|
||||||
func(*args, **kwargs)
|
|
||||||
|
|
||||||
eventlet.spawn_n(context_wrapper, *args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
def tpool_execute(func, *args, **kwargs):
|
def tpool_execute(func, *args, **kwargs):
|
||||||
"""Run func in a native thread"""
|
"""Run func in a native thread"""
|
||||||
tpool.execute(func, *args, **kwargs)
|
return pass_context(tpool.execute, func, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def is_none_string(val):
|
def is_none_string(val):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user