diff --git a/cue/api/controllers/v1/cluster.py b/cue/api/controllers/v1/cluster.py index ba235ba1..9e30c985 100644 --- a/cue/api/controllers/v1/cluster.py +++ b/cue/api/controllers/v1/cluster.py @@ -178,14 +178,18 @@ class ClusterController(rest.RestController): # create list with node id's for create cluster flow node_ids = [node.id for node in nodes] + # retrieve cluster record + cluster = objects.Cluster.get_cluster_by_id(context, cluster_id) + # prepare and post cluster delete job to backend flow_kwargs = { 'cluster_id': cluster_id, 'node_ids': node_ids, + 'group_id': cluster.group_id, } job_args = { - "context": context.to_dict(), + 'context': context.to_dict(), } job_client = task_flow_client.get_client_instance() diff --git a/cue/db/sqlalchemy/alembic/versions/17c428e0479e_add_cluster_error_detail_column.py b/cue/db/sqlalchemy/alembic/versions/17c428e0479e_add_error_detail_group_id_columns.py similarity index 75% rename from cue/db/sqlalchemy/alembic/versions/17c428e0479e_add_cluster_error_detail_column.py rename to cue/db/sqlalchemy/alembic/versions/17c428e0479e_add_error_detail_group_id_columns.py index 0e33dcd3..47f164b0 100644 --- a/cue/db/sqlalchemy/alembic/versions/17c428e0479e_add_cluster_error_detail_column.py +++ b/cue/db/sqlalchemy/alembic/versions/17c428e0479e_add_error_detail_group_id_columns.py @@ -14,7 +14,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -"""add error_detail column in clusters +"""add error_detail and group_id column in clusters Revision ID: 17c428e0479e Revises: 244aa473e595 @@ -26,16 +26,22 @@ Create Date: 2015-11-11 12:01:10.769280 revision = '17c428e0479e' down_revision = '244aa473e595' +from cue.db.sqlalchemy import types + from alembic import op from oslo_config import cfg import sqlalchemy as sa def upgrade(): - op.add_column('clusters', sa.Column('error_detail', sa.Text)) + op.add_column('clusters', sa.Column('error_detail', sa.Text(), + nullable=True)) + op.add_column('clusters', sa.Column('group_id', types.UUID(), + nullable=True)) def downgrade(): db_connection = cfg.CONF.database.connection if db_connection != "sqlite://": # pragma: nocover - op.drop_column('clusters', 'error_detail') \ No newline at end of file + op.drop_column('clusters', 'error_detail') + op.drop_column('clusters', 'group_id') \ No newline at end of file diff --git a/cue/db/sqlalchemy/models.py b/cue/db/sqlalchemy/models.py index 0456f8a7..ade34894 100644 --- a/cue/db/sqlalchemy/models.py +++ b/cue/db/sqlalchemy/models.py @@ -74,6 +74,7 @@ class Cluster(base.BASE, base.IdMixin, models.TimestampMixin, size = sa.Column(sa.Integer(), default=1, nullable=False) volume_size = sa.Column(sa.Integer(), nullable=True) error_detail = sa.Column(sa.Text(), nullable=True) + group_id = sa.Column(types.UUID(), nullable=True) sa.Index("clusters_cluster_id_idx", "cluster_id", unique=True) diff --git a/cue/objects/cluster.py b/cue/objects/cluster.py index a3f3db8e..85d27d71 100644 --- a/cue/objects/cluster.py +++ b/cue/objects/cluster.py @@ -39,6 +39,7 @@ class Cluster(base.CueObject): 'updated_at': obj_utils.datetime_or_str_or_none, 'deleted_at': obj_utils.datetime_or_str_or_none, 'error_detail': obj_utils.str_or_none, + 'group_id': obj_utils.str_or_none, } @staticmethod diff --git a/cue/taskflow/__init__.py b/cue/taskflow/__init__.py index 4f03769f..f6347b5d 100644 --- a/cue/taskflow/__init__.py +++ b/cue/taskflow/__init__.py @@ -59,6 +59,10 @@ TF_OPTS = [ help="Number of times to check a node for status before " "declaring it FAULTED", default=30), + + cfg.BoolOpt('cluster_node_anti_affinity', + help="Anti-affinity policy for cue cluster nodes", + default=False), ] opt_group = cfg.OptGroup( diff --git a/cue/taskflow/flow/create_cluster.py b/cue/taskflow/flow/create_cluster.py index b6bbf270..cbfa78b7 100644 --- a/cue/taskflow/flow/create_cluster.py +++ b/cue/taskflow/flow/create_cluster.py @@ -18,10 +18,12 @@ import taskflow.patterns.graph_flow as graph_flow import taskflow.patterns.linear_flow as linear_flow import taskflow.retry as retry +import cue.client as client from cue.db.sqlalchemy import models from cue.taskflow.flow import create_cluster_node import cue.taskflow.task as cue_tasks import os_tasklib.common as os_common +import os_tasklib.nova as nova def create_cluster(cluster_id, node_ids, user_network_id, @@ -42,63 +44,102 @@ def create_cluster(cluster_id, node_ids, user_network_id, :return: A flow instance that represents the workflow for creating a cluster """ + cluster_name = "cue[%s]" % cluster_id flow = graph_flow.Flow("creating cluster %s" % cluster_id) - start_flow_status = {'cluster_id': cluster_id, - 'cluster_values': {'status': models.Status.BUILDING}} - end_flow_status = {'cluster_id': cluster_id, - 'cluster_values': {'status': models.Status.ACTIVE}} + start_flow_cluster_update = { + 'cluster_id': cluster_id, + 'cluster_values': {'status': models.Status.BUILDING}} - start_task = cue_tasks.UpdateClusterRecord( - name="update cluster status start " - "%s" % cluster_id, - inject=start_flow_status) - flow.add(start_task) + extract_scheduler_hints = lambda vm_group: {'group': str(vm_group['id'])} + end_flow_cluster_update = lambda vm_group: { + 'status': models.Status.ACTIVE, + 'group_id': str(vm_group['id'])} - end_task = cue_tasks.UpdateClusterRecord( - name="update cluster status end " - "%s" % cluster_id, - inject=end_flow_status) - flow.add(end_task) + create_cluster_start_task = cue_tasks.UpdateClusterRecord( + name="update cluster status start %s" % cluster_name, + inject=start_flow_cluster_update) + flow.add(create_cluster_start_task) + + cluster_anti_affinity = cfg.CONF.taskflow.cluster_node_anti_affinity + if cluster_anti_affinity: + create_vm_group = nova.CreateVmGroup( + name="create cluster group %s" % cluster_name, + os_client=client.nova_client(), + requires=('name', 'policies'), + inject={'name': "cue_group_%s" % cluster_id, + 'policies': ['anti-affinity']}, + provides="cluster_group") + flow.add(create_vm_group) + + get_scheduler_hints = os_common.Lambda( + extract_scheduler_hints, + name="extract scheduler hints %s" % cluster_name, + rebind={'vm_group': "cluster_group"}, + provides="scheduler_hints") + flow.add(get_scheduler_hints) + + build_cluster_info = os_common.Lambda( + end_flow_cluster_update, + name="build new cluster update values %s" % cluster_name, + rebind={'vm_group': "cluster_group"}, + provides="cluster_values") + flow.add(build_cluster_info) + + flow.link(create_cluster_start_task, create_vm_group) + flow.link(create_vm_group, get_scheduler_hints) + flow.link(get_scheduler_hints, build_cluster_info) + create_node_start_task = build_cluster_info + create_cluster_end_task = cue_tasks.UpdateClusterRecord( + name="update cluster status end %s" % cluster_name, + inject={'cluster_id': cluster_id}) + else: + create_node_start_task = create_cluster_start_task + end_flow_cluster_update = { + 'cluster_id': cluster_id, + 'cluster_values': {'status': models.Status.ACTIVE}} + create_cluster_end_task = cue_tasks.UpdateClusterRecord( + name="update cluster status end %s" % cluster_name, + inject=end_flow_cluster_update) + + flow.add(create_cluster_end_task) node_check_timeout = cfg.CONF.taskflow.cluster_node_check_timeout node_check_max_count = cfg.CONF.taskflow.cluster_node_check_max_count check_rabbit_online = linear_flow.Flow( - name="wait for RabbitMQ ready state", + name="wait for RabbitMQ ready state %s" % cluster_name, retry=retry.Times(node_check_max_count, revert_all=True)) check_rabbit_online.add( cue_tasks.GetRabbitClusterStatus( - name="get RabbitMQ status", + name="get RabbitMQ status %s" % cluster_name, rebind={'vm_ip': "vm_management_ip_0"}, provides="clustering_status", inject={'proto': 'http'}), os_common.CheckFor( - name="check cluster status", + name="check cluster status %s" % cluster_name, details="waiting for RabbitMQ clustered status", rebind={'check_var': "clustering_status"}, check_value='OK', - retry_delay_seconds=10), + retry_delay_seconds=node_check_timeout), ) flow.add(check_rabbit_online) - flow.link(check_rabbit_online, end_task) + flow.link(check_rabbit_online, create_cluster_end_task) #todo(dagnello): verify node_ids is a list and not a string for i, node_id in enumerate(node_ids): generate_userdata = cue_tasks.ClusterNodeUserData( - "userdata_%d" % i, - len(node_ids), - "vm_management_ip_", + name="generate userdata %s_%d" % (cluster_name, i), + node_count=len(node_ids), + node_ip_prefix="vm_management_ip_", inject={'node_name': "rabbit-node-%d" % i, 'cluster_id': cluster_id}) flow.add(generate_userdata) - create_cluster_node.create_cluster_node(cluster_id, i, node_id, flow, - generate_userdata, start_task, - check_rabbit_online, - node_check_timeout, - node_check_max_count, - user_network_id, - management_network_id) - + create_cluster_node.create_cluster_node( + cluster_id=cluster_id, node_number=i, node_id=node_id, + graph_flow=flow, generate_userdata=generate_userdata, + start_task=create_node_start_task, post_task=check_rabbit_online, + user_network_id=user_network_id, + management_network_id=management_network_id) return flow diff --git a/cue/taskflow/flow/create_cluster_node.py b/cue/taskflow/flow/create_cluster_node.py index adfeee5e..a38c92bd 100644 --- a/cue/taskflow/flow/create_cluster_node.py +++ b/cue/taskflow/flow/create_cluster_node.py @@ -37,7 +37,6 @@ CONF.register_opts(FLOW_OPTS, group='flow_options') def create_cluster_node(cluster_id, node_number, node_id, graph_flow, generate_userdata, start_task, post_task, - node_check_timeout, node_check_max_count, user_network_id, management_network_id): """Create Cluster Node factory function @@ -57,10 +56,6 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow, :type post_task: taskflow task or flow :param generate_userdata: generate user data task :type generate_userdata: cue.taskflow.task.ClusterNodeUserData - :param node_check_timeout: seconds wait between node status checks - :type node_check_timeout: int - :param node_check_max_count: times to check for updated node status - :type node_check_max_count: int :param user_network_id: The user's network id :type user_network_id: string :param management_network_id: The management network id @@ -141,7 +136,6 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow, graph_flow.link(create_vm, get_vm_id) retry_count = CONF.flow_options.create_cluster_node_vm_active_retry_count - #todo(dagnello): make retry times configurable check_vm_active = linear_flow.Flow( name="wait for VM active state %s" % node_name, retry=retry.Times(retry_count, revert_all=True)) diff --git a/cue/taskflow/flow/delete_cluster.py b/cue/taskflow/flow/delete_cluster.py index 6238278d..5f639fa2 100644 --- a/cue/taskflow/flow/delete_cluster.py +++ b/cue/taskflow/flow/delete_cluster.py @@ -16,12 +16,14 @@ import taskflow.patterns.linear_flow as linear_flow import taskflow.patterns.unordered_flow as unordered_flow +import cue.client as client from cue.db.sqlalchemy import models from cue.taskflow.flow import delete_cluster_node import cue.taskflow.task as cue_tasks +import os_tasklib.nova as nova -def delete_cluster(cluster_id, node_ids): +def delete_cluster(cluster_id, node_ids, group_id): """Delete Cluster flow factory function This factory function uses :func:`cue.taskflow.flow.delete_cluster_node` to @@ -31,9 +33,12 @@ def delete_cluster(cluster_id, node_ids): :type cluster_id: string :param node_ids: The Cue Node id's associated with each node in the cluster :type node_ids: list of uuid's + :param group_id: The group id associated with the cluster + :type group_id: uuid :return: A flow instance that represents the workflow for deleting a cluster """ + cluster_name = "cue[%s]" % cluster_id flow = linear_flow.Flow("deleting cluster %s" % cluster_id) sub_flow = unordered_flow.Flow("delete VMs") start_flow_status = {'cluster_id': cluster_id, @@ -48,11 +53,16 @@ def delete_cluster(cluster_id, node_ids): node_id)) flow.add(cue_tasks.UpdateClusterRecord(name="update cluster status start " - "%s" % cluster_id, - inject=start_flow_status)) + "%s" % cluster_name, + inject=start_flow_status)) + if group_id is not None: + flow.add(nova.DeleteVmGroup(name="delete nova user group %s %s" % + (group_id, cluster_name), + os_client=client.nova_client(), + inject={'group': group_id})) flow.add(sub_flow) flow.add(cue_tasks.UpdateClusterRecord(name="update cluster status end " - "%s" % cluster_id, - inject=end_flow_status)) + "%s" % cluster_name, + inject=end_flow_status)) return flow diff --git a/cue/tests/functional/db/test_models.py b/cue/tests/functional/db/test_models.py index 4fc4b3da..02a73624 100644 --- a/cue/tests/functional/db/test_models.py +++ b/cue/tests/functional/db/test_models.py @@ -26,6 +26,7 @@ from oslo_utils import timeutils UUID1 = str(uuid.uuid4()) UUID2 = str(uuid.uuid4()) UUID3 = str(uuid.uuid4()) +UUID4 = str(uuid.uuid4()) class ModelsTests(base.FunctionalTestCase): @@ -47,6 +48,7 @@ class ModelsTests(base.FunctionalTestCase): "updated_at": timeutils.utcnow(), "deleted_at": timeutils.utcnow(), "error_detail": "My cluster's error(s) detail", + "group_id": UUID4, } cluster = models.Cluster() @@ -79,6 +81,9 @@ class ModelsTests(base.FunctionalTestCase): self.assertEqual(cluster_values["error_detail"], cluster.error_detail, "Invalid error_detail value") + self.assertEqual(cluster_values["group_id"], + cluster.group_id, + "Invalid group_id value") db_session = sql_api.get_session() cluster.save(db_session) @@ -114,6 +119,9 @@ class ModelsTests(base.FunctionalTestCase): self.assertEqual(cluster_values["error_detail"], cluster_db.error_detail, "Invalid error_detail value") + self.assertEqual(cluster_values["group_id"], + cluster_db.group_id, + "Invalid group_id value") def test_create_node_model(self): """Verifies a new cluster record is created in DB.""" diff --git a/cue/tests/functional/fixtures/nova.py b/cue/tests/functional/fixtures/nova.py index 40dc1fb2..ac5480d1 100644 --- a/cue/tests/functional/fixtures/nova.py +++ b/cue/tests/functional/fixtures/nova.py @@ -24,13 +24,14 @@ import cue.tests.functional.fixtures.base as base class VmDetails(object): def __init__(self, vm_id, name, flavor, image, - port_list=None, status=None): + port_list=None, status=None, host_id=None): self.id = vm_id self.name = name self.flavor = flavor self.image = image self.status = status if status else 'ACTIVE' self.port_list = port_list + self.host_id = host_id def to_dict(self): return { @@ -105,6 +106,20 @@ class VmStatusDetails(object): return status +class VmGroupDetails(object): + def __init__(self, vm_group_id, name, policies=None): + self.id = vm_group_id or str(uuid.uuid4()) + self.name = name or 'cue_group' + self.policies = policies or ['anti-affinity'] + + def to_dict(self): + return { + 'id': self.id, + 'name': self.name, + 'policies': self.policies + } + + class NovaClient(base.BaseFixture): """A test fixture to simulate a Nova Client connection @@ -114,11 +129,12 @@ class NovaClient(base.BaseFixture): def __init__(self, image_list=None, flavor_list=None, vm_limit=None, security_group_list=None, - *args, **kwargs): + vm_group_list=None, *args, **kwargs): super(NovaClient, self).__init__(*args, **kwargs) self._vm_list = dict() self._image_list = dict() self._flavor_list = dict() + self._vm_group_list = dict() if not image_list: image_list = ['cirros-0.3.2-x86_64-uec-kernel'] @@ -158,9 +174,12 @@ class NovaClient(base.BaseFixture): v2_client.images.find = self.find_images v2_client.images.list = self.list_images v2_client.flavors.find = self.find_flavors + v2_client.server_groups.create = self.create_vm_group + v2_client.server_groups.delete = self.delete_vm_group + v2_client.server_groups.get = self.get_vm_group def create_vm(self, name, image, flavor, nics=None, security_groups=None, - **kwargs): + scheduler_hints=None, **kwargs): """Mock'd version of novaclient...create_vm(). Create a Nova VM. @@ -225,6 +244,17 @@ class NovaClient(base.BaseFixture): port_list=port_list, status='BUILDING') + if scheduler_hints is not None: + try: + group_id = scheduler_hints['group'] + except AttributeError: + group_id = scheduler_hints + + if group_id not in self._vm_group_list: + raise nova_exc.BadRequest(400) + + newVm.host_id = str(uuid.uuid4()) + self._vm_list[str(newVm.id)] = newVm return newVm @@ -330,3 +360,42 @@ class NovaClient(base.BaseFixture): raise nova_exc.NotFound(404) return server.port_list + + def create_vm_group(self, name, policies, **kwargs): + """Mock'd version of novaclient...server_group_create(). + + Create a Nova server group. + + :param name: Server group name. + :param policies: Server group policy list. + :return: An updated copy of the 'body' that was passed in, with other + information populated. + """ + newVmGroup = VmGroupDetails(vm_group_id=str(uuid.uuid4()), name=name, + policies=policies) + + self._vm_group_list[newVmGroup.id] = newVmGroup + return newVmGroup + + def get_vm_group(self, id, **kwargs): + """Mock'd version of novaclient...server_group_get() + + :param id: vm server group id + :return: current server group object for specified vm id + """ + try: + vm_group = self._vm_group_list[str(id)] + except KeyError: + raise nova_exc.NotFound(404) + + return vm_group + + def delete_vm_group(self, id, **kwargs): + """Mock'd version of novaclient...server_group_delete() + + :param id: vm server group id + """ + try: + del (self._vm_group_list[str(id)]) + except KeyError: + raise nova_exc.NotFound(404) diff --git a/cue/tests/functional/taskflow/flow/test_create_cluster.py b/cue/tests/functional/taskflow/flow/test_create_cluster.py index 7dee71f3..13a8a5b4 100644 --- a/cue/tests/functional/taskflow/flow/test_create_cluster.py +++ b/cue/tests/functional/taskflow/flow/test_create_cluster.py @@ -240,6 +240,54 @@ class CreateClusterTests(base.FunctionalTestCase): else: self.fail("Expected taskflow_exc.WrappedFailure exception.") + def test_create_cluster_anti_affinity(self): + self.flags(cluster_node_anti_affinity=True, group="taskflow") + + flow_store = { + 'image': self.valid_image.id, + 'flavor': self.valid_flavor.id, + "port": self.port, + "context": self.context.to_dict(), + "erlang_cookie": str(uuid.uuid4()), + "default_rabbit_user": 'rabbit', + "default_rabbit_pass": str(uuid.uuid4()), + } + + cluster_values = { + "project_id": self.context.tenant_id, + "name": "RabbitCluster", + "network_id": str(uuid.uuid4()), + "flavor": "1", + "size": 3, + } + + new_cluster = objects.Cluster(**cluster_values) + new_cluster.create(self.context) + + nodes = objects.Node.get_nodes_by_cluster_id(self.context, + new_cluster.id) + + node_ids = [] + for node in nodes: + node_ids.append(node.id) + + flow = create_cluster(new_cluster.id, + node_ids, + self.valid_network['id'], + self.management_network['id']) + + engines.run(flow, store=flow_store) + + nodes_after = objects.Node.get_nodes_by_cluster_id(self.context, + new_cluster.id) + + # check if the host_ids are different for cluster nodes + host_ids = [] + for node in nodes_after: + host_id = self.nova_client.servers.get(node.instance_id).host_id + self.assertNotIn(host_id, host_ids) + host_ids.append(host_id) + def tearDown(self): for vm_id in self.new_vm_list: self.nova_client.servers.delete(vm_id) diff --git a/cue/tests/functional/taskflow/flow/test_delete_cluster.py b/cue/tests/functional/taskflow/flow/test_delete_cluster.py index ed0f7288..81fb8ffa 100644 --- a/cue/tests/functional/taskflow/flow/test_delete_cluster.py +++ b/cue/tests/functional/taskflow/flow/test_delete_cluster.py @@ -26,6 +26,7 @@ from cue.tests.functional.fixtures import neutron from cue.tests.functional.fixtures import nova from cue.tests.functional.fixtures import urllib2_fixture +import novaclient.exceptions as nova_exc from taskflow import engines import taskflow.exceptions as taskflow_exc @@ -114,10 +115,9 @@ class DeleteClusterTests(base.FunctionalTestCase): node_ids.append(str(node.id)) flow_create = create_cluster(new_cluster.id, - node_ids, - self.valid_network['id'], - self.management_network['id']) - flow_delete = delete_cluster(str(new_cluster.id), node_ids) + node_ids, + self.valid_network['id'], + self.management_network['id']) result = engines.run(flow_create, store=flow_store_create) @@ -148,6 +148,8 @@ class DeleteClusterTests(base.FunctionalTestCase): self.assertEqual(uri, endpoint.uri, "invalid endpoint uri") self.assertEqual('AMQP', endpoint.type, "invalid endpoint type") + flow_delete = delete_cluster(str(new_cluster.id), node_ids, + cluster_after.group_id) result = engines.run(flow_delete, store=flow_store_delete) nodes_after = objects.Node.get_nodes_by_cluster_id(self.context, @@ -166,6 +168,67 @@ class DeleteClusterTests(base.FunctionalTestCase): node.id) self.assertEqual(0, len(endpoints), "endpoints were not deleted") + def test_delete_cluster_anti_affinity(self): + self.flags(cluster_node_anti_affinity=True, group="taskflow") + + flow_store_create = { + "image": self.valid_image.id, + "flavor": self.valid_flavor.id, + "port": self.port, + "context": self.context.to_dict(), + "erlang_cookie": str(uuid.uuid4()), + "default_rabbit_user": 'rabbit', + "default_rabbit_pass": str(uuid.uuid4()), + } + flow_store_delete = { + "context": self.context.to_dict(), + } + + cluster_values = { + "project_id": self.context.tenant_id, + "name": "RabbitCluster", + "network_id": str(uuid.uuid4()), + "flavor": "1", + "size": 3, + } + + new_cluster = objects.Cluster(**cluster_values) + new_cluster.create(self.context) + + nodes = objects.Node.get_nodes_by_cluster_id(self.context, + new_cluster.id) + + node_ids = [] + for node in nodes: + node_ids.append(str(node.id)) + + flow_create = create_cluster(new_cluster.id, + node_ids, + self.valid_network['id'], + self.management_network['id']) + + engines.run(flow_create, store=flow_store_create) + + cluster_after = objects.Cluster.get_cluster_by_id(self.context, + new_cluster.id) + + self.assertEqual(models.Status.ACTIVE, cluster_after.status, + "Invalid status for cluster") + + flow_delete = delete_cluster(str(new_cluster.id), node_ids, + cluster_after.group_id) + engines.run(flow_delete, store=flow_store_delete) + + self.assertRaises(exception.NotFound, + objects.Cluster.get_cluster_by_id, + self.context, + new_cluster.id) + + # verify server group is not found + self.assertRaises(nova_exc.NotFound, + self.nova_client.server_groups.get, + cluster_after.group_id) + def test_delete_invalid_cluster(self): vm_list = self.nova_client.servers.list() port_list = self.neutron_client.list_ports() @@ -176,11 +239,12 @@ class DeleteClusterTests(base.FunctionalTestCase): cluster_size = 3 cluster_id = str(uuid.uuid4()) + server_group_id = str(uuid.uuid4()) node_ids = [] for i in range(0, cluster_size): node_ids.append(str(uuid.uuid4())) - flow_delete = delete_cluster(cluster_id, node_ids) + flow_delete = delete_cluster(cluster_id, node_ids, server_group_id) self.assertRaises(taskflow_exc.WrappedFailure, engines.run, flow_delete, store=flow_store_delete) diff --git a/cue/tests/functional/taskflow/task/test_create_vm_group.py b/cue/tests/functional/taskflow/task/test_create_vm_group.py new file mode 100644 index 00000000..45906e77 --- /dev/null +++ b/cue/tests/functional/taskflow/task/test_create_vm_group.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from cue import client +from cue.tests.functional import base +from cue.tests.functional.fixtures import nova +import os_tasklib.nova.create_vm_group as create_vm_group + +from taskflow import engines +from taskflow.patterns import linear_flow + + +class CreateVmGroupTests(base.FunctionalTestCase): + additional_fixtures = [ + nova.NovaClient + ] + + def setUp(self): + super(CreateVmGroupTests, self).setUp() + + self.nova_client = client.nova_client() + + self.new_vm_group_name = str(uuid.uuid4()) + self.new_vm_group_id = None + + self.flow = linear_flow.Flow("create vm group flow") + self.flow.add( + create_vm_group.CreateVmGroup( + os_client=self.nova_client, + requires=('name', 'policies'), + provides='new_vm_group', + rebind={'name': 'vm_group_name'} + ) + ) + + def test_create_vm_group(self): + """Verifies CreateVMGroup task directly.""" + + flow_store = { + 'vm_group_name': self.new_vm_group_name, + 'policies': ['anti-affinity'] + } + + result = engines.run(self.flow, store=flow_store) + self.new_vm_group_id = result['new_vm_group']['id'] + vm_group = self.nova_client.server_groups.get(self.new_vm_group_id) + self.assertEqual(self.new_vm_group_name, vm_group.name) diff --git a/cue/tests/functional/taskflow/task/test_delete_vm_group.py b/cue/tests/functional/taskflow/task/test_delete_vm_group.py new file mode 100644 index 00000000..76222c4a --- /dev/null +++ b/cue/tests/functional/taskflow/task/test_delete_vm_group.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from cue import client +from cue.tests.functional import base +from cue.tests.functional.fixtures import nova +import os_tasklib.nova.delete_vm_group as delete_vm_group + +import novaclient.exceptions as nova_exc +from taskflow import engines +from taskflow.patterns import linear_flow + + +class DeleteVmGroupTests(base.FunctionalTestCase): + additional_fixtures = [ + nova.NovaClient + ] + + task_store = { + 'group': "0", + } + + def setUp(self): + super(DeleteVmGroupTests, self).setUp() + + self.nova_client = client.nova_client() + + self.new_vm_group_name = str(uuid.uuid4()) + self.new_vm_group_id = None + + self.flow = linear_flow.Flow("delete vm group flow") + self.flow.add( + delete_vm_group.DeleteVmGroup(os_client=self.nova_client) + ) + + def test_delete_server_group_invalid_id(self): + """Verifies Delete non-existing server group.""" + # create a few server groups + server_groups = [ + self.nova_client.server_groups.create(name="server_group_1", + policies=['anti-affinity']), + self.nova_client.server_groups.create(name="server_group_2", + policies=['affinity']), + self.nova_client.server_groups.create(name="server_group_3", + policies=['anti-affinity'])] + + self.task_store['group'] = str(uuid.uuid4()) + + # start engine to run delete task + engines.run(self.flow, store=DeleteVmGroupTests.task_store) + + # retrieve all server groups + server_groups_found = [] + for server_group in server_groups: + server_groups_found.append( + self.nova_client.server_groups.get(server_group.id)) + + # verify the number of server groups did not change + self.assertEqual(len(server_groups), len(server_groups_found), + "Not all server groups were found") + + def test_delete_server_group(self): + """Verifies Delete existing server group.""" + # create a few server groups + server_groups = [ + self.nova_client.server_groups.create(name="server_group_1", + policies=['anti-affinity']), + self.nova_client.server_groups.create(name="server_group_2", + policies=['anti-affinity']), + self.nova_client.server_groups.create(name="server_group_3", + policies=['affinity'])] + + # select a server group to delete + self.task_store['group'] = server_groups[0].id + + # start engine to run delete server group task + engines.run(self.flow, store=DeleteVmGroupTests.task_store) + + # verify deleted server group is not found + self.assertRaises(nova_exc.NotFound, + self.nova_client.server_groups.get, + server_groups[0].id) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index f8593a0c..082ca546 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -76,6 +76,7 @@ jobboard_name 'cue' Board name engine_type 'serial' Engine type cluster_node_check_timeout 10 Number of seconds between node status checks cluster_node_check_max_count 30 Number of times to check a node for status +cluster_node_anti_affinity False Anti-affinity policy for cue cluster nodes ============================= ==================================== ============================================================== [openstack] diff --git a/os_tasklib/nova/__init__.py b/os_tasklib/nova/__init__.py index d30914ce..90980d00 100644 --- a/os_tasklib/nova/__init__.py +++ b/os_tasklib/nova/__init__.py @@ -14,7 +14,9 @@ # under the License. from os_tasklib.nova.create_vm import CreateVm # noqa +from os_tasklib.nova.create_vm_group import CreateVmGroup # noqa from os_tasklib.nova.delete_vm import DeleteVm # noqa +from os_tasklib.nova.delete_vm_group import DeleteVmGroup # noqa from os_tasklib.nova.get_vm import GetVm # noqa from os_tasklib.nova.get_vm_status import GetVmStatus # noqa from os_tasklib.nova.list_vm_interfaces import ListVmInterfaces # noqa diff --git a/os_tasklib/nova/create_vm_group.py b/os_tasklib/nova/create_vm_group.py new file mode 100644 index 00000000..db6c9eba --- /dev/null +++ b/os_tasklib/nova/create_vm_group.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import os_tasklib + +from cue.common.i18n import _LW # noqa + +from oslo_log import log as logging + + +LOG = logging.getLogger(__name__) + + +class CreateVmGroup(os_tasklib.BaseTask): + """CreateVmGroup Task + + This task interfaces with Nova API and creates a VM group based on + parameters provided to the Task. + + """ + + def execute(self, name, policies): + """Main execute method + + :param name: Name of the server group + :type name: string + :param policies: policy is either affinity or anti-affinity + :type policies: list + :rtype: dict + """ + new_vm_group = self.os_client.server_groups.create( + name=name, + policies=policies + ) + + return new_vm_group.to_dict() + + def revert(self, *args, **kwargs): + """Revert CreateVmGroup Task + + This method is executed upon failure of the CreateVmGroup Task or the + Flow that the Task is part of. + + :param args: positional arguments that the task required to execute. + :type args: list + :param kwargs: keyword arguments that the task required to execute; the + special key `result` will contain the :meth:`execute` + results (if any) and the special key `flow_failures` + will contain any failure information. + """ + + if kwargs.get('tx_id'): + LOG.warning(_LW("%(tx_id)s Create VM Group failed %(result)s") % + {'tx_id': kwargs['tx_id'], + 'result': kwargs['flow_failures']}) + else: + LOG.warning(_LW("Create VM Group failed %s") % + kwargs['flow_failures']) + + vm_group_info = kwargs.get('result') + if vm_group_info and isinstance(vm_group_info, dict): + try: + vm_group_id = vm_group_info['id'] + if vm_group_id: + self.os_client.server_groups.delete(vm_group_id) + except KeyError: + pass diff --git a/os_tasklib/nova/delete_vm_group.py b/os_tasklib/nova/delete_vm_group.py new file mode 100644 index 00000000..20a1e4a9 --- /dev/null +++ b/os_tasklib/nova/delete_vm_group.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import os_tasklib + +from cue.common.i18n import _LW # noqa + +import novaclient.exceptions as nova_exc +from oslo_log import log as logging + + +LOG = logging.getLogger(__name__) + + +class DeleteVmGroup(os_tasklib.BaseTask): + """DeleteVmGroup Task + + This task interfaces with Nova API and deletes a VM server group. + + """ + + def execute(self, group): + """Main execute method + + :param name: ID of the server group to delete + :type name: string + :return: n/a + """ + try: + self.os_client.server_groups.delete(id=group) + except nova_exc.NotFound: + LOG.warning(_LW("Server group was not found %s") % group)