[MGM] Refactor pool manager ready for new functions such as delete

Change-Id: I2941b12de47b8b693e284c65f96cd671fd1d6290
This commit is contained in:
Andrew Hutchings
2013-07-08 14:37:41 +01:00
parent 625b5446cb
commit 25d6ad32b9
8 changed files with 336 additions and 271 deletions

View File

@@ -28,7 +28,7 @@ Configuration File
api_server = 10.0.0.1:8889 10.0.0.2:8889
nodes = 10
check_interval = 5
failed_interval = 15
submit_interval = 15
node_basename = 'libra'
Command Line Options
@@ -46,9 +46,9 @@ Command Line Options
How often to check the API server to see if new nodes are needed
(value is minutes)
.. option:: --failed_interval <FAILED_INTERVAL>
.. option:: --submit_interval <SUBMIT_INTERVAL>
How often to check the list of failed node uploads to see if the nodes
How often to check the list of nodes to see if the nodes
are now in a good state (value is in minutes)
.. option:: --driver <DRIVER>

View File

@@ -53,7 +53,7 @@ nova_image_size = standard.medium
api_server = 10.0.0.1:8889 10.0.0.2:8889
nodes = 10
check_interval = 5
failed_interval = 15
submit_interval = 15
node_basename = 'libra'
az = 1

View File

@@ -22,10 +22,9 @@ import time
import sys
import os
import threading
from libra.mgm.schedulers import modules, known_modules
from novaclient import exceptions
from libra.openstack.common import importutils
from libra.mgm.nova import Node, BuildError, NotFound
from libra.common.options import Options, setup_logging
from libra.mgm.drivers.base import known_drivers
from libra.mgm.node_list import NodeList, AccessDenied
@@ -34,10 +33,10 @@ from libra.mgm.node_list import NodeList, AccessDenied
class Server(object):
def __init__(self, args):
self.args = args
self.ct = None
self.ft = None
self.api = None
self.driver_class = None
self.schedulers = []
try:
self.node_list = NodeList(self.args.datadir)
except AccessDenied as exc:
@@ -59,258 +58,32 @@ class Server(object):
known_drivers[self.args.driver]
)
# make initial check and then run scheduler
self.logger.info(
'Scheduling node check for {check} minutes'
.format(check=self.args.check_interval)
)
# NOTE(LinuxJedi): Threading lock is for the case in the future where
# we need two timers, we don't want them to both execute their trigger
# NOTE(LinuxJedi): Threading lock is due to needing more than one
# timer and we don't want them to execute their trigger
# at the same time.
self.rlock = threading.RLock()
self.check_nodes()
self.failed_nodes()
# Load all the schedulers
for module in modules:
mod = importutils.import_class(known_modules[module])
instance = mod(
self.driver_class, self.rlock, self.logger, self.node_list,
self.args
)
self.schedulers.append(instance)
instance.run()
while True:
time.sleep(1)
def failed_nodes(self):
""" check list of failures """
with self.rlock:
try:
self.logger.info('Checking log of failed node uploads')
nodes = self.node_list.get()
if len(nodes) == 0:
self.logger.info('Node log empty')
else:
api = self.driver_class(self.args.api_server, self.logger)
if api.is_online():
self.logger.info(
'Connected to {url}'.format(url=api.get_url())
)
for node in nodes:
self.retest_node(node, api)
else:
self.logger.error('No working API server found')
except Exception:
self.logger.exception(
'Uncaught exception during failed node check'
)
self.reset_failed_scheduler()
def retest_node(self, node_id, api):
try:
nova = Node(
self.args.nova_user,
self.args.nova_pass,
self.args.nova_tenant,
self.args.nova_auth_url,
self.args.nova_region,
self.args.nova_keyname,
self.args.nova_secgroup,
self.args.nova_image,
self.args.nova_image_size,
node_basename=self.args.node_basename
)
except Exception as exc:
self.logger.error(
'Error initialising Nova connection {exc}'
.format(exc=exc)
)
return
self.logger.info('Retrying node {0}'.format(node_id))
try:
resp, status = nova.status(node_id)
except NotFound:
self.logger.info(
'Node {0} no longer exists, removing from list'
.format(node_id)
)
self.node_list.delete(node_id)
return
except exceptions.ClientException as exc:
self.logger.error(
'Error getting status from Nova, exception {exc}'
.format(exc=sys.exc_info()[0])
)
return
if resp.status_code not in(200, 203):
self.logger.error(
'Error geting status from Nova, error {0}'
.format(resp.status_code)
)
return
status = status['server']
if status['status'] == 'ACTIVE':
name = status['name']
body = self.build_node_data(status)
status, response = api.add_node(body)
if not status:
self.logger.error(
'Could not upload node {name} to API server'
.format(name=name)
)
else:
self.node_list.delete(node_id)
self.logger.info('Node {0} added to API server'.format(name))
return
elif status['status'].startswith('BUILD'):
self.logger.info(
'Node {0} still building, ignoring'.format(node_id)
)
return
else:
self.logger.info(
'Node {0} is bad, deleting'.format(node_id)
)
status, msg = nova.delete(node_id)
if not status:
self.logger.error(msg)
else:
self.logger.info('Delete successful')
self.node_list.delete(node_id)
def check_nodes(self):
""" check if known nodes are used """
with self.rlock:
try:
self.logger.info('Checking if new nodes are needed')
api = self.driver_class(self.args.api_server, self.logger)
if api.is_online():
self.logger.info(
'Connected to {url}'.format(url=api.get_url())
)
free_count = api.get_free_count()
if free_count is None:
self.reset_scheduler()
return
if free_count < self.args.nodes:
# we need to build new nodes
nodes_required = self.args.nodes - free_count
self.logger.info(
'Building {nodes} nodes'
.format(nodes=nodes_required)
)
self.build_nodes(nodes_required, api)
else:
self.logger.info('No new nodes required')
else:
self.logger.error('No working API server found')
except Exception:
self.logger.exception('Uncaught exception during node check')
self.reset_scheduler()
def reset_scheduler(self):
self.logger.info('Node check timer sleeping for {mins} minutes'
.format(mins=self.args.check_interval))
self.ct = threading.Timer(60 * int(self.args.check_interval),
self.check_nodes, ())
self.ct.start()
def reset_failed_scheduler(self):
self.logger.info('Node failed timer sleeping for {mins} minutes'
.format(mins=self.args.failed_interval))
self.ft = threading.Timer(60 * int(self.args.failed_interval),
self.failed_nodes, ())
self.ft.start()
def build_node_data(self, data):
""" Build the API data from the node data """
body = {}
body['name'] = data['name']
addresses = data['addresses']['private']
for address in addresses:
if not address['addr'].startswith('10.'):
break
body['publicIpAddr'] = address['addr']
body['floatingIpAddr'] = address['addr']
body['az'] = self.args.az
body['type'] = "basename: {0}, image: {1}".format(
self.args.node_basename, self.args.nova_image
)
return body
def find_unknown(self, name, nova):
"""
Nova can tell us a node failed to build when it didn't
This does a check and if it did start to build adds it to the
failed node list.
"""
try:
node_id = nova.get_node(name)
self.logger.info('Storing node to try again later')
self.node_list.add(node_id)
except NotFound:
# Node really didn't build
return
except exceptions.ClientException as exc:
# TODO: edge case where if node reports failed, actually succeeds
# and this node check fails we will have a dangling node
self.logger.error(
'Error getting failed node info from Nova, exception {exc}'
.format(exc=exc)
)
def build_nodes(self, count, api):
try:
nova = Node(
self.args.nova_user,
self.args.nova_pass,
self.args.nova_tenant,
self.args.nova_auth_url,
self.args.nova_region,
self.args.nova_keyname,
self.args.nova_secgroup,
self.args.nova_image,
self.args.nova_image_size,
node_basename=self.args.node_basename
)
except Exception as exc:
self.logger.error('Error initialising Nova connection {exc}'
.format(exc=exc)
)
return
while count > 0:
try:
data = nova.build()
except BuildError as exc:
self.logger.error('{0}, node {1}'
.format(exc.msg, exc.node_name)
)
if exc.node_id > 0:
self.logger.info('Storing node to try again later')
self.node_list.add(exc.node_id)
else:
self.find_unknown(exc.node_name, nova)
self.logger.warning('Aborting node building')
return
body = self.build_node_data(data)
self.logger.info('Adding node {name} on {ip}'.format(
name=body['name'], ip=body['publicIpAddr'])
)
status, response = api.add_node(body)
if not status:
self.logger.error(
'Could not upload node {name} to API server'
.format(name=data['name'])
)
self.logger.info('Storing node to try again later')
self.node_list.add(data['id'])
self.logger.warning('Aborting node building')
return
count = count - 1
def exit_handler(self, signum, frame):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
self.shutdown(False)
def shutdown(self, error):
if self.ct:
self.ct.cancel()
if self.ft:
self.ft.cancel()
for sched in self.schedulers:
sched.timer.cancel()
if not error:
self.logger.info('Safely shutting down')
@@ -323,7 +96,7 @@ class Server(object):
def main():
options = Options('mgm', 'Node Management Daemon')
options.parser.add_argument(
'--api_server', action='append', metavar='HOST:PORT',
'--api_server', action='append', metavar='HOST:PORT', default=[],
help='a list of API servers to connect to (for HP REST API driver)'
)
options.parser.add_argument(
@@ -344,8 +117,8 @@ def main():
help='how often to check if new nodes are needed (in minutes)'
)
options.parser.add_argument(
'--failed_interval', type=int, default=15,
help='how often to retest nodes that failed to get added to the API'
'--submit_interval', type=int, default=15,
help='how often to test nodes for submission to the API'
' server (in minutes)'
)
options.parser.add_argument(

View File

@@ -13,7 +13,6 @@
# under the License.
import uuid
import time
import sys
import urllib
@@ -75,24 +74,7 @@ class Node(object):
.format(exc=sys.exc_info()[0]), node_id
)
server_id = body['server']['id']
# try for 40 * 3 seconds
waits = 40
while waits > 0:
time.sleep(3)
resp, status = self.status(server_id)
status = status['server']
if status['status'] == 'ACTIVE':
return status
elif not status['status'].startswith('BUILD'):
raise BuildError(
'Error spawning node, status {stat}'
.format(stat=status['status']),
node_id, server_id,
)
waits = waits - 1
raise BuildError('Timeout creating node', node_id, server_id)
return body['server']['id']
def delete(self, node_id):
""" delete a node """

View File

@@ -0,0 +1,19 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
known_modules = {
'build': 'libra.mgm.schedulers.build.BuildNodes',
'submit': 'libra.mgm.schedulers.submit.SubmitNodes'
}
modules = ['build', 'submit']

View File

@@ -0,0 +1,143 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
from novaclient import exceptions
from libra.mgm.nova import Node, BuildError, NotFound
class BuildNodes(object):
def __init__(self, driver, lock, logger, node_list, args):
self.driver = driver
self.lock = lock
self.args = args
self.logger = logger
self.node_list = node_list
self.timer = None
def run(self):
""" check if known nodes are used """
with self.lock:
try:
self.logger.info('Checking if new nodes are needed')
api = self.driver(self.args.api_server, self.logger)
if api.is_online():
self.logger.info(
'Connected to {url}'.format(url=api.get_url())
)
free_count = api.get_free_count()
if free_count is None:
self.scheduler()
return
if free_count < self.args.nodes:
# we need to build new nodes
nodes_required = self.args.nodes - free_count
self.logger.info(
'{nodes} nodes required'
.format(nodes=nodes_required)
)
self.build_nodes(nodes_required, api)
else:
self.logger.info('No new nodes required')
else:
self.logger.error('No working API server found')
except Exception:
self.logger.exception('Uncaught exception during node check')
self.scheduler()
def build_nodes(self, count, api):
try:
nova = Node(
self.args.nova_user,
self.args.nova_pass,
self.args.nova_tenant,
self.args.nova_auth_url,
self.args.nova_region,
self.args.nova_keyname,
self.args.nova_secgroup,
self.args.nova_image,
self.args.nova_image_size,
node_basename=self.args.node_basename
)
except Exception as exc:
self.logger.error('Error initialising Nova connection {exc}'
.format(exc=exc)
)
return
# Remove number of nodes we are still waiting on build status from
build_count = len(self.node_list.get())
count = count - build_count
if count > 0:
self.logger.info(
'{0} nodes already building, attempting to build {1} more'
.format(build_count, count)
)
else:
self.logger.info(
'{0} nodes already building, no more needed'
.format(build_count)
)
while count > 0:
count = count - 1
try:
node_id = nova.build()
except BuildError as exc:
self.logger.exception('{0}, node {1}'
.format(exc.msg, exc.node_name)
)
self.logger.info(
'Node build did not return ID for {0}, trying to find'
.format(exc.node_name)
)
self.find_unknown(exc.node_name, nova)
continue
if node_id > 0:
self.logger.info(
'Storing node {0} to add later'.format(node_id)
)
self.node_list.add(node_id)
else:
self.logger.error(
'Node build did not return ID, cannot find it'
)
def find_unknown(self, name, nova):
"""
Nova can tell us a node failed to build when it didn't
This does a check and if it did start to build adds it to the
failed node list.
"""
try:
node_id = nova.get_node(name)
self.logger.info('Storing node {0} to add later'.format(node_id))
self.node_list.add(node_id)
except NotFound:
# Node really didn't build
self.logger.info(
'No node found for {0}, giving up on it'.format(name)
)
return
except exceptions.ClientException:
self.logger.exception(
'Error getting failed node info from Nova for {0}'.format(name)
)
def scheduler(self):
self.logger.info('Node check timer sleeping for {mins} minutes'
.format(mins=self.args.check_interval))
self.timer = threading.Timer(60 * int(self.args.check_interval),
self.run, ())
self.timer.start()

View File

@@ -0,0 +1,148 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import threading
from novaclient import exceptions
from libra.mgm.nova import Node, NotFound
class SubmitNodes(object):
def __init__(self, driver, lock, logger, node_list, args):
self.driver = driver
self.lock = lock
self.args = args
self.logger = logger
self.node_list = node_list
self.timer = None
def run(self):
""" check/submit list of nodes to be added """
with self.lock:
try:
self.logger.info('Checking log of nova builds')
nodes = self.node_list.get()
if len(nodes) == 0:
self.logger.info('Node log empty')
else:
api = self.driver(self.args.api_server, self.logger)
if api.is_online():
self.logger.info(
'Connected to {url}'.format(url=api.get_url())
)
for node in nodes:
self.test_node(node, api)
else:
self.logger.error('No working API server found')
except Exception:
self.logger.exception(
'Uncaught exception during failed node check'
)
self.scheduler()
def test_node(self, node_id, api):
try:
nova = Node(
self.args.nova_user,
self.args.nova_pass,
self.args.nova_tenant,
self.args.nova_auth_url,
self.args.nova_region,
self.args.nova_keyname,
self.args.nova_secgroup,
self.args.nova_image,
self.args.nova_image_size,
node_basename=self.args.node_basename
)
except Exception as exc:
self.logger.error(
'Error initialising Nova connection {exc}'
.format(exc=exc)
)
return
self.logger.info('Testing readiness node {0}'.format(node_id))
try:
resp, status = nova.status(node_id)
except NotFound:
self.logger.info(
'Node {0} no longer exists, removing from list'
.format(node_id)
)
self.node_list.delete(node_id)
return
except exceptions.ClientException as exc:
self.logger.error(
'Error getting status from Nova, exception {exc}'
.format(exc=sys.exc_info()[0])
)
return
if resp.status_code not in(200, 203):
self.logger.error(
'Error geting status from Nova, error {0}'
.format(resp.status_code)
)
return
status = status['server']
if status['status'] == 'ACTIVE':
name = status['name']
body = self.build_node_data(status)
status, response = api.add_node(body)
if not status:
self.logger.error(
'Could not upload node {name} to API server'
.format(name=name)
)
else:
self.node_list.delete(node_id)
self.logger.info('Node {0} added to API server'.format(name))
return
elif status['status'].startswith('BUILD'):
self.logger.info(
'Node {0} still building, ignoring'.format(node_id)
)
return
else:
self.logger.info(
'Node {0} is bad, deleting'.format(node_id)
)
status, msg = nova.delete(node_id)
if not status:
self.logger.error(msg)
else:
self.logger.info('Delete successful')
self.node_list.delete(node_id)
def build_node_data(self, data):
""" Build the API data from the node data """
body = {}
body['name'] = data['name']
addresses = data['addresses']['private']
for address in addresses:
if not address['addr'].startswith('10.'):
break
body['publicIpAddr'] = address['addr']
body['floatingIpAddr'] = address['addr']
body['az'] = self.args.az
body['type'] = "basename: {0}, image: {1}".format(
self.args.node_basename, self.args.nova_image
)
return body
def scheduler(self):
self.logger.info('Node submit timer sleeping for {mins} minutes'
.format(mins=self.args.failed_interval))
self.timer = threading.Timer(60 * int(self.args.failed_interval),
self.run, ())
self.timer.start()

View File

@@ -67,7 +67,7 @@ class TestLBaaSMgmNova(testtools.TestCase):
with mock.patch.object(requests, "request", mock_request):
with mock.patch('time.time', mock.Mock(return_value=1234)):
data = self.api.build()
self.assertEqual(data['id'], 417773)
self.assertEqual(data, 417773)
def testCreateNodeFail(self):
with mock.patch.object(requests, "request", mock_bad_request):