Rename self.driver to self.resource where applicable
We keep passing around driver instances as a 'driver' parameter and track it locally in the instance and state manager as self.driver. This is actually a resource encapsulated, and we should reference it as such to avoid being opaque. This renames it accordingly. It also removes some redundancy where we are passing resource_id along with a resource object, which contains the id as well. Change-Id: I65490f01608fda1da3467455ee58ecb5fa6c7873
This commit is contained in:
parent
3d96794fd7
commit
e2eb7d4689
@ -63,8 +63,7 @@ def debug_one_router(args=sys.argv[1:]):
|
|||||||
context = worker.WorkerContext()
|
context = worker.WorkerContext()
|
||||||
driver = drivers.get('router')(context, cfg.CONF.router_id)
|
driver = drivers.get('router')(context, cfg.CONF.router_id)
|
||||||
a = state.Automaton(
|
a = state.Automaton(
|
||||||
driver=driver,
|
resource=driver,
|
||||||
resource_id=cfg.CONF.router_id,
|
|
||||||
tenant_id=driver._router.tenant_id,
|
tenant_id=driver._router.tenant_id,
|
||||||
delete_callback=delete_callback,
|
delete_callback=delete_callback,
|
||||||
bandwidth_callback=bandwidth_callback,
|
bandwidth_callback=bandwidth_callback,
|
||||||
|
@ -47,7 +47,7 @@ def synchronize_driver_state(f):
|
|||||||
"""Wrapper that triggers a driver's synchronize_state function"""
|
"""Wrapper that triggers a driver's synchronize_state function"""
|
||||||
def wrapper(self, *args, **kw):
|
def wrapper(self, *args, **kw):
|
||||||
state = f(self, *args, **kw)
|
state = f(self, *args, **kw)
|
||||||
self.driver.synchronize_state(*args, state=state)
|
self.resource.synchronize_state(*args, state=state)
|
||||||
return state
|
return state
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
@ -65,9 +65,8 @@ def ensure_cache(f):
|
|||||||
def wrapper(self, worker_context, *args, **kw):
|
def wrapper(self, worker_context, *args, **kw):
|
||||||
# insure that self.instance_info is current before doing anything.
|
# insure that self.instance_info is current before doing anything.
|
||||||
self.instance_info = (
|
self.instance_info = (
|
||||||
worker_context.nova_client.get_instance_info(self.driver.name)
|
worker_context.nova_client.get_instance_info(self.resource.name)
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.instance_info:
|
if self.instance_info:
|
||||||
(
|
(
|
||||||
self.instance_info.management_port,
|
self.instance_info.management_port,
|
||||||
@ -98,19 +97,18 @@ class BootAttemptCounter(object):
|
|||||||
|
|
||||||
class InstanceManager(object):
|
class InstanceManager(object):
|
||||||
|
|
||||||
def __init__(self, driver, resource_id, worker_context):
|
def __init__(self, resource, worker_context):
|
||||||
"""The instance manager is your interface to the running instance.
|
"""The instance manager is your interface to the running instance.
|
||||||
wether it be virtual, container or physical.
|
wether it be virtual, container or physical.
|
||||||
|
|
||||||
Service specific code lives in the driver which is passed in here.
|
Service specific code lives in the driver which is passed in here.
|
||||||
|
|
||||||
:param driver: driver object
|
:param resource: An driver instance for the managed resource
|
||||||
:param resource_id: UUID of logical resource
|
:param resource_id: UUID of logical resource
|
||||||
:param worker_context:
|
:param worker_context:
|
||||||
"""
|
"""
|
||||||
self.driver = driver
|
self.resource = resource
|
||||||
self.id = resource_id
|
self.log = self.resource.log
|
||||||
self.log = self.driver.log
|
|
||||||
|
|
||||||
self.state = states.DOWN
|
self.state = states.DOWN
|
||||||
|
|
||||||
@ -146,9 +144,9 @@ class InstanceManager(object):
|
|||||||
:param silent:
|
:param silent:
|
||||||
:returns: state
|
:returns: state
|
||||||
"""
|
"""
|
||||||
if self.driver.get_state(worker_context) == states.GONE:
|
if self.resource.get_state(worker_context) == states.GONE:
|
||||||
self.log.debug('%s driver reported its state is %s',
|
self.log.debug('%s driver reported its state is %s',
|
||||||
self.driver.RESOURCE_NAME, states.GONE)
|
self.resource.RESOURCE_NAME, states.GONE)
|
||||||
self.state = states.GONE
|
self.state = states.GONE
|
||||||
return self.state
|
return self.state
|
||||||
|
|
||||||
@ -165,7 +163,7 @@ class InstanceManager(object):
|
|||||||
return self.state
|
return self.state
|
||||||
|
|
||||||
for i in six.moves.range(cfg.CONF.max_retries):
|
for i in six.moves.range(cfg.CONF.max_retries):
|
||||||
if self.driver.is_alive(self.instance_info.management_address):
|
if self.resource.is_alive(self.instance_info.management_address):
|
||||||
if self.state != states.CONFIGURED:
|
if self.state != states.CONFIGURED:
|
||||||
self.state = states.UP
|
self.state = states.UP
|
||||||
break
|
break
|
||||||
@ -179,7 +177,8 @@ class InstanceManager(object):
|
|||||||
self._check_boot_timeout()
|
self._check_boot_timeout()
|
||||||
|
|
||||||
# If the instance isn't responding, make sure Nova knows about it
|
# If the instance isn't responding, make sure Nova knows about it
|
||||||
instance = worker_context.nova_client.get_instance_for_obj(self.id)
|
instance = worker_context.nova_client.get_instance_for_obj(
|
||||||
|
self.resource.id)
|
||||||
if instance is None and self.state != states.ERROR:
|
if instance is None and self.state != states.ERROR:
|
||||||
self.log.info('No instance was found; rebooting')
|
self.log.info('No instance was found; rebooting')
|
||||||
self.state = states.DOWN
|
self.state = states.DOWN
|
||||||
@ -208,7 +207,7 @@ class InstanceManager(object):
|
|||||||
if not self._boot_logged:
|
if not self._boot_logged:
|
||||||
boot_time = self.instance_info.time_since_boot.total_seconds()
|
boot_time = self.instance_info.time_since_boot.total_seconds()
|
||||||
self.log.info('%s booted in %s seconds after %s attempts',
|
self.log.info('%s booted in %s seconds after %s attempts',
|
||||||
self.driver.RESOURCE_NAME, boot_time,
|
self.resource.RESOURCE_NAME, boot_time,
|
||||||
self._boot_counter.count)
|
self._boot_counter.count)
|
||||||
self._boot_logged = True
|
self._boot_logged = True
|
||||||
|
|
||||||
@ -224,22 +223,22 @@ class InstanceManager(object):
|
|||||||
|
|
||||||
:returns: None
|
:returns: None
|
||||||
"""
|
"""
|
||||||
self.log.info('Booting %s' % self.driver.RESOURCE_NAME)
|
self.log.info('Booting %s' % self.resource.RESOURCE_NAME)
|
||||||
self.state = states.DOWN
|
self.state = states.DOWN
|
||||||
self._boot_counter.start()
|
self._boot_counter.start()
|
||||||
|
|
||||||
# driver preboot hook
|
# driver preboot hook
|
||||||
self.driver.pre_boot(worker_context)
|
self.resource.pre_boot(worker_context)
|
||||||
|
|
||||||
# try to boot the instance
|
# try to boot the instance
|
||||||
try:
|
try:
|
||||||
instance_info = worker_context.nova_client.boot_instance(
|
instance_info = worker_context.nova_client.boot_instance(
|
||||||
resource_type=self.driver.RESOURCE_NAME,
|
resource_type=self.resource.RESOURCE_NAME,
|
||||||
prev_instance_info=self.instance_info,
|
prev_instance_info=self.instance_info,
|
||||||
name=self.driver.name,
|
name=self.resource.name,
|
||||||
image_uuid=self.driver.image_uuid,
|
image_uuid=self.resource.image_uuid,
|
||||||
flavor=self.driver.flavor,
|
flavor=self.resource.flavor,
|
||||||
make_ports_callback=self.driver.make_ports(worker_context)
|
make_ports_callback=self.resource.make_ports(worker_context)
|
||||||
)
|
)
|
||||||
if not instance_info:
|
if not instance_info:
|
||||||
self.log.info(_LI('Previous instance is still deleting'))
|
self.log.info(_LI('Previous instance is still deleting'))
|
||||||
@ -250,7 +249,7 @@ class InstanceManager(object):
|
|||||||
return
|
return
|
||||||
except:
|
except:
|
||||||
self.log.exception(_LE('Instance failed to start boot'))
|
self.log.exception(_LE('Instance failed to start boot'))
|
||||||
self.driver.delete_ports(worker_context)
|
self.resource.delete_ports(worker_context)
|
||||||
else:
|
else:
|
||||||
# We have successfully started a (re)boot attempt so
|
# We have successfully started a (re)boot attempt so
|
||||||
# record the timestamp so we can report how long it takes.
|
# record the timestamp so we can report how long it takes.
|
||||||
@ -258,7 +257,7 @@ class InstanceManager(object):
|
|||||||
self.instance_info = instance_info
|
self.instance_info = instance_info
|
||||||
|
|
||||||
# driver post boot hook
|
# driver post boot hook
|
||||||
self.driver.post_boot(worker_context)
|
self.resource.post_boot(worker_context)
|
||||||
|
|
||||||
@synchronize_driver_state
|
@synchronize_driver_state
|
||||||
@ensure_cache
|
@ensure_cache
|
||||||
@ -311,7 +310,7 @@ class InstanceManager(object):
|
|||||||
"""
|
"""
|
||||||
self.log.info(_LI('Destroying instance'))
|
self.log.info(_LI('Destroying instance'))
|
||||||
|
|
||||||
self.driver.delete_ports(worker_context)
|
self.resource.delete_ports(worker_context)
|
||||||
|
|
||||||
if not self.instance_info:
|
if not self.instance_info:
|
||||||
self.log.info(_LI('Instance already destroyed.'))
|
self.log.info(_LI('Instance already destroyed.'))
|
||||||
@ -353,13 +352,15 @@ class InstanceManager(object):
|
|||||||
self.state = states.UP
|
self.state = states.UP
|
||||||
attempts = cfg.CONF.max_retries
|
attempts = cfg.CONF.max_retries
|
||||||
|
|
||||||
if self.driver.get_state(worker_context) == states.GONE:
|
if self.resource.get_state(worker_context) == states.GONE:
|
||||||
return states.GONE
|
return states.GONE
|
||||||
|
|
||||||
interfaces = self.driver.get_interfaces(
|
interfaces = self.resource.get_interfaces(
|
||||||
self.instance_info.management_address)
|
self.instance_info.management_address)
|
||||||
|
|
||||||
if not self._verify_interfaces(self.driver.ports, interfaces):
|
if not self._verify_interfaces(self.resource.ports, interfaces):
|
||||||
|
# FIXME: Need a states.REPLUG state when we support hot-plugging
|
||||||
|
# interfaces.
|
||||||
self.log.debug("Interfaces aren't plugged as expected.")
|
self.log.debug("Interfaces aren't plugged as expected.")
|
||||||
self.state = states.REPLUG
|
self.state = states.REPLUG
|
||||||
return self.state
|
return self.state
|
||||||
@ -381,7 +382,7 @@ class InstanceManager(object):
|
|||||||
}
|
}
|
||||||
|
|
||||||
# sending all the standard config over to the driver for final updates
|
# sending all the standard config over to the driver for final updates
|
||||||
config = self.driver.build_config(
|
config = self.resource.build_config(
|
||||||
worker_context,
|
worker_context,
|
||||||
mgt_port,
|
mgt_port,
|
||||||
iface_map
|
iface_map
|
||||||
@ -391,7 +392,7 @@ class InstanceManager(object):
|
|||||||
|
|
||||||
for i in six.moves.range(attempts):
|
for i in six.moves.range(attempts):
|
||||||
try:
|
try:
|
||||||
self.driver.update_config(
|
self.resource.update_config(
|
||||||
self.instance_info.management_address,
|
self.instance_info.management_address,
|
||||||
config)
|
config)
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -421,9 +422,9 @@ class InstanceManager(object):
|
|||||||
"""
|
"""
|
||||||
self.log.debug('Attempting to replug...')
|
self.log.debug('Attempting to replug...')
|
||||||
|
|
||||||
self.driver.pre_plug(worker_context)
|
self.resource.pre_plug(worker_context)
|
||||||
|
|
||||||
interfaces = self.driver.get_interfaces(
|
interfaces = self.resource.get_interfaces(
|
||||||
self.instance_info.management_address)
|
self.instance_info.management_address)
|
||||||
|
|
||||||
actual_macs = set((iface['lladdr'] for iface in interfaces))
|
actual_macs = set((iface['lladdr'] for iface in interfaces))
|
||||||
@ -443,7 +444,7 @@ class InstanceManager(object):
|
|||||||
instance_ports = {p.network_id: p for p in self.instance_info.ports}
|
instance_ports = {p.network_id: p for p in self.instance_info.ports}
|
||||||
instance_networks = set(instance_ports.keys())
|
instance_networks = set(instance_ports.keys())
|
||||||
|
|
||||||
logical_networks = set(p.network_id for p in self.driver.ports)
|
logical_networks = set(p.network_id for p in self.resource.ports)
|
||||||
|
|
||||||
if logical_networks != instance_networks:
|
if logical_networks != instance_networks:
|
||||||
instance = worker_context.nova_client.get_instance_by_id(
|
instance = worker_context.nova_client.get_instance_by_id(
|
||||||
@ -453,7 +454,7 @@ class InstanceManager(object):
|
|||||||
# For each port that doesn't have a mac address on the instance...
|
# For each port that doesn't have a mac address on the instance...
|
||||||
for network_id in logical_networks - instance_networks:
|
for network_id in logical_networks - instance_networks:
|
||||||
port = worker_context.neutron.create_vrrp_port(
|
port = worker_context.neutron.create_vrrp_port(
|
||||||
self.driver.id,
|
self.resource.id,
|
||||||
network_id
|
network_id
|
||||||
)
|
)
|
||||||
self.log.debug(
|
self.log.debug(
|
||||||
@ -494,10 +495,10 @@ class InstanceManager(object):
|
|||||||
self.log.debug(
|
self.log.debug(
|
||||||
"Waiting for interface attachments to take effect..."
|
"Waiting for interface attachments to take effect..."
|
||||||
)
|
)
|
||||||
interfaces = self.driver.get_interfaces(
|
interfaces = self.resource.get_interfaces(
|
||||||
self.instance_info.management_address)
|
self.instance_info.management_address)
|
||||||
|
|
||||||
if self._verify_interfaces(self.driver.ports, interfaces):
|
if self._verify_interfaces(self.resource.ports, interfaces):
|
||||||
# replugging was successful
|
# replugging was successful
|
||||||
# TODO(mark) update port states
|
# TODO(mark) update port states
|
||||||
return
|
return
|
||||||
|
102
astara/state.py
102
astara/state.py
@ -26,7 +26,6 @@ import collections
|
|||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
from astara.common.i18n import _LE, _LI, _LW
|
from astara.common.i18n import _LE, _LI, _LW
|
||||||
from astara.event import Resource
|
|
||||||
from astara.event import POLL, CREATE, READ, UPDATE, DELETE, REBUILD
|
from astara.event import POLL, CREATE, READ, UPDATE, DELETE, REBUILD
|
||||||
from astara import instance_manager
|
from astara import instance_manager
|
||||||
from astara.drivers import states
|
from astara.drivers import states
|
||||||
@ -35,7 +34,7 @@ from astara.drivers import states
|
|||||||
class StateParams(object):
|
class StateParams(object):
|
||||||
def __init__(self, driver, instance, queue, bandwidth_callback,
|
def __init__(self, driver, instance, queue, bandwidth_callback,
|
||||||
reboot_error_threshold):
|
reboot_error_threshold):
|
||||||
self.driver = driver
|
self.resource = driver
|
||||||
self.instance = instance
|
self.instance = instance
|
||||||
self.log = driver.log
|
self.log = driver.log
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
@ -83,11 +82,11 @@ class CalcAction(State):
|
|||||||
def execute(self, action, worker_context):
|
def execute(self, action, worker_context):
|
||||||
queue = self.queue
|
queue = self.queue
|
||||||
if DELETE in queue:
|
if DELETE in queue:
|
||||||
self.params.driver.log.debug('shortcutting to delete')
|
self.params.resource.log.debug('shortcutting to delete')
|
||||||
return DELETE
|
return DELETE
|
||||||
|
|
||||||
while queue:
|
while queue:
|
||||||
self.params.driver.log.debug(
|
self.params.resource.log.debug(
|
||||||
'action = %s, len(queue) = %s, queue = %s',
|
'action = %s, len(queue) = %s, queue = %s',
|
||||||
action,
|
action,
|
||||||
len(queue),
|
len(queue),
|
||||||
@ -97,22 +96,23 @@ class CalcAction(State):
|
|||||||
if action == UPDATE and queue[0] == CREATE:
|
if action == UPDATE and queue[0] == CREATE:
|
||||||
# upgrade to CREATE from UPDATE by taking the next
|
# upgrade to CREATE from UPDATE by taking the next
|
||||||
# item from the queue
|
# item from the queue
|
||||||
self.params.driver.log.debug('upgrading from update to create')
|
self.params.resource.log.debug(
|
||||||
|
'upgrading from update to create')
|
||||||
action = queue.popleft()
|
action = queue.popleft()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
elif action in (CREATE, UPDATE) and queue[0] == REBUILD:
|
elif action in (CREATE, UPDATE) and queue[0] == REBUILD:
|
||||||
# upgrade to REBUILD from CREATE/UPDATE by taking the next
|
# upgrade to REBUILD from CREATE/UPDATE by taking the next
|
||||||
# item from the queue
|
# item from the queue
|
||||||
self.params.driver.log.debug('upgrading from %s to rebuild',
|
self.params.resource.log.debug('upgrading from %s to rebuild',
|
||||||
action)
|
action)
|
||||||
action = queue.popleft()
|
action = queue.popleft()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
elif action == CREATE and queue[0] == UPDATE:
|
elif action == CREATE and queue[0] == UPDATE:
|
||||||
# CREATE implies an UPDATE so eat the update event
|
# CREATE implies an UPDATE so eat the update event
|
||||||
# without changing the action
|
# without changing the action
|
||||||
self.params.driver.log.debug('merging create and update')
|
self.params.resource.log.debug('merging create and update')
|
||||||
queue.popleft()
|
queue.popleft()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -120,9 +120,9 @@ class CalcAction(State):
|
|||||||
# Throw away a poll following any other valid action,
|
# Throw away a poll following any other valid action,
|
||||||
# because a create or update will automatically handle
|
# because a create or update will automatically handle
|
||||||
# the poll and repeated polls are not needed.
|
# the poll and repeated polls are not needed.
|
||||||
self.params.driver.log.debug('discarding poll event following '
|
self.params.resource.log.debug(
|
||||||
'action %s',
|
'discarding poll event following action %s',
|
||||||
action)
|
action)
|
||||||
queue.popleft()
|
queue.popleft()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -130,10 +130,10 @@ class CalcAction(State):
|
|||||||
# We are not polling and the next action is something
|
# We are not polling and the next action is something
|
||||||
# different from what we are doing, so just do the
|
# different from what we are doing, so just do the
|
||||||
# current action.
|
# current action.
|
||||||
self.params.driver.log.debug('done collapsing events')
|
self.params.resource.log.debug('done collapsing events')
|
||||||
break
|
break
|
||||||
|
|
||||||
self.params.driver.log.debug('popping action from queue')
|
self.params.resource.log.debug('popping action from queue')
|
||||||
action = queue.popleft()
|
action = queue.popleft()
|
||||||
|
|
||||||
return action
|
return action
|
||||||
@ -159,7 +159,7 @@ class CalcAction(State):
|
|||||||
# here.
|
# here.
|
||||||
next_action = self
|
next_action = self
|
||||||
elif self.instance.error_cooldown:
|
elif self.instance.error_cooldown:
|
||||||
self.params.driver.log.debug(
|
self.params.resource.log.debug(
|
||||||
'Resource is in ERROR cooldown, '
|
'Resource is in ERROR cooldown, '
|
||||||
'ignoring event.'
|
'ignoring event.'
|
||||||
)
|
)
|
||||||
@ -229,15 +229,15 @@ class CreateInstance(State):
|
|||||||
# Check for a loop where the resource keeps failing to boot or
|
# Check for a loop where the resource keeps failing to boot or
|
||||||
# accept the configuration.
|
# accept the configuration.
|
||||||
if self.instance.attempts >= self.params.reboot_error_threshold:
|
if self.instance.attempts >= self.params.reboot_error_threshold:
|
||||||
self.params.driver.log.info(_LI('Dropping out of boot loop after '
|
self.params.resource.log.info(_LI(
|
||||||
' %s trials'),
|
'Dropping out of boot loop after %s trials'),
|
||||||
self.instance.attempts)
|
self.instance.attempts)
|
||||||
self.instance.set_error(worker_context)
|
self.instance.set_error(worker_context)
|
||||||
return action
|
return action
|
||||||
self.instance.boot(worker_context)
|
self.instance.boot(worker_context)
|
||||||
self.params.driver.log.debug('CreateInstance attempt %s/%s',
|
self.params.resource.log.debug('CreateInstance attempt %s/%s',
|
||||||
self.instance.attempts,
|
self.instance.attempts,
|
||||||
self.params.reboot_error_threshold)
|
self.params.reboot_error_threshold)
|
||||||
return action
|
return action
|
||||||
|
|
||||||
def transition(self, action, worker_context):
|
def transition(self, action, worker_context):
|
||||||
@ -253,7 +253,7 @@ class CreateInstance(State):
|
|||||||
class CheckBoot(State):
|
class CheckBoot(State):
|
||||||
def execute(self, action, worker_context):
|
def execute(self, action, worker_context):
|
||||||
self.instance.update_state(worker_context)
|
self.instance.update_state(worker_context)
|
||||||
self.params.driver.log.debug(
|
self.params.resource.log.debug(
|
||||||
'Instance is %s' % self.instance.state.upper())
|
'Instance is %s' % self.instance.state.upper())
|
||||||
# Put the action back on the front of the queue so that we can yield
|
# Put the action back on the front of the queue so that we can yield
|
||||||
# and handle it in another state machine traversal (which will proceed
|
# and handle it in another state machine traversal (which will proceed
|
||||||
@ -366,14 +366,12 @@ class ReadStats(State):
|
|||||||
|
|
||||||
|
|
||||||
class Automaton(object):
|
class Automaton(object):
|
||||||
def __init__(self, driver, resource_id, tenant_id,
|
def __init__(self, resource, tenant_id,
|
||||||
delete_callback, bandwidth_callback,
|
delete_callback, bandwidth_callback,
|
||||||
worker_context, queue_warning_threshold,
|
worker_context, queue_warning_threshold,
|
||||||
reboot_error_threshold):
|
reboot_error_threshold):
|
||||||
"""
|
"""
|
||||||
:param driver: An instantiated driver object for the managed resource
|
:param resource: An instantiated driver object for the managed resource
|
||||||
:param resource_id: UUID of the resource being managed
|
|
||||||
:type resource_id: str
|
|
||||||
:param tenant_id: UUID of the tenant being managed
|
:param tenant_id: UUID of the tenant being managed
|
||||||
:type tenant_id: str
|
:type tenant_id: str
|
||||||
:param delete_callback: Invoked when the Automaton decides
|
:param delete_callback: Invoked when the Automaton decides
|
||||||
@ -392,8 +390,7 @@ class Automaton(object):
|
|||||||
the router puts it into an error state.
|
the router puts it into an error state.
|
||||||
:type reboot_error_threshold: int
|
:type reboot_error_threshold: int
|
||||||
"""
|
"""
|
||||||
self.driver = driver
|
self.resource = resource
|
||||||
self.resource_id = resource_id
|
|
||||||
self.tenant_id = tenant_id
|
self.tenant_id = tenant_id
|
||||||
self._delete_callback = delete_callback
|
self._delete_callback = delete_callback
|
||||||
self._queue_warning_threshold = queue_warning_threshold
|
self._queue_warning_threshold = queue_warning_threshold
|
||||||
@ -403,11 +400,10 @@ class Automaton(object):
|
|||||||
self._queue = collections.deque()
|
self._queue = collections.deque()
|
||||||
|
|
||||||
self.action = POLL
|
self.action = POLL
|
||||||
self.instance = instance_manager.InstanceManager(self.driver,
|
self.instance = instance_manager.InstanceManager(self.resource,
|
||||||
self.resource_id,
|
|
||||||
worker_context)
|
worker_context)
|
||||||
self._state_params = StateParams(
|
self._state_params = StateParams(
|
||||||
self.driver,
|
self.resource,
|
||||||
self.instance,
|
self.instance,
|
||||||
self._queue,
|
self._queue,
|
||||||
self.bandwidth_callback,
|
self.bandwidth_callback,
|
||||||
@ -415,15 +411,17 @@ class Automaton(object):
|
|||||||
)
|
)
|
||||||
self.state = CalcAction(self._state_params)
|
self.state = CalcAction(self._state_params)
|
||||||
|
|
||||||
self.resource = Resource(
|
@property
|
||||||
driver=self.driver, id=self.resource_id, tenant_id=self.tenant_id)
|
def resource_id(self):
|
||||||
|
"""Returns the ID of the managed resource"""
|
||||||
|
return self.resource.id
|
||||||
|
|
||||||
def service_shutdown(self):
|
def service_shutdown(self):
|
||||||
"Called when the parent process is being stopped"
|
"Called when the parent process is being stopped"
|
||||||
|
|
||||||
def _do_delete(self):
|
def _do_delete(self):
|
||||||
if self._delete_callback is not None:
|
if self._delete_callback is not None:
|
||||||
self.driver.log.debug('calling delete callback')
|
self.resource.log.debug('calling delete callback')
|
||||||
self._delete_callback()
|
self._delete_callback()
|
||||||
# Avoid calling the delete callback more than once.
|
# Avoid calling the delete callback more than once.
|
||||||
self._delete_callback = None
|
self._delete_callback = None
|
||||||
@ -435,26 +433,28 @@ class Automaton(object):
|
|||||||
while self._queue:
|
while self._queue:
|
||||||
while True:
|
while True:
|
||||||
if self.deleted:
|
if self.deleted:
|
||||||
self.driver.log.debug(
|
self.resource.log.debug(
|
||||||
'skipping update because the router is being deleted'
|
'skipping update because the router is being deleted'
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.driver.log.debug('%s.execute(%s) instance.state=%s',
|
self.resource.log.debug(
|
||||||
self.state,
|
'%s.execute(%s) instance.state=%s',
|
||||||
self.action,
|
self.state,
|
||||||
self.instance.state)
|
self.action,
|
||||||
|
self.instance.state)
|
||||||
self.action = self.state.execute(
|
self.action = self.state.execute(
|
||||||
self.action,
|
self.action,
|
||||||
worker_context,
|
worker_context,
|
||||||
)
|
)
|
||||||
self.driver.log.debug('%s.execute -> %s instance.state=%s',
|
self.resource.log.debug(
|
||||||
self.state,
|
'%s.execute -> %s instance.state=%s',
|
||||||
self.action,
|
self.state,
|
||||||
self.instance.state)
|
self.action,
|
||||||
|
self.instance.state)
|
||||||
except:
|
except:
|
||||||
self.driver.log.exception(
|
self.resource.log.exception(
|
||||||
_LE('%s.execute() failed for action: %s'),
|
_LE('%s.execute() failed for action: %s'),
|
||||||
self.state,
|
self.state,
|
||||||
self.action
|
self.action
|
||||||
@ -465,7 +465,7 @@ class Automaton(object):
|
|||||||
self.action,
|
self.action,
|
||||||
worker_context,
|
worker_context,
|
||||||
)
|
)
|
||||||
self.driver.log.debug(
|
self.resource.log.debug(
|
||||||
'%s.transition(%s) -> %s instance.state=%s',
|
'%s.transition(%s) -> %s instance.state=%s',
|
||||||
old_state,
|
old_state,
|
||||||
self.action,
|
self.action,
|
||||||
@ -488,7 +488,7 @@ class Automaton(object):
|
|||||||
"Called when the worker put a message in the state machine queue"
|
"Called when the worker put a message in the state machine queue"
|
||||||
if self.deleted:
|
if self.deleted:
|
||||||
# Ignore any more incoming messages
|
# Ignore any more incoming messages
|
||||||
self.driver.log.debug(
|
self.resource.log.debug(
|
||||||
'deleted state machine, ignoring incoming message %s',
|
'deleted state machine, ignoring incoming message %s',
|
||||||
message)
|
message)
|
||||||
return False
|
return False
|
||||||
@ -501,7 +501,7 @@ class Automaton(object):
|
|||||||
# do any work.
|
# do any work.
|
||||||
if message.crud == POLL and \
|
if message.crud == POLL and \
|
||||||
self.instance.state == states.ERROR:
|
self.instance.state == states.ERROR:
|
||||||
self.driver.log.info(_LI(
|
self.resource.log.info(_LI(
|
||||||
'Resource status is ERROR, ignoring POLL message: %s'),
|
'Resource status is ERROR, ignoring POLL message: %s'),
|
||||||
message,
|
message,
|
||||||
)
|
)
|
||||||
@ -509,20 +509,20 @@ class Automaton(object):
|
|||||||
|
|
||||||
if message.crud == REBUILD:
|
if message.crud == REBUILD:
|
||||||
if message.body.get('image_uuid'):
|
if message.body.get('image_uuid'):
|
||||||
self.driver.log.info(_LI(
|
self.resource.log.info(_LI(
|
||||||
'Resource is being REBUILT with custom image %s'),
|
'Resource is being REBUILT with custom image %s'),
|
||||||
message.body['image_uuid']
|
message.body['image_uuid']
|
||||||
)
|
)
|
||||||
self.image_uuid = message.body['image_uuid']
|
self.image_uuid = message.body['image_uuid']
|
||||||
else:
|
else:
|
||||||
self.image_uuid = self.driver.image_uuid
|
self.image_uuid = self.resource.image_uuid
|
||||||
|
|
||||||
self._queue.append(message.crud)
|
self._queue.append(message.crud)
|
||||||
queue_len = len(self._queue)
|
queue_len = len(self._queue)
|
||||||
if queue_len > self._queue_warning_threshold:
|
if queue_len > self._queue_warning_threshold:
|
||||||
logger = self.driver.log.warning
|
logger = self.resource.log.warning
|
||||||
else:
|
else:
|
||||||
logger = self.driver.log.debug
|
logger = self.resource.log.debug
|
||||||
logger(_LW('incoming message brings queue length to %s'), queue_len)
|
logger(_LW('incoming message brings queue length to %s'), queue_len)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -547,6 +547,6 @@ class Automaton(object):
|
|||||||
This is used after a ring rebalance if this state machine no longer
|
This is used after a ring rebalance if this state machine no longer
|
||||||
maps to the local Rug process.
|
maps to the local Rug process.
|
||||||
"""
|
"""
|
||||||
self.driver.log.debug(
|
self.resource.log.info(
|
||||||
'Dropping %s pending actions from queue', len(self._queue))
|
'Dropping %s pending actions from queue', len(self._queue))
|
||||||
self._queue.clear()
|
self._queue.clear()
|
||||||
|
@ -196,11 +196,11 @@ class TenantResourceManager(object):
|
|||||||
'a driver.'))
|
'a driver.'))
|
||||||
return []
|
return []
|
||||||
|
|
||||||
driver_obj = \
|
resource_obj = \
|
||||||
drivers.get(message.resource.driver)(worker_context,
|
drivers.get(message.resource.driver)(worker_context,
|
||||||
message.resource.id)
|
message.resource.id)
|
||||||
|
|
||||||
if not driver_obj:
|
if not resource_obj:
|
||||||
# this means the driver didn't load for some reason..
|
# this means the driver didn't load for some reason..
|
||||||
# this might not be needed at all.
|
# this might not be needed at all.
|
||||||
LOG.debug('for some reason loading the driver failed')
|
LOG.debug('for some reason loading the driver failed')
|
||||||
@ -210,8 +210,7 @@ class TenantResourceManager(object):
|
|||||||
self._delete_resource(message.resource)
|
self._delete_resource(message.resource)
|
||||||
|
|
||||||
new_state_machine = state.Automaton(
|
new_state_machine = state.Automaton(
|
||||||
driver=driver_obj,
|
resource=resource_obj,
|
||||||
resource_id=message.resource.id,
|
|
||||||
tenant_id=self.tenant_id,
|
tenant_id=self.tenant_id,
|
||||||
delete_callback=deleter,
|
delete_callback=deleter,
|
||||||
bandwidth_callback=self._report_bandwidth,
|
bandwidth_callback=self._report_bandwidth,
|
||||||
@ -231,7 +230,7 @@ class TenantResourceManager(object):
|
|||||||
machine
|
machine
|
||||||
for machine in state_machines
|
for machine in state_machines
|
||||||
if (not machine.deleted and
|
if (not machine.deleted and
|
||||||
not self.state_machines.has_been_deleted(machine.resource_id))
|
not self.state_machines.has_been_deleted(machine.resource.id))
|
||||||
]
|
]
|
||||||
|
|
||||||
def get_state_machine_by_resource_id(self, resource_id):
|
def get_state_machine_by_resource_id(self, resource_id):
|
||||||
|
@ -45,8 +45,7 @@ class TestDebug(base.RugTestBase):
|
|||||||
|
|
||||||
assert set_trace.called
|
assert set_trace.called
|
||||||
automaton.assert_called_once_with(
|
automaton.assert_called_once_with(
|
||||||
driver=drivers_get.return_value.return_value,
|
resource=drivers_get.return_value.return_value,
|
||||||
resource_id='X',
|
|
||||||
tenant_id=mock_router.tenant_id,
|
tenant_id=mock_router.tenant_id,
|
||||||
delete_callback=debug.delete_callback,
|
delete_callback=debug.delete_callback,
|
||||||
bandwidth_callback=debug.bandwidth_callback,
|
bandwidth_callback=debug.bandwidth_callback,
|
||||||
|
@ -110,7 +110,6 @@ class TestInstanceManager(base.RugTestBase):
|
|||||||
self.mock_update_state = self.update_state_p.start()
|
self.mock_update_state = self.update_state_p.start()
|
||||||
self.instance_mgr = instance_manager.InstanceManager(
|
self.instance_mgr = instance_manager.InstanceManager(
|
||||||
self.fake_driver,
|
self.fake_driver,
|
||||||
'fake_resource_id',
|
|
||||||
self.ctx
|
self.ctx
|
||||||
)
|
)
|
||||||
self.instance_mgr.instance_info = self.INSTANCE_INFO
|
self.instance_mgr.instance_info = self.INSTANCE_INFO
|
||||||
@ -387,7 +386,8 @@ class TestInstanceManager(base.RugTestBase):
|
|||||||
image_uuid=self.fake_driver.image_uuid,
|
image_uuid=self.fake_driver.image_uuid,
|
||||||
flavor=self.fake_driver.flavor,
|
flavor=self.fake_driver.flavor,
|
||||||
make_ports_callback='fake_ports_callback')
|
make_ports_callback='fake_ports_callback')
|
||||||
self.instance_mgr.driver.delete_ports.assert_called_once_with(self.ctx)
|
self.instance_mgr.resource.delete_ports.assert_called_once_with(
|
||||||
|
self.ctx)
|
||||||
|
|
||||||
@mock.patch('time.sleep')
|
@mock.patch('time.sleep')
|
||||||
def test_stop_success(self, sleep):
|
def test_stop_success(self, sleep):
|
||||||
@ -397,7 +397,8 @@ class TestInstanceManager(base.RugTestBase):
|
|||||||
self.ctx.nova_client.destroy_instance.assert_called_once_with(
|
self.ctx.nova_client.destroy_instance.assert_called_once_with(
|
||||||
self.INSTANCE_INFO
|
self.INSTANCE_INFO
|
||||||
)
|
)
|
||||||
self.instance_mgr.driver.delete_ports.assert_called_once_with(self.ctx)
|
self.instance_mgr.resource.delete_ports.assert_called_once_with(
|
||||||
|
self.ctx)
|
||||||
self.assertEqual(self.instance_mgr.state, states.DOWN)
|
self.assertEqual(self.instance_mgr.state, states.DOWN)
|
||||||
|
|
||||||
@mock.patch('time.time')
|
@mock.patch('time.time')
|
||||||
|
@ -535,8 +535,7 @@ class TestAutomaton(unittest.TestCase):
|
|||||||
self.bandwidth_callback = mock.Mock()
|
self.bandwidth_callback = mock.Mock()
|
||||||
|
|
||||||
self.sm = state.Automaton(
|
self.sm = state.Automaton(
|
||||||
driver=self.fake_driver,
|
resource=self.fake_driver,
|
||||||
resource_id=self.fake_driver.id,
|
|
||||||
tenant_id='tenant-id',
|
tenant_id='tenant-id',
|
||||||
delete_callback=self.delete_callback,
|
delete_callback=self.delete_callback,
|
||||||
bandwidth_callback=self.bandwidth_callback,
|
bandwidth_callback=self.bandwidth_callback,
|
||||||
@ -548,7 +547,7 @@ class TestAutomaton(unittest.TestCase):
|
|||||||
def test_send_message(self):
|
def test_send_message(self):
|
||||||
message = mock.Mock()
|
message = mock.Mock()
|
||||||
message.crud = 'update'
|
message.crud = 'update'
|
||||||
with mock.patch.object(self.sm.driver, 'log') as logger:
|
with mock.patch.object(self.sm.resource, 'log') as logger:
|
||||||
self.sm.send_message(message)
|
self.sm.send_message(message)
|
||||||
self.assertEqual(len(self.sm._queue), 1)
|
self.assertEqual(len(self.sm._queue), 1)
|
||||||
logger.debug.assert_called_with(
|
logger.debug.assert_called_with(
|
||||||
@ -561,7 +560,7 @@ class TestAutomaton(unittest.TestCase):
|
|||||||
message.crud = 'update'
|
message.crud = 'update'
|
||||||
for i in range(3):
|
for i in range(3):
|
||||||
self.sm.send_message(message)
|
self.sm.send_message(message)
|
||||||
with mock.patch.object(self.sm.driver, 'log') as logger:
|
with mock.patch.object(self.sm.resource, 'log') as logger:
|
||||||
self.sm.send_message(message)
|
self.sm.send_message(message)
|
||||||
logger.warning.assert_called_with(
|
logger.warning.assert_called_with(
|
||||||
'incoming message brings queue length to %s',
|
'incoming message brings queue length to %s',
|
||||||
@ -587,7 +586,7 @@ class TestAutomaton(unittest.TestCase):
|
|||||||
|
|
||||||
# Non-POLL events should *not* be ignored for routers in ERROR state
|
# Non-POLL events should *not* be ignored for routers in ERROR state
|
||||||
message.crud = 'create'
|
message.crud = 'create'
|
||||||
with mock.patch.object(self.sm.driver, 'log') as logger:
|
with mock.patch.object(self.sm.resource, 'log') as logger:
|
||||||
self.sm.send_message(message)
|
self.sm.send_message(message)
|
||||||
self.assertEqual(len(self.sm._queue), 1)
|
self.assertEqual(len(self.sm._queue), 1)
|
||||||
logger.debug.assert_called_with(
|
logger.debug.assert_called_with(
|
||||||
@ -646,7 +645,7 @@ class TestAutomaton(unittest.TestCase):
|
|||||||
self.sm.action = 'fake'
|
self.sm.action = 'fake'
|
||||||
self.sm.state = fake_state
|
self.sm.state = fake_state
|
||||||
|
|
||||||
with mock.patch.object(self.sm.driver, 'log') as log:
|
with mock.patch.object(self.sm.resource, 'log') as log:
|
||||||
self.sm.update(self.ctx)
|
self.sm.update(self.ctx)
|
||||||
|
|
||||||
log.exception.assert_called_once_with(mock.ANY, fake_state, 'fake')
|
log.exception.assert_called_once_with(mock.ANY, fake_state, 'fake')
|
||||||
|
@ -83,9 +83,8 @@ class TestTenantResourceManager(unittest.TestCase):
|
|||||||
rid = str(uuid.uuid4())
|
rid = str(uuid.uuid4())
|
||||||
driver = fakes.fake_driver(rid)
|
driver = fakes.fake_driver(rid)
|
||||||
sm = state.Automaton(
|
sm = state.Automaton(
|
||||||
driver=driver,
|
resource=driver,
|
||||||
worker_context=self.ctx,
|
worker_context=self.ctx,
|
||||||
resource_id=driver.id,
|
|
||||||
tenant_id=self.tenant_id,
|
tenant_id=self.tenant_id,
|
||||||
delete_callback=None,
|
delete_callback=None,
|
||||||
bandwidth_callback=None,
|
bandwidth_callback=None,
|
||||||
@ -111,9 +110,8 @@ class TestTenantResourceManager(unittest.TestCase):
|
|||||||
rid = str(uuid.uuid4())
|
rid = str(uuid.uuid4())
|
||||||
driver = fakes.fake_driver(rid)
|
driver = fakes.fake_driver(rid)
|
||||||
sm = state.Automaton(
|
sm = state.Automaton(
|
||||||
driver=driver,
|
resource=driver,
|
||||||
worker_context=self.ctx,
|
worker_context=self.ctx,
|
||||||
resource_id=i,
|
|
||||||
tenant_id=self.tenant_id,
|
tenant_id=self.tenant_id,
|
||||||
delete_callback=None,
|
delete_callback=None,
|
||||||
bandwidth_callback=None,
|
bandwidth_callback=None,
|
||||||
@ -124,15 +122,16 @@ class TestTenantResourceManager(unittest.TestCase):
|
|||||||
# Replace the default mock with one that has 'state' set.
|
# Replace the default mock with one that has 'state' set.
|
||||||
if i == 2:
|
if i == 2:
|
||||||
status = states.ERROR
|
status = states.ERROR
|
||||||
|
err_id = sm.resource_id
|
||||||
else:
|
else:
|
||||||
status = states.UP
|
status = states.UP
|
||||||
|
|
||||||
sm.instance = mock.Mock(state=status)
|
sm.instance = mock.Mock(state=status)
|
||||||
self.trm.state_machines.state_machines[str(i)] = sm
|
self.trm.state_machines.state_machines[sm.resource_id] = sm
|
||||||
|
|
||||||
r = event.Resource(
|
r = event.Resource(
|
||||||
tenant_id=self.tenant_id,
|
tenant_id=self.tenant_id,
|
||||||
id='2',
|
id=err_id,
|
||||||
driver=router.Router.RESOURCE_NAME,
|
driver=router.Router.RESOURCE_NAME,
|
||||||
)
|
)
|
||||||
msg = event.Event(
|
msg = event.Event(
|
||||||
@ -142,8 +141,8 @@ class TestTenantResourceManager(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
sms = self.trm.get_state_machines(msg, self.ctx)
|
sms = self.trm.get_state_machines(msg, self.ctx)
|
||||||
self.assertEqual(1, len(sms))
|
self.assertEqual(1, len(sms))
|
||||||
self.assertEqual(2, sms[0].resource_id)
|
self.assertEqual(err_id, sms[0].resource_id)
|
||||||
self.assertIs(self.trm.state_machines.state_machines['2'], sms[0])
|
self.assertIs(self.trm.state_machines.state_machines[err_id], sms[0])
|
||||||
|
|
||||||
def test_existing_resource(self):
|
def test_existing_resource(self):
|
||||||
r = event.Resource(
|
r = event.Resource(
|
||||||
|
@ -657,7 +657,7 @@ class Worker(object):
|
|||||||
return
|
return
|
||||||
new_res = event.Resource(
|
new_res = event.Resource(
|
||||||
id=resource_id,
|
id=resource_id,
|
||||||
driver=sm.driver.RESOURCE_NAME,
|
driver=sm.resource.RESOURCE_NAME,
|
||||||
tenant_id=sm.tenant_id)
|
tenant_id=sm.tenant_id)
|
||||||
new_msg = event.Event(
|
new_msg = event.Event(
|
||||||
resource=new_res,
|
resource=new_res,
|
||||||
|
Loading…
Reference in New Issue
Block a user