Use server group in scheduler
This change will allow the scheduler to select nodes with specified server group in schedule_hints. Partially Implements: bp support-schedule-hints Change-Id: Idecd5e5f470eee12de0088b2332874b43f9dae52
This commit is contained in:
parent
7228ff5c15
commit
639aeceafb
@ -277,3 +277,8 @@ class Connection(object):
|
||||
def server_group_get_all(self, context, project_id=None):
|
||||
"""Get server groups."""
|
||||
return IMPL.server_group_get_all(context, project_id)
|
||||
|
||||
@abc.abstractmethod
|
||||
def server_group_members_add(self, context, group_uuid, members):
|
||||
"""Add a list of members to a server group"""
|
||||
return IMPL.server_group_members_add(context, group_uuid, members)
|
||||
|
@ -273,6 +273,10 @@ class Connection(api.Connection):
|
||||
context,
|
||||
models.ServerFault).filter_by(server_uuid=server_id)
|
||||
faults_query.delete()
|
||||
group_member_query = model_query(
|
||||
context, models.ServerGroupMember).filter_by(
|
||||
server_uuid=server_id)
|
||||
group_member_query.delete()
|
||||
count = query.delete()
|
||||
if count != 1:
|
||||
raise exception.ServerNotFound(server=server_id)
|
||||
@ -1057,6 +1061,13 @@ class Connection(api.Connection):
|
||||
query = query.filter_by(project_id=project_id)
|
||||
return query.all()
|
||||
|
||||
def server_group_members_add(self, context, group_uuid, members):
|
||||
group = model_query(context, models.ServerGroup).filter_by(
|
||||
uuid=group_uuid).first()
|
||||
if not group:
|
||||
raise exception.ServerGroupNotFound(group_uuid=group_uuid)
|
||||
self._server_group_members_add(context, group.id, members)
|
||||
|
||||
|
||||
def _get_id_from_flavor_query(context, type_id):
|
||||
return model_query(context, models.Flavors). \
|
||||
|
@ -213,7 +213,7 @@ class API(object):
|
||||
return [_decode(f) for f in injected_files]
|
||||
|
||||
def _provision_servers(self, context, base_options,
|
||||
min_count, max_count):
|
||||
min_count, max_count, server_group):
|
||||
# Return num_servers according quota
|
||||
num_servers = self._check_num_servers_quota(
|
||||
context, min_count, max_count)
|
||||
@ -235,6 +235,9 @@ class API(object):
|
||||
|
||||
server.create()
|
||||
servers.append(server)
|
||||
if server_group:
|
||||
objects.ServerGroup.add_members(
|
||||
context, server_group.uuid, [server.uuid])
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
try:
|
||||
@ -262,6 +265,15 @@ class API(object):
|
||||
return self.network_api.validate_networks(context, requested_networks,
|
||||
max_count)
|
||||
|
||||
@staticmethod
|
||||
def _get_requested_server_group(context, scheduler_hints):
|
||||
if not scheduler_hints:
|
||||
return
|
||||
group_hint = scheduler_hints.get('group')
|
||||
if not group_hint:
|
||||
return
|
||||
return objects.ServerGroup.get_by_uuid(context, group_hint)
|
||||
|
||||
def _create_server(self, context, flavor, image_uuid,
|
||||
name, description, availability_zone, metadata,
|
||||
requested_networks, user_data, injected_files,
|
||||
@ -296,9 +308,10 @@ class API(object):
|
||||
# TODO(zhenguo): Check injected file quota
|
||||
# b64 decode the files to inject:
|
||||
decoded_files = self._decode_files(injected_files)
|
||||
|
||||
server_group = self._get_requested_server_group(context,
|
||||
scheduler_hints)
|
||||
servers = self._provision_servers(context, base_options,
|
||||
min_count, max_count)
|
||||
min_count, max_count, server_group)
|
||||
request_spec = {
|
||||
'server_properties': {
|
||||
'flavor_uuid': servers[0].flavor_uuid,
|
||||
|
@ -74,6 +74,10 @@ class ServerGroup(base.MoganObject, object_base.VersionedObjectDictCompat):
|
||||
self.dbapi.server_group_delete(self._context, self.uuid)
|
||||
self.obj_reset_changes()
|
||||
|
||||
@classmethod
|
||||
def add_members(cls, context, group_uuid, members):
|
||||
cls.dbapi.server_group_members_add(context, group_uuid, members)
|
||||
|
||||
def save(self, context=None):
|
||||
updates = self.obj_get_changes()
|
||||
self.dbapi.server_group_update(context, self.uuid, updates)
|
||||
|
@ -15,6 +15,8 @@
|
||||
You can customize this scheduler by specifying your own node Filters and
|
||||
Weighing Functions.
|
||||
"""
|
||||
import itertools
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -110,10 +112,127 @@ class FilterScheduler(driver.Scheduler):
|
||||
# if no aggregates match with the key/value,
|
||||
# fail the scheduling.
|
||||
return None
|
||||
filters.append([agg.uuid for agg in aggregates])
|
||||
filters.append(aggregates)
|
||||
|
||||
return filters
|
||||
|
||||
@staticmethod
|
||||
def _get_server_group_obj(context, request_spec):
|
||||
server_group = request_spec.get('scheduler_hints', {}).get('group')
|
||||
if not server_group:
|
||||
return
|
||||
server_group = objects.ServerGroup.get_by_uuid(context, server_group)
|
||||
return server_group
|
||||
|
||||
def _get_nodes_of_aggregates(self, aggregates):
|
||||
agg_uuids = [agg.uuid for agg in aggregates]
|
||||
query_filters = {'member_of': 'in:' + ','.join(agg_uuids)}
|
||||
rps = self.reportclient.get_filtered_resource_providers(query_filters)
|
||||
return [rp['uuid'] for rp in rps]
|
||||
|
||||
def _get_filtered_affzs_nodes(self, context, server_group, filtered_nodes,
|
||||
num_servers):
|
||||
"""Get the filtered affinity zone and nodes.
|
||||
|
||||
If affinity specified in request, this method will return a tuple
|
||||
including filtered affinity zone and filtered nodes. e.g.
|
||||
[(zone1, [node-1, node-2, node-3])].
|
||||
If anti-affinity specified, this will return a list of tuples of
|
||||
affinity zone and nodes list. e.g.
|
||||
[(zone1, node-1]), (zone2, node-2), (zone3, node-3)]
|
||||
|
||||
"""
|
||||
|
||||
def _log_and_raise_error(policy):
|
||||
LOG.error("No enough nodes filtered, request %(num_svr)s "
|
||||
"server(s) with server group %(group)s with %(policy)s "
|
||||
"policy specified.",
|
||||
{"num_svr": num_servers, "group": server_group.name,
|
||||
"policy": policy})
|
||||
msg = (_("No enough nodes filtered, request %(num_svr)s server(s) "
|
||||
"with %(policy)s policy specified.") %
|
||||
{"num_svr": num_servers, "policy": policy})
|
||||
raise exception.NoValidNode(msg)
|
||||
|
||||
if 'affinity' in server_group.policies:
|
||||
selected_affz = None
|
||||
if server_group.members:
|
||||
for member in server_group.members:
|
||||
server = objects.Server.get(context, member)
|
||||
if server.affinity_zone:
|
||||
selected_affz = server.affinity_zone
|
||||
break
|
||||
if selected_affz:
|
||||
aggs = objects.AggregateList.get_by_metadata(
|
||||
context, 'affinity_zone', selected_affz)
|
||||
affz_nodes = self._get_nodes_of_aggregates(aggs)
|
||||
selected_nodes = list(set(filtered_nodes) & set(affz_nodes))
|
||||
if len(selected_nodes) < num_servers:
|
||||
_log_and_raise_error('affinity')
|
||||
return selected_affz, selected_nodes[:num_servers]
|
||||
|
||||
all_aggs = objects.AggregateList.get_all(context)
|
||||
all_aggs = sorted(all_aggs, key=lambda a: a.metadata.get(
|
||||
'affinity_zone'))
|
||||
grouped_aggs = itertools.groupby(
|
||||
all_aggs, lambda a: a.metadata.get('affinity_zone'))
|
||||
|
||||
if 'affinity' in server_group.policies:
|
||||
for affz, aggs in grouped_aggs:
|
||||
affz_nodes = self._get_nodes_of_aggregates(aggs)
|
||||
affz_nodes = list(set(filtered_nodes) & set(affz_nodes))
|
||||
if len(affz_nodes) >= num_servers:
|
||||
return affz, affz_nodes[:num_servers]
|
||||
_log_and_raise_error('affinity')
|
||||
|
||||
elif 'anti-affinity' in server_group.policies:
|
||||
affinity_zones = []
|
||||
for member in server_group.members:
|
||||
server = objects.Server.get(context, member)
|
||||
affinity_zone = server.affinity_zone
|
||||
affinity_zones.append(affinity_zone)
|
||||
selected_affz_nodes = []
|
||||
for affz, aggs in grouped_aggs:
|
||||
if affz in affinity_zones:
|
||||
continue
|
||||
affz_nodes = self._get_nodes_of_aggregates(aggs)
|
||||
affz_nodes = list(set(filtered_nodes) & set(affz_nodes))
|
||||
if affz_nodes:
|
||||
selected_affz_nodes.append((affz, affz_nodes[0]))
|
||||
if len(selected_affz_nodes) >= num_servers:
|
||||
return selected_affz_nodes
|
||||
_log_and_raise_error('anti-affinity')
|
||||
|
||||
def _consume_per_server(self, context, request_spec, node, server_id,
|
||||
affinity_zone=None):
|
||||
server_obj = objects.Server.get(context, server_id)
|
||||
if affinity_zone:
|
||||
server_obj.affinity_zone = affinity_zone
|
||||
server_obj.save(context)
|
||||
alloc_data = self._get_res_cls_filters(request_spec)
|
||||
self.reportclient.put_allocations(
|
||||
node, server_obj.uuid, alloc_data,
|
||||
server_obj.project_id, server_obj.user_id)
|
||||
|
||||
def _consume_nodes_with_server_group(self, context, request_spec,
|
||||
filtered_affzs_nodes, server_group):
|
||||
if 'affinity' in server_group.policies:
|
||||
affinity_zone, dest_nodes = filtered_affzs_nodes
|
||||
for server_id, node in zip(request_spec['server_ids'],
|
||||
dest_nodes):
|
||||
self._consume_per_server(
|
||||
context, request_spec, node, server_id, affinity_zone)
|
||||
return dest_nodes
|
||||
elif 'anti-affinity' in server_group.policies:
|
||||
dest_nodes = []
|
||||
for server_id, affz_node in zip(request_spec['server_ids'],
|
||||
filtered_affzs_nodes):
|
||||
affinity_zone, node = affz_node
|
||||
dest_nodes.append(node)
|
||||
self._consume_per_server(
|
||||
context, request_spec, node, server_id, affinity_zone)
|
||||
return dest_nodes
|
||||
|
||||
def _get_filtered_nodes(self, context, request_spec):
|
||||
resources_filter = self._get_res_cls_filters(request_spec)
|
||||
aggs_filters = self._get_res_aggregates_filters(context, request_spec)
|
||||
@ -125,14 +244,10 @@ class FilterScheduler(driver.Scheduler):
|
||||
if aggs_filters:
|
||||
filtered_nodes = set()
|
||||
for agg_filter in aggs_filters:
|
||||
query_filters = {'resources': resources_filter,
|
||||
'member_of': 'in:' + ','.join(agg_filter)}
|
||||
filtered_rps = self.reportclient.\
|
||||
get_filtered_resource_providers(query_filters)
|
||||
filtered_rps = set(self._get_nodes_of_aggregates(agg_filter))
|
||||
if not filtered_rps:
|
||||
# if got empty, just break here.
|
||||
return []
|
||||
filtered_rps = set([rp['uuid'] for rp in filtered_rps])
|
||||
if not filtered_nodes:
|
||||
# initialize the filtered_nodes
|
||||
filtered_nodes = filtered_rps
|
||||
@ -160,29 +275,36 @@ class FilterScheduler(driver.Scheduler):
|
||||
def _schedule(self, context, request_spec, filter_properties):
|
||||
self._populate_retry(filter_properties, request_spec)
|
||||
filtered_nodes = self._get_filtered_nodes(context, request_spec)
|
||||
|
||||
if not filtered_nodes:
|
||||
LOG.warning('No filtered nodes found for server '
|
||||
'with properties: %s',
|
||||
request_spec.get('flavor'))
|
||||
raise exception.NoValidNode(_("No filtered nodes available"))
|
||||
raise exception.NoValidNode(
|
||||
_("No filtered nodes available"))
|
||||
dest_nodes = self._choose_nodes(filtered_nodes, request_spec)
|
||||
for server_id, node in zip(request_spec['server_ids'], dest_nodes):
|
||||
server_obj = objects.Server.get(
|
||||
context, server_id)
|
||||
alloc_data = self._get_res_cls_filters(request_spec)
|
||||
self.reportclient.put_allocations(
|
||||
node, server_obj.uuid, alloc_data,
|
||||
server_obj.project_id, server_obj.user_id)
|
||||
return dest_nodes
|
||||
server_group = self._get_server_group_obj(context, request_spec)
|
||||
if not server_group:
|
||||
for server_id, node in zip(request_spec['server_ids'],
|
||||
dest_nodes):
|
||||
self._consume_per_server(context, request_spec, node,
|
||||
server_id)
|
||||
return dest_nodes
|
||||
else:
|
||||
filtered_affzs_nodes = self._get_filtered_affzs_nodes(
|
||||
context, server_group, filtered_nodes,
|
||||
request_spec['num_servers'])
|
||||
return self._consume_nodes_with_server_group(
|
||||
context, request_spec, filtered_affzs_nodes, server_group)
|
||||
|
||||
return _schedule(self, context, request_spec, filter_properties)
|
||||
|
||||
def _choose_nodes(self, weighed_nodes, request_spec):
|
||||
def _choose_nodes(self, filtered_nodes, request_spec):
|
||||
num_servers = request_spec['num_servers']
|
||||
if num_servers > len(weighed_nodes):
|
||||
if num_servers > len(filtered_nodes):
|
||||
msg = 'Not enough nodes found for servers, request ' \
|
||||
'servers: %s, but only available nodes: %s' \
|
||||
% (str(num_servers), str(len(weighed_nodes)))
|
||||
% (str(num_servers), str(len(filtered_nodes)))
|
||||
raise exception.NoValidNode(_("Choose Node: %s") % msg)
|
||||
|
||||
return weighed_nodes[:num_servers]
|
||||
return filtered_nodes[:num_servers]
|
||||
|
@ -148,6 +148,10 @@ class TestServers(v1_test.APITestV1):
|
||||
mock_create):
|
||||
mocked_uuid.return_value = self.SERVER_UUIDS[0]
|
||||
mock_create.return_value = mock.MagicMock()
|
||||
headers = self.gen_headers(self.context, roles="admin")
|
||||
group_body = {"name": "group1", "policies": ["affinity"]}
|
||||
group = self.post_json('/server_groups', group_body, headers=headers,
|
||||
status=201).json
|
||||
body = {
|
||||
"server": {
|
||||
"name": "test_server_with_hints",
|
||||
@ -159,7 +163,7 @@ class TestServers(v1_test.APITestV1):
|
||||
'port_type': 'Ethernet'}],
|
||||
'metadata': {'fake_key': 'fake_value'}
|
||||
},
|
||||
"scheduler_hints": {"group": 'group1'}
|
||||
"scheduler_hints": {"group": group['uuid']}
|
||||
}
|
||||
headers = self.gen_headers(self.context)
|
||||
response = self.post_json('/servers', body, headers=headers,
|
||||
|
@ -107,7 +107,8 @@ class ComputeAPIUnitTest(base.DbTestCase):
|
||||
min_count = 1
|
||||
max_count = 2
|
||||
self.engine_api._provision_servers(self.context, base_options,
|
||||
min_count, max_count)
|
||||
min_count, max_count,
|
||||
server_group=None)
|
||||
calls = [mock.call() for i in range(max_count)]
|
||||
mock_server_create.assert_has_calls(calls)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user