Merge "Allow PUT to /pools/<id>/members to batch update members"
This commit is contained in:
commit
859bb3bae3
1
api-ref/source/v2/examples/member-batch-update-curl
Normal file
1
api-ref/source/v2/examples/member-batch-update-curl
Normal file
@ -0,0 +1 @@
|
||||
curl -X PUT -H "Content-Type: application/json" -H "X-Auth-Token: <token>" -d '{"members":[{"name":"web-server-1","weight":"20","admin_state_up":true,"subnet_id":"bbb35f84-35cc-4b2f-84c2-a6a29bba68aa","address":"192.0.2.16","protocol_port":"80","monitor_port":8080},{"name":"web-server-2","weight":"10","admin_state_up":true,"subnet_id":"bbb35f84-35cc-4b2f-84c2-a6a29bba68aa","address":"192.0.2.17","protocol_port":"80","monitor_port":8080}]}' http://198.51.100.10:9876/v2.0/lbaas/pools/4029d267-3983-4224-a3d0-afb3fe16a2cd/members
|
22
api-ref/source/v2/examples/member-batch-update-request.json
Normal file
22
api-ref/source/v2/examples/member-batch-update-request.json
Normal file
@ -0,0 +1,22 @@
|
||||
{
|
||||
"members": [
|
||||
{
|
||||
"name": "web-server-1",
|
||||
"weight": 20,
|
||||
"admin_state_up": true,
|
||||
"subnet_id": "bbb35f84-35cc-4b2f-84c2-a6a29bba68aa",
|
||||
"address": "192.0.2.16",
|
||||
"protocol_port": 80,
|
||||
"monitor_port": 8080
|
||||
},
|
||||
{
|
||||
"name": "web-server-2",
|
||||
"weight": 10,
|
||||
"admin_state_up": true,
|
||||
"subnet_id": "bbb35f84-35cc-4b2f-84c2-a6a29bba68aa",
|
||||
"address": "192.0.2.17",
|
||||
"protocol_port": 80,
|
||||
"monitor_port": 8080
|
||||
}
|
||||
]
|
||||
}
|
@ -340,8 +340,79 @@ Response Example
|
||||
.. literalinclude:: examples/member-update-response.json
|
||||
:language: javascript
|
||||
|
||||
Batch Update Members
|
||||
====================
|
||||
|
||||
.. rest_method:: PUT /v2.0/lbaas/pools/{pool_id}/members
|
||||
|
||||
Set the state of members for a pool in one API call. This may include
|
||||
creating new members, deleting old members, and updating existing members.
|
||||
Existing members are matched based on address/port combination.
|
||||
|
||||
For example, assume a pool currently has two members. These members have the
|
||||
following address/port combinations: '192.0.2.15:80' and '192.0.2.16:80'.
|
||||
Now assume a PUT request is made that includes members with address/port
|
||||
combinations: '192.0.2.16:80' and '192.0.2.17:80'.
|
||||
The member '192.0.2.15:80' will be deleted, because it was not in the request.
|
||||
The member '192.0.2.16:80' will be updated to match the request data for that
|
||||
member, because it was matched.
|
||||
The member '192.0.2.17:80' will be created, because no such member existed.
|
||||
|
||||
If the request is valid, the service returns the ``Accepted (202)``
|
||||
response code. To confirm the updates, check that the member provisioning
|
||||
statuses are ``ACTIVE`` for new or updated members, and that any unspecified
|
||||
members were correctly deleted. If the statuses are ``PENDING_UPDATE`` or
|
||||
``PENDING_DELETE``, use GET to poll the member objects for changes.
|
||||
|
||||
.. rest_status_code:: success ../http-status.yaml
|
||||
|
||||
- 202
|
||||
|
||||
.. rest_status_code:: error ../http-status.yaml
|
||||
|
||||
- 400
|
||||
- 401
|
||||
- 403
|
||||
- 404
|
||||
- 409
|
||||
- 500
|
||||
- 503
|
||||
|
||||
Request
|
||||
-------
|
||||
|
||||
.. rest_parameters:: ../parameters.yaml
|
||||
|
||||
- admin_state_up: admin_state_up-default-optional
|
||||
- address: address
|
||||
- monitor_address: monitor_address-optional
|
||||
- monitor_port: monitor_port-optional
|
||||
- name: name-optional
|
||||
- pool_id: path-pool-id
|
||||
- project_id: project_id-optional-deprecated
|
||||
- protocol_port: protocol_port
|
||||
- subnet_id: subnet_id-optional
|
||||
- weight: weight-optional
|
||||
|
||||
Request Example
|
||||
---------------
|
||||
|
||||
.. literalinclude:: examples/member-batch-update-request.json
|
||||
:language: javascript
|
||||
|
||||
Curl Example
|
||||
------------
|
||||
|
||||
.. literalinclude:: examples/member-batch-update-curl
|
||||
:language: bash
|
||||
|
||||
Response
|
||||
--------
|
||||
|
||||
There is no body content for the response of a successful PUT request.
|
||||
|
||||
Remove a Member
|
||||
=================
|
||||
===============
|
||||
|
||||
.. rest_method:: DELETE /v2.0/lbaas/pools/{pool_id}/members/{member_id}
|
||||
|
||||
|
@ -39,7 +39,8 @@ def validate_input(expected, actual):
|
||||
raise InvalidHandlerInputObject(obj_type=actual.__class__)
|
||||
|
||||
|
||||
def simulate_controller(data_model, delete=False, update=False, create=False):
|
||||
def simulate_controller(data_model, delete=False, update=False, create=False,
|
||||
batch_update=False):
|
||||
"""Simulates a successful controller operator for a data model.
|
||||
|
||||
:param data_model: data model to simulate controller operation
|
||||
@ -47,7 +48,8 @@ def simulate_controller(data_model, delete=False, update=False, create=False):
|
||||
"""
|
||||
repo = repos.Repositories()
|
||||
|
||||
def member_controller(member, delete=False, update=False, create=False):
|
||||
def member_controller(member, delete=False, update=False, create=False,
|
||||
batch_update=False):
|
||||
time.sleep(ASYNC_TIME)
|
||||
LOG.info("Simulating controller operation for member...")
|
||||
|
||||
@ -63,6 +65,11 @@ def simulate_controller(data_model, delete=False, update=False, create=False):
|
||||
elif create:
|
||||
repo.member.update(db_api.get_session(), member.id,
|
||||
operating_status=constants.ONLINE)
|
||||
elif batch_update:
|
||||
members = member
|
||||
for member in members:
|
||||
repo.member.update(db_api.get_session(), member.id,
|
||||
operating_status=constants.ONLINE)
|
||||
listeners = []
|
||||
if db_mem:
|
||||
for listener in db_mem.pool.listeners:
|
||||
@ -395,6 +402,25 @@ class MemberHandler(abstract_handler.BaseObjectHandler):
|
||||
member.id = old_member.id
|
||||
simulate_controller(member, update=True)
|
||||
|
||||
def batch_update(self, old_member_ids, new_member_ids, updated_members):
|
||||
for m in updated_members:
|
||||
validate_input(data_models.Member, m)
|
||||
LOG.info("%(entity)s handling the batch update of members: "
|
||||
"old=%(old)s, new=%(new)s",
|
||||
{"entity": self.__class__.__name__, "old": old_member_ids,
|
||||
"new": new_member_ids})
|
||||
|
||||
repo = repos.Repositories()
|
||||
old_members = [repo.member.get(db_api.get_session(), mid)
|
||||
for mid in old_member_ids]
|
||||
new_members = [repo.member.get(db_api.get_session(), mid)
|
||||
for mid in new_member_ids]
|
||||
all_members = []
|
||||
all_members.extend(old_members)
|
||||
all_members.extend(new_members)
|
||||
all_members.extend(updated_members)
|
||||
simulate_controller(all_members, batch_update=True)
|
||||
|
||||
def delete(self, member_id):
|
||||
LOG.info("%(entity)s handling the deletion of member %(id)s",
|
||||
{"entity": self.__class__.__name__, "id": member_id})
|
||||
|
@ -163,6 +163,21 @@ class MemberProducer(BaseProducer):
|
||||
def payload_class(self):
|
||||
return self.PAYLOAD_CLASS
|
||||
|
||||
def batch_update(self, old_ids, new_ids, updated_models):
|
||||
"""sends an update message to the controller via oslo.messaging
|
||||
|
||||
:param old_ids: list of member ids that are being deleted
|
||||
:param new_ids: list of member ids that are being created
|
||||
:param updated_models: list of member model objects to update
|
||||
"""
|
||||
updated_dicts = [m.to_dict(render_unsets=False)
|
||||
for m in updated_models]
|
||||
kw = {"old_{0}_ids".format(self.payload_class): old_ids,
|
||||
"new_{0}_ids".format(self.payload_class): new_ids,
|
||||
"updated_{0}s".format(self.payload_class): updated_dicts}
|
||||
method_name = "batch_update_{0}s".format(self.payload_class)
|
||||
self.client.cast({}, method_name, **kw)
|
||||
|
||||
|
||||
class L7PolicyProducer(BaseProducer):
|
||||
"""Sends updates,deletes and creates to the RPC end of the queue consumer
|
||||
|
@ -34,11 +34,11 @@ from octavia.db import prepare as db_prepare
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MembersController(base.BaseController):
|
||||
class MemberController(base.BaseController):
|
||||
RBAC_TYPE = constants.RBAC_MEMBER
|
||||
|
||||
def __init__(self, pool_id):
|
||||
super(MembersController, self).__init__()
|
||||
super(MemberController, self).__init__()
|
||||
self.pool_id = pool_id
|
||||
self.handler = self.handler.member
|
||||
|
||||
@ -265,3 +265,80 @@ class MembersController(base.BaseController):
|
||||
self.repositories.member.update(
|
||||
lock_session, db_member.id,
|
||||
provisioning_status=constants.ERROR)
|
||||
|
||||
|
||||
class MembersController(MemberController):
|
||||
|
||||
def __init__(self, pool_id):
|
||||
super(MembersController, self).__init__(pool_id)
|
||||
|
||||
@wsme_pecan.wsexpose(None, wtypes.text,
|
||||
body=member_types.MembersRootPUT, status_code=202)
|
||||
def put(self, members_):
|
||||
"""Updates all members."""
|
||||
members = members_.members
|
||||
context = pecan.request.context.get('octavia_context')
|
||||
|
||||
db_pool = self._get_db_pool(context.session, self.pool_id)
|
||||
old_members = db_pool.members
|
||||
|
||||
# Check POST+PUT+DELETE since this operation is all of 'CUD'
|
||||
self._auth_validate_action(context, db_pool.project_id,
|
||||
constants.RBAC_POST)
|
||||
self._auth_validate_action(context, db_pool.project_id,
|
||||
constants.RBAC_PUT)
|
||||
self._auth_validate_action(context, db_pool.project_id,
|
||||
constants.RBAC_DELETE)
|
||||
|
||||
with db_api.get_lock_session() as lock_session:
|
||||
self._test_lb_and_listener_and_pool_statuses(lock_session)
|
||||
|
||||
member_count_diff = len(members) - len(old_members)
|
||||
if member_count_diff > 0 and self.repositories.check_quota_met(
|
||||
context.session, lock_session, data_models.Member,
|
||||
db_pool.project_id, count=member_count_diff):
|
||||
raise exceptions.QuotaException
|
||||
|
||||
old_member_uniques = {
|
||||
(m.ip_address, m.protocol_port): m.id for m in old_members}
|
||||
new_member_uniques = [
|
||||
(m.address, m.protocol_port) for m in members]
|
||||
|
||||
# Find members that are brand new or updated
|
||||
new_members = []
|
||||
updated_members = []
|
||||
for m in members:
|
||||
if (m.address, m.protocol_port) not in old_member_uniques:
|
||||
new_members.append(m)
|
||||
else:
|
||||
m.id = old_member_uniques[(m.address, m.protocol_port)]
|
||||
updated_members.append(m)
|
||||
|
||||
# Find members that are deleted
|
||||
deleted_members = []
|
||||
for m in old_members:
|
||||
if (m.ip_address, m.protocol_port) not in new_member_uniques:
|
||||
deleted_members.append(m)
|
||||
|
||||
# Create new members
|
||||
new_members_created = []
|
||||
for m in new_members:
|
||||
m = m.to_dict(render_unsets=False)
|
||||
m['project_id'] = db_pool.project_id
|
||||
new_members_created.append(self._graph_create(lock_session, m))
|
||||
# Update old members
|
||||
for m in updated_members:
|
||||
self.repositories.member.update(
|
||||
lock_session, m.id,
|
||||
provisioning_status=constants.PENDING_UPDATE)
|
||||
# Delete old members
|
||||
for m in deleted_members:
|
||||
self.repositories.member.update(
|
||||
lock_session, m.id,
|
||||
provisioning_status=constants.PENDING_DELETE)
|
||||
|
||||
LOG.info("Sending Full Member Update to handler")
|
||||
new_member_ids = [m.id for m in new_members_created]
|
||||
old_member_ids = [m.id for m in deleted_members]
|
||||
self.handler.batch_update(
|
||||
old_member_ids, new_member_ids, updated_members)
|
||||
|
@ -319,19 +319,14 @@ class PoolsController(base.BaseController):
|
||||
which controller, if any, should control be passed.
|
||||
"""
|
||||
context = pecan.request.context.get('octavia_context')
|
||||
if pool_id and len(remainder) and (remainder[0] == 'members' or
|
||||
remainder[0] == 'healthmonitor'):
|
||||
controller = remainder[0]
|
||||
if pool_id and len(remainder) and remainder[0] == 'members':
|
||||
remainder = remainder[1:]
|
||||
db_pool = self.repositories.pool.get(context.session, id=pool_id)
|
||||
if not db_pool:
|
||||
LOG.info("Pool %s not found.", pool_id)
|
||||
raise exceptions.NotFound(resource=data_models.Pool._name(),
|
||||
id=pool_id)
|
||||
if controller == 'members':
|
||||
return member.MembersController(
|
||||
pool_id=db_pool.id), remainder
|
||||
elif controller == 'healthmonitor':
|
||||
return health_monitor.HealthMonitorController(
|
||||
load_balancer_id=db_pool.load_balancer_id,
|
||||
pool_id=db_pool.id), remainder
|
||||
if remainder:
|
||||
return member.MemberController(pool_id=db_pool.id), remainder
|
||||
else:
|
||||
return member.MembersController(pool_id=db_pool.id), remainder
|
||||
|
@ -101,6 +101,10 @@ class MemberRootPUT(types.BaseType):
|
||||
member = wtypes.wsattr(MemberPUT)
|
||||
|
||||
|
||||
class MembersRootPUT(types.BaseType):
|
||||
members = wtypes.wsattr([MemberPOST])
|
||||
|
||||
|
||||
class MemberSingleCreate(BaseMemberType):
|
||||
"""Defines mandatory and optional attributes of a POST request."""
|
||||
name = wtypes.wsattr(wtypes.StringType(max_length=255))
|
||||
@ -112,6 +116,9 @@ class MemberSingleCreate(BaseMemberType):
|
||||
weight = wtypes.wsattr(wtypes.IntegerType(
|
||||
minimum=constants.MIN_WEIGHT, maximum=constants.MAX_WEIGHT), default=1)
|
||||
subnet_id = wtypes.wsattr(wtypes.UuidType())
|
||||
monitor_port = wtypes.wsattr(wtypes.IntegerType(
|
||||
minimum=constants.MIN_PORT_NUMBER, maximum=constants.MAX_PORT_NUMBER))
|
||||
monitor_address = wtypes.wsattr(types.IPAddressType())
|
||||
|
||||
|
||||
class MemberStatusResponse(BaseMemberType):
|
||||
|
@ -278,6 +278,19 @@ CREATE_VRRP_SECURITY_RULES = 'octavia-create-vrrp-security-rules'
|
||||
|
||||
GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask'
|
||||
|
||||
# Batch Member Update constants
|
||||
MEMBERS = 'members'
|
||||
UNORDERED_MEMBER_UPDATES_FLOW = 'octavia-unordered-member-updates-flow'
|
||||
UNORDERED_MEMBER_ACTIVE_FLOW = 'octavia-unordered-member-active-flow'
|
||||
UPDATE_ATTRIBUTES_FLOW = 'octavia-update-attributes-flow'
|
||||
DELETE_MODEL_OBJECT_FLOW = 'octavia-delete-model-object-flow'
|
||||
BATCH_UPDATE_MEMBERS_FLOW = 'octavia-batch-update-members-flow'
|
||||
MEMBER_TO_ERROR_ON_REVERT_FLOW = 'octavia-member-to-error-on-revert-flow'
|
||||
DECREMENT_MEMBER_QUOTA_FLOW = 'octavia-decrement-member-quota-flow'
|
||||
MARK_MEMBER_ACTIVE_INDB = 'octavia-mark-member-active-indb'
|
||||
UPDATE_MEMBER_INDB = 'octavia-update-member-indb'
|
||||
DELETE_MEMBER_INDB = 'octavia-delete-member-indb'
|
||||
|
||||
# Task Names
|
||||
RELOAD_LB_AFTER_AMP_ASSOC = 'reload-lb-after-amp-assoc'
|
||||
RELOAD_LB_AFTER_AMP_ASSOC_FULL_GRAPH = 'reload-lb-after-amp-assoc-full-graph'
|
||||
|
@ -104,6 +104,17 @@ class Endpoint(object):
|
||||
LOG.info('Updating member \'%s\'...', member_id)
|
||||
self.worker.update_member(member_id, member_updates)
|
||||
|
||||
def batch_update_members(self, context, old_member_ids, new_member_ids,
|
||||
updated_members):
|
||||
updated_member_ids = [m.get('id') for m in updated_members]
|
||||
LOG.info(
|
||||
'Batch updating members: old=\'%(old)s\', new=\'%(new)s\', '
|
||||
'updated=\'%(updated)s\'...',
|
||||
{'old': old_member_ids, 'new': new_member_ids,
|
||||
'updated': updated_member_ids})
|
||||
self.worker.batch_update_members(
|
||||
old_member_ids, new_member_ids, updated_members)
|
||||
|
||||
def delete_member(self, context, member_id):
|
||||
LOG.info('Deleting member \'%s\'...', member_id)
|
||||
self.worker.delete_member(member_id)
|
||||
|
@ -376,6 +376,34 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
|
||||
log=LOG):
|
||||
delete_member_tf.run()
|
||||
|
||||
def batch_update_members(self, old_member_ids, new_member_ids,
|
||||
updated_members):
|
||||
old_members = [self._member_repo.get(db_apis.get_session(), id=mid)
|
||||
for mid in old_member_ids]
|
||||
new_members = [self._member_repo.get(db_apis.get_session(), id=mid)
|
||||
for mid in new_member_ids]
|
||||
updated_members = [
|
||||
(self._member_repo.get(db_apis.get_session(), id=m.get('id')), m)
|
||||
for m in updated_members]
|
||||
if old_members:
|
||||
pool = old_members[0].pool
|
||||
elif new_members:
|
||||
pool = new_members[0].pool
|
||||
else:
|
||||
pool = updated_members[0][0].pool
|
||||
listeners = pool.listeners
|
||||
load_balancer = pool.load_balancer
|
||||
|
||||
batch_update_members_tf = self._taskflow_load(
|
||||
self._member_flows.get_batch_update_members_flow(
|
||||
old_members, new_members, updated_members),
|
||||
store={constants.LISTENERS: listeners,
|
||||
constants.LOADBALANCER: load_balancer,
|
||||
constants.POOL: pool})
|
||||
with tf_logging.DynamicLoggingListener(batch_update_members_tf,
|
||||
log=LOG):
|
||||
batch_update_members_tf.run()
|
||||
|
||||
def update_member(self, member_id, member_updates):
|
||||
"""Updates a pool member.
|
||||
|
||||
|
@ -14,6 +14,7 @@
|
||||
#
|
||||
|
||||
from taskflow.patterns import linear_flow
|
||||
from taskflow.patterns import unordered_flow
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.worker.tasks import amphora_driver_tasks
|
||||
@ -113,11 +114,108 @@ class MemberFlows(object):
|
||||
requires=[constants.MEMBER, constants.UPDATE_DICT]))
|
||||
update_member_flow.add(database_tasks.MarkMemberActiveInDB(
|
||||
requires=constants.MEMBER))
|
||||
update_member_flow.add(database_tasks.MarkPoolActiveInDB(
|
||||
requires=constants.POOL))
|
||||
update_member_flow.add(database_tasks.
|
||||
MarkLBAndListenersActiveInDB(
|
||||
requires=[constants.LOADBALANCER,
|
||||
constants.LISTENERS]))
|
||||
update_member_flow.add(database_tasks.MarkPoolActiveInDB(
|
||||
requires=constants.POOL))
|
||||
|
||||
return update_member_flow
|
||||
|
||||
def get_batch_update_members_flow(self, old_members, new_members,
|
||||
updated_members):
|
||||
"""Create a flow to batch update members
|
||||
|
||||
:returns: The flow for batch updating members
|
||||
"""
|
||||
batch_update_members_flow = linear_flow.Flow(
|
||||
constants.BATCH_UPDATE_MEMBERS_FLOW)
|
||||
unordered_members_flow = unordered_flow.Flow(
|
||||
constants.UNORDERED_MEMBER_UPDATES_FLOW)
|
||||
unordered_members_active_flow = unordered_flow.Flow(
|
||||
constants.UNORDERED_MEMBER_ACTIVE_FLOW)
|
||||
|
||||
# Delete old members
|
||||
unordered_members_flow.add(
|
||||
lifecycle_tasks.MembersToErrorOnRevertTask(
|
||||
inject={constants.MEMBERS: old_members},
|
||||
name='{flow}-deleted'.format(
|
||||
flow=constants.MEMBER_TO_ERROR_ON_REVERT_FLOW)))
|
||||
for m in old_members:
|
||||
unordered_members_flow.add(
|
||||
model_tasks.DeleteModelObject(
|
||||
inject={constants.OBJECT: m},
|
||||
name='{flow}-{id}'.format(
|
||||
id=m.id, flow=constants.DELETE_MODEL_OBJECT_FLOW)))
|
||||
unordered_members_flow.add(database_tasks.DeleteMemberInDB(
|
||||
inject={constants.MEMBER: m},
|
||||
name='{flow}-{id}'.format(
|
||||
id=m.id, flow=constants.DELETE_MEMBER_INDB)))
|
||||
unordered_members_flow.add(database_tasks.DecrementMemberQuota(
|
||||
inject={constants.MEMBER: m},
|
||||
name='{flow}-{id}'.format(
|
||||
id=m.id, flow=constants.DECREMENT_MEMBER_QUOTA_FLOW)))
|
||||
|
||||
# Create new members
|
||||
unordered_members_flow.add(
|
||||
lifecycle_tasks.MembersToErrorOnRevertTask(
|
||||
inject={constants.MEMBERS: new_members},
|
||||
name='{flow}-created'.format(
|
||||
flow=constants.MEMBER_TO_ERROR_ON_REVERT_FLOW)))
|
||||
for m in new_members:
|
||||
unordered_members_active_flow.add(
|
||||
database_tasks.MarkMemberActiveInDB(
|
||||
inject={constants.MEMBER: m},
|
||||
name='{flow}-{id}'.format(
|
||||
id=m.id, flow=constants.MARK_MEMBER_ACTIVE_INDB)))
|
||||
|
||||
# Update existing members
|
||||
unordered_members_flow.add(
|
||||
lifecycle_tasks.MembersToErrorOnRevertTask(
|
||||
inject={constants.MEMBERS: updated_members},
|
||||
name='{flow}-updated'.format(
|
||||
flow=constants.MEMBER_TO_ERROR_ON_REVERT_FLOW)))
|
||||
for m, um in updated_members:
|
||||
um.pop('id', None)
|
||||
unordered_members_flow.add(
|
||||
model_tasks.UpdateAttributes(
|
||||
inject={constants.OBJECT: m, constants.UPDATE_DICT: um},
|
||||
name='{flow}-{id}'.format(
|
||||
id=m.id, flow=constants.UPDATE_ATTRIBUTES_FLOW)))
|
||||
unordered_members_flow.add(database_tasks.UpdateMemberInDB(
|
||||
inject={constants.MEMBER: m, constants.UPDATE_DICT: um},
|
||||
name='{flow}-{id}'.format(
|
||||
id=m.id, flow=constants.UPDATE_MEMBER_INDB)))
|
||||
unordered_members_active_flow.add(
|
||||
database_tasks.MarkMemberActiveInDB(
|
||||
inject={constants.MEMBER: m},
|
||||
name='{flow}-{id}'.format(
|
||||
id=m.id, flow=constants.MARK_MEMBER_ACTIVE_INDB)))
|
||||
|
||||
batch_update_members_flow.add(unordered_members_flow)
|
||||
|
||||
# Done, do real updates
|
||||
batch_update_members_flow.add(network_tasks.CalculateDelta(
|
||||
requires=constants.LOADBALANCER,
|
||||
provides=constants.DELTAS))
|
||||
batch_update_members_flow.add(network_tasks.HandleNetworkDeltas(
|
||||
requires=constants.DELTAS, provides=constants.ADDED_PORTS))
|
||||
batch_update_members_flow.add(
|
||||
amphora_driver_tasks.AmphoraePostNetworkPlug(
|
||||
requires=(constants.LOADBALANCER, constants.ADDED_PORTS)))
|
||||
|
||||
# Update the Listener (this makes the changes active on the Amp)
|
||||
batch_update_members_flow.add(amphora_driver_tasks.ListenersUpdate(
|
||||
requires=(constants.LOADBALANCER, constants.LISTENERS)))
|
||||
|
||||
# Mark all the members ACTIVE here, then pool then LB/Listeners
|
||||
batch_update_members_flow.add(unordered_members_active_flow)
|
||||
batch_update_members_flow.add(database_tasks.MarkPoolActiveInDB(
|
||||
requires=constants.POOL))
|
||||
batch_update_members_flow.add(
|
||||
database_tasks.MarkLBAndListenersActiveInDB(
|
||||
requires=(constants.LOADBALANCER,
|
||||
constants.LISTENERS)))
|
||||
|
||||
return batch_update_members_flow
|
||||
|
@ -138,10 +138,25 @@ class MemberToErrorOnRevertTask(BaseLifecycleTask):
|
||||
|
||||
def revert(self, member, listeners, loadbalancer, pool, *args, **kwargs):
|
||||
self.task_utils.mark_member_prov_status_error(member.id)
|
||||
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
||||
for listener in listeners:
|
||||
self.task_utils.mark_listener_prov_status_active(listener.id)
|
||||
self.task_utils.mark_pool_prov_status_active(pool.id)
|
||||
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
||||
|
||||
|
||||
class MembersToErrorOnRevertTask(BaseLifecycleTask):
|
||||
"""Task to set members to ERROR on revert."""
|
||||
|
||||
def execute(self, members, listeners, loadbalancer, pool):
|
||||
pass
|
||||
|
||||
def revert(self, members, listeners, loadbalancer, pool, *args, **kwargs):
|
||||
for m in members:
|
||||
self.task_utils.mark_member_prov_status_error(m.id)
|
||||
for listener in listeners:
|
||||
self.task_utils.mark_listener_prov_status_active(listener.id)
|
||||
self.task_utils.mark_pool_prov_status_active(pool.id)
|
||||
self.task_utils.mark_loadbalancer_prov_status_active(loadbalancer.id)
|
||||
|
||||
|
||||
class PoolToErrorOnRevertTask(BaseLifecycleTask):
|
||||
|
@ -427,6 +427,160 @@ class TestMember(base.BaseAPITest):
|
||||
member_prov_status=constants.ERROR,
|
||||
member_op_status=constants.NO_MONITOR)
|
||||
|
||||
def test_full_batch_members(self):
|
||||
member1 = {'address': '10.0.0.1', 'protocol_port': 80}
|
||||
member2 = {'address': '10.0.0.2', 'protocol_port': 80}
|
||||
member3 = {'address': '10.0.0.3', 'protocol_port': 80}
|
||||
member4 = {'address': '10.0.0.4', 'protocol_port': 80}
|
||||
member5 = {'address': '10.0.0.5', 'protocol_port': 80}
|
||||
member6 = {'address': '10.0.0.6', 'protocol_port': 80}
|
||||
members = [member1, member2, member3, member4]
|
||||
for m in members:
|
||||
self.create_member(pool_id=self.pool_id, **m)
|
||||
self.set_lb_status(self.lb_id)
|
||||
|
||||
req_dict = [member1, member2, member5, member6]
|
||||
body = {self.root_tag_list: req_dict}
|
||||
path = self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||
self.put(path, body, status=202)
|
||||
returned_members = self.get(
|
||||
self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||
).json.get(self.root_tag_list)
|
||||
|
||||
expected_members = [
|
||||
('10.0.0.1', 80, 'PENDING_UPDATE'),
|
||||
('10.0.0.2', 80, 'PENDING_UPDATE'),
|
||||
('10.0.0.3', 80, 'PENDING_DELETE'),
|
||||
('10.0.0.4', 80, 'PENDING_DELETE'),
|
||||
('10.0.0.5', 80, 'PENDING_CREATE'),
|
||||
('10.0.0.6', 80, 'PENDING_CREATE'),
|
||||
]
|
||||
|
||||
member_ids = {}
|
||||
for rm in returned_members:
|
||||
self.assertIn(
|
||||
(rm['address'],
|
||||
rm['protocol_port'],
|
||||
rm['provisioning_status']), expected_members)
|
||||
member_ids[(rm['address'], rm['protocol_port'])] = rm['id']
|
||||
handler_args = self.handler_mock().member.batch_update.call_args[0]
|
||||
self.assertEqual(
|
||||
[member_ids[('10.0.0.3', 80)], member_ids[('10.0.0.4', 80)]],
|
||||
handler_args[0])
|
||||
self.assertEqual(
|
||||
[member_ids[('10.0.0.5', 80)], member_ids[('10.0.0.6', 80)]],
|
||||
handler_args[1])
|
||||
self.assertEqual(2, len(handler_args[2]))
|
||||
updated_members = [
|
||||
(handler_args[2][0].address, handler_args[2][0].protocol_port),
|
||||
(handler_args[2][1].address, handler_args[2][1].protocol_port)
|
||||
]
|
||||
self.assertEqual([('10.0.0.1', 80), ('10.0.0.2', 80)], updated_members)
|
||||
|
||||
def test_create_batch_members(self):
|
||||
member5 = {'address': '10.0.0.5', 'protocol_port': 80}
|
||||
member6 = {'address': '10.0.0.6', 'protocol_port': 80}
|
||||
|
||||
req_dict = [member5, member6]
|
||||
body = {self.root_tag_list: req_dict}
|
||||
path = self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||
self.put(path, body, status=202)
|
||||
returned_members = self.get(
|
||||
self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||
).json.get(self.root_tag_list)
|
||||
|
||||
expected_members = [
|
||||
('10.0.0.5', 80, 'PENDING_CREATE'),
|
||||
('10.0.0.6', 80, 'PENDING_CREATE'),
|
||||
]
|
||||
|
||||
member_ids = {}
|
||||
for rm in returned_members:
|
||||
self.assertIn(
|
||||
(rm['address'],
|
||||
rm['protocol_port'],
|
||||
rm['provisioning_status']), expected_members)
|
||||
member_ids[(rm['address'], rm['protocol_port'])] = rm['id']
|
||||
handler_args = self.handler_mock().member.batch_update.call_args[0]
|
||||
self.assertEqual(0, len(handler_args[0]))
|
||||
self.assertEqual(
|
||||
[member_ids[('10.0.0.5', 80)], member_ids[('10.0.0.6', 80)]],
|
||||
handler_args[1])
|
||||
self.assertEqual(0, len(handler_args[2]))
|
||||
|
||||
def test_update_batch_members(self):
|
||||
member1 = {'address': '10.0.0.1', 'protocol_port': 80}
|
||||
member2 = {'address': '10.0.0.2', 'protocol_port': 80}
|
||||
members = [member1, member2]
|
||||
for m in members:
|
||||
self.create_member(pool_id=self.pool_id, **m)
|
||||
self.set_lb_status(self.lb_id)
|
||||
|
||||
req_dict = [member1, member2]
|
||||
body = {self.root_tag_list: req_dict}
|
||||
path = self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||
self.put(path, body, status=202)
|
||||
returned_members = self.get(
|
||||
self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||
).json.get(self.root_tag_list)
|
||||
|
||||
expected_members = [
|
||||
('10.0.0.1', 80, 'PENDING_UPDATE'),
|
||||
('10.0.0.2', 80, 'PENDING_UPDATE'),
|
||||
]
|
||||
|
||||
member_ids = {}
|
||||
for rm in returned_members:
|
||||
self.assertIn(
|
||||
(rm['address'],
|
||||
rm['protocol_port'],
|
||||
rm['provisioning_status']), expected_members)
|
||||
member_ids[(rm['address'], rm['protocol_port'])] = rm['id']
|
||||
handler_args = self.handler_mock().member.batch_update.call_args[0]
|
||||
self.assertEqual(0, len(handler_args[0]))
|
||||
self.assertEqual(0, len(handler_args[1]))
|
||||
self.assertEqual(2, len(handler_args[2]))
|
||||
updated_members = [
|
||||
(handler_args[2][0].address, handler_args[2][0].protocol_port),
|
||||
(handler_args[2][1].address, handler_args[2][1].protocol_port)
|
||||
]
|
||||
self.assertEqual([('10.0.0.1', 80), ('10.0.0.2', 80)], updated_members)
|
||||
|
||||
def test_delete_batch_members(self):
|
||||
member3 = {'address': '10.0.0.3', 'protocol_port': 80}
|
||||
member4 = {'address': '10.0.0.4', 'protocol_port': 80}
|
||||
members = [member3, member4]
|
||||
for m in members:
|
||||
self.create_member(pool_id=self.pool_id, **m)
|
||||
self.set_lb_status(self.lb_id)
|
||||
|
||||
req_dict = []
|
||||
body = {self.root_tag_list: req_dict}
|
||||
path = self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||
self.put(path, body, status=202)
|
||||
returned_members = self.get(
|
||||
self.MEMBERS_PATH.format(pool_id=self.pool_id)
|
||||
).json.get(self.root_tag_list)
|
||||
|
||||
expected_members = [
|
||||
('10.0.0.3', 80, 'PENDING_DELETE'),
|
||||
('10.0.0.4', 80, 'PENDING_DELETE'),
|
||||
]
|
||||
|
||||
member_ids = {}
|
||||
for rm in returned_members:
|
||||
self.assertIn(
|
||||
(rm['address'],
|
||||
rm['protocol_port'],
|
||||
rm['provisioning_status']), expected_members)
|
||||
member_ids[(rm['address'], rm['protocol_port'])] = rm['id']
|
||||
handler_args = self.handler_mock().member.batch_update.call_args[0]
|
||||
self.assertEqual(
|
||||
[member_ids[('10.0.0.3', 80)], member_ids[('10.0.0.4', 80)]],
|
||||
handler_args[0])
|
||||
self.assertEqual(0, len(handler_args[1]))
|
||||
self.assertEqual(0, len(handler_args[2]))
|
||||
|
||||
def test_create_with_attached_listener(self):
|
||||
api_member = self.create_member(
|
||||
self.pool_with_listener_id, '10.0.0.1', 80).get(self.root_tag)
|
||||
|
@ -186,6 +186,18 @@ class TestProducer(base.TestCase):
|
||||
self.mck_client.cast.assert_called_once_with(
|
||||
{}, 'update_member', **kw)
|
||||
|
||||
def test_batch_update_members(self):
|
||||
p = producer.MemberProducer()
|
||||
member_model = data_models.Member(id=10)
|
||||
p.batch_update(old_ids=[9],
|
||||
new_ids=[11],
|
||||
updated_models=[member_model])
|
||||
kw = {'old_member_ids': [9],
|
||||
'new_member_ids': [11],
|
||||
'updated_members': [member_model.to_dict()]}
|
||||
self.mck_client.cast.assert_called_once_with(
|
||||
{}, 'batch_update_members', **kw)
|
||||
|
||||
def test_create_l7policy(self):
|
||||
p = producer.L7PolicyProducer()
|
||||
p.create(self.mck_model)
|
||||
|
@ -120,6 +120,12 @@ class TestEndpoint(base.TestCase):
|
||||
self.ep.worker.update_member.assert_called_once_with(
|
||||
self.resource_id, self.resource_updates)
|
||||
|
||||
def test_batch_update_members(self):
|
||||
self.ep.batch_update_members(
|
||||
self.context, [9], [11], [self.resource_updates])
|
||||
self.ep.worker.batch_update_members.assert_called_once_with(
|
||||
[9], [11], [self.resource_updates])
|
||||
|
||||
def test_delete_member(self):
|
||||
self.ep.delete_member(self.context, self.resource_id)
|
||||
self.ep.worker.delete_member.assert_called_once_with(
|
||||
|
@ -72,3 +72,17 @@ class TestMemberFlows(base.TestCase):
|
||||
|
||||
self.assertEqual(5, len(member_flow.requires))
|
||||
self.assertEqual(0, len(member_flow.provides))
|
||||
|
||||
def test_get_batch_update_members_flow(self, mock_get_net_driver):
|
||||
|
||||
member_flow = self.MemberFlow.get_batch_update_members_flow(
|
||||
[], [], [])
|
||||
|
||||
self.assertIsInstance(member_flow, flow.Flow)
|
||||
|
||||
self.assertIn(constants.LISTENERS, member_flow.requires)
|
||||
self.assertIn(constants.LOADBALANCER, member_flow.requires)
|
||||
self.assertIn(constants.POOL, member_flow.requires)
|
||||
|
||||
self.assertEqual(3, len(member_flow.requires))
|
||||
self.assertEqual(2, len(member_flow.provides))
|
||||
|
@ -46,6 +46,7 @@ class TestLifecycleTasks(base.TestCase):
|
||||
self.MEMBER = mock.MagicMock()
|
||||
self.MEMBER_ID = uuidutils.generate_uuid()
|
||||
self.MEMBER.id = self.MEMBER_ID
|
||||
self.MEMBERS = [self.MEMBER]
|
||||
self.POOL = mock.MagicMock()
|
||||
self.POOL_ID = uuidutils.generate_uuid()
|
||||
self.POOL.id = self.POOL_ID
|
||||
@ -301,7 +302,6 @@ class TestLifecycleTasks(base.TestCase):
|
||||
mock_listener_prov_status_active,
|
||||
mock_loadbalancer_prov_status_active,
|
||||
mock_member_prov_status_error):
|
||||
|
||||
member_to_error_on_revert = lifecycle_tasks.MemberToErrorOnRevertTask()
|
||||
|
||||
# Execute
|
||||
@ -327,6 +327,46 @@ class TestLifecycleTasks(base.TestCase):
|
||||
mock_pool_prov_status_active.assert_called_once_with(
|
||||
self.POOL_ID)
|
||||
|
||||
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||
'mark_member_prov_status_error')
|
||||
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||
'mark_loadbalancer_prov_status_active')
|
||||
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||
'mark_listener_prov_status_active')
|
||||
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||
'mark_pool_prov_status_active')
|
||||
def test_MembersToErrorOnRevertTask(
|
||||
self,
|
||||
mock_pool_prov_status_active,
|
||||
mock_listener_prov_status_active,
|
||||
mock_loadbalancer_prov_status_active,
|
||||
mock_member_prov_status_error):
|
||||
members_to_error_on_revert = (
|
||||
lifecycle_tasks.MembersToErrorOnRevertTask())
|
||||
|
||||
# Execute
|
||||
members_to_error_on_revert.execute(self.MEMBERS,
|
||||
self.LISTENERS,
|
||||
self.LOADBALANCER,
|
||||
self.POOL)
|
||||
|
||||
self.assertFalse(mock_member_prov_status_error.called)
|
||||
|
||||
# Revert
|
||||
members_to_error_on_revert.revert(self.MEMBERS,
|
||||
self.LISTENERS,
|
||||
self.LOADBALANCER,
|
||||
self.POOL)
|
||||
|
||||
mock_member_prov_status_error.assert_called_once_with(
|
||||
self.MEMBER_ID)
|
||||
mock_loadbalancer_prov_status_active.assert_called_once_with(
|
||||
self.LOADBALANCER_ID)
|
||||
mock_listener_prov_status_active.assert_called_once_with(
|
||||
self.LISTENER_ID)
|
||||
mock_pool_prov_status_active.assert_called_once_with(
|
||||
self.POOL_ID)
|
||||
|
||||
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||
'mark_pool_prov_status_error')
|
||||
@mock.patch('octavia.controller.worker.task_utils.TaskUtils.'
|
||||
|
@ -773,6 +773,38 @@ class TestControllerWorker(base.TestCase):
|
||||
|
||||
_flow_mock.run.assert_called_once_with()
|
||||
|
||||
@mock.patch('octavia.controller.worker.flows.'
|
||||
'member_flows.MemberFlows.get_batch_update_members_flow',
|
||||
return_value=_flow_mock)
|
||||
def test_batch_update_members(self,
|
||||
mock_get_batch_update_members_flow,
|
||||
mock_api_get_session,
|
||||
mock_dyn_log_listener,
|
||||
mock_taskflow_load,
|
||||
mock_pool_repo_get,
|
||||
mock_member_repo_get,
|
||||
mock_l7rule_repo_get,
|
||||
mock_l7policy_repo_get,
|
||||
mock_listener_repo_get,
|
||||
mock_lb_repo_get,
|
||||
mock_health_mon_repo_get,
|
||||
mock_amp_repo_get):
|
||||
|
||||
_flow_mock.reset_mock()
|
||||
|
||||
cw = controller_worker.ControllerWorker()
|
||||
cw.batch_update_members([9], [11], [MEMBER_UPDATE_DICT])
|
||||
|
||||
(base_taskflow.BaseTaskFlowEngine._taskflow_load.
|
||||
assert_called_once_with(_flow_mock,
|
||||
store={
|
||||
constants.LISTENERS: [_listener_mock],
|
||||
constants.LOADBALANCER:
|
||||
_load_balancer_mock,
|
||||
constants.POOL: _pool_mock}))
|
||||
|
||||
_flow_mock.run.assert_called_once_with()
|
||||
|
||||
@mock.patch('octavia.controller.worker.flows.'
|
||||
'pool_flows.PoolFlows.get_create_pool_flow',
|
||||
return_value=_flow_mock)
|
||||
|
@ -0,0 +1,7 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
It is now possible to completely update a pool's member list as a batch
|
||||
operation. Using a PUT request on the base member endpoint of a pool, you
|
||||
can specify a list of member objects and the service will perform any
|
||||
necessary creates/deletes/updates as a single operation.
|
@ -71,6 +71,10 @@ def generate(flow_list, output_directory):
|
||||
lb = dmh.generate_load_balancer()
|
||||
delete_flow, store = get_flow_method(lb)
|
||||
current_engine = engines.load(delete_flow)
|
||||
elif (current_tuple[1] == 'MemberFlows' and
|
||||
current_tuple[2] == 'get_batch_update_members_flow'):
|
||||
current_engine = engines.load(
|
||||
get_flow_method([], [], []))
|
||||
else:
|
||||
current_engine = engines.load(get_flow_method())
|
||||
current_engine.compile()
|
||||
|
@ -19,6 +19,7 @@ octavia.controller.worker.flows.pool_flows PoolFlows get_update_pool_flow
|
||||
octavia.controller.worker.flows.member_flows MemberFlows get_create_member_flow
|
||||
octavia.controller.worker.flows.member_flows MemberFlows get_delete_member_flow
|
||||
octavia.controller.worker.flows.member_flows MemberFlows get_update_member_flow
|
||||
octavia.controller.worker.flows.member_flows MemberFlows get_batch_update_members_flow
|
||||
octavia.controller.worker.flows.health_monitor_flows HealthMonitorFlows get_create_health_monitor_flow
|
||||
octavia.controller.worker.flows.health_monitor_flows HealthMonitorFlows get_delete_health_monitor_flow
|
||||
octavia.controller.worker.flows.health_monitor_flows HealthMonitorFlows get_update_health_monitor_flow
|
||||
|
Loading…
Reference in New Issue
Block a user