Save node resources to DB

This tries to save available nodes resources to DB, but we still
use node cache for scheduling, will add following up patches to
finish to whole refactor.

Note: This also change to use node_type from ironic node instead
of instance_type.

Change-Id: Ic43180a8c85e36ef8e04edb2b99044b1d715cbd1
This commit is contained in:
Zhenguo Niu 2017-03-15 16:18:46 +08:00
parent f9d5d2ec76
commit 2d26f7d39e
11 changed files with 166 additions and 40 deletions

View File

@ -184,7 +184,7 @@ function create_instance_type {
function update_ironic_node_type {
ironic_nodes=$(openstack baremetal node list -c UUID -f value)
for node in ${ironic_nodes};do
openstack baremetal node set --property instance_type=${MOGAN_DEFAULT_INSTANCE_TYPE} ${node}
openstack baremetal node set --property node_type=${MOGAN_DEFAULT_INSTANCE_TYPE} ${node}
done
}

View File

@ -30,10 +30,10 @@ opts = [
help=_('Maximum number of worker threads that can be started '
'simultaneously by a periodic task. Should be less '
'than RPC thread pool size.')),
cfg.IntOpt('sync_node_resource_interval',
cfg.IntOpt('update_resources_interval',
default=60,
help=_('Interval between syncing the node resources from '
'ironic, in seconds.')),
help=_('Interval between syncing the resources from underlying '
'hypervisor, in seconds.')),
cfg.StrOpt('scheduler_driver',
default='mogan.engine.scheduler.filter_scheduler.'
'FilterScheduler',

View File

@ -102,10 +102,10 @@ def upgrade():
sa.Column('cpus', sa.Integer(), nullable=False),
sa.Column('memory_mb', sa.Integer(), nullable=False),
sa.Column('hypervisor_type', sa.String(length=255), nullable=False),
sa.Column('node_type', sa.String(length=255), nullable=False),
sa.Column('availability_zone', sa.String(length=255), nullable=True),
sa.Column('node_uuid', sa.String(length=36), nullable=False),
sa.Column('capabilities', sa.Text(), nullable=True),
sa.Column('extra', sa.Text(), nullable=True),
sa.Column('extra_specs', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('node_uuid', name='uniq_compute_nodes0node_uuid'),
mysql_ENGINE='InnoDB',

View File

@ -105,10 +105,10 @@ class ComputeNode(Base):
cpus = Column(Integer, nullable=False)
memory_mb = Column(Integer, nullable=False)
hypervisor_type = Column(String(255), nullable=False)
node_type = Column(String(255), nullable=False)
availability_zone = Column(String(255), nullable=True)
node_uuid = Column(String(36), nullable=False)
capabilities = Column(db_types.JsonEncodedDict)
extra = Column(db_types.JsonEncodedDict)
extra_specs = Column(db_types.JsonEncodedDict)
class InstanceNic(Base):

View File

@ -102,6 +102,63 @@ class IronicDriver(base_driver.BaseEngineDriver):
except ironic_exc.NotFound:
raise exception.InstanceNotFound(instance_id=instance.uuid)
def _parse_node_properties(self, node):
"""Helper method to parse the node's properties."""
properties = {}
for prop in ('cpus', 'memory_mb', 'local_gb'):
try:
properties[prop] = int(node.properties.get(prop, 0))
except (TypeError, ValueError):
LOG.warning(_LW('Node %(uuid)s has a malformed "%(prop)s". '
'It should be an integer.'),
{'uuid': node.uuid, 'prop': prop})
properties[prop] = 0
properties['capabilities'] = node.properties.get('capabilities')
properties['availability_zone'] = \
node.properties.get('availability_zone')
properties['node_type'] = node.properties.get('node_type')
return properties
def _node_resource(self, node):
"""Helper method to create resource dict from node stats."""
properties = self._parse_node_properties(node)
cpus = properties['cpus']
memory_mb = properties['memory_mb']
availability_zone = properties['availability_zone']
node_type = properties['node_type']
nodes_extra_specs = {}
# NOTE(gilliard): To assist with more precise scheduling, if the
# node.properties contains a key 'capabilities', we expect the value
# to be of the form "k1:v1,k2:v2,etc.." which we add directly as
# key/value pairs into the node_extra_specs to be used by the
# ComputeCapabilitiesFilter
capabilities = properties['capabilities']
if capabilities:
for capability in str(capabilities).split(','):
parts = capability.split(':')
if len(parts) == 2 and parts[0] and parts[1]:
nodes_extra_specs[parts[0].strip()] = parts[1]
else:
LOG.warning(_LW("Ignoring malformed capability '%s'. "
"Format should be 'key:val'."), capability)
dic = {
'cpus': cpus,
'memory_mb': memory_mb,
'hypervisor_type': self._get_hypervisor_type(),
'availability_zone': str(availability_zone),
'node_type': str(node_type),
'extra_specs': nodes_extra_specs,
'node_uuid': str(node.uuid),
'ports': node.ports,
}
return dic
def _add_instance_info_to_node(self, node, instance):
patch = list()
@ -182,6 +239,10 @@ class IronicDriver(base_driver.BaseEngineDriver):
_log_ironic_polling(message, node, instance)
def _get_hypervisor_type(self):
"""Get hypervisor type."""
return 'ironic'
def get_ports_from_node(self, node_uuid, detail=True):
"""List the MAC addresses and the port types from a node."""
ports = self.ironicclient.call("node.list_ports",
@ -388,7 +449,7 @@ class IronicDriver(base_driver.BaseEngineDriver):
# Add ports to the associated node
node.ports = [port for port in port_list
if node.uuid == port.node_uuid]
node_resources[node.uuid] = node
node_resources[node.uuid] = self._node_resource(node)
return node_resources
def get_maintenance_node_list(self):

View File

@ -47,17 +47,70 @@ class EngineManager(base_manager.BaseEngineManager):
target = messaging.Target(version=RPC_API_VERSION)
_lock = threading.Lock()
def _refresh_cache(self):
nodes = self.driver.get_available_resources()
def _refresh_cache(self, nodes):
with self._lock:
self.node_cache = nodes
def _get_compute_node(self, context, node_uuid):
"""Gets compute node by the uuid."""
try:
return objects.ComputeNode.get(context, node_uuid)
except exception.NotFound:
LOG.warning(_LW("No compute node record for %(node)s"),
{'node': node_uuid})
def _init_compute_node(self, context, node):
"""Initialize the compute node if it does not already exist.
:param context: security context
:param node: initial values
"""
# now try to get the compute node record from the
# database. If we get one we use resources to initialize
cn = self._get_compute_node(context, node['node_uuid'])
if cn:
cn.update_from_driver(node)
cn.save()
return
# there was no compute node in the database so we need to create
# a new compute node. This needs to be initialized with node values.
cn = objects.ComputeNode(context)
cn.update_from_driver(node)
cn.create()
@periodic_task.periodic_task(
spacing=CONF.engine.sync_node_resource_interval,
spacing=CONF.engine.update_resources_interval,
run_immediately=True)
def _sync_node_resources(self, context):
self._refresh_cache()
def _update_available_resources(self, context):
"""See driver.get_available_resource()
Periodic process that keeps that the engine's understanding of
resource availability in sync with the underlying hypervisor.
:param context: security context
"""
nodes = self.driver.get_available_resources()
# TODO(zhenguo): Keep using cache until we finished the refactor to
# save resources to db.
self._refresh_cache(nodes)
compute_nodes_in_db = objects.ComputeNode.list(context)
# Record compute nodes to db
for uuid, node in nodes.items():
# initialize the compute node object, creating it
# if it does not already exist.
self._init_compute_node(context, node)
# Delete orphan compute node not reported by driver but still in db
for cn in compute_nodes_in_db:
if cn.node_uuid not in nodes:
LOG.info(_LI("Deleting orphan compute node %(id)s)"),
{'id': cn.node_uuid})
cn.destroy()
@periodic_task.periodic_task(spacing=CONF.engine.sync_power_state_interval,
run_immediately=True)

View File

@ -33,12 +33,12 @@ class NodeState(object):
"""Mutable and immutable information tracked for a Ironic node."""
def __init__(self, node):
self.node = node.uuid
self.capabilities = node.properties.get('capabilities')
self.availability_zone = node.properties.get('availability_zone') \
self.node = node['node_uuid']
self.capabilities = node['extra_specs']
self.availability_zone = node['availability_zone'] \
or CONF.engine.default_availability_zone
self.instance_type = node.properties.get('instance_type')
self.ports = node.ports
self.instance_type = node['node_type']
self.ports = node['ports']
class NodeManager(object):

View File

@ -33,10 +33,10 @@ class ComputeNode(base.MoganObject, object_base.VersionedObjectDictCompat):
'cpus': object_fields.IntegerField(),
'memory_mb': object_fields.IntegerField(),
'hypervisor_type': object_fields.StringField(),
'node_type': object_fields.StringField(),
'availability_zone': object_fields.StringField(nullable=True),
'node_uuid': object_fields.UUIDField(read_only=True),
'capabilities': object_fields.FlexibleDictField(nullable=True),
'extra': object_fields.FlexibleDictField(nullable=True),
'extra_specs': object_fields.FlexibleDictField(nullable=True),
}
@classmethod
@ -73,3 +73,10 @@ class ComputeNode(base.MoganObject, object_base.VersionedObjectDictCompat):
"""Refresh the object by re-fetching from the DB."""
current = self.__class__.get(context, self.node_uuid)
self.obj_refresh(current)
def update_from_driver(self, node):
keys = ["cpus", "memory_mb", "hypervisor_type", "node_type",
"availability_zone", "node_uuid", "extra_specs"]
for key in keys:
if key in node:
setattr(self, key, node[key])

View File

@ -93,11 +93,11 @@ def get_test_compute_node(**kw):
'cpus': kw.get('cpus', 16),
'memory_mb': kw.get('memory_mb', 10240),
'hypervisor_type': kw.get('hypervisor_type', 'ironic'),
'node_type': kw.get('node_type', 'gold'),
'availability_zone': kw.get('availability_zone', 'test_az'),
'node_uuid': kw.get('node_uuid',
'f978ef48-d4af-4dad-beec-e6174309bc71'),
'capabilities': kw.get('capabilities', {}),
'extra': kw.get('extra', {}),
'extra_specs': kw.get('extra_specs', {}),
'updated_at': kw.get('updated_at'),
'created_at': kw.get('created_at'),
}

View File

@ -36,26 +36,31 @@ class FakeFilterScheduler(filter_scheduler.FilterScheduler):
class FakeNode(base.MoganObject, object_base.VersionedObjectDictCompat):
fields = {
'id': object_fields.IntegerField(),
'uuid': object_fields.UUIDField(nullable=True),
'properties': object_fields.FlexibleDictField(nullable=True),
'node_uuid': object_fields.UUIDField(),
'node_type': object_fields.StringField(nullable=True),
'availability_zone': object_fields.StringField(nullable=True),
'extra_specs': object_fields.FlexibleDictField(nullable=True),
'ports': fields.ListOfDictOfNullableStringsField(nullable=True),
}
fakenode1 = FakeNode(id=1, uuid='1a617131-cdbc-45dc-afff-f21f17ae054e',
properties={'capabilities': '',
'availability_zone': 'az1',
'instance_type': 'type1'},
fakenode1 = FakeNode(id=1,
node_uuid='1a617131-cdbc-45dc-afff-f21f17ae054e',
extra_specs={},
availability_zone='az1',
node_type='type1',
ports=[])
fakenode2 = FakeNode(id=2, uuid='2a617131-cdbc-45dc-afff-f21f17ae054e',
properties={'capabilities': '',
'availability_zone': 'az2',
'instance_type': 'type2'},
fakenode2 = FakeNode(id=2,
node_uuid='2a617131-cdbc-45dc-afff-f21f17ae054e',
extra_specs={},
availability_zone='az1',
node_type='type1',
ports=[])
fakenode3 = FakeNode(id=3, uuid='3a617131-cdbc-45dc-afff-f21f17ae054e',
properties={'capabilities': '',
'availability_zone': 'az3',
'instance_type': 'type3'},
fakenode3 = FakeNode(id=3,
node_uuid='3a617131-cdbc-45dc-afff-f21f17ae054e',
extra_specs={},
availability_zone='az1',
node_type='type1',
ports=[])

View File

@ -383,12 +383,12 @@ class _TestObject(object):
# The fingerprint values should only be changed if there is a version bump.
expected_object_fingerprints = {
'Instance': '1.0-18d0ffc894a0f6b52df73a29919c035b',
'ComputeNode': '1.0-de876259e1ad6d214e25aeeb478079ad',
'ComputeNode': '1.0-9dd029c83e37adc7e01ff759e76cdda1',
'InstanceFault': '1.0-6b5b01b2cc7b6b547837acb168ec6eb9',
'InstanceFaultList': '1.0-43e8aad0258652921f929934e9e048fd',
'InstanceType': '1.0-589b096651fcdb30898ff50f748dd948',
'MyObj': '1.1-aad62eedc5a5cc8bcaf2982c285e753f',
'FakeNode': '1.0-07813a70fee67557d8a71ad96f31cee7',
'FakeNode': '1.0-f367d3a6d123084a60ef73696cd2964b',
'InstanceNic': '1.0-78744332fe105f9c1796dc5295713d9f',
'InstanceNics': '1.0-33a2e1bb91ad4082f9f63429b77c1244',
'Quota': '1.0-c8caa082f4d726cb63fdc5943f7cd186',