cue-monitor service

Change-Id: I24aff0c016a06b26a8252e2b31ab7c7cfdde08db
This commit is contained in:
Daniel Allegood 2015-10-02 15:09:22 -07:00
parent 0597d3a8b6
commit 74124f363a
21 changed files with 543 additions and 13 deletions

View File

@ -300,6 +300,7 @@ function configure_scenario_rally_tests {
function start_cue {
run_process cue-api "$CUE_BIN_DIR/cue-api --config-file $CUE_CONF"
run_process cue-worker "$CUE_BIN_DIR/cue-worker --config-file $CUE_CONF"
run_process cue-monitor "$CUE_BIN_DIR/cue-monitor --config-file $CUE_CONF"
# Start proxies if enabled
if is_service_enabled cue-api && is_service_enabled tls-proxy; then

View File

@ -64,6 +64,7 @@ enable_service zookeeper
enable_service cue
enable_service cue-api
enable_service cue-worker
enable_service cue-monitor
CUE_MANAGEMENT_KEY=cue-mgmt-key
# Rally auth version

50
cue/cmd/monitor.py Normal file
View File

@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""cue-monitor
cue-monitor is responsible for actively monitoring cluster statuses
"""
import logging
from oslo_config import cfg
import oslo_log.log as log
from oslo_service import service as openstack_service
from cue.common.i18n import _LI # noqa
import cue.common.service as cue_service
import cue.monitor.monitor_service as cue_monitor_service
import sys
def main():
CONF = cfg.CONF
cue_service.prepare_service(sys.argv)
# Log configuration and other startup information
LOG = log.getLogger(__name__)
LOG.info(_LI("Starting cue-monitor"))
LOG.info(_LI("Configuration:"))
CONF.log_opt_values(LOG, logging.INFO)
monitor = cue_monitor_service.MonitorService()
launcher = openstack_service.launch(CONF, monitor)
launcher.wait()
if __name__ == "__main__": # pragma: no cover
sys.exit(main())

View File

@ -44,7 +44,7 @@ class Connection(object):
"""Constructor."""
@abc.abstractmethod
def get_clusters(self, context):
def get_clusters(self, context, *args, **kwargs):
"""Returns a list of Cluster objects for specified project_id.
:param context: request context object

View File

@ -99,8 +99,8 @@ class Connection(api.Connection):
def __init__(self):
pass
def get_clusters(self, context):
query = model_query(context, models.Cluster)
def get_clusters(self, context, *args, **kwargs):
query = model_query(context, models.Cluster, *args, **kwargs)
return query.all()
def create_cluster(self, context, cluster_values):
@ -131,8 +131,9 @@ class Connection(api.Connection):
return cluster
def update_cluster(self, context, cluster_values, cluster_id):
cluster_query = (model_query(context, models.Cluster)
def update_cluster(self, context, cluster_values,
cluster_id, *args, **kwargs):
cluster_query = (model_query(context, models.Cluster, *args, **kwargs)
.filter_by(id=cluster_id))
# if status is set to deleted, soft delete this cluster record

33
cue/monitor/__init__.py Normal file
View File

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
CONF = cfg.CONF
MONITOR_OPTS = [
cfg.StrOpt('loop_interval_seconds',
help='How often Cluster Status is checked.',
default=60)
]
opt_group = cfg.OptGroup(
name='cue_monitor',
title='Options for cue-monitor.'
)
CONF.register_group(opt_group)
CONF.register_opts(MONITOR_OPTS, group='cue_monitor')

View File

@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_service import loopingcall
from oslo_service import service
from tooz import coordination
from cue.db import api as db_api
import cue.taskflow.client as taskflow_client
from cue.taskflow.flow import check_cluster_status
class MonitorService(service.Service):
def __init__(self):
super(MonitorService, self).__init__()
coord_url = ("%s://%s:%s"
% (
cfg.CONF.taskflow.coord_url,
cfg.CONF.taskflow.zk_hosts,
cfg.CONF.taskflow.zk_port
))
self.coordinator = coordination.get_coordinator(
coord_url, 'cue-monitor')
self.coordinator.start()
# Create a lock
self.lock = self.coordinator.get_lock("status_check")
def start(self):
loop_interval_seconds = int(cfg.CONF.cue_monitor.loop_interval_seconds)
pulse = loopingcall.FixedIntervalLoopingCall(
self.check
)
pulse.start(interval=loop_interval_seconds)
pulse.wait()
# On stop, try to release the znode
def stop(self):
self.lock.release()
self.coordinator.stop()
def wait(self):
pass
def reset(self):
self.lock.release()
self.coordinator.stop()
def check(self):
if not self.lock.acquired:
self.lock.acquire(blocking=False)
if self.lock.acquired:
clusters = get_cluster_id_node_ids()
taskflow_client_instance = taskflow_client.get_client_instance()
job_list = taskflow_client_instance.joblist()
job_list = filter(lambda job:
'cluster_status_check' in job.details['store'],
job_list)
job_list = map(lambda job:
job.details['store']['cluster_id'],
job_list)
clusters = filter(lambda cluster:
cluster[0] not in job_list,
clusters)
for cluster in clusters:
job_args = {
'cluster_status_check': '',
'cluster_id': cluster[0],
'context': {},
'default_rabbit_user': 'cue_monitor',
'default_rabbit_pass': cluster[0],
}
flow_kwargs = {
'cluster_id': cluster[0],
'node_ids': cluster[1]
}
taskflow_client_instance.post(check_cluster_status, job_args,
flow_kwargs=flow_kwargs)
# Returns a list of tuples where [0] is cluster_id
# and [1] is a list of that clusters node ids
def get_cluster_id_node_ids():
dbapi = db_api.get_instance()
clusters = dbapi.get_clusters(None, project_only=False)
cluster_ids = []
for cluster in clusters:
if cluster.status not in ['ACTIVE', 'DOWN']:
continue
node_ids = []
for node in dbapi.get_nodes_in_cluster(None, cluster.id):
node_ids.append(node.id)
cluster_ids.append((cluster.id, node_ids))
return cluster_ids

View File

@ -63,7 +63,7 @@ class Cluster(base.CueObject):
self._from_db_object(self, db_cluster)
def update(self, context, cluster_id):
def update(self, context, cluster_id, *args, **kwargs):
"""Updates a database cluster object.
:param context: The request context
@ -71,7 +71,8 @@ class Cluster(base.CueObject):
"""
cluster_changes = self.obj_get_changes()
self.dbapi.update_cluster(context, cluster_changes, cluster_id)
self.dbapi.update_cluster(context, cluster_changes,
cluster_id, *args, **kwargs)
@classmethod
def get_clusters(cls, context):

View File

@ -23,10 +23,18 @@ TF_OPTS = [
help="Persistence connection.",
default=None),
cfg.StrOpt('coord_url',
help="Coordinator connection string prefix.",
default='zookeeper'),
cfg.StrOpt('zk_hosts',
help="Zookeeper jobboard hosts.",
default="localhost"),
cfg.StrOpt('zk_port',
help="Zookeeper jobboard port.",
default="2181"),
cfg.StrOpt('zk_path',
help="Zookeeper path for jobs.",
default='/cue/taskflow'),

View File

@ -28,9 +28,9 @@ def check_cluster_status(cluster_id, node_ids):
This factory function uses :func:`cue.taskflow.flow.check_node_status` to
check cluster status on each node.
:param cluster_id: A unique ID assigned to the cluster
:param cluster_id: A unique ID assigned to the cluster being created
:type cluster_id: string
:param node_ids: node id's associated with the cluster
:param node_ids: The Cue Node id's associated with each node in the cluster
:type node_ids: list of uuid strings
:return: A flow instance that represents the workflow for checking cluster
status
@ -66,7 +66,8 @@ def check_cluster_status(cluster_id, node_ids):
update_cluster_status = cue_tasks.UpdateClusterStatus(
name="update cluster status %s" % cluster_id,
inject={'cluster_id': cluster_id},
inject={'cluster_id': cluster_id,
'project_only': False},
rebind={'cluster_values': "final_cluster_status"}
)
flow.add(update_cluster_status)

View File

@ -88,7 +88,8 @@ def create_cluster(cluster_id, node_ids, user_network_id,
"userdata_%d" % i,
len(node_ids),
"vm_management_ip_",
inject={'node_name': "rabbit-node-%d" % i})
inject={'node_name': "rabbit-node-%d" % i,
'cluster_id': cluster_id})
flow.add(generate_userdata)
create_cluster_node.create_cluster_node(cluster_id, i, node_id, flow,

View File

@ -30,6 +30,7 @@ class ClusterNodeUserData(task.Task):
requires.append('node_name')
requires.append('default_rabbit_user')
requires.append('default_rabbit_pass')
requires.append('cluster_id')
super(ClusterNodeUserData, self).__init__(name=name,
requires=requires,
@ -64,6 +65,7 @@ class ClusterNodeUserData(task.Task):
'erlang_cookie': kwargs['erlang_cookie'],
'default_rabbit_user': kwargs['default_rabbit_user'],
'default_rabbit_pass': kwargs['default_rabbit_pass'],
'cluster_id': kwargs['cluster_id'],
}
script = self.userdata_template.render(userdata_inputs)
sub_message = mime_text.MIMEText(script,

View File

@ -27,7 +27,8 @@ class UpdateClusterStatus(task.Task):
models.Status.BUILDING: models.Status.ERROR
}
def execute(self, context, cluster_id, cluster_values, **kwargs):
def execute(self, context, cluster_id, cluster_values,
project_only=True, **kwargs):
"""Main execute method which will update the cluster status in the DB
:param context: The request context in dict format
@ -39,7 +40,8 @@ class UpdateClusterStatus(task.Task):
"""
request_context = context_module.RequestContext.from_dict(context)
cluster = objects.Cluster(**cluster_values)
cluster.update(request_context, cluster_id)
cluster.update(request_context, cluster_id,
project_only=project_only, **kwargs)
def revert(self, context, cluster_id, cluster_values, **kwargs):
"""Revert UpdateClusterStatus

View File

@ -60,4 +60,9 @@ while [ ! -z "$(service rabbitmq-server status | grep 'nodedown')" ]; do
sleep 5
done
# Create monitoring user
rabbitmqctl add_user cue_monitor "{{cluster_id}}"
rabbitmqctl set_user_tags cue_monitor monitoring
rabbitmqctl set_permissions cue_monitor ".*" ".*" ".*"
rabbitmqctl set_policy HA '^(?!amq\.).*' '{"ha-mode": "all"}'

View File

View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import mock
from oslo_config import cfg
from cue.db.sqlalchemy import models
from cue.monitor import monitor_service as cue_monitor_service
from cue import objects
import cue.taskflow.client as tf_client
from cue.tests.functional import base
CONF = cfg.CONF
class MonitorFunctionalTests(base.FunctionalTestCase):
cue_monitor_service = None
test_uuid_1 = None
test_uuid_2 = None
test_uuid_3 = None
def setUp(self):
super(MonitorFunctionalTests, self).setUp()
CONF.set_override("coord_url", "zake", group="taskflow")
CONF.set_override("zk_hosts", "", group="taskflow")
CONF.set_override("zk_port", "", group="taskflow")
self.test_uuid_1 = uuid.uuid4()
self.test_uuid_2 = uuid.uuid4()
self.test_uuid_3 = uuid.uuid4()
# Add some test clusters
set_up_test_clusters(
self.context, models.Status.ACTIVE, self.test_uuid_1, 3
)
set_up_test_clusters(
self.context, models.Status.ERROR, self.test_uuid_2, 3
)
set_up_test_clusters(
self.context, models.Status.DOWN, self.test_uuid_3, 1
)
self.cue_monitor_service = cue_monitor_service.MonitorService()
def tearDown(self):
self.cue_monitor_service.stop()
super(MonitorFunctionalTests, self).tearDown()
def test_check_lock(self):
# Lock should not be acquired yet
self.assertEqual(False, self.cue_monitor_service.lock.acquired)
self.cue_monitor_service.check()
# Lock should have been reacquired
self.assertEqual(True, self.cue_monitor_service.lock.acquired)
@mock.patch('tooz.drivers.zookeeper.ZooKeeperLock.acquire')
def test_lock_unacquirable(self, mock_acquire_lock):
self.cue_monitor_service.check()
# Lock should not have been acquired
self.assertEqual(False, self.cue_monitor_service.lock.acquired)
def test_check(self):
tf_instance = tf_client.get_client_instance()
start_job_list_length = len(tf_instance.joblist())
# Test while job board is empty
self.cue_monitor_service.check()
end_job_list_length = len(tf_instance.joblist())
self.assertEqual(2, end_job_list_length - start_job_list_length,
"Job list should only have two "
"clusters: " + str(tf_instance.joblist()))
# Test while job board has 2 entries
self.cue_monitor_service.check()
# No new jobs should have been added.
self.assertEqual(0, len(tf_instance.joblist()) - end_job_list_length)
def test_get_cluster_id_node_ids(self):
clusters = cue_monitor_service.get_cluster_id_node_ids()
self.assertEqual(2, len(clusters),
"Expected to find two clusters. Only the ACTIVE"
" and DOWN clusters. Found: " + str(len(clusters)))
for cluster in clusters:
if cluster[0] == str(self.test_uuid_1):
self.assertEqual(3, len(cluster[1]),
"Expected to find three nodes in this "
"cluster. Found: " + str(cluster[1]))
elif cluster[0] == str(self.test_uuid_3):
self.assertEqual(1, len(cluster[1]),
"Expected to find one node in this "
"cluster. Found: " + str(cluster[1]))
else:
self.assertEqual(0, 1,
"The only clusters returned should have been"
" those created by this test. Found a "
"cluster id that does not match: " + str(
cluster[0]))
def set_up_test_clusters(context, status, cluster_id, size):
cluster_values = {
"id": cluster_id,
"project_id": "test_project_id",
"name": "test_cluster",
"network_id": "test_uuid",
"flavor": "1",
"size": size,
}
new_cluster = objects.Cluster(**cluster_values)
new_cluster.create(context)
new_cluster.status = status
new_cluster.update(context, cluster_id)

View File

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cue.cmd import monitor
from cue.tests.unit import base
class TestMonitor(base.UnitTestCase):
@mock.patch('tooz.coordination.get_coordinator')
@mock.patch('oslo_service.service.ServiceLauncher.wait')
@mock.patch('oslo_config.cfg.CONF.log_opt_values')
@mock.patch('cue.common.service.prepare_service')
def test_main(self,
mock_prepare,
mock_conf,
mock_oslo_service_wait,
mock_tooz_get_coordinator):
monitor.main()
mock_tooz_get_coordinator.assert_called_once_with(
'zookeeper://localhost:2181', 'cue-monitor')
mock_oslo_service_wait.assert_called_once_with()
self.assertEqual(mock_conf.call_count, 1)
self.assertEqual(mock_prepare.call_count, 1)

View File

View File

@ -0,0 +1,127 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Authors: Daniel Allegood <daniel.allegood@hpe.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright [2015] Hewlett-Packard Development Company, L.P.
# limitations under the License.
import mock
from oslo_config import cfg
from oslotest import base as oslo_base
from cue.monitor import monitor_service as cue_monitor_service
CONF = cfg.CONF
class TestMonitorService(oslo_base.BaseTestCase):
cue_monitor_service = None
def setUp(self):
super(TestMonitorService, self).setUp()
CONF.set_override("coord_url", "zake", group="taskflow")
CONF.set_override("zk_hosts", "", group="taskflow")
CONF.set_override("zk_port", "", group="taskflow")
self.cue_monitor_service = cue_monitor_service.MonitorService()
@mock.patch('oslo_service.loopingcall.LoopingCallBase.wait')
@mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall.start')
def test_start(self,
mock_fixed_interval_loop_call_start,
mock_loop_call_wait):
CONF.set_override("loop_interval_seconds", "9001",
group="cue_monitor")
self.cue_monitor_service.start()
mock_fixed_interval_loop_call_start.assert_called_once_with(
interval=9001)
mock_loop_call_wait.assert_called_once_with()
@mock.patch('tooz.coordination.CoordinationDriver.stop')
@mock.patch('tooz.drivers.zookeeper.ZooKeeperLock.release')
def test_stop(self, mock_lock_release, mock_coordinator_stop):
self.cue_monitor_service.stop()
mock_lock_release.assert_called_once_with()
mock_coordinator_stop.assert_called_once_with()
def test_wait(self):
self.cue_monitor_service.wait()
@mock.patch('tooz.coordination.CoordinationDriver.stop')
@mock.patch('tooz.drivers.zookeeper.ZooKeeperLock.release')
def test_reset(self, mock_lock_release, mock_coordinator_stop):
self.cue_monitor_service.reset()
mock_lock_release.assert_called_once_with()
mock_coordinator_stop.assert_called_once_with()
# # @mock.patch('cue.taskflow.client.get_client_instance')
# @mock.patch('cue.monitor.monitor_service.get_cluster_id_node_ids')
# def test_check(self,
# mock_get_cluster_id_node_ids):
# self.cue_monitor_service.check()
#
# mock_get_cluster_id_node_ids.assert_called_once_with()
# mock_get_tf_client.assert_called_once_with()
#
# # Get all mock calls to the taskflow_client magicMock object
# tf_calls = map(
# lambda call: call[0],
# mock_get_tf_client.mock_calls
# )
# # Filter for any call containing "joblist"
# tf_joblist_called = filter(lambda mock_call:
# "joblist" in mock_call,
# tf_calls)
# self.assertIsNot(0, len(tf_joblist_called))
#
# @mock.patch('cue.db.api.Connection.get_clusters')
# @mock.patch('cue.db.api.get_instance')
# def test_get_cluster_id_node_ids(self,
# mock_get_db_instance,
# mock_get_clusters):
# cue_monitor_service.get_cluster_id_node_ids()
#
# mock_get_db_instance.assert_called_once_with()
#
# # Get all mock calls to the taskflow_client magicMock object
# db_api_calls = map(
# lambda call: {"name": call[0],
# "positional_args": call[1],
# "explicit_args": call[2]
# },
# mock_get_db_instance.mock_calls
# )
#
# # Filter for any call containing "joblist"
# db_api_get_clusters_called = filter(
# lambda mock_call:
# check_mock_calls_for_args(mock_call),
# db_api_calls)
#
# self.assertIsNot(0, len(db_api_get_clusters_called))
#
#
# def check_mock_calls_for_args(mock_call):
# return (
# "get_clusters" in mock_call['name'] and
# None in mock_call['positional_args'] and
# 'project_only' in mock_call['explicit_args'] and
# mock_call['explicit_args']['project_only'] is False)

View File

@ -17,6 +17,7 @@ oslo.policy>=0.5.0 # Apache-2.0
oslo.rootwrap>=2.0.0 # Apache-2.0
oslo.i18n>=1.5.0 # Apache-2.0
oslo.serialization>=1.4.0 # Apache-2.0
oslo.service>=0.9.0 # Apache-2.0
oslo.utils>=2.0.0 # Apache-2.0
pecan>=1.0.0
six>=1.9.0
@ -29,6 +30,7 @@ Jinja2>=2.6 # BSD License (3 clause)
taskflow>=1.16.0
kazoo>=2.2
tooz>=1.19.0 # Apache-2.0
#PyMySQL
python-keystoneclient>=1.6.0
python-novaclient>=2.28.1

View File

@ -27,6 +27,7 @@ console_scripts =
cue-api = cue.cmd.api:main
cue-manage = cue.cmd.manage:main
cue-worker = cue.cmd.worker:main
cue-monitor = cue.cmd.monitor:main
cue.database.migration_backend =
sqlalchemy = cue.db.sqlalchemy.migration