Replace ThreadPoolExecutor with GreenThreadPoolExecutor

Based on eventlet issue [1] ThreadPoolExecutor doesn't play nice with
eventlet in python 3.7. We saw deadlocks in the functional-py37
execution in live migration tests due to live migration using
ThreadPoolExecutor.

The [1] suggests to replaces ThreadPoolExecutor with
futurist.GreenThreadPoolExecutor to avoid deadlocks. So this patch does
the replacement and adjusts the unit tests accordingly.

As the ThreadPoolExecutor was the last used class from the futures
module we remove that from the requirements and add the futurist module
instead.

[1] https://github.com/eventlet/eventlet/issues/508

Change-Id: Ia56ab43be739e677760bbad5c40caad924425fa5
This commit is contained in:
Balazs Gibizer 2018-12-12 16:13:55 +01:00 committed by Matt Riedemann
parent ae3064b7a8
commit 1f2a80c195
6 changed files with 15 additions and 20 deletions

@ -32,8 +32,7 @@ fasteners==0.14.1
fixtures==3.0.0 fixtures==3.0.0
flake8==2.5.5 flake8==2.5.5
future==0.16.0 future==0.16.0
futurist==1.6.0 futurist==1.8.0
futures==3.0.0
gabbi==1.35.0 gabbi==1.35.0
gitdb2==2.0.3 gitdb2==2.0.3
GitPython==2.1.8 GitPython==2.1.8

@ -27,9 +27,6 @@ terminating it.
import base64 import base64
import binascii import binascii
# If py2, concurrent.futures comes from the futures library otherwise it
# comes from the py3 standard library.
from concurrent import futures
import contextlib import contextlib
import functools import functools
import inspect import inspect
@ -43,6 +40,7 @@ import eventlet.event
from eventlet import greenthread from eventlet import greenthread
import eventlet.semaphore import eventlet.semaphore
import eventlet.timeout import eventlet.timeout
import futurist
from keystoneauth1 import exceptions as keystone_exception from keystoneauth1 import exceptions as keystone_exception
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging as messaging import oslo_messaging as messaging
@ -523,13 +521,10 @@ class ComputeManager(manager.Manager):
else: else:
self._build_semaphore = compute_utils.UnlimitedSemaphore() self._build_semaphore = compute_utils.UnlimitedSemaphore()
if max(CONF.max_concurrent_live_migrations, 0) != 0: if max(CONF.max_concurrent_live_migrations, 0) != 0:
self._live_migration_executor = futures.ThreadPoolExecutor( self._live_migration_executor = futurist.GreenThreadPoolExecutor(
max_workers=CONF.max_concurrent_live_migrations) max_workers=CONF.max_concurrent_live_migrations)
else: else:
# Starting in python 3.5, this is technically bounded, but it's self._live_migration_executor = futurist.GreenThreadPoolExecutor()
# ncpu * 5 which is probably much higher than anyone would sanely
# use for concurrently running live migrations.
self._live_migration_executor = futures.ThreadPoolExecutor()
# This is a dict, keyed by instance uuid, to a two-item tuple of # This is a dict, keyed by instance uuid, to a two-item tuple of
# migration object and Future for the queued live migration. # migration object and Future for the queued live migration.
self._waiting_live_migrations = {} self._waiting_live_migrations = {}
@ -6353,8 +6348,9 @@ class ComputeManager(manager.Manager):
block_migration, migration, migrate_data) block_migration, migration, migrate_data)
self._waiting_live_migrations[instance.uuid] = (migration, future) self._waiting_live_migrations[instance.uuid] = (migration, future)
except RuntimeError: except RuntimeError:
# ThreadPoolExecutor.submit will raise RuntimeError if the pool # GreenThreadPoolExecutor.submit will raise RuntimeError if the
# is shutdown, which happens in _cleanup_live_migrations_in_pool. # pool is shutdown, which happens in
# _cleanup_live_migrations_in_pool.
LOG.info('Migration %s failed to submit as the compute service ' LOG.info('Migration %s failed to submit as the compute service '
'is shutting down.', migration.uuid, instance=instance) 'is shutting down.', migration.uuid, instance=instance)
self._set_migration_status(migration, 'error') self._set_migration_status(migration, 'error')

@ -1072,7 +1072,7 @@ class SpawnIsSynchronousFixture(fixtures.Fixture):
class SynchronousThreadPoolExecutorFixture(fixtures.Fixture): class SynchronousThreadPoolExecutorFixture(fixtures.Fixture):
"""Make ThreadPoolExecutor.submit() synchronous. """Make GreenThreadPoolExecutor.submit() synchronous.
The function passed to submit() will be executed and a mock.Mock The function passed to submit() will be executed and a mock.Mock
object will be returned as the Future where Future.result() will object will be returned as the Future where Future.result() will
@ -1083,11 +1083,11 @@ class SynchronousThreadPoolExecutorFixture(fixtures.Fixture):
def fake_submit(_self, fn, *args, **kwargs): def fake_submit(_self, fn, *args, **kwargs):
result = fn(*args, **kwargs) result = fn(*args, **kwargs)
future = mock.Mock(spec='concurrent.futures.Future') future = mock.Mock(spec='futurist.Future')
future.return_value.result.return_value = result future.return_value.result.return_value = result
return future return future
self.useFixture(fixtures.MonkeyPatch( self.useFixture(fixtures.MonkeyPatch(
'concurrent.futures.ThreadPoolExecutor.submit', 'futurist.GreenThreadPoolExecutor.submit',
fake_submit)) fake_submit))

@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from concurrent import futures
import datetime import datetime
import futurist
import mock import mock
from nova.conductor import manager as conductor_manager from nova.conductor import manager as conductor_manager
@ -241,7 +241,7 @@ class ServerMigrationsSampleJsonTestV2_65(ServerMigrationsSampleJsonTestV2_24):
self._do_post('servers/%s/action' % self.uuid, 'live-migrate-server', self._do_post('servers/%s/action' % self.uuid, 'live-migrate-server',
{'hostname': self.compute.host}) {'hostname': self.compute.host})
self.compute._waiting_live_migrations[self.uuid] = ( self.compute._waiting_live_migrations[self.uuid] = (
self.migration, futures.Future()) self.migration, futurist.Future())
uri = 'servers/%s/migrations/%s' % (self.uuid, self.migration.id) uri = 'servers/%s/migrations/%s' % (self.uuid, self.migration.id)
response = self._do_delete(uri) response = self._do_delete(uri)
self.assertEqual(202, response.status_code) self.assertEqual(202, response.status_code)

@ -7214,7 +7214,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
self.flags(max_concurrent_live_migrations=0) self.flags(max_concurrent_live_migrations=0)
self._test_max_concurrent_live() self._test_max_concurrent_live()
@mock.patch('concurrent.futures.ThreadPoolExecutor') @mock.patch('futurist.GreenThreadPoolExecutor')
def test_max_concurrent_live_semaphore_limited(self, mock_executor): def test_max_concurrent_live_semaphore_limited(self, mock_executor):
self.flags(max_concurrent_live_migrations=123) self.flags(max_concurrent_live_migrations=123)
manager.ComputeManager() manager.ComputeManager()
@ -7224,7 +7224,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
def test_max_concurrent_live_semaphore_unlimited(self, max_concurrent): def test_max_concurrent_live_semaphore_unlimited(self, max_concurrent):
self.flags(max_concurrent_live_migrations=max_concurrent) self.flags(max_concurrent_live_migrations=max_concurrent)
with mock.patch( with mock.patch(
'concurrent.futures.ThreadPoolExecutor') as mock_executor: 'futurist.GreenThreadPoolExecutor') as mock_executor:
manager.ComputeManager() manager.ComputeManager()
mock_executor.assert_called_once_with() mock_executor.assert_called_once_with()

@ -69,4 +69,4 @@ os-service-types>=1.2.0 # Apache-2.0
taskflow>=2.16.0 # Apache-2.0 taskflow>=2.16.0 # Apache-2.0
python-dateutil>=2.5.3 # BSD python-dateutil>=2.5.3 # BSD
zVMCloudConnector>=1.1.1;sys_platform!='win32' # Apache 2.0 License zVMCloudConnector>=1.1.1;sys_platform!='win32' # Apache 2.0 License
futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # PSF futurist>=1.8.0 # Apache-2.0