Merge "Replace ThreadPoolExecutor with GreenThreadPoolExecutor"
This commit is contained in:
commit
eb0efcf681
@ -32,8 +32,7 @@ fasteners==0.14.1
|
||||
fixtures==3.0.0
|
||||
flake8==2.5.5
|
||||
future==0.16.0
|
||||
futurist==1.6.0
|
||||
futures==3.0.0
|
||||
futurist==1.8.0
|
||||
gabbi==1.35.0
|
||||
gitdb2==2.0.3
|
||||
GitPython==2.1.8
|
||||
|
@ -27,9 +27,6 @@ terminating it.
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
# If py2, concurrent.futures comes from the futures library otherwise it
|
||||
# comes from the py3 standard library.
|
||||
from concurrent import futures
|
||||
import contextlib
|
||||
import functools
|
||||
import inspect
|
||||
@ -43,6 +40,7 @@ import eventlet.event
|
||||
from eventlet import greenthread
|
||||
import eventlet.semaphore
|
||||
import eventlet.timeout
|
||||
import futurist
|
||||
from keystoneauth1 import exceptions as keystone_exception
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
@ -523,13 +521,10 @@ class ComputeManager(manager.Manager):
|
||||
else:
|
||||
self._build_semaphore = compute_utils.UnlimitedSemaphore()
|
||||
if max(CONF.max_concurrent_live_migrations, 0) != 0:
|
||||
self._live_migration_executor = futures.ThreadPoolExecutor(
|
||||
self._live_migration_executor = futurist.GreenThreadPoolExecutor(
|
||||
max_workers=CONF.max_concurrent_live_migrations)
|
||||
else:
|
||||
# Starting in python 3.5, this is technically bounded, but it's
|
||||
# ncpu * 5 which is probably much higher than anyone would sanely
|
||||
# use for concurrently running live migrations.
|
||||
self._live_migration_executor = futures.ThreadPoolExecutor()
|
||||
self._live_migration_executor = futurist.GreenThreadPoolExecutor()
|
||||
# This is a dict, keyed by instance uuid, to a two-item tuple of
|
||||
# migration object and Future for the queued live migration.
|
||||
self._waiting_live_migrations = {}
|
||||
@ -6357,8 +6352,9 @@ class ComputeManager(manager.Manager):
|
||||
block_migration, migration, migrate_data)
|
||||
self._waiting_live_migrations[instance.uuid] = (migration, future)
|
||||
except RuntimeError:
|
||||
# ThreadPoolExecutor.submit will raise RuntimeError if the pool
|
||||
# is shutdown, which happens in _cleanup_live_migrations_in_pool.
|
||||
# GreenThreadPoolExecutor.submit will raise RuntimeError if the
|
||||
# pool is shutdown, which happens in
|
||||
# _cleanup_live_migrations_in_pool.
|
||||
LOG.info('Migration %s failed to submit as the compute service '
|
||||
'is shutting down.', migration.uuid, instance=instance)
|
||||
self._set_migration_status(migration, 'error')
|
||||
|
@ -1065,7 +1065,7 @@ class SpawnIsSynchronousFixture(fixtures.Fixture):
|
||||
|
||||
|
||||
class SynchronousThreadPoolExecutorFixture(fixtures.Fixture):
|
||||
"""Make ThreadPoolExecutor.submit() synchronous.
|
||||
"""Make GreenThreadPoolExecutor.submit() synchronous.
|
||||
|
||||
The function passed to submit() will be executed and a mock.Mock
|
||||
object will be returned as the Future where Future.result() will
|
||||
@ -1076,11 +1076,11 @@ class SynchronousThreadPoolExecutorFixture(fixtures.Fixture):
|
||||
|
||||
def fake_submit(_self, fn, *args, **kwargs):
|
||||
result = fn(*args, **kwargs)
|
||||
future = mock.Mock(spec='concurrent.futures.Future')
|
||||
future = mock.Mock(spec='futurist.Future')
|
||||
future.return_value.result.return_value = result
|
||||
return future
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'concurrent.futures.ThreadPoolExecutor.submit',
|
||||
'futurist.GreenThreadPoolExecutor.submit',
|
||||
fake_submit))
|
||||
|
||||
|
||||
|
@ -13,9 +13,9 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from concurrent import futures
|
||||
import datetime
|
||||
|
||||
import futurist
|
||||
import mock
|
||||
|
||||
from nova.conductor import manager as conductor_manager
|
||||
@ -241,7 +241,7 @@ class ServerMigrationsSampleJsonTestV2_65(ServerMigrationsSampleJsonTestV2_24):
|
||||
self._do_post('servers/%s/action' % self.uuid, 'live-migrate-server',
|
||||
{'hostname': self.compute.host})
|
||||
self.compute._waiting_live_migrations[self.uuid] = (
|
||||
self.migration, futures.Future())
|
||||
self.migration, futurist.Future())
|
||||
uri = 'servers/%s/migrations/%s' % (self.uuid, self.migration.id)
|
||||
response = self._do_delete(uri)
|
||||
self.assertEqual(202, response.status_code)
|
||||
|
@ -7214,7 +7214,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
|
||||
self.flags(max_concurrent_live_migrations=0)
|
||||
self._test_max_concurrent_live()
|
||||
|
||||
@mock.patch('concurrent.futures.ThreadPoolExecutor')
|
||||
@mock.patch('futurist.GreenThreadPoolExecutor')
|
||||
def test_max_concurrent_live_semaphore_limited(self, mock_executor):
|
||||
self.flags(max_concurrent_live_migrations=123)
|
||||
manager.ComputeManager()
|
||||
@ -7224,7 +7224,7 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase):
|
||||
def test_max_concurrent_live_semaphore_unlimited(self, max_concurrent):
|
||||
self.flags(max_concurrent_live_migrations=max_concurrent)
|
||||
with mock.patch(
|
||||
'concurrent.futures.ThreadPoolExecutor') as mock_executor:
|
||||
'futurist.GreenThreadPoolExecutor') as mock_executor:
|
||||
manager.ComputeManager()
|
||||
mock_executor.assert_called_once_with()
|
||||
|
||||
|
@ -69,4 +69,4 @@ os-service-types>=1.2.0 # Apache-2.0
|
||||
taskflow>=2.16.0 # Apache-2.0
|
||||
python-dateutil>=2.5.3 # BSD
|
||||
zVMCloudConnector>=1.1.1;sys_platform!='win32' # Apache 2.0 License
|
||||
futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # PSF
|
||||
futurist>=1.8.0 # Apache-2.0
|
||||
|
Loading…
Reference in New Issue
Block a user