Merge "Add a fake system driver"

This commit is contained in:
Zuul 2022-05-10 22:16:06 +00:00 committed by Gerrit Code Review
commit c5c890287f
4 changed files with 272 additions and 1 deletions

View File

@ -0,0 +1,9 @@
---
features:
- |
Adds a fake system driver (actived using the ``--fake`` argument) that
does not have an actual backend and works by storing all values in
the cache.
It is currently functional enough for the Ironic's ``ramdisk`` deploy
(and undeploy) to finish successfully.

View File

@ -33,6 +33,7 @@ from sushy_tools.emulator.resources import drives as drvdriver
from sushy_tools.emulator.resources import indicators as inddriver
from sushy_tools.emulator.resources import managers as mgrdriver
from sushy_tools.emulator.resources import storage as stgdriver
from sushy_tools.emulator.resources.systems import fakedriver
from sushy_tools.emulator.resources.systems import libvirtdriver
from sushy_tools.emulator.resources.systems import novadriver
from sushy_tools.emulator.resources import vmedia as vmddriver
@ -95,9 +96,14 @@ class Application(flask.Flask):
@property
@memoize.memoize()
def systems(self):
fake = self.config.get('SUSHY_EMULATOR_FAKE_DRIVER')
os_cloud = self.config.get('SUSHY_EMULATOR_OS_CLOUD')
if os_cloud:
if fake:
result = fakedriver.FakeDriver.initialize(
self.config, self.logger)()
elif os_cloud:
if not novadriver.is_loaded:
self.logger.error('Nova driver not loaded')
sys.exit(1)
@ -769,6 +775,10 @@ def parse_args():
'environment variable '
'SUSHY_EMULATOR_LIBVIRT_URI. '
'Default is qemu:///system')
backend_group.add_argument('--fake', action='store_true',
help='Use the fake driver. Can also be set '
'via environmnet variable '
'SUSHY_EMULATOR_FAKE_DRIVER.')
return parser.parse_args()
@ -787,6 +797,9 @@ def main():
if args.libvirt_uri:
app.config['SUSHY_EMULATOR_LIBVIRT_URI'] = args.libvirt_uri
if args.fake:
app.config['SUSHY_EMULATOR_FAKE_DRIVER'] = True
else:
for envvar in ('SUSHY_EMULATOR_LIBVIRT_URL', # backward compatibility
'SUSHY_EMULATOR_LIBVIRT_URI'):

View File

@ -0,0 +1,155 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import random
import time
from sushy_tools.emulator import memoize
from sushy_tools.emulator.resources.systems.base import AbstractSystemsDriver
from sushy_tools import error
DEFAULT_UUID = '27946b59-9e44-4fa7-8e91-f3527a1ef094'
class FakeDriver(AbstractSystemsDriver):
"""Fake driver"""
@classmethod
def initialize(cls, config, logger):
config.setdefault('SUSHY_EMULATOR_FAKE_SYSTEMS', [
{
'uuid': DEFAULT_UUID,
'name': 'fake',
'power_state': 'Off',
}
])
cls._config = config
cls._logger = logger
return cls
def __init__(self):
super().__init__()
self._systems = memoize.PersistentDict()
if hasattr(self._systems, 'make_permanent'):
self._systems.make_permanent(
self._config.get('SUSHY_EMULATOR_STATE_DIR'), 'fakedriver')
for system in self._config['SUSHY_EMULATOR_FAKE_SYSTEMS']:
# Be careful to reduce racing with other processes
if system['uuid'] not in self._systems:
self._systems[system['uuid']] = copy.deepcopy(system)
self._by_name = {
system['name']: uuid
for uuid, system in self._systems.items()
}
def _update_if_needed(self, system):
pending_power = system.get('pending_power')
if pending_power and time.time() >= pending_power['apply_time']:
self._update(system,
power_state=pending_power['power_state'],
pending_power=None)
return system
def _get(self, identity):
try:
result = self._systems[identity]
except KeyError:
try:
uuid = self._by_name[identity]
except KeyError:
raise error.NotFound(f'Fake system {identity} was not found')
else:
raise error.AliasAccessError(uuid)
# NOTE(dtantsur): since the state change can only be observed after
# a _get() call, we can cheat a bit and update it on reading.
return self._update_if_needed(result)
def _update(self, system, **changes):
if isinstance(system, str):
system = self._get(system)
system.update(changes)
self._systems[system['uuid']] = system
@property
def driver(self):
return '<fake>'
@property
def systems(self):
return list(self._systems)
def uuid(self, identity):
try:
return self._get(identity)['uuid']
except error.AliasAccessError as exc:
return str(exc)
def name(self, identity):
try:
return self._get(identity)['name']
except error.AliasAccessError:
return identity
def get_power_state(self, identity):
return self._get(identity)['power_state']
def set_power_state(self, identity, state):
# hardware actions are not immediate
apply_time = int(time.time()) + random.randint(1, 11)
system = self._get(identity)
if 'On' in state:
pending_state = 'On'
elif state in ('ForceOff', 'GracefulShutdown'):
pending_state = 'Off'
elif 'Restart' in state:
system['power_state'] = 'Off'
pending_state = 'On'
else:
raise error.NotSupportedError(
f'Power state {state} is not supported')
if system['power_state'] != pending_state:
self._update(system, pending_power={
'power_state': pending_state,
'apply_time': apply_time,
})
def get_boot_device(self, identity):
return self._get(identity).get('boot_device', 'Hdd')
def set_boot_device(self, identity, boot_source):
self._update(identity, boot_device=boot_source)
def get_boot_mode(self, identity):
return self._get(identity).get('boot_mode', 'UEFI')
def set_boot_mode(self, identity, boot_mode):
self._update(identity, boot_mode=boot_mode)
def get_boot_image(self, identity, device):
devinfo = self._get(identity).get('boot_image') or {}
return devinfo.get(device) or (None, False, False)
def set_boot_image(self, identity, device, boot_image=None,
write_protected=True):
system = self._get(identity)
devinfo = system.get('boot_image') or {}
devinfo[device] = (boot_image, write_protected, bool(boot_image))
self._update(system, boot_image=devinfo)

View File

@ -0,0 +1,94 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from unittest import mock
from oslotest import base
from sushy_tools.emulator.resources.systems import fakedriver
from sushy_tools import error
UUID = fakedriver.DEFAULT_UUID
class FakeDriverTestCase(base.BaseTestCase):
def setUp(self):
super().setUp()
test_driver_class = fakedriver.FakeDriver.initialize(
{}, mock.MagicMock())
self.cache = {}
with mock.patch('sushy_tools.emulator.memoize.PersistentDict',
return_value=self.cache, autospec=True):
self.test_driver = test_driver_class()
def test_systems(self):
self.assertEqual([UUID], self.test_driver.systems)
self.assertEqual('fake', self.test_driver.name(UUID))
self.assertEqual(UUID, self.test_driver.uuid('fake'))
self.assertEqual(UUID, self.test_driver.uuid(UUID))
self.assertEqual('fake', self.test_driver.name('fake'))
self.assertRaises(error.NotFound, self.test_driver.uuid, 'foo')
self.assertRaises(error.NotFound, self.test_driver.name, 'foo')
@mock.patch('random.randint', autospec=True, return_value=0)
def test_power_state(self, mock_rand):
self.assertEqual('Off', self.test_driver.get_power_state(UUID))
self.test_driver.set_power_state(UUID, 'On')
self.assertEqual('On', self.test_driver.get_power_state(UUID))
self.test_driver.set_power_state(UUID, 'ForceOff')
self.assertEqual('Off', self.test_driver.get_power_state(UUID))
@mock.patch('random.randint', autospec=True, return_value=1000)
def test_power_state_delay(self, mock_rand):
self.assertEqual('Off', self.test_driver.get_power_state(UUID))
self.test_driver.set_power_state(UUID, 'On')
self.assertEqual('Off', self.test_driver.get_power_state(UUID))
new_time = time.time() + 2000
with mock.patch.object(time, 'time', autospec=True,
return_value=new_time):
self.assertEqual('On', self.test_driver.get_power_state(UUID))
@mock.patch('random.randint', autospec=True, return_value=1000)
def test_reboot_delay(self, mock_rand):
self.cache[UUID]['power_state'] = 'On'
self.assertEqual('On', self.test_driver.get_power_state(UUID))
self.test_driver.set_power_state(UUID, 'ForceRestart')
self.assertEqual('Off', self.test_driver.get_power_state(UUID))
new_time = time.time() + 2000
with mock.patch.object(time, 'time', autospec=True,
return_value=new_time):
self.assertEqual('On', self.test_driver.get_power_state(UUID))
def test_boot_mode(self):
self.assertEqual('UEFI', self.test_driver.get_boot_mode(UUID))
self.test_driver.set_boot_mode(UUID, 'legacy')
self.assertEqual('legacy', self.test_driver.get_boot_mode(UUID))
def test_boot_device(self):
self.assertEqual('Hdd', self.test_driver.get_boot_device(UUID))
self.test_driver.set_boot_device(UUID, 'Cd')
self.assertEqual('Cd', self.test_driver.get_boot_device(UUID))
def test_boot_image(self):
self.assertEqual((None, False, False),
self.test_driver.get_boot_image(UUID, 'Cd'))
self.test_driver.set_boot_image(UUID, 'Cd', 'http://example')
self.assertEqual(('http://example', True, True),
self.test_driver.get_boot_image(UUID, 'Cd'))
self.assertEqual((None, False, False),
self.test_driver.get_boot_image(UUID, 'Hdd'))