Merge "Update task state in context manager"

This commit is contained in:
Zuul 2019-04-26 08:27:36 +00:00 committed by Gerrit Code Review
commit 4b8311aa16
4 changed files with 200 additions and 184 deletions

View File

@ -30,14 +30,16 @@ TASK_STATES = (
CONTAINER_STARTING, CONTAINER_DELETING,
CONTAINER_STOPPING, CONTAINER_REBOOTING, CONTAINER_PAUSING,
CONTAINER_UNPAUSING, CONTAINER_KILLING, SG_ADDING,
SG_REMOVING, NETWORK_ATTACHING, NETWORK_DETACHING
SG_REMOVING, NETWORK_ATTACHING, NETWORK_DETACHING,
CONTAINER_REBUILDING,
) = (
'image_pulling', 'container_creating',
'container_starting', 'container_deleting',
'container_stopping', 'container_rebooting', 'container_pausing',
'container_unpausing', 'container_killing', 'sg_adding',
'sg_removing', 'network_attaching', 'network_detaching'
)
'sg_removing', 'network_attaching', 'network_detaching',
'container_rebuilding',
)
RESOURCE_CLASSES = (
VCPU, MEMORY_MB, DISK_GB, PCI_DEVICE, SRIOV_NET_VF,

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import itertools
import math
@ -99,6 +100,7 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug("Container %s in transitional state %s at start-up "
"retrying delete request",
container.uuid, container.task_state)
container.task_state = None
self.container_delete(context, container, force=True)
return
@ -106,6 +108,7 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug("Container %s in transitional state %s at start-up "
"retrying reboot request",
container.uuid, container.task_state)
container.task_state = None
self.container_reboot(context, container,
CONF.docker.default_timeout)
return
@ -114,6 +117,7 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug("Container %s in transitional state %s at start-up "
"retrying stop request",
container.uuid, container.task_state)
container.task_state = None
self.container_stop(context, container,
CONF.docker.default_timeout)
return
@ -122,18 +126,22 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug("Container %s in transitional state %s at start-up "
"retrying start request",
container.uuid, container.task_state)
container.task_state = None
self.container_start(context, container)
return
if container.task_state == consts.CONTAINER_PAUSING:
container.task_state = None
self.container_pause(context, container)
return
if container.task_state == consts.CONTAINER_UNPAUSING:
container.task_state = None
self.container_unpause(context, container)
return
if container.task_state == consts.CONTAINER_KILLING:
container.task_state = None
self.container_kill(context, container)
return
@ -146,7 +154,6 @@ class Manager(periodic_task.PeriodicTasks):
container.status = consts.ERROR
container.status_reason = error
container.task_state = None
if unset_host:
container.host = None
container.save(context)
@ -255,73 +262,88 @@ class Manager(periodic_task.PeriodicTasks):
utils.spawn_n(do_container_create)
@contextlib.contextmanager
def _update_task_state(self, context, container, task_state):
if container.task_state != task_state:
container.task_state = task_state
if container.task_state is not None:
LOG.debug('Skip updating container task state to %(task_state)s '
'because its current task state is: '
'%(current_task_state)s',
{'task_state': task_state,
'current_task_state': container.task_state})
yield
return
container.task_state = task_state
container.save(context)
try:
yield
finally:
container.task_state = None
container.save(context)
def _do_container_create_base(self, context, container, requested_networks,
requested_volumes,
limits=None):
self._update_task_state(context, container, consts.IMAGE_PULLING)
image_driver_name = container.image_driver
repo, tag = utils.parse_image_name(container.image, image_driver_name,
registry=container.registry)
image_pull_policy = utils.get_image_pull_policy(
container.image_pull_policy, tag)
try:
image, image_loaded = self.driver.pull_image(
context, repo, tag, image_pull_policy, image_driver_name,
registry=container.registry)
image['repo'], image['tag'] = repo, tag
if not image_loaded:
self.driver.load_image(image['path'])
except exception.ImageNotFound as e:
with excutils.save_and_reraise_exception():
LOG.error(six.text_type(e))
self._fail_container(context, container, six.text_type(e))
except exception.DockerError as e:
with excutils.save_and_reraise_exception():
LOG.error("Error occurred while calling Docker image API: %s",
six.text_type(e))
self._fail_container(context, container, six.text_type(e))
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.exception("Unexpected exception: %s",
six.text_type(e))
self._fail_container(context, container, six.text_type(e))
with self._update_task_state(context, container,
consts.CONTAINER_CREATING):
image_driver_name = container.image_driver
repo, tag = utils.parse_image_name(container.image,
image_driver_name,
registry=container.registry)
image_pull_policy = utils.get_image_pull_policy(
container.image_pull_policy, tag)
try:
image, image_loaded = self.driver.pull_image(
context, repo, tag, image_pull_policy, image_driver_name,
registry=container.registry)
image['repo'], image['tag'] = repo, tag
if not image_loaded:
self.driver.load_image(image['path'])
except exception.ImageNotFound as e:
with excutils.save_and_reraise_exception():
LOG.error(six.text_type(e))
self._fail_container(context, container, six.text_type(e))
except exception.DockerError as e:
with excutils.save_and_reraise_exception():
LOG.error("Error occurred while calling Docker image "
"API: %s", six.text_type(e))
self._fail_container(context, container, six.text_type(e))
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.exception("Unexpected exception: %s",
six.text_type(e))
self._fail_container(context, container, six.text_type(e))
container.task_state = consts.CONTAINER_CREATING
container.image_driver = image.get('driver')
container.save(context)
try:
if image['driver'] == 'glance':
self.driver.read_tar_image(image)
if image['tag'] != tag:
LOG.warning("The input tag is different from the tag in tar")
if isinstance(container, objects.Capsule):
container = self.driver.create_capsule(context, container,
image,
requested_networks,
requested_volumes)
elif isinstance(container, objects.Container):
container = self.driver.create(context, container, image,
requested_networks,
requested_volumes)
self._update_task_state(context, container, None)
return container
except exception.DockerError as e:
with excutils.save_and_reraise_exception():
LOG.error("Error occurred while calling Docker create API: %s",
six.text_type(e))
self._fail_container(context, container, six.text_type(e),
unset_host=True)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.exception("Unexpected exception: %s",
six.text_type(e))
self._fail_container(context, container, six.text_type(e),
unset_host=True)
container.image_driver = image.get('driver')
container.save(context)
try:
if image['driver'] == 'glance':
self.driver.read_tar_image(image)
if image['tag'] != tag:
LOG.warning("The input tag is different from the tag in "
"tar")
if isinstance(container, objects.Capsule):
container = self.driver.create_capsule(context, container,
image,
requested_networks,
requested_volumes)
elif isinstance(container, objects.Container):
container = self.driver.create(context, container, image,
requested_networks,
requested_volumes)
return container
except exception.DockerError as e:
with excutils.save_and_reraise_exception():
LOG.error("Error occurred while calling Docker create "
"API: %s", six.text_type(e))
self._fail_container(context, container, six.text_type(e),
unset_host=True)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.exception("Unexpected exception: %s",
six.text_type(e))
self._fail_container(context, container, six.text_type(e),
unset_host=True)
@wrap_container_event(prefix='compute')
def _do_container_create(self, context, container, requested_networks,
@ -447,23 +469,23 @@ class Manager(periodic_task.PeriodicTasks):
@wrap_container_event(prefix='compute')
def _do_container_start(self, context, container):
LOG.debug('Starting container: %s', container.uuid)
self._update_task_state(context, container, consts.CONTAINER_STARTING)
try:
container = self.driver.start(context, container)
container.task_state = None
container.started_at = timeutils.utcnow()
container.save(context)
return container
except exception.DockerError as e:
with excutils.save_and_reraise_exception():
LOG.error("Error occurred while calling Docker start API: %s",
six.text_type(e))
self._fail_container(context, container, six.text_type(e))
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.exception("Unexpected exception: %s",
six.text_type(e))
self._fail_container(context, container, six.text_type(e))
with self._update_task_state(context, container,
consts.CONTAINER_STARTING):
try:
container = self.driver.start(context, container)
container.started_at = timeutils.utcnow()
container.save(context)
return container
except exception.DockerError as e:
with excutils.save_and_reraise_exception():
LOG.error("Error occurred while calling Docker start "
"API: %s", six.text_type(e))
self._fail_container(context, container, six.text_type(e))
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.exception("Unexpected exception: %s",
six.text_type(e))
self._fail_container(context, container, six.text_type(e))
@translate_exception
def container_delete(self, context, container, force=False):
@ -475,26 +497,26 @@ class Manager(periodic_task.PeriodicTasks):
def _do_container_delete(self, context, container, force):
LOG.debug('Deleting container: %s', container.uuid)
self._update_task_state(context, container, consts.CONTAINER_DELETING)
reraise = not force
try:
if isinstance(container, objects.Capsule):
self.driver.delete_capsule(context, container, force)
elif isinstance(container, objects.Container):
self.driver.delete(context, container, force)
except exception.DockerError as e:
with excutils.save_and_reraise_exception(reraise=reraise):
LOG.error("Error occurred while calling Docker "
"delete API: %s", six.text_type(e))
self._fail_container(context, container, six.text_type(e))
except Exception as e:
with excutils.save_and_reraise_exception(reraise=reraise):
LOG.exception("Unexpected exception: %s", six.text_type(e))
self._fail_container(context, container, six.text_type(e))
with self._update_task_state(context, container,
consts.CONTAINER_DELETING):
reraise = not force
try:
if isinstance(container, objects.Capsule):
self.driver.delete_capsule(context, container, force)
elif isinstance(container, objects.Container):
self.driver.delete(context, container, force)
except exception.DockerError as e:
with excutils.save_and_reraise_exception(reraise=reraise):
LOG.error("Error occurred while calling Docker "
"delete API: %s", six.text_type(e))
self._fail_container(context, container, six.text_type(e))
except Exception as e:
with excutils.save_and_reraise_exception(reraise=reraise):
LOG.exception("Unexpected exception: %s", six.text_type(e))
self._fail_container(context, container, six.text_type(e))
self._detach_volumes(context, container, reraise=reraise)
self._detach_volumes(context, container, reraise=reraise)
self._update_task_state(context, container, None)
container.destroy(context)
self._get_resource_tracker()
@ -513,11 +535,10 @@ class Manager(periodic_task.PeriodicTasks):
@wrap_container_event(prefix='compute')
def _add_security_group(self, context, container, security_group):
LOG.debug('Adding security_group to container: %s', container.uuid)
self._update_task_state(context, container, consts.SG_ADDING)
self.driver.add_security_group(context, container, security_group)
self._update_task_state(context, container, None)
container.security_groups += [security_group]
container.save(context)
with self._update_task_state(context, container, consts.SG_ADDING):
self.driver.add_security_group(context, container, security_group)
container.security_groups += [security_group]
container.save(context)
def remove_security_group(self, context, container, security_group):
@utils.synchronized(container.uuid)
@ -530,13 +551,12 @@ class Manager(periodic_task.PeriodicTasks):
@wrap_container_event(prefix='compute')
def _remove_security_group(self, context, container, security_group):
LOG.debug('Removing security_group from container: %s', container.uuid)
self._update_task_state(context, container, consts.SG_REMOVING)
self.driver.remove_security_group(context, container,
security_group)
container.task_state = None
container.security_groups = list(set(container.security_groups)
- set([security_group]))
container.save(context)
with self._update_task_state(context, container, consts.SG_REMOVING):
self.driver.remove_security_group(context, container,
security_group)
container.security_groups = list(set(container.security_groups)
- set([security_group]))
container.save(context)
@translate_exception
def container_show(self, context, container):
@ -558,10 +578,10 @@ class Manager(periodic_task.PeriodicTasks):
@wrap_container_event(prefix='compute')
def _do_container_reboot(self, context, container, timeout):
LOG.debug('Rebooting container: %s', container.uuid)
self._update_task_state(context, container, consts.CONTAINER_REBOOTING)
container = self.driver.reboot(context, container, timeout)
self._update_task_state(context, container, None)
return container
with self._update_task_state(context, container,
consts.CONTAINER_REBOOTING):
container = self.driver.reboot(context, container, timeout)
return container
def container_reboot(self, context, container, timeout):
@utils.synchronized(container.uuid)
@ -574,10 +594,10 @@ class Manager(periodic_task.PeriodicTasks):
@wrap_container_event(prefix='compute')
def _do_container_stop(self, context, container, timeout):
LOG.debug('Stopping container: %s', container.uuid)
self._update_task_state(context, container, consts.CONTAINER_STOPPING)
container = self.driver.stop(context, container, timeout)
self._update_task_state(context, container, None)
return container
with self._update_task_state(context, container,
consts.CONTAINER_STOPPING):
container = self.driver.stop(context, container, timeout)
return container
def container_stop(self, context, container, timeout):
@utils.synchronized(container.uuid)
@ -601,49 +621,48 @@ class Manager(periodic_task.PeriodicTasks):
@wrap_container_event(prefix='compute')
def _do_container_rebuild(self, context, container, run):
LOG.info("start to rebuild container: %s", container.uuid)
vol_info = {container.uuid: self._get_vol_info(context, container)}
try:
network_info = self._get_network_info(context, container)
except Exception as e:
with excutils.save_and_reraise_exception():
self._fail_container(context, container, six.text_type(e))
if self.driver.check_container_exist(container):
for addr in container.addresses.values():
for port in addr:
port['preserve_on_delete'] = True
with self._update_task_state(context, container,
consts.CONTAINER_REBUILDING):
vol_info = {container.uuid: self._get_vol_info(context, container)}
try:
self._update_task_state(context, container,
consts.CONTAINER_DELETING)
if isinstance(container, objects.Capsule):
self.driver.delete_capsule(context, container)
elif isinstance(container, objects.Container):
self.driver.delete(context, container, True)
network_info = self._get_network_info(context, container)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error("Rebuild container: %s failed, "
"reason of failure is: %s",
container.uuid,
six.text_type(e))
self._fail_container(context, container, six.text_type(e))
if self.driver.check_container_exist(container):
for addr in container.addresses.values():
for port in addr:
port['preserve_on_delete'] = True
try:
if isinstance(container, objects.Capsule):
self.driver.delete_capsule(context, container)
elif isinstance(container, objects.Container):
self.driver.delete(context, container, True)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error("Rebuild container: %s failed, "
"reason of failure is: %s",
container.uuid,
six.text_type(e))
self._fail_container(context, container,
six.text_type(e))
try:
created_container = self._do_container_create_base(
context, container, network_info, vol_info)
created_container.status = consts.CREATED
created_container.status_reason = None
created_container.save(context)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error("Rebuild container:%s failed, "
"reason of failure is: %s", container.uuid, e)
self._fail_container(context, container, six.text_type(e))
try:
self._update_task_state(context, container,
consts.CONTAINER_CREATING)
created_container = self._do_container_create_base(
context, container, network_info, vol_info)
created_container.status = consts.CREATED
created_container.status_reason = None
created_container.save(context)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error("Rebuild container:%s failed, "
"reason of failure is: %s", container.uuid, e)
self._fail_container(context, container, six.text_type(e))
LOG.info("rebuild container: %s success", created_container.uuid)
if run:
self._do_container_start(context, created_container)
LOG.info("rebuild container: %s success", created_container.uuid)
if run:
self._do_container_start(context, created_container)
def _get_vol_info(self, context, container):
return objects.VolumeMapping.list_by_container(context,
@ -691,10 +710,10 @@ class Manager(periodic_task.PeriodicTasks):
@wrap_container_event(prefix='compute')
def _do_container_pause(self, context, container):
LOG.debug('Pausing container: %s', container.uuid)
self._update_task_state(context, container, consts.CONTAINER_PAUSING)
container = self.driver.pause(context, container)
self._update_task_state(context, container, None)
return container
with self._update_task_state(context, container,
consts.CONTAINER_PAUSING):
container = self.driver.pause(context, container)
return container
def container_pause(self, context, container):
@utils.synchronized(container.uuid)
@ -707,10 +726,10 @@ class Manager(periodic_task.PeriodicTasks):
@wrap_container_event(prefix='compute')
def _do_container_unpause(self, context, container):
LOG.debug('Unpausing container: %s', container.uuid)
self._update_task_state(context, container, consts.CONTAINER_UNPAUSING)
container = self.driver.unpause(context, container)
self._update_task_state(context, container, None)
return container
with self._update_task_state(context, container,
consts.CONTAINER_UNPAUSING):
container = self.driver.unpause(context, container)
return container
def container_unpause(self, context, container):
@utils.synchronized(container.uuid)
@ -784,10 +803,10 @@ class Manager(periodic_task.PeriodicTasks):
@wrap_container_event(prefix='compute')
def _do_container_kill(self, context, container, signal):
LOG.debug('Killing a container: %s', container.uuid)
self._update_task_state(context, container, consts.CONTAINER_KILLING)
container = self.driver.kill(context, container, signal)
self._update_task_state(context, container, None)
return container
with self._update_task_state(context, container,
consts.CONTAINER_KILLING):
container = self.driver.kill(context, container, signal)
return container
def container_kill(self, context, container, signal):
@utils.synchronized(container.uuid)
@ -1143,12 +1162,9 @@ class Manager(periodic_task.PeriodicTasks):
def _do_network_detach(self, context, container, network):
LOG.debug('Detach network: %(network)s from container: %(container)s.',
{'container': container, 'network': network})
self._update_task_state(context, container,
consts.NETWORK_DETACHING)
try:
with self._update_task_state(context, container,
consts.NETWORK_DETACHING):
self.driver.network_detach(context, container, network)
finally:
self._update_task_state(context, container, None)
def network_attach(self, context, container, requested_network):
@utils.synchronized(container.uuid)
@ -1162,12 +1178,9 @@ class Manager(periodic_task.PeriodicTasks):
def _do_network_attach(self, context, container, requested_network):
LOG.debug('Attach network: %(network)s to container: %(container)s.',
{'container': container, 'network': requested_network})
self._update_task_state(context, container,
consts.NETWORK_ATTACHING)
try:
with self._update_task_state(context, container,
consts.NETWORK_ATTACHING):
self.driver.network_attach(context, container, requested_network)
finally:
self._update_task_state(context, container, None)
def network_create(self, context, neutron_net_id):
LOG.debug('Create network')

View File

@ -811,12 +811,13 @@ class TestManager(base.TestCase):
@mock.patch.object(ContainerActionEvent, 'event_start')
@mock.patch.object(ContainerActionEvent, 'event_finish')
@mock.patch.object(Container, 'save')
@mock.patch('zun.compute.manager.Manager._get_vol_info')
@mock.patch('zun.compute.manager.Manager._get_network_info')
@mock.patch.object(manager.Manager, '_fail_container')
def test_container_rebuild_failed(
self, mock_fail, mock_get_network_info, mock_get_vol_info,
mock_event_finish, mock_event_start):
mock_save, mock_event_finish, mock_event_start):
mock_get_vol_info.return_value = []
fake_exc = exception.PortNotFound(port='fake-port')
mock_get_network_info.side_effect = fake_exc

View File

@ -344,10 +344,10 @@ class TestObject(test_base.TestCase, _TestObject):
# For more information on object version testing, read
# https://docs.openstack.org/zun/latest/
object_data = {
'Capsule': '1.0-f2adba2460c55f8bb6f8cdb548cf691d',
'CapsuleContainer': '1.0-64f11ebe03a0f7d5e74e4876b1f5a98f',
'CapsuleInitContainer': '1.0-64f11ebe03a0f7d5e74e4876b1f5a98f',
'Container': '1.39-dd75db8ef7f620f9781de1f484b9e5d0',
'Capsule': '1.0-fb200a8860db6d6fd9b2cf4b9efbf3a3',
'CapsuleContainer': '1.0-c7f84d8e07d854ddfc20140f2e6dd703',
'CapsuleInitContainer': '1.0-c7f84d8e07d854ddfc20140f2e6dd703',
'Container': '1.39-1ac92cdfe46067b990ed147038229a81',
'Cpuset': '1.0-06c4e6335683c18b87e2e54080f8c341',
'Volume': '1.0-034768f2f5c5e89acb5ee45c6d3f3403',
'VolumeMapping': '1.5-57febc66526185a75a744637e7a387c7',