Delete subcloud VIM strategies when DC fpga strategy deleted

As part of an fpga upgrade strategy in a subcloud by the VIM,
that strategy needs to be deleted.  Previously it was being
deleted as part of a the final steps of a successful orchestration,
but there can be scenarios where that orchestration is incomplete,
and therefore these VIM strategies need to be handled.

A new DC orchestration cannot be created until the old one is deleted,
so this change ensures fpga vim strategies are deleted if possible
when the DC orchestration strategy is deleted.

A bug in applying vim strategy has been fixed, and the section removed
since a vim auth exception cannot be raised since a new vim client
is constructed per loop.

Unit tests to invoke the delete strategy have been added.

Tox coverage now generates an html report.

Change-Id: I4f3d7bc3550e12c430d1b1c0ce3dffb298fe31b4
Closes-Bug: 1891521
Signed-off-by: albailey <Al.Bailey@windriver.com>
This commit is contained in:
albailey
2020-09-01 08:02:57 -05:00
parent f0a03ba131
commit 12b81ff452
15 changed files with 329 additions and 149 deletions

View File

@@ -23,8 +23,11 @@ import datetime
import threading
import time
from keystoneauth1 import exceptions as keystone_exceptions
from oslo_log import log as logging
from dccommon.drivers.openstack.sdk_platform import OpenStackDriver
from dccommon.drivers.openstack import vim
from dcmanager.common import consts
from dcmanager.common import context
from dcmanager.common import exceptions
@@ -101,6 +104,22 @@ class FwUpdateOrchThread(threading.Thread):
self.thread_group_manager.stop()
LOG.info("FwUpdateOrchThread Stopped")
@staticmethod
def get_ks_client(region_name=consts.DEFAULT_REGION_NAME):
"""This will get a cached keystone client (and token)"""
try:
os_client = OpenStackDriver(
region_name=region_name,
region_clients=None)
return os_client.keystone_client
except Exception:
LOG.warn('Failure initializing KeystoneClient')
raise
def get_vim_client(self, region_name=consts.DEFAULT_REGION_NAME):
ks_client = self.get_ks_client(region_name)
return vim.VimClient(region_name, ks_client.session)
@staticmethod
def get_region_name(strategy_step):
"""Get the region name for a strategy step"""
@@ -352,8 +371,14 @@ class FwUpdateOrchThread(threading.Thread):
LOG.info("Deleting fw update strategy")
# todo(abailey): determine if we should validate the strategy_steps
# before allowing the delete
strategy_steps = db_api.strategy_step_get_all(self.context)
for strategy_step in strategy_steps:
self.delete_subcloud_strategy(strategy_step)
if self.stopped():
LOG.info("Exiting because task is stopped")
return
# Remove the strategy from the database
try:
@@ -363,6 +388,50 @@ class FwUpdateOrchThread(threading.Thread):
LOG.exception(e)
raise e
# todo(abailey): refactor delete to reuse patch orch code
def delete_subcloud_strategy(self, strategy_step):
"""Delete the vim strategy in this subcloud"""
strategy_name = vim.STRATEGY_NAME_FW_UPDATE
region = self.get_region_name(strategy_step)
LOG.info("Deleting vim strategy %s for %s" % (strategy_name, region))
# First check if the strategy has been created.
try:
subcloud_strategy = self.get_vim_client(region).get_strategy(
strategy_name=strategy_name)
except (keystone_exceptions.EndpointNotFound, IndexError):
message = ("Endpoint for subcloud: %s not found." %
region)
LOG.error(message)
self.strategy_step_update(
strategy_step.subcloud_id,
state=consts.STRATEGY_STATE_FAILED,
details=message)
return
except Exception:
# Strategy doesn't exist so there is nothing to do
return
if subcloud_strategy.state in [vim.STATE_BUILDING,
vim.STATE_APPLYING,
vim.STATE_ABORTING]:
# Can't delete a strategy in these states
message = ("Strategy for %s in wrong state (%s)for delete" %
(region, subcloud_strategy.state))
LOG.warn(message)
raise Exception(message)
# If we are here, we need to delete the strategy
try:
self.get_vim_client(region).delete_strategy(
strategy_name=strategy_name)
except Exception:
message = "Strategy delete failed for %s" % region
LOG.warn(message)
raise
def process_update_step(self, region, strategy_step, log_error=False):
"""manage the green thread for calling perform_state_action"""
if region in self.subcloud_workers:

View File

@@ -80,7 +80,6 @@ class ApplyingVIMStrategyState(BaseState):
wait_count = 0
get_fail_count = 0
last_details = ""
auth_failure = False
while True:
# todo(abailey): combine the sleep and stop check into one method
# which would allow the longer 60 second sleep to be broken into
@@ -101,37 +100,21 @@ class ApplyingVIMStrategyState(BaseState):
subcloud_strategy = self.get_vim_client(region).get_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE,
raise_error_if_missing=False)
auth_failure = False
get_fail_count = 0
except Exception as e:
if e.message == vim.VIM_AUTHORIZATION_FAILED:
# Since it can take hours to apply a strategy, there is a
# chance our keystone token will expire. Attempt to get
# a new token (by re-creating the client) and re-try the
# request, but only once.
if not auth_failure:
auth_failure = True
self.log_info(strategy_step,
"Authorization failure getting strategy."
" Retrying...")
continue
else:
raise Exception("Repeated authorization failure "
"getting firmware update strategy")
else:
# When applying the strategy to a subcloud, the VIM can
# be unreachable for a significant period of time when
# there is a controller swact, or in the case of AIO-SX,
# when the controller reboots.
get_fail_count += 1
if get_fail_count >= self.max_failed_queries:
# We have waited too long.
raise Exception("Timeout during recovery of apply "
"firmware strategy.")
self.debug_log(strategy_step,
"Unable to get firmware strategy - "
"attempt %d" % get_fail_count)
continue
except Exception:
# When applying the strategy to a subcloud, the VIM can
# be unreachable for a significant period of time when
# there is a controller swact, or in the case of AIO-SX,
# when the controller reboots.
get_fail_count += 1
if get_fail_count >= self.max_failed_queries:
# We have waited too long.
raise Exception("Timeout during recovery of apply "
"firmware strategy.")
self.debug_log(strategy_step,
"Unable to get firmware strategy - "
"attempt %d" % get_fail_count)
continue
# The loop gets here if the API is able to respond
# Check if the strategy no longer exists. This should not happen.
if subcloud_strategy is None:

View File

@@ -73,26 +73,26 @@ class CreatingVIMStrategyState(BaseState):
region)
else:
self.info_log(strategy_step,
"FW VIM strategy exists with state: %s"
"FW VIM strategy already exists with state: %s"
% subcloud_strategy.state)
# if a strategy exists in any type of failed state or aborted
# state it should be deleted.
# applied state should also be deleted from previous success runs.
if subcloud_strategy.state in [vim.STATE_BUILD_FAILED,
vim.STATE_BUILD_TIMEOUT,
vim.STATE_APPLY_FAILED,
vim.STATE_APPLY_TIMEOUT,
vim.STATE_ABORTED,
vim.STATE_ABORT_FAILED,
vim.STATE_ABORT_TIMEOUT,
vim.STATE_APPLIED]:
self.info_log(strategy_step,
"Deleting existing FW VIM strategy")
self.get_vim_client(region).delete_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE)
# re-create it
subcloud_strategy = self._create_vim_strategy(strategy_step,
region)
# if a strategy exists in building/applying/aborting do not delete
# it and instead raise an exception
if subcloud_strategy.state in [vim.STATE_BUILDING,
vim.STATE_APPLYING,
vim.STATE_ABORTING]:
# Can't delete a strategy in these states
message = ("Failed to create a VIM strategy for %s. "
"There already is an existing strategy in %s state"
% (region, subcloud_strategy.state))
self.warn_log(strategy_step, message)
raise Exception(message)
# if strategy exists in any other type of state, delete and create
self.info_log(strategy_step, "Deleting existing FW VIM strategy")
self.get_vim_client(region).delete_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE)
subcloud_strategy = self._create_vim_strategy(strategy_step,
region)
# A strategy already exists, or is being built
# Loop until the strategy is done building Repeatedly query the API

View File

@@ -3,7 +3,6 @@
#
# SPDX-License-Identifier: Apache-2.0
#
from dccommon.drivers.openstack import vim
from dcmanager.common import consts
from dcmanager.orchestrator.states.base import BaseState
from dcmanager.orchestrator.states.firmware import utils
@@ -41,24 +40,11 @@ class FinishingFwUpdateState(BaseState):
Returns the next state for the state machine if successful.
"""
# Possible things that need to be done in this state:
# - delete the vim fw update strategy
# - clean up files
# - report information about the firmware on the subcloud
region = self.get_region_name(strategy_step)
# Get the existing firmware strategy, which may be None
subcloud_strategy = self.get_vim_client(region).get_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE,
raise_error_if_missing=False)
if subcloud_strategy is not None:
self.info_log(strategy_step,
"Deleting FW VIM strategy that has state: %s"
% subcloud_strategy.state)
self.get_vim_client(region).delete_strategy(
strategy_name=vim.STRATEGY_NAME_FW_UPDATE)
# FINAL CHECK
# if any of the device images are in failed state, fail this state
# only check for enabled devices matching images with applied labels

View File

@@ -0,0 +1,85 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2017 Wind River Systems, Inc.
#
# The right to copy, distribute, modify, or otherwise make use
# of this software may be licensed only pursuant to the terms
# of an applicable Wind River license agreement.
#
import uuid
from oslo_utils import timeutils
# VIM constants for Strategy
APPLY_TYPE_SERIAL = 'serial'
INSTANCE_ACTION_STOP_START = 'stop-start'
ALARM_RESTRICTIONS_STRICT = 'strict'
class FakeVimClient(object):
def __init__(self):
pass
class FakeVimStrategy(object):
"""Represents a VIM Strategy object defined in:
starlingx/nfv/nfv-client/nfv_client/openstack/sw_update.py
"""
def __init__(self,
name="VIM Strategy",
controller_apply_type=APPLY_TYPE_SERIAL,
storage_apply_type=APPLY_TYPE_SERIAL,
swift_apply_type=APPLY_TYPE_SERIAL,
worker_apply_type=APPLY_TYPE_SERIAL,
max_parallel_worker_hosts=2,
default_instance_action=INSTANCE_ACTION_STOP_START,
alarm_restrictions=ALARM_RESTRICTIONS_STRICT,
current_phase=None,
current_phase_completion_percentage=0,
state=None,
build_phase=None,
apply_phase=None,
abort_phase=None):
self.uuid = str(uuid.uuid4())
self.name = name
self.controller_apply_type = controller_apply_type
self.storage_apply_type = storage_apply_type
self.swift_apply_type = swift_apply_type
self.worker_apply_type = worker_apply_type
self.max_parallel_worker_hosts = max_parallel_worker_hosts
self.default_instance_action = default_instance_action
self.alarm_restrictions = alarm_restrictions
self.current_phase = current_phase
self.current_phase_completion_percentage =\
current_phase_completion_percentage
self.state = state
self.build_phase = build_phase
self.apply_phase = apply_phase
self.abort_phase = abort_phase
class SwUpdateStrategy(object):
def __init__(self, id, data):
self.id = id
self.type = data['type']
self.subcloud_apply_type = data['subcloud-apply-type']
self.max_parallel_subclouds = int(data['max-parallel-subclouds'])
if data['stop-on-failure'] == 'true':
self.stop_on_failure = True
else:
self.stop_on_failure = False
self.state = data['state']
self.created_at = timeutils.utcnow()
self.updated_at = timeutils.utcnow()

View File

@@ -19,11 +19,6 @@ UPGRADED_VERSION = '56.78'
FAKE_VENDOR = '8086'
FAKE_DEVICE = '0b30'
# VIM constants for Strategy
APPLY_TYPE_SERIAL = 'serial'
INSTANCE_ACTION_STOP_START = 'stop-start'
ALARM_RESTRICTIONS_STRICT = 'strict'
class FakeController(object):
def __init__(self,
@@ -186,47 +181,3 @@ class FakeUpgrade(object):
self.from_release = from_release
self.to_release = to_release
self.links = []
class FakeVimClient(object):
def __init__(self):
pass
class FakeVimStrategy(object):
"""Represents a VIM Strategy object defined in:
starlingx/nfv/nfv-client/nfv_client/openstack/sw_update.py
"""
def __init__(self,
name="VIM Strategy",
controller_apply_type=APPLY_TYPE_SERIAL,
storage_apply_type=APPLY_TYPE_SERIAL,
swift_apply_type=APPLY_TYPE_SERIAL,
worker_apply_type=APPLY_TYPE_SERIAL,
max_parallel_worker_hosts=2,
default_instance_action=INSTANCE_ACTION_STOP_START,
alarm_restrictions=ALARM_RESTRICTIONS_STRICT,
current_phase=None,
current_phase_completion_percentage=0,
state=None,
build_phase=None,
apply_phase=None,
abort_phase=None):
self.uuid = str(uuid.uuid4())
self.name = name
self.controller_apply_type = controller_apply_type
self.storage_apply_type = storage_apply_type
self.swift_apply_type = swift_apply_type
self.worker_apply_type = worker_apply_type
self.max_parallel_worker_hosts = max_parallel_worker_hosts
self.default_instance_action = default_instance_action
self.alarm_restrictions = alarm_restrictions
self.current_phase = current_phase
self.current_phase_completion_percentage =\
current_phase_completion_percentage
self.state = state
self.build_phase = build_phase
self.apply_phase = apply_phase
self.abort_phase = abort_phase

View File

@@ -10,7 +10,7 @@ from dccommon.drivers.openstack import vim
from dcmanager.common import consts
from dcmanager.orchestrator.states.firmware import applying_vim_strategy
from dcmanager.tests.unit.orchestrator.states.fakes import FakeVimStrategy
from dcmanager.tests.unit.fakes import FakeVimStrategy
from dcmanager.tests.unit.orchestrator.states.firmware.test_base \
import TestFwUpdateState

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: Apache-2.0
#
from dcmanager.common import consts
from dcmanager.tests.unit.orchestrator.states.test_base import TestSwUpdate
from dcmanager.tests.unit.orchestrator.test_base import TestSwUpdate
class TestFwUpdateState(TestSwUpdate):

View File

@@ -11,7 +11,7 @@ from dccommon.drivers.openstack import vim
from dcmanager.common import consts
from dcmanager.orchestrator.states.firmware import creating_vim_strategy
from dcmanager.tests.unit.orchestrator.states.fakes import FakeVimStrategy
from dcmanager.tests.unit.fakes import FakeVimStrategy
from dcmanager.tests.unit.orchestrator.states.firmware.test_base \
import TestFwUpdateState
@@ -39,6 +39,7 @@ class TestFwUpdateCreatingVIMStrategyStage(TestFwUpdateState):
# Add mock API endpoints for sysinv client calls invcked by this state
self.vim_client.create_strategy = mock.MagicMock()
self.vim_client.delete_strategy = mock.MagicMock()
self.vim_client.get_strategy = mock.MagicMock()
def test_creating_vim_strategy_success(self):
@@ -139,19 +140,25 @@ class TestFwUpdateCreatingVIMStrategyStage(TestFwUpdateState):
def test_creating_vim_strategy_already_exists_and_completes(self):
"""Test creating a VIM strategy while one already exists"""
# first api query is what already exists
# first api query is what already exists.
# If it is not building,aborting or applying it should be deleted
# and a new one recreated
# remainder are during the loop
self.vim_client.get_strategy.side_effect = [
STRATEGY_BUILDING,
STRATEGY_DONE_BUILDING,
STRATEGY_FAILED_BUILDING, # old strategy that gets deleted
STRATEGY_BUILDING, # new strategy gets built
STRATEGY_DONE_BUILDING, # new strategy succeeds during while loop
]
# The strategy should be deleted and then created
self.vim_client.create_strategy.return_value = STRATEGY_BUILDING
# invoke the strategy state operation on the orch thread
self.worker.perform_state_action(self.strategy_step)
# create API call should never be invoked
self.vim_client.create_strategy.assert_not_called()
# delete API should have been invoked
self.assertEqual(1, self.vim_client.delete_strategy.call_count)
# create API call should be invoked
self.assertEqual(1, self.vim_client.create_strategy.call_count)
# SUCCESS case
self.assert_step_updated(self.strategy_step.subcloud_id,
self.on_success_state)
@@ -159,10 +166,11 @@ class TestFwUpdateCreatingVIMStrategyStage(TestFwUpdateState):
def test_creating_vim_strategy_already_exists_and_is_broken(self):
"""Test creating a VIM strategy while a broken strategy exists"""
# first api query is what already exists
# remainder are during the loop
# first api query is what already exists.
# If it is building,aborting or applying it does not get deleted
# and the strategy goes to failed state
self.vim_client.get_strategy.side_effect = [
STRATEGY_FAILED_BUILDING,
STRATEGY_BUILDING,
]
# invoke the strategy state operation on the orch thread

View File

@@ -9,7 +9,7 @@ from dccommon.drivers.openstack import vim
from dcmanager.common import consts
from dcmanager.orchestrator.states.firmware.finishing_fw_update import FinishingFwUpdateState
from dcmanager.tests.unit.orchestrator.states.fakes import FakeVimStrategy
from dcmanager.tests.unit.fakes import FakeVimStrategy
from dcmanager.tests.unit.orchestrator.states.firmware.test_base \
import TestFwUpdateState
@@ -48,9 +48,6 @@ class TestFwUpdateFinishingFwUpdateStage(TestFwUpdateState):
# invoke the strategy state operation on the orch thread
self.worker.perform_state_action(self.strategy_step)
# ensure that the delete was called
self.vim_client.delete_strategy.assert_called_once()
# Successful promotion to next state
self.assert_step_updated(self.strategy_step.subcloud_id,
self.on_success_state)

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: Apache-2.0
#
from dcmanager.common import consts
from dcmanager.tests.unit.orchestrator.states.test_base import TestSwUpdate
from dcmanager.tests.unit.orchestrator.test_base import TestSwUpdate
class TestSwUpgradeState(TestSwUpdate):

View File

@@ -30,9 +30,9 @@ from dcmanager.orchestrator import sw_update_manager
from dcmanager.orchestrator import sw_upgrade_orch_thread
from dcmanager.tests import base
from dcmanager.tests.unit.fakes import FakeVimClient
from dcmanager.tests.unit.orchestrator.states.fakes import FakeKeystoneClient
from dcmanager.tests.unit.orchestrator.states.fakes import FakeSysinvClient
from dcmanager.tests.unit.orchestrator.states.fakes import FakeVimClient
from dcmanager.tests.unit.orchestrator.test_sw_update_manager import FakeOrchThread
from dcmanager.tests.unit.orchestrator.test_sw_update_manager \
import StrategyStep

View File

@@ -0,0 +1,116 @@
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import copy
import mock
from dccommon.drivers.openstack import vim
from dcmanager.common import consts
from dcmanager.orchestrator.fw_update_orch_thread import FwUpdateOrchThread
from dcmanager.tests.unit.fakes import FakeVimClient
from dcmanager.tests.unit.fakes import FakeVimStrategy
from dcmanager.tests.unit.fakes import SwUpdateStrategy
from dcmanager.tests.unit.orchestrator.test_base import TestSwUpdate
FAKE_SW_UPDATE_DATA = {
"type": consts.SW_UPDATE_TYPE_FIRMWARE,
"subcloud-apply-type": consts.SUBCLOUD_APPLY_TYPE_PARALLEL,
"max-parallel-subclouds": "2",
"stop-on-failure": "true",
"force": "false",
"state": consts.SW_UPDATE_STATE_INITIAL
}
class TestFwOrchThread(TestSwUpdate):
# Setting DEFAULT_STRATEGY_TYPE to firmware will setup the firmware
# orchestration worker, and will mock away the other orch threads
DEFAULT_STRATEGY_TYPE = consts.SW_UPDATE_TYPE_FIRMWARE
def setUp(self):
super(TestFwOrchThread, self).setUp()
# Mock the vim client defined in the orch thread
self.worker_vim_client = FakeVimClient()
p = mock.patch.object(FwUpdateOrchThread, 'get_vim_client')
self.mock_worker_vim_client = p.start()
self.mock_worker_vim_client.return_value = self.worker_vim_client
self.addCleanup(p.stop)
self.worker_vim_client.create_strategy = mock.MagicMock()
self.worker_vim_client.delete_strategy = mock.MagicMock()
self.worker_vim_client.get_strategy = mock.MagicMock()
def setup_strategy(self, state):
data = copy.copy(FAKE_SW_UPDATE_DATA)
data['state'] = state
return SwUpdateStrategy(1, data=data)
def test_delete_strategy_no_steps(self):
# The 'strategy'should be 'deleting'
self.strategy = self.setup_strategy(
state=consts.SW_UPDATE_STATE_DELETING)
self.mock_db_api.strategy_step_get_all.return_value = []
# invoke the strategy (not strategy step) operation on the orch thread
self.worker.delete(self.strategy)
# Delete should get the steps and call delete_subcloud_strategy on each
self.mock_db_api.strategy_step_get_all.assert_called()
# There are no strategy steps, so no vim api calls should be invoked
self.worker_vim_client.get_strategy.assert_not_called()
self.mock_db_api.strategy_step_destroy_all.assert_called()
self.mock_db_api.sw_update_strategy_destroy.assert_called()
def test_delete_strategy_single_step_no_vim_strategy(self):
# The 'strategy' needs to be in 'deleting'
self.strategy = self.setup_strategy(
state=consts.SW_UPDATE_STATE_DELETING)
strategy_step = self.setup_strategy_step(
consts.STRATEGY_STATE_CREATING_FW_UPDATE_STRATEGY)
self.mock_db_api.strategy_step_get_all.return_value = [strategy_step, ]
# If the subcloud does not have a vim strategy, it raises an exception
self.worker_vim_client.get_strategy.side_effect = Exception
# invoke the strategy (not strategy step) operation on the orch thread
self.worker.delete(self.strategy)
# Delete should get the steps and call delete_subcloud_strategy on each
self.mock_db_api.strategy_step_get_all.assert_called()
# There is a step, so the vim strategy should be queried
self.worker_vim_client.get_strategy.assert_called()
# delete should delete the streps from the DB and the strategy
self.mock_db_api.strategy_step_destroy_all.assert_called()
self.mock_db_api.sw_update_strategy_destroy.assert_called()
def test_delete_strategy_single_step_with_vim_strategy(self):
# The 'strategy' needs to be in 'deleting'
self.strategy = self.setup_strategy(
state=consts.SW_UPDATE_STATE_DELETING)
strategy_step = self.setup_strategy_step(
consts.STRATEGY_STATE_CREATING_FW_UPDATE_STRATEGY)
self.mock_db_api.strategy_step_get_all.return_value = [strategy_step, ]
# the subcloud returns a vim strategy
vim_strategy = FakeVimStrategy(state=vim.STATE_APPLIED)
self.worker_vim_client.get_strategy.return_value = vim_strategy
# invoke the strategy (not strategy step) operation on the orch thread
self.worker.delete(self.strategy)
# Delete should get the steps and call delete_subcloud_strategy on each
self.mock_db_api.strategy_step_get_all.assert_called()
# There is a step, so the vim strategy should be queried and deleted
self.worker_vim_client.get_strategy.assert_called()
self.worker_vim_client.delete_strategy.assert_called()
# delete should delete the streps from the DB and the strategy
self.mock_db_api.strategy_step_destroy_all.assert_called()
self.mock_db_api.sw_update_strategy_destroy.assert_called()

View File

@@ -22,7 +22,6 @@ from os import path as os_path
import threading
from oslo_config import cfg
from oslo_utils import timeutils
from dcmanager.common import consts
from dcmanager.common import context
@@ -316,21 +315,6 @@ class Controller(object):
self.hostname = hostname
class SwUpdateStrategy(object):
def __init__(self, id, data):
self.id = id
self.type = data['type']
self.subcloud_apply_type = data['subcloud-apply-type']
self.max_parallel_subclouds = int(data['max-parallel-subclouds'])
if data['stop-on-failure'] == 'true':
self.stop_on_failure = True
else:
self.stop_on_failure = False
self.state = data['state']
self.created_at = timeutils.utcnow()
self.updated_at = timeutils.utcnow()
# All orch_threads can be mocked the same way
class FakeOrchThread(object):
def __init__(self):

View File

@@ -114,6 +114,7 @@ commands =
rm -f coverage.xml
find {toxinidir} -not -path '{toxinidir}/.tox/*' -name '*.py[c|o]' -delete
python setup.py testr --coverage --testr-args='{posargs}'
coverage html -d cover
coverage xml --rcfile=.coveragerc_xml
coverage report