Add aggregate nodes API

* list nodes for the specified aggregates
* add node to an aggregate
* remove node from an aggregate

Change-Id: I78ce523c7ba26860e02dcde84bcb84a6cc0c60ee
This commit is contained in:
Zhenguo Niu 2017-07-26 13:29:46 +08:00
parent 8a8dc20660
commit a637bfd38a
14 changed files with 293 additions and 11 deletions

View File

@ -180,3 +180,91 @@ Response
--------
No body content is returned on a successful DELETE.
List Aggregate Nodes
====================
.. rest_method:: GET /aggregates/{aggregate_uuid}/nodes
Lists nodes for the specified aggregate.
Normal response codes: 200
Error response codes: unauthorized(401), forbidden(403)
Request
-------
.. rest_parameters:: parameters.yaml
- aggregate_uuid: aggregate_uuid_path
Response
--------
.. rest_parameters:: parameters.yaml
- nodes: aggregate_nodes
**Example List aggregates: JSON response**
.. literalinclude:: samples/aggregates/aggregates-list-nodes-resp.json
:language: javascript
Add Aggregate Node
==================
.. rest_method:: POST /aggregates/{aggregate_uuid}/nodes
Adds a node to an aggregate.
Normal response codes: 204
Error response codes: badRequest(400), unauthorized(401), forbidden(403),
conflict(409)
Request
-------
.. rest_parameters:: parameters.yaml
- aggregate_uuid: aggregate_uuid_path
- node: aggregate_node
**Example Add Aggregate Node: JSON request**
.. literalinclude:: samples/aggregates/aggregate-add-node-req.json
:language: javascript
Response
--------
If successful, this method does not return content in the response body.
Remove Aggregate Node
=====================
.. rest_method:: DELETE /aggregates/{aggregate_uuid}/nodes/{node}
Removes a node to an aggregate.
Normal response codes: 204
Error response codes: badRequest(400), unauthorized(401), forbidden(403),
conflict(409)
Request
-------
.. rest_parameters:: parameters.yaml
- aggregate_uuid: aggregate_uuid_path
- node: aggregate_node_path
Response
--------
If successful, this method does not return content in the response body.

View File

@ -15,6 +15,12 @@ address_path:
in: path
required: true
type: string
aggregate_node_path:
description: |
The name of the node.
in: path
required: true
type: string
aggregate_uuid_path:
description: |
The UUID of the aggregate.
@ -142,6 +148,18 @@ aggregate_name:
in: body
required: true
type: string
aggregate_node:
description: |
The name of the node.
in: body
required: true
type: string
aggregate_nodes:
description: |
An array of node information.
in: body
required: true
type: array
aggregate_uuid:
description: |
The UUID of the node aggregate.

View File

@ -0,0 +1,3 @@
{
"node": "fake_node"
}

View File

@ -0,0 +1,7 @@
{
"nodes": [
"node1",
"node2",
"node3"
]
}

View File

@ -94,9 +94,50 @@ class AggregateCollection(base.APIBase):
return collection
class AggregateNodeController(rest.RestController):
"""REST controller for aggregate nodes."""
@policy.authorize_wsgi("mogan:aggregate_node", "get_all")
@expose.expose(wtypes.text, types.uuid)
def get_all(self, aggregate_uuid):
"""Retrieve a list of nodes of the queried aggregate."""
# check whether the aggregate exists
objects.Aggregate.get(pecan.request.context, aggregate_uuid)
nodes = pecan.request.engine_api.list_aggregate_nodes(
pecan.request.context, aggregate_uuid)
return nodes
@policy.authorize_wsgi("mogan:aggregate_node", "create")
@expose.expose(None, types.uuid, body=types.jsontype,
status_code=http_client.NO_CONTENT)
def post(self, aggregate_uuid, node):
"""Add node to the given aggregate."""
validation.check_schema(node, agg_schema.add_aggregate_node)
# check whether the aggregate exists
objects.Aggregate.get(pecan.request.context, aggregate_uuid)
pecan.request.engine_api.add_aggregate_node(
pecan.request.context, aggregate_uuid, node['node'])
@policy.authorize_wsgi("mogan:aggregate_node", "delete")
@expose.expose(None, types.uuid, wtypes.text,
status_code=http_client.NO_CONTENT)
def delete(self, aggregate_uuid, node):
"""Remove node from the given aggregate."""
# check whether the aggregate exists
objects.Aggregate.get(pecan.request.context, aggregate_uuid)
pecan.request.engine_api.remove_aggregate_node(
pecan.request.context, aggregate_uuid, node)
class AggregateController(rest.RestController):
"""REST controller for Aggregates."""
nodes = AggregateNodeController()
@policy.authorize_wsgi("mogan:aggregate", "get_all")
@expose.expose(AggregateCollection)
def get_all(self):

View File

@ -26,3 +26,12 @@ create_aggregate = {
'required': ['name'],
'additionalProperties': False,
}
add_aggregate_node = {
'type': 'object',
'properties': {
'node': parameter_types.name,
},
'required': ['node'],
'additionalProperties': False,
}

View File

@ -208,8 +208,7 @@ class AggregateMetadataNotFound(NotFound):
class NodeNotFound(NotFound):
_msg_fmt = _("Node associated with server %(server)s "
"could not be found.")
_msg_fmt = _("Node %(node)s could not be found.")
class InvalidActionParameterValue(Invalid):

View File

@ -159,6 +159,15 @@ server_policies = [
policy.RuleDefault('mogan:aggregate:get_one',
'rule:admin_api',
description='Show aggregate details'),
policy.RuleDefault('mogan:aggregate_node:get_all',
'rule:admin_api',
description='Get the aggregate nodes list'),
policy.RuleDefault('mogan:aggregate_node:create',
'rule:admin_api',
description='Add node to an aggregate'),
policy.RuleDefault('mogan:aggregate_node:delete',
'rule:admin_api',
description='Remove node from an aggregate'),
policy.RuleDefault('mogan:node:get_all',
'rule:admin_api',
description='Get the nodes list'),

View File

@ -520,3 +520,20 @@ class API(object):
def list_compute_nodes(self, context):
"""Get compute node list."""
return self.engine_rpcapi.list_compute_nodes(context)
def list_aggregate_nodes(self, context, aggregate_uuid):
"""Get aggregate node list."""
return self.engine_rpcapi.list_aggregate_nodes(context,
aggregate_uuid)
def add_aggregate_node(self, context, aggregate_uuid, node):
"""Add a node to the aggregate."""
return self.engine_rpcapi.add_aggregate_node(context,
aggregate_uuid,
node)
def remove_aggregate_node(self, context, aggregate_uuid, node):
"""Remove a node to the aggregate."""
return self.engine_rpcapi.remove_aggregate_node(context,
aggregate_uuid,
node)

View File

@ -585,3 +585,16 @@ class EngineManager(base_manager.BaseEngineManager):
nodes = self.scheduler_client.reportclient \
.get_nodes_from_resource_providers()
return nodes
def list_aggregate_nodes(self, context, aggregate_uuid):
nodes = self.scheduler_client.reportclient \
.get_nodes_from_aggregate(aggregate_uuid)
return nodes
def add_aggregate_node(self, context, aggregate_uuid, node):
self.scheduler_client.reportclient \
.update_aggregate_node(aggregate_uuid, node, 'add')
def remove_aggregate_node(self, context, aggregate_uuid, node):
self.scheduler_client.reportclient \
.update_aggregate_node(aggregate_uuid, node, 'remove')

View File

@ -96,3 +96,18 @@ class EngineAPI(object):
def list_compute_nodes(self, context):
cctxt = self.client.prepare(topic=self.topic, server=CONF.host)
return cctxt.call(context, 'list_compute_nodes')
def list_aggregate_nodes(self, context, aggregate_uuid):
cctxt = self.client.prepare(topic=self.topic, server=CONF.host)
return cctxt.call(context, 'list_aggregate_nodes',
aggregate_uuid=aggregate_uuid)
def add_aggregate_node(self, context, aggregate_uuid, node):
cctxt = self.client.prepare(topic=self.topic, server=CONF.host)
return cctxt.call(context, 'add_aggregate_node',
aggregate_uuid=aggregate_uuid, node=node)
def remove_aggregate_node(self, context, aggregate_uuid, node):
cctxt = self.client.prepare(topic=self.topic, server=CONF.host)
return cctxt.call(context, 'remove_aggregate_node',
aggregate_uuid=aggregate_uuid, node=node)

View File

@ -21,10 +21,6 @@ from mogan.objects import base
from mogan.objects import fields as object_fields
def _get_nodes_from_cache(aggregate_id):
return []
@base.MoganObjectRegistry.register
class Aggregate(base.MoganObject, object_base.VersionedObjectDictCompat):
# Version 1.0: Initial version
@ -36,7 +32,6 @@ class Aggregate(base.MoganObject, object_base.VersionedObjectDictCompat):
'id': object_fields.IntegerField(read_only=True),
'uuid': object_fields.UUIDField(read_only=True),
'name': object_fields.StringField(),
'nodes': object_fields.ListOfStringsField(nullable=True),
'metadata': object_fields.FlexibleDictField(nullable=True),
}
@ -50,8 +45,6 @@ class Aggregate(base.MoganObject, object_base.VersionedObjectDictCompat):
for field in aggregate.fields:
if field == 'metadata':
aggregate[field] = db_aggregate['metadetails']
elif field == 'nodes':
aggregate[field] = _get_nodes_from_cache(aggregate['uuid'])
else:
aggregate[field] = db_aggregate[field]
aggregate.obj_reset_changes()

View File

@ -156,7 +156,7 @@ class SchedulerReportClient(object):
'OpenStack-API-Version': 'placement %s' % version
},
}
if data:
if data is not None:
kwargs['json'] = data
return self._client.put(
url, endpoint_filter=self.ks_filter, raise_exc=False,
@ -233,6 +233,43 @@ class SchedulerReportClient(object):
}
LOG.error(msg, args)
@safe_connect
def _put_provider_aggregates(self, rp_uuid, aggs):
"""Associate a list of aggregates with the resource provider.
:param aggs: a list of UUID of the aggregates.
:param rp_uuid: UUID of the resource provider.
"""
url = "/resource_providers/%s/aggregates" % rp_uuid
payload = list(aggs)
resp = self.put(url, payload, version='1.1')
if resp.status_code == 200:
self._provider_aggregate_map[rp_uuid] = set(aggs)
data = resp.json()
return set(data['aggregates'])
placement_req_id = get_placement_request_id(resp)
if resp.status_code == 404:
msg = "[%(placement_req_id)s] Tried to put a provider's "
"aggregates; however the provider %(uuid)s does not "
"exist."
args = {
'uuid': rp_uuid,
'placement_req_id': placement_req_id,
}
LOG.warning(msg, args)
else:
msg = ("[%(placement_req_id)s] Failed to set aggregates "
"from placement API for resource provider with UUID "
"%(uuid)s. Got %(status_code)d: %(err_text)s.")
args = {
'placement_req_id': placement_req_id,
'uuid': rp_uuid,
'status_code': resp.status_code,
'err_text': resp.text,
}
LOG.error(msg, args)
@safe_connect
def _get_resource_provider(self, uuid):
"""Queries the placement API for a resource provider record with the
@ -714,3 +751,36 @@ class SchedulerReportClient(object):
# Use the rps we cached
rps = self._resource_providers
return {'nodes': [rp['name'] for id, rp in rps.items()]}
def get_nodes_from_aggregate(self, aggregate_uuid):
# Use the aggregates we cached
rps = self._resource_providers
rp_aggs = self._provider_aggregate_map
rp_uuids = []
for rp, aggs in rp_aggs.items():
if aggregate_uuid in aggs:
rp_uuids.append(rp)
return {'nodes': [rps[id]['name'] for id in rp_uuids]}
def update_aggregate_node(self, aggregate_uuid, node, action):
rps = self._resource_providers
for id, rp in rps.items():
if node == rp['name']:
rp_uuid = id
break
else:
raise exception.NodeNotFound(node=node)
aggs = self._provider_aggregate_map[rp_uuid]
if action == 'add':
new_aggs = aggs | set([aggregate_uuid])
elif action == 'remove':
if aggregate_uuid in aggs:
new_aggs = aggs - set([aggregate_uuid])
else:
return
else:
LOG.info('Bad action parameter for update_aggregate_node() %s',
action)
return
self._put_provider_aggregates(rp_uuid, list(new_aggs))

View File

@ -392,7 +392,7 @@ expected_object_fingerprints = {
'Quota': '1.0-c8caa082f4d726cb63fdc5943f7cd186',
'KeyPair': '1.0-c6820166e307676c5900f7801831b84c',
'KeyPairList': '1.0-33a2e1bb91ad4082f9f63429b77c1244',
'Aggregate': '1.0-e2f3060705a18e1de8daae3f1af6b104',
'Aggregate': '1.0-b62b178679d1b69bc2643909d8874af1',
'AggregateList': '1.0-33a2e1bb91ad4082f9f63429b77c1244'
}