Add cache_images() to conductor
This adds the bulk of the image pre-caching logic to the conductor task manager. It takes an aggregate and list of image ids from the API service and handles the process of calling to the relevant compute nodes to initiate the image downloads, honoring the (new) config knob for overall task parallelism. Related to blueprint image-precache-support Change-Id: Id7c0ab7ae0586d49d88ff2afae149e25e59a3489
This commit is contained in:
parent
ac165112b7
commit
11d909c2cb
@ -20,6 +20,7 @@ import oslo_messaging as messaging
|
||||
from nova import baserpc
|
||||
from nova.conductor import rpcapi
|
||||
import nova.conf
|
||||
from nova import image
|
||||
|
||||
CONF = nova.conf.CONF
|
||||
|
||||
@ -83,6 +84,7 @@ class ComputeTaskAPI(object):
|
||||
|
||||
def __init__(self):
|
||||
self.conductor_compute_rpcapi = rpcapi.ComputeTaskAPI()
|
||||
self.image_api = image.API()
|
||||
|
||||
# TODO(stephenfin): Remove the 'reservations' parameter since we don't use
|
||||
# reservations anymore
|
||||
@ -155,3 +157,20 @@ class ComputeTaskAPI(object):
|
||||
preserve_ephemeral=preserve_ephemeral,
|
||||
host=host,
|
||||
request_spec=request_spec)
|
||||
|
||||
def cache_images(self, context, aggregate, image_ids):
|
||||
"""Request images be pre-cached on hosts within an aggregate.
|
||||
|
||||
:param context: The RequestContext
|
||||
:param aggregate: The objects.Aggregate representing the hosts to
|
||||
contact
|
||||
:param image_ids: A list of image ID strings to send to the hosts
|
||||
"""
|
||||
for image_id in image_ids:
|
||||
# Validate that we can get the image by id before we go
|
||||
# ask a bunch of hosts to do the same. We let this bubble
|
||||
# up to the API, which catches NovaException for the 4xx and
|
||||
# otherwise 500s if this fails in some unexpected way.
|
||||
self.image_api.get(context, image_id)
|
||||
self.conductor_compute_rpcapi.cache_images(context, aggregate,
|
||||
image_ids)
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
import contextlib
|
||||
import copy
|
||||
import eventlet
|
||||
import functools
|
||||
import sys
|
||||
|
||||
@ -230,7 +231,7 @@ class ComputeTaskManager(base.Base):
|
||||
may involve coordinating activities on multiple compute nodes.
|
||||
"""
|
||||
|
||||
target = messaging.Target(namespace='compute_task', version='1.20')
|
||||
target = messaging.Target(namespace='compute_task', version='1.21')
|
||||
|
||||
def __init__(self):
|
||||
super(ComputeTaskManager, self).__init__()
|
||||
@ -1629,3 +1630,70 @@ class ComputeTaskManager(base.Base):
|
||||
pass
|
||||
return False
|
||||
return True
|
||||
|
||||
def cache_images(self, context, aggregate, image_ids):
|
||||
"""Cache a set of images on the set of hosts in an aggregate.
|
||||
|
||||
:param context: The RequestContext
|
||||
:param aggregate: The Aggregate object from the request to constrain
|
||||
the host list
|
||||
:param image_id: The IDs of the image to cache
|
||||
"""
|
||||
|
||||
# TODO(danms): Fix notification sample for IMAGE_CACHE action
|
||||
compute_utils.notify_about_aggregate_action(
|
||||
context, aggregate,
|
||||
fields.NotificationAction.IMAGE_CACHE,
|
||||
fields.NotificationPhase.START)
|
||||
|
||||
clock = timeutils.StopWatch()
|
||||
threads = CONF.image_cache.precache_concurrency
|
||||
fetch_pool = eventlet.GreenPool(size=threads)
|
||||
|
||||
hosts_by_cell = {}
|
||||
cells_by_uuid = {}
|
||||
# TODO(danms): Make this a much more efficient bulk query
|
||||
for hostname in aggregate.hosts:
|
||||
hmap = objects.HostMapping.get_by_host(context, hostname)
|
||||
cells_by_uuid.setdefault(hmap.cell_mapping.uuid, hmap.cell_mapping)
|
||||
hosts_by_cell.setdefault(hmap.cell_mapping.uuid, [])
|
||||
hosts_by_cell[hmap.cell_mapping.uuid].append(hostname)
|
||||
|
||||
LOG.info('Preparing to request pre-caching of image(s) %(image_ids)s '
|
||||
'on %(hosts)i hosts across %(cells)s cells.',
|
||||
{'image_ids': ','.join(image_ids),
|
||||
'hosts': len(aggregate.hosts),
|
||||
'cells': len(hosts_by_cell)})
|
||||
clock.start()
|
||||
|
||||
for cell_uuid, hosts in hosts_by_cell.items():
|
||||
cell = cells_by_uuid[cell_uuid]
|
||||
with nova_context.target_cell(context, cell) as target_ctxt:
|
||||
for host in hosts:
|
||||
service = objects.Service.get_by_compute_host(target_ctxt,
|
||||
host)
|
||||
if not self.servicegroup_api.service_is_up(service):
|
||||
LOG.info(
|
||||
'Skipping image pre-cache request to compute '
|
||||
'%(host)r because it is not up',
|
||||
{'host': host})
|
||||
continue
|
||||
|
||||
fetch_pool.spawn_n(self.compute_rpcapi.cache_images,
|
||||
target_ctxt,
|
||||
host=host,
|
||||
image_ids=image_ids)
|
||||
|
||||
# Wait until all those things finish
|
||||
fetch_pool.waitall()
|
||||
|
||||
clock.stop()
|
||||
LOG.info('Image pre-cache operation for image(s) %(image_ids)s '
|
||||
'completed in %(time).2f seconds',
|
||||
{'image_ids': ','.join(image_ids),
|
||||
'time': clock.elapsed()})
|
||||
|
||||
compute_utils.notify_about_aggregate_action(
|
||||
context, aggregate,
|
||||
fields.NotificationAction.IMAGE_CACHE,
|
||||
fields.NotificationPhase.END)
|
||||
|
@ -20,6 +20,7 @@ from oslo_serialization import jsonutils
|
||||
from oslo_versionedobjects import base as ovo_base
|
||||
|
||||
import nova.conf
|
||||
from nova import exception
|
||||
from nova.objects import base as objects_base
|
||||
from nova import profiler
|
||||
from nova import rpc
|
||||
@ -281,6 +282,7 @@ class ComputeTaskAPI(object):
|
||||
instance.
|
||||
1.20 - migrate_server() now gets a 'host_list' parameter that represents
|
||||
potential alternate hosts for retries within a cell.
|
||||
1.21 - Added cache_images()
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
@ -436,3 +438,12 @@ class ComputeTaskAPI(object):
|
||||
del kw['request_spec']
|
||||
cctxt = self.client.prepare(version=version)
|
||||
cctxt.cast(ctxt, 'rebuild_instance', **kw)
|
||||
|
||||
def cache_images(self, ctxt, aggregate, image_ids):
|
||||
version = '1.21'
|
||||
if not self.client.can_send_version(version):
|
||||
raise exception.NovaException('Conductor RPC version pin does not '
|
||||
'allow cache_images() to be called')
|
||||
cctxt = self.client.prepare(version=version)
|
||||
cctxt.cast(ctxt, 'cache_images', aggregate=aggregate,
|
||||
image_ids=image_ids)
|
||||
|
@ -35,6 +35,7 @@ from nova.conf import ephemeral_storage
|
||||
from nova.conf import glance
|
||||
from nova.conf import guestfs
|
||||
from nova.conf import hyperv
|
||||
from nova.conf import imagecache
|
||||
from nova.conf import ironic
|
||||
from nova.conf import key_manager
|
||||
from nova.conf import keystone
|
||||
@ -88,6 +89,7 @@ glance.register_opts(CONF)
|
||||
guestfs.register_opts(CONF)
|
||||
hyperv.register_opts(CONF)
|
||||
mks.register_opts(CONF)
|
||||
imagecache.register_opts(CONF)
|
||||
ironic.register_opts(CONF)
|
||||
key_manager.register_opts(CONF)
|
||||
keystone.register_opts(CONF)
|
||||
|
47
nova/conf/imagecache.py
Normal file
47
nova/conf/imagecache.py
Normal file
@ -0,0 +1,47 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
imagecache_group = cfg.OptGroup(
|
||||
'image_cache',
|
||||
title='Image Cache Options',
|
||||
help="""
|
||||
A collection of options specific to image caching.
|
||||
""")
|
||||
imagecache_opts = [
|
||||
cfg.IntOpt('precache_concurrency',
|
||||
default=1,
|
||||
min=1,
|
||||
help="""
|
||||
Maximum number of compute hosts to trigger image precaching in parallel.
|
||||
|
||||
When an image precache request is made, compute nodes will be contacted
|
||||
to initiate the download. This number constrains the number of those that
|
||||
will happen in parallel. Higher numbers will cause more computes to work
|
||||
in parallel and may result in reduced time to complete the operation, but
|
||||
may also DDoS the image service. Lower numbers will result in more sequential
|
||||
operation, lower image service load, but likely longer runtime to completion.
|
||||
"""),
|
||||
]
|
||||
|
||||
|
||||
ALL_OPTS = (imagecache_opts,)
|
||||
|
||||
|
||||
def register_opts(conf):
|
||||
conf.register_group(imagecache_group)
|
||||
conf.register_opts(imagecache_opts, group=imagecache_group)
|
||||
|
||||
|
||||
def list_opts():
|
||||
return {imagecache_group: imagecache_opts}
|
@ -71,7 +71,8 @@ class EventType(NotificationObject):
|
||||
# NotificationActionField enum
|
||||
# Version 1.19: SELECT_DESTINATIONS is added to the NotificationActionField
|
||||
# enum
|
||||
VERSION = '1.19'
|
||||
# Version 1.20: IMAGE_CACHE is added to the NotificationActionField enum
|
||||
VERSION = '1.20'
|
||||
|
||||
fields = {
|
||||
'object': fields.StringField(nullable=False),
|
||||
|
@ -864,6 +864,7 @@ class NotificationAction(BaseNovaEnum):
|
||||
BUILD_INSTANCES = 'build_instances'
|
||||
MIGRATE_SERVER = 'migrate_server'
|
||||
REBUILD_SERVER = 'rebuild_server'
|
||||
IMAGE_CACHE = 'cache_images'
|
||||
|
||||
ALL = (UPDATE, EXCEPTION, DELETE, PAUSE, UNPAUSE, RESIZE, VOLUME_SWAP,
|
||||
SUSPEND, POWER_ON, REBOOT, SHUTDOWN, SNAPSHOT, INTERFACE_ATTACH,
|
||||
@ -877,7 +878,7 @@ class NotificationAction(BaseNovaEnum):
|
||||
REMOVE_HOST, ADD_MEMBER, UPDATE_METADATA, LOCK, UNLOCK,
|
||||
REBUILD_SCHEDULED, UPDATE_PROP, LIVE_MIGRATION_FORCE_COMPLETE,
|
||||
CONNECT, USAGE, BUILD_INSTANCES, MIGRATE_SERVER, REBUILD_SERVER,
|
||||
SELECT_DESTINATIONS)
|
||||
SELECT_DESTINATIONS, IMAGE_CACHE)
|
||||
|
||||
|
||||
# TODO(rlrossit): These should be changed over to be a StateMachine enum from
|
||||
|
76
nova/tests/functional/compute/test_cache_image.py
Normal file
76
nova/tests/functional/compute/test_cache_image.py
Normal file
@ -0,0 +1,76 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_utils.fixture import uuidsentinel as uuids
|
||||
|
||||
from nova import context
|
||||
from nova import objects
|
||||
from nova import test
|
||||
from nova.tests.unit import fake_notifier
|
||||
|
||||
|
||||
class ImageCacheTest(test.TestCase):
|
||||
NUMBER_OF_CELLS = 2
|
||||
|
||||
def setUp(self):
|
||||
super(ImageCacheTest, self).setUp()
|
||||
|
||||
self.flags(compute_driver='fake.FakeDriverWithCaching')
|
||||
|
||||
fake_notifier.stub_notifier(self)
|
||||
self.addCleanup(fake_notifier.reset)
|
||||
self.context = context.get_admin_context()
|
||||
|
||||
self.conductor = self.start_service('conductor')
|
||||
self.compute1 = self.start_service('compute', host='compute1')
|
||||
self.compute2 = self.start_service('compute', host='compute2')
|
||||
self.compute3 = self.start_service('compute', host='compute3',
|
||||
cell='cell2')
|
||||
self.compute4 = self.start_service('compute', host='compute4',
|
||||
cell='cell2')
|
||||
self.compute5 = self.start_service('compute', host='compute5',
|
||||
cell='cell2')
|
||||
|
||||
cell2 = self.cell_mappings['cell2']
|
||||
with context.target_cell(self.context, cell2) as cctxt:
|
||||
srv = objects.Service.get_by_compute_host(cctxt, 'compute5')
|
||||
srv.forced_down = True
|
||||
srv.save()
|
||||
|
||||
def test_cache_image(self):
|
||||
"""Test caching images by injecting the request directly to
|
||||
the conductor service and making sure it fans out and calls
|
||||
the expected nodes.
|
||||
"""
|
||||
|
||||
aggregate = objects.Aggregate(name='test',
|
||||
uuid=uuids.aggregate,
|
||||
id=1,
|
||||
hosts=['compute1', 'compute3',
|
||||
'compute4', 'compute5'])
|
||||
self.conductor.compute_task_mgr.cache_images(
|
||||
self.context, aggregate, ['an-image'])
|
||||
|
||||
# NOTE(danms): We expect only three image cache attempts because
|
||||
# compute5 is marked as forced-down and compute2 is not in the
|
||||
# requested aggregate.
|
||||
for host in ['compute1', 'compute3', 'compute4']:
|
||||
mgr = getattr(self, host)
|
||||
self.assertEqual(set(['an-image']), mgr.driver.cached_images)
|
||||
for host in ['compute2', 'compute5']:
|
||||
mgr = getattr(self, host)
|
||||
self.assertEqual(set(), mgr.driver.cached_images)
|
||||
|
||||
fake_notifier.wait_for_versioned_notifications(
|
||||
'aggregate.cache_images.start')
|
||||
fake_notifier.wait_for_versioned_notifications(
|
||||
'aggregate.cache_images.end')
|
@ -3567,6 +3567,23 @@ class ConductorTaskRPCAPITestCase(_BaseTaskTestCase,
|
||||
self.context, 'build_instances', **kw)
|
||||
_test()
|
||||
|
||||
def test_cache_images(self):
|
||||
with mock.patch.object(self.conductor, 'client') as client:
|
||||
self.conductor.cache_images(self.context, mock.sentinel.aggregate,
|
||||
[mock.sentinel.image])
|
||||
client.prepare.return_value.cast.assert_called_once_with(
|
||||
self.context, 'cache_images',
|
||||
aggregate=mock.sentinel.aggregate,
|
||||
image_ids=[mock.sentinel.image])
|
||||
client.prepare.assert_called_once_with(version='1.21')
|
||||
|
||||
with mock.patch.object(self.conductor.client, 'can_send_version') as v:
|
||||
v.return_value = False
|
||||
self.assertRaises(exc.NovaException,
|
||||
self.conductor.cache_images,
|
||||
self.context, mock.sentinel.aggregate,
|
||||
[mock.sentinel.image])
|
||||
|
||||
|
||||
class ConductorTaskAPITestCase(_BaseTaskTestCase, test_compute.BaseTestCase):
|
||||
"""Compute task API Tests."""
|
||||
@ -3591,3 +3608,56 @@ class ConductorTaskAPITestCase(_BaseTaskTestCase, test_compute.BaseTestCase):
|
||||
self.context, inst_obj, {'host': 'destination'}, True, False,
|
||||
None, 'block_migration', 'disk_over_commit', None,
|
||||
request_spec=None)
|
||||
|
||||
def test_cache_images(self):
|
||||
@mock.patch.object(self.conductor.conductor_compute_rpcapi,
|
||||
'cache_images')
|
||||
@mock.patch.object(self.conductor.image_api, 'get')
|
||||
def _test(mock_image, mock_cache):
|
||||
self.conductor.cache_images(self.context,
|
||||
mock.sentinel.aggregate,
|
||||
[mock.sentinel.image1,
|
||||
mock.sentinel.image2])
|
||||
mock_image.assert_has_calls([mock.call(self.context,
|
||||
mock.sentinel.image1),
|
||||
mock.call(self.context,
|
||||
mock.sentinel.image2)])
|
||||
mock_cache.assert_called_once_with(
|
||||
self.context, mock.sentinel.aggregate,
|
||||
[mock.sentinel.image1, mock.sentinel.image2])
|
||||
|
||||
_test()
|
||||
|
||||
def test_cache_images_fail(self):
|
||||
@mock.patch.object(self.conductor.conductor_compute_rpcapi,
|
||||
'cache_images')
|
||||
@mock.patch.object(self.conductor.image_api, 'get')
|
||||
def _test(mock_image, mock_cache):
|
||||
mock_image.side_effect = test.TestingException()
|
||||
# We should expect to see non-NovaException errors
|
||||
# raised directly so the API can 500 for them.
|
||||
self.assertRaises(test.TestingException,
|
||||
self.conductor.cache_images,
|
||||
self.context,
|
||||
mock.sentinel.aggregate,
|
||||
[mock.sentinel.image1,
|
||||
mock.sentinel.image2])
|
||||
mock_cache.assert_not_called()
|
||||
|
||||
_test()
|
||||
|
||||
def test_cache_images_missing(self):
|
||||
@mock.patch.object(self.conductor.conductor_compute_rpcapi,
|
||||
'cache_images')
|
||||
@mock.patch.object(self.conductor.image_api, 'get')
|
||||
def _test(mock_image, mock_cache):
|
||||
mock_image.side_effect = exc.ImageNotFound('foo')
|
||||
self.assertRaises(exc.ImageNotFound,
|
||||
self.conductor.cache_images,
|
||||
self.context,
|
||||
mock.sentinel.aggregate,
|
||||
[mock.sentinel.image1,
|
||||
mock.sentinel.image2])
|
||||
mock_cache.assert_not_called()
|
||||
|
||||
_test()
|
||||
|
@ -375,7 +375,7 @@ notification_object_data = {
|
||||
'ComputeTaskNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
|
||||
'ComputeTaskPayload': '1.0-e3d34762c14d131c98337b72e8c600e1',
|
||||
'DestinationPayload': '1.0-4ccf26318dd18c4377dada2b1e74ec2e',
|
||||
'EventType': '1.19-000a76e83b06a9de11d365465a755a5e',
|
||||
'EventType': '1.20-4e02a676d3a18cab99579cacd1c91453',
|
||||
'ExceptionNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
|
||||
'ExceptionPayload': '1.1-6c43008bd81885a63bc7f7c629f0793b',
|
||||
'FlavorNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
|
||||
|
@ -1013,3 +1013,16 @@ class FakeDriverWithPciResources(SmallFakeDriver):
|
||||
},
|
||||
])
|
||||
return host_status
|
||||
|
||||
|
||||
class FakeDriverWithCaching(FakeDriver):
|
||||
def __init__(self, *a, **k):
|
||||
super(FakeDriverWithCaching, self).__init__(*a, **k)
|
||||
self.cached_images = set()
|
||||
|
||||
def cache_image(self, context, image_id):
|
||||
if image_id in self.cached_images:
|
||||
return False
|
||||
else:
|
||||
self.cached_images.add(image_id)
|
||||
return True
|
||||
|
Loading…
x
Reference in New Issue
Block a user