provisioning rabbitmq via userdata/cloud-init

- changed create cluster flows from a combination of unordered and
  linear flows to a graph flow
- added userdata generation step to create userdata to be used by
  each VM
- added check_or_restart_rabbitmq, to check rabbitmq status a
  number of times then restart the VM, before checking again

Change-Id: I98f2aa031ccd7e8bb068fa4cece3dd8af263e33e
This commit is contained in:
Min Pae
2015-02-27 13:56:57 -08:00
parent 85c84945c8
commit 21ef0c6677
23 changed files with 541 additions and 124 deletions

View File

@@ -58,6 +58,7 @@ UBUNTU_IMAGE_MINDISK=4
CUE_FLAVOR=cue.small
CUE_FLAVOR_PARAMS="--id 8795 --ram 512 --disk $UBUNTU_IMAGE_MINDISK --vcpus 1"
CUE_RABBIT_SECURITY_GROUP='cue-rabbitmq'
# cleanup_cue - Remove residual data files, anything left over from previous
# runs that a clean run would need to clean up
@@ -87,6 +88,10 @@ function configure_cue {
iniset $CUE_CONF taskflow persistence_connection `database_connection_url $CUE_TF_DB`
fi
# Set cluster node check timeouts
iniset $CUE_CONF taskflow cluster_node_check_timeout 10
iniset $CUE_CONF taskflow cluster_node_check_max_count 30
iniset $CUE_CONF openstack os_auth_url $KEYSTONE_AUTH_PROTOCOL://$KEYSTONE_AUTH_HOST:$KEYSTONE_AUTH_PORT/v2.0
iniset $CUE_CONF openstack os_tenant_name admin
iniset $CUE_CONF openstack os_username admin

View File

@@ -11,8 +11,6 @@ RABBIT_PASSWORD=password
SERVICE_PASSWORD=password
SERVICE_TOKEN=password
CUE_MANAGEMENT_KEY_NAME=cue-mgmt-key
# Enable Logging
LOGFILE=/opt/stack/logs/stack.sh.log
VERBOSE=True
@@ -62,4 +60,4 @@ enable_service zookeeper
enable_service cue
enable_service cue-api
enable_service cue-worker
CUE_MANAGEMENT_KEY=cue-mgmt-key

View File

@@ -35,8 +35,8 @@ if [[ -z $CUE_MANAGEMENT_KEY ]]; then
fi
# Add ssh keypair to admin account
if [[ -z $(nova keypair-list | grep vagrant) ]]; then
nova keypair-add --pub-key ~/.ssh/id_rsa.pub vagrant
if [[ -z $(nova keypair-list | grep $CUE_MANAGEMENT_KEY) ]]; then
nova keypair-add --pub-key ~/.ssh/id_rsa.pub $CUE_MANAGEMENT_KEY
fi
# Add ping and ssh rules to rabbitmq security group

View File

@@ -28,7 +28,7 @@ cfg.CONF.register_opts([
cfg.StrOpt('state-path', default='/var/lib/cue',
help='Top-level directory for maintaining cue\'s state'),
cfg.StrOpt('os_security_group',
default='default',
default=None,
help='The default Security Group to use for VMs created as '
'part of a cluster')
])

View File

@@ -237,6 +237,10 @@ class ClusterController(rest.RestController):
'node_ids': node_ids,
}
# generate unique erlang cookie to be used by all nodes in the new
# cluster, erlang cookies are strings of up to 255 characters
erlang_cookie = uuidutils.generate_uuid()
job_args = {
'flavor': cluster.cluster.flavor,
# TODO(sputnik13): need to remove this when image selector is done
@@ -245,11 +249,12 @@ class ClusterController(rest.RestController):
'network_id': cluster.cluster.network_id,
'port': '5672',
'context': context.to_dict(),
'cluster_status': 'BUILDING',
# TODO(sputnik13: this needs to come from the create request and
# default to a configuration value rather than always using config
# value
'security_groups': [CONF.os_security_group],
'key_name': CONF.openstack.os_key_name,
'erlang_cookie': erlang_cookie,
}
job_client = task_flow_client.get_client_instance()
#TODO(dagnello): might be better to use request_id for job_uuid

View File

@@ -41,6 +41,12 @@ OS_OPTS = [
cfg.StrOpt('os_auth_url',
help='Openstack Authentication (Identity) URL',
default=None),
cfg.StrOpt('os_key_name',
help='SSH key to be provisioned to cue VMs',
default=None),
cfg.StrOpt('os_availability_zone',
help='Default availability zone to provision cue VMs',
default=None),
]
opt_group = cfg.OptGroup(
@@ -54,11 +60,12 @@ CONF.register_opts(OS_OPTS, group=opt_group)
def nova_client():
return NovaClient.Client(2,
CONF.openstack.os_username,
CONF.openstack.os_password,
CONF.openstack.os_tenant_name,
CONF.openstack.os_auth_url,
CONF.openstack.os_region_name,
username=CONF.openstack.os_username,
api_key=CONF.openstack.os_password,
project_id=CONF.openstack.os_tenant_name,
tenant_id=CONF.openstack.os_tenant_id,
auth_url=CONF.openstack.os_auth_url,
region_name=CONF.openstack.os_region_name,
)

View File

@@ -42,6 +42,15 @@ TF_OPTS = [
cfg.StrOpt('engine_type',
help="Engine type.",
default='serial'),
cfg.IntOpt('cluster_node_check_timeout',
help="Number of seconds to wait between checks for node status",
default=10),
cfg.IntOpt('cluster_node_check_max_count',
help="Number of times to check a node for status before "
"declaring it FAULTED",
default=30),
]
opt_group = cfg.OptGroup(

View File

@@ -237,7 +237,7 @@ class Client(object):
tx_uuid = uuidutils.generate_uuid()
job_name = "%s[%s]" % (flow_factory.__name__, tx_uuid)
book = logbook.LogBook(job_name)
book = logbook.LogBook(job_name, uuid=tx_uuid)
if flow_factory is not None:
flow_detail = logbook.FlowDetail(job_name, str(uuid.uuid4()))

View File

@@ -13,8 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import taskflow.patterns.linear_flow as linear_flow
import taskflow.patterns.unordered_flow as unordered_flow
import oslo_config.cfg as cfg
import taskflow.patterns.graph_flow as graph_flow
from cue.db.sqlalchemy import models
from cue.taskflow.flow import create_cluster_node
@@ -34,24 +34,39 @@ def create_cluster(cluster_id, node_ids):
:return: A flow instance that represents the workflow for creating a
cluster
"""
flow = linear_flow.Flow("creating cluster %s" % cluster_id)
sub_flow = unordered_flow.Flow("create VMs")
flow = graph_flow.Flow("creating cluster %s" % cluster_id)
start_flow_status = {'cluster_id': cluster_id,
'cluster_values': {'status': models.Status.BUILDING}}
end_flow_status = {'cluster_id': cluster_id,
'cluster_values': {'status': models.Status.ACTIVE}}
start_task = cue_tasks.UpdateClusterStatus(
name="update cluster status start "
"%s" % cluster_id,
inject=start_flow_status)
flow.add(start_task)
end_task = cue_tasks.UpdateClusterStatus(
name="update cluster status end "
"%s" % cluster_id,
inject=end_flow_status)
flow.add(end_task)
node_check_timeout = cfg.CONF.taskflow.cluster_node_check_timeout
node_check_max_count = cfg.CONF.taskflow.cluster_node_check_max_count
#todo(dagnello): verify node_ids is a list and not a string
for i, node_id in enumerate(node_ids):
sub_flow.add(create_cluster_node.create_cluster_node(cluster_id, i,
node_id))
generate_userdata = cue_tasks.ClusterNodeUserData(
"userdata_%d" % i,
len(node_ids),
"vm_ip_",
inject={'node_name': "rabbit-node-%d" % i})
flow.add(generate_userdata)
flow.add(cue_tasks.UpdateClusterStatus(name="update cluster status start "
"%s" % cluster_id,
inject=start_flow_status))
flow.add(sub_flow)
flow.add(cue_tasks.UpdateClusterStatus(name="update cluster status end "
"%s" % cluster_id,
inject=end_flow_status))
create_cluster_node.create_cluster_node(cluster_id, i, node_id, flow,
generate_userdata, start_task,
end_task, node_check_timeout,
node_check_max_count)
return flow

View File

@@ -24,7 +24,9 @@ import os_tasklib.neutron as neutron
import os_tasklib.nova as nova
def create_cluster_node(cluster_id, node_number, node_id):
def create_cluster_node(cluster_id, node_number, node_id, graph_flow,
generate_userdata, start_task, end_task,
node_check_timeout, node_check_max_count):
"""Create Cluster Node factory function
This factory function creates a flow for creating a node of a cluster.
@@ -38,14 +40,11 @@ def create_cluster_node(cluster_id, node_number, node_id):
:return: A flow instance that represents the workflow for creating a
cluster node.
"""
flow_name = "create cluster %s node %d" % (cluster_id, node_number)
node_name = "cluster[%s].node[%d]" % (cluster_id, node_number)
node_name = "cue[%s].node[%d]" % (cluster_id, node_number)
extract_port_id = (lambda port_info:
[{'port-id': port_info['port']['id']}])
extract_port_ip = (lambda port_info:
port_info['port']['fixed_ips'][0]['ip_address'])
extract_port_info = (lambda port_info:
([{'port-id': port_info['port']['id']}],
port_info['port']['fixed_ips'][0]['ip_address']))
extract_vm_id = lambda vm_info: str(vm_info['id'])
@@ -56,77 +55,102 @@ def create_cluster_node(cluster_id, node_number, node_id):
'uri': vm_ip + ':',
'type': 'AMQP'}
flow = linear_flow.Flow(flow_name)
flow.add(
neutron.CreatePort(
name="create port %s" % node_name,
os_client=client.neutron_client(),
inject={'port_name': node_name},
provides="port_info_%d" % node_number),
os_common.Lambda(
extract_port_id,
name="extract port id %s" % node_name,
rebind={'port_info': "port_info_%d" % node_number},
provides="port_id_%d" % node_number),
os_common.Lambda(
extract_port_ip,
name="extract port ip %s" % node_name,
rebind={'port_info': "port_info_%d" % node_number},
provides="vm_ip_%d" % node_number),
nova.CreateVm(
name="create vm %s" % node_name,
create_port = neutron.CreatePort(
name="create port %s" % node_name,
os_client=client.neutron_client(),
inject={'port_name': node_name},
provides="port_info_%d" % node_number)
graph_flow.add(create_port)
graph_flow.link(start_task, create_port)
extract_port_data = os_common.Lambda(
extract_port_info,
name="extract port id %s" % node_name,
rebind={'port_info': "port_info_%d" % node_number},
provides=("port_id_%d" % node_number, "vm_ip_%d" % node_number))
graph_flow.add(extract_port_data)
graph_flow.link(create_port, extract_port_data)
graph_flow.link(extract_port_data, generate_userdata)
create_vm = nova.CreateVm(name="create vm %s" % node_name,
os_client=client.nova_client(),
requires=('name', 'image', 'flavor', 'nics'),
inject={'name': node_name},
rebind={'nics': "port_id_%d" % node_number},
provides="vm_info_%d" % node_number)
graph_flow.add(create_vm)
graph_flow.link(generate_userdata, create_vm)
get_vm_id = os_common.Lambda(extract_vm_id,
name="extract vm id %s" % node_name,
rebind={'vm_info': "vm_info_%d" % node_number},
provides="vm_id_%d" % node_number)
graph_flow.add(get_vm_id)
graph_flow.link(create_vm, get_vm_id)
#todo(dagnello): make retry times configurable
check_vm_active = linear_flow.Flow(
name="wait for VM active state %s" % node_name,
retry=retry.Times(12))
check_vm_active.add(
nova.GetVmStatus(
os_client=client.nova_client(),
requires=('name', 'image', 'flavor', 'nics'),
inject={'name': node_name},
rebind={'nics': "port_id_%d" % node_number},
provides="vm_info_%d" % node_number),
os_common.Lambda(
extract_vm_id,
name="extract vm id %s" % node_name,
rebind={'vm_info': "vm_info_%d" % node_number},
provides="vm_id_%d" % node_number),
#todo(dagnello): make retry times configurable
linear_flow.Flow(name="wait for VM active state %s" % node_name,
retry=retry.Times(12)).add(
nova.GetVmStatus(
os_client=client.nova_client(),
name="get vm %s" % node_name,
rebind={'nova_vm_id': "vm_id_%d" % node_number},
provides="vm_status_%d" % node_number),
os_common.CheckFor(
name="check vm status %s" % node_name,
rebind={'check_var': "vm_status_%d" % node_number},
check_value='ACTIVE',
retry_delay_seconds=10),
),
#todo(dagnello): make retry times configurable
linear_flow.Flow(name="wait for RabbitMQ ready state %s" % node_name,
retry=retry.Times(30)).add(
os_common.VerifyNetwork(
name="get RabbitMQ status %s" % node_name,
rebind={'vm_ip': "vm_ip_%d" % node_number},
retry_delay_seconds=10
)),
os_common.Lambda(
new_node_values,
name="build new node values %s" % node_name,
name="get vm %s" % node_name,
rebind={'nova_vm_id': "vm_id_%d" % node_number},
provides="node_values_%d" % node_number
),
cue_tasks.UpdateNode(
name="update node %s" % node_name,
rebind={'node_values': "node_values_%d" % node_number},
inject={'node_id': node_id},
),
os_common.Lambda(
new_endpoint_values,
name="build new endpoint values %s" % node_name,
provides="vm_status_%d" % node_number),
os_common.CheckFor(
name="check vm status %s" % node_name,
rebind={'check_var': "vm_status_%d" % node_number},
check_value='ACTIVE',
retry_delay_seconds=10),
)
graph_flow.add(check_vm_active)
graph_flow.link(get_vm_id, check_vm_active)
#todo(dagnello): make retry times configurable
check_rabbit_online = linear_flow.Flow(
name="wait for RabbitMQ ready state %s" % node_name,
retry=retry.Times(node_check_max_count))
check_rabbit_online.add(
os_common.VerifyNetwork(
name="get RabbitMQ status %s" % node_name,
rebind={'vm_ip': "vm_ip_%d" % node_number},
inject={'node_id': node_id},
provides="endpoint_values_%d" % node_number
),
cue_tasks.CreateEndpoint(
name="update endpoint for node %s" % node_name,
rebind={'endpoint_values': "endpoint_values_%d" % node_number}
))
return flow
retry_delay_seconds=node_check_timeout))
graph_flow.add(check_rabbit_online)
graph_flow.link(check_vm_active, check_rabbit_online)
build_node_info = os_common.Lambda(
new_node_values,
name="build new node values %s" % node_name,
rebind={'nova_vm_id': "vm_id_%d" % node_number},
provides="node_values_%d" % node_number
)
graph_flow.add(build_node_info)
graph_flow.link(check_rabbit_online, build_node_info)
update_node = cue_tasks.UpdateNode(
name="update node %s" % node_name,
rebind={'node_values': "node_values_%d" % node_number},
inject={'node_id': node_id})
graph_flow.add(update_node)
graph_flow.link(build_node_info, update_node)
build_endpoint_info = os_common.Lambda(
new_endpoint_values,
name="build new endpoint values %s" % node_name,
rebind={'vm_ip': "vm_ip_%d" % node_number},
inject={'node_id': node_id},
provides="endpoint_values_%d" % node_number
)
graph_flow.add(build_endpoint_info)
graph_flow.link(check_rabbit_online, build_endpoint_info)
create_endpoint = cue_tasks.CreateEndpoint(
name="update endpoint for node %s" % node_name,
rebind={'endpoint_values': "endpoint_values_%d" % node_number})
graph_flow.add(create_endpoint)
graph_flow.link(check_rabbit_online, create_endpoint)
graph_flow.link(update_node, end_task)
graph_flow.link(create_endpoint, end_task)

View File

@@ -13,8 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from create_endpoint_task import CreateEndpoint # noqa
from get_node import GetNode # noqa
from update_cluster_task import UpdateClusterStatus # noqa
from update_endpoints_task import UpdateEndpoints # noqa
from update_node_task import UpdateNode # noqa
from check_or_restart_rabbitmq import CheckOrRestartRabbitMq # noqa
from cluster_node_userdata import ClusterNodeUserData # noqa
from create_endpoint_task import CreateEndpoint # noqa
from get_node import GetNode # noqa
from update_cluster_task import UpdateClusterStatus # noqa
from update_endpoints_task import UpdateEndpoints # noqa
from update_node_task import UpdateNode # noqa

View File

@@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import telnetlib as telnet
import time
import six
import taskflow.task
class CheckOrRestartRabbitMq(taskflow.task.Task):
"""Check or Restart RabbitMQ VM
This task either checks that RabbitMQ is running or restarts the VM,
depending on the supplied action.
"""
def __init__(self,
os_client,
retry_delay_seconds=None,
retry_delay_ms=None,
name=None,
**kwargs):
"""Constructor
This constructor sets the retry delay for this task's revert method.
:param retry_delay_seconds: retry delay in seconds
:param retry_delay_ms: retry delay in milliseconds
:param name: unique name for atom
"""
super(CheckOrRestartRabbitMq, self).__init__(name=name, **kwargs)
self.os_client = os_client
self.sleep_time = 0
if retry_delay_seconds:
self.sleep_time = retry_delay_seconds
if retry_delay_ms:
self.sleep_time += retry_delay_ms / 1000.0
def execute(self, action, vm_info, vm_ip, port, **kwargs):
"""main execute method
:param action: The request context in dict format.
:type action: oslo_context.RequestContext
:param vm_info: Unique ID for the node.
:type vm_info: dict or string
:param vm_ip:
:type vm_ip:
:param port:
:type port:
"""
if six.PY2 and isinstance(port, unicode):
check_port = port.encode()
else:
check_port = port
if isinstance(vm_info, dict):
vm_id = vm_info['id']
else:
vm_id = vm_info
if action == 'restart':
self.os_client.servers.reboot(vm_id)
tn = telnet.Telnet()
tn.open(vm_ip, check_port, timeout=10)
def revert(self, *args, **kwargs):
"""Revert CreateVmTask
This method is executed upon failure of the GetRabbitVmStatus or the
Flow that the Task is part of.
:param args: positional arguments that the task required to execute.
:type args: list
:param kwargs: keyword arguments that the task required to execute; the
special key `result` will contain the :meth:`execute`
results (if any) and the special key `flow_failures`
will contain any failure information.
"""
if self.sleep_time != 0:
time.sleep(self.sleep_time)

View File

@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import email.mime.multipart as mime_multipart
import email.mime.text as mime_text
import sys
import jinja2
import taskflow.task as task
class ClusterNodeUserData(task.Task):
default_provides = 'userdata'
def __init__(self, name, node_count, node_ip_prefix, inject=None):
requires = ["%s%d" % (node_ip_prefix, i) for i in range(node_count)]
requires.append('erlang_cookie')
requires.append('node_name')
super(ClusterNodeUserData, self).__init__(name=name,
requires=requires,
inject=inject)
self.node_count = node_count
self.node_ip_prefix = node_ip_prefix
env = jinja2.Environment(
loader=jinja2.PackageLoader('cue', 'templates'))
self.cloud_config_template = env.get_template('cloud_config.tmpl')
self.userdata_template = env.get_template('install_rabbit.sh.tmpl')
def execute(self, *args, **kwargs):
userdata = mime_multipart.MIMEMultipart()
rabbit_nodes = {"rabbit-node-%d" % i:
kwargs["%s%d" % (self.node_ip_prefix, i)]
for i in range(self.node_count)}
cloud_config = self.cloud_config_template.render(
node_name=kwargs['node_name'])
sub_message = mime_text.MIMEText(cloud_config,
'cloud-config',
sys.getdefaultencoding())
sub_message.add_header('Content-Disposition',
'attachment; filename="cloud_config"')
userdata.attach(sub_message)
userdata_inputs = {
'rabbit_nodes': rabbit_nodes,
'erlang_cookie': kwargs['erlang_cookie'],
}
script = self.userdata_template.render(userdata_inputs)
sub_message = mime_text.MIMEText(script,
'x-shellscript',
sys.getdefaultencoding())
sub_message.add_header('Content-Disposition',
'attachment; filename="setup_rabbitmq.sh"')
userdata.attach(sub_message)
return userdata.as_string()

View File

@@ -0,0 +1,2 @@
hostname: {{node_name}}
manage_etc_hosts: false

View File

@@ -0,0 +1,38 @@
#!/bin/bash
cat > /etc/hosts << EOF
127.0.0.1 localhost
{%- for node_name, node_ip in rabbit_nodes.iteritems() %}
{{node_ip}} {{node_name}}
{%- endfor %}
EOF
if [[ ! "`grep rabbitmq /etc/passwd`" ]]; then
useradd -d /var/lib/rabbitmq -U -m rabbitmq
fi
mkdir -p /etc/rabbitmq /var/lib/rabbitmq
echo {{erlang_cookie}} > /var/lib/rabbitmq/.erlang.cookie
chmod 0400 /var/lib/rabbitmq/.erlang.cookie
cat > /etc/rabbitmq/rabbitmq.config << EOF
[
{rabbit, [
{cluster_nodes, {[
{%- for node_name in rabbit_nodes -%}
'rabbit@{{node_name}}' {%- if not loop.last -%},{%- endif -%}
{%- endfor -%}
], disc}}
]}
].
EOF
chown -R rabbitmq:rabbitmq /var/lib/rabbitmq
apt-get update
apt-get install -y rabbitmq-server
for i in `seq 5`; do
if [[ ! "`service rabbitmq-server status | grep pid`" ]]; then
sleep 1
service rabbitmq-server start
fi
done

View File

@@ -44,7 +44,7 @@ class CreateClusterTests(base.TestCase):
self.nova_client = client.nova_client()
self.neutron_client = client.neutron_client()
self.port = '5672'
self.port = u'5672'
self.new_vm_name = str(uuid.uuid4())
self.new_vm_list = []
@@ -68,6 +68,7 @@ class CreateClusterTests(base.TestCase):
"network_id": self.valid_network['id'],
"port": self.port,
"context": self.context.to_dict(),
"erlang_cookie": str(uuid.uuid4()),
}
cluster_values = {
@@ -129,6 +130,7 @@ class CreateClusterTests(base.TestCase):
'network_id': self.valid_network['id'],
"port": self.port,
"context": self.context.to_dict(),
"erlang_cookie": str(uuid.uuid4()),
}
cluster_values = {

View File

@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cue.client as client
import cue.taskflow.task as cue_task
from cue.tests import base
from cue.tests.test_fixtures import nova
from cue.tests.test_fixtures import telnet
from taskflow import engines
from taskflow.patterns import linear_flow
import taskflow.retry as retry
import uuid
class CheckOrRestartRabbitTest(base.TestCase):
additional_fixtures = [
nova.NovaClient,
telnet.TelnetClient
]
task_store = {
'vm_ip': "0.0.0.0",
'vm_info': str(uuid.uuid4()),
'port': '5672'
}
def setUp(self):
super(CheckOrRestartRabbitTest, self).setUp()
self.nova_client = client.nova_client()
rabbitmq_retry_strategy = [
'check',
'check',
'check',
'check',
'restart',
]
retry_controller = retry.ForEach(rabbitmq_retry_strategy,
"retry check RabbitMQ",
provides="retry_strategy")
self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state",
retry=retry_controller).add(
cue_task.CheckOrRestartRabbitMq(
os_client=self.nova_client,
name="get RabbitMQ status %s",
rebind={'action': "retry_strategy"},
retry_delay_seconds=1))
def test_get_rabbit_status(self):
"""Verifies GetRabbitVmStatus task directly."""
# start engine to run task
engines.run(self.flow, store=CheckOrRestartRabbitTest.task_store)
def test_get_vm_status_flow(self):
"""Verifies GetRabbitVmStatus in a successful retry flow.
This test simulates creating a cluster, then directly running a flow
which will fail until telnet_status acquired from get_rabbit_vm_status
task returns 'connect'. Attempting the telnet connection will return
'wait' for the first three times, then will return 'connected' once a
telnet connection is made. The node status should be in Active state.
"""
# configure custom vm_status list
telnet.TelnetStatusDetails.set_telnet_status(['connected',
'wait',
'wait',
'wait'])
# create flow with "GetRabbitVmStatus" task
# start engine to run task
engines.run(self.flow, store=CheckOrRestartRabbitTest.task_store)
self.assertFalse(self.nova_client.servers.reboot.called)
def test_get_vm_status_flow_fail(self):
"""Verifies GetRabbitVmStatus in an unsuccessful retry flow.
This test simulates creating a cluster, then directly running a flow
which will fail until the retry count has been exhausted. Attempting
the telnet connection will return 'wait' until retry count reaches
limit and flow fails. The node status should remain in Building state.
"""
# configure custom vm_status list
telnet.TelnetStatusDetails.set_telnet_status(['wait',
'wait',
'wait',
'wait',
'wait',
'wait'])
# start engine to run task
self.assertRaises(IOError, engines.run, self.flow,
store=CheckOrRestartRabbitTest.task_store)
self.assertTrue(self.nova_client.servers.reboot.called)

View File

@@ -733,3 +733,13 @@ os_image_id=
# Openstack Authentication (Identity) URL
#
#os_auth_url=
#
# Openstack SSH keypair name
#
#os_key_name=
#
# Openstack Availability zone
#
#os_availability_zone=

View File

@@ -46,7 +46,19 @@
#
#engine_type=serial
#
# Cluster node check timeout. The number of seconds to wait until timing out
# status checks on a cluster node.
# Default: 10
#
#cluster_node_check_timeout=10
#
# Cluster node check max count. The maximum number of times to check a cluster
# node for status before declaring the node FAULTED.
# Default: 30
#
#cluster_node_check_max_count=30
[worker]

View File

@@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from assert_task import Assert # noqa
from check_for import CheckFor # noqa
from lambda_task import Lambda # noqa
from map_task import Map # noqa
from reduce_task import Reduce # noqa
from verify_network_task import VerifyNetwork # noqa
from assert_task import Assert # noqa
from check_for import CheckFor # noqa
from lambda_task import Lambda # noqa
from map_task import Map # noqa
from reduce_task import Reduce # noqa
from verify_network_task import VerifyNetwork # noqa

View File

@@ -16,6 +16,7 @@
import telnetlib as telnet
import time
import six
import taskflow.task
@@ -50,8 +51,12 @@ class VerifyNetwork(taskflow.task.Task):
:param port: host service port
:type port: int
"""
if six.PY2 and isinstance(port, unicode):
check_port = port.encode()
else:
check_port = port
tn = telnet.Telnet()
tn.open(vm_ip, port, timeout=10)
tn.open(vm_ip, check_port, timeout=10)
def revert(self, *args, **kwargs):
"""Revert CreateVmTask

View File

@@ -15,6 +15,7 @@
import os_tasklib
import neutronclient.common.exceptions as neutron_exc
from oslo_log import log as logging
from cue.common.i18n import _LW # noqa
@@ -80,10 +81,13 @@ class CreatePort(os_tasklib.BaseTask):
LOG.warning(_LW("Create Port failed %s") % kwargs['flow_failures'])
port_info = kwargs.get('result')
if port_info and isinstance(port_info, dict):
if (port_info
and isinstance(port_info, dict)
and 'port' in port_info
and 'id' in port_info['port']):
try:
port_id = port_info['port']['id']
if port_id:
self.os_client.delete_port(port=port_id)
except KeyError:
self.os_client.delete_port(port=port_info['port']['id'])
except neutron_exc.PortNotFoundClient:
# if port is not found, it was likely attached to a VM and
# already deleted with the VM, so there's nothing to do
pass

View File

@@ -26,6 +26,7 @@ WSME>=0.6
keystonemiddleware>=1.0.0
paramiko>=1.13.0
posix_ipc
pystache
taskflow>=0.6.1
-e git+https://github.com/python-zk/kazoo.git#egg=kazoo