[WORKER][ADMIN_API]: Add device diagnostics check

After a device is built run a diagnostics check on the device.
1. The pool manager will send a DIAGNOSTICS message to the device after build
2. The worker will:
  a. Connect test google
  b. Connect test all gearman servers
3. If google connect test fails device is failed
4. If > 1/3rd gearman servers connect fails the device is failed
5. Failed devices are deleted and a FAIL response for that build is returned

Change-Id: Iaac8fabeabb4136451407164e396f784eabaf887
This commit is contained in:
Andrew Hutchings
2013-10-20 11:04:39 +01:00
parent 7dcfd773e1
commit 70dc7943a3
5 changed files with 191 additions and 29 deletions

View File

@@ -175,6 +175,43 @@ Example Response
}
DIAGNOSTICS Message
-------------------
The DIAGNOSTICS message will run some basic network connection tests to see if
the device the worker lives on is healthy. At the moment it runs a connect
test to Google and a gearman connect test.
Example Request
^^^^^^^^^^^^^^^
.. code-block:: json
{
"hpcs_action": "DIAGNOSTICS"
}
Example Response
^^^^^^^^^^^^^^^^
.. code-block:: json
{
"hpcs_action": "DIAGNOSTICS",
"network": "PASS",
"gearman": [
{
"15.185.1.2": "PASS"
},
{
"15.185.1.3": "FAIL"
}
],
"release": "1.0.alpha.3.gca84083",
"hpcs_response": "PASS"
}
DISCOVER Message
----------------

View File

@@ -14,6 +14,9 @@
from time import sleep
from novaclient import exceptions
from gearman.constants import JOB_UNKNOWN
from libra.common.json_gearman import JSONGearmanClient
from libra.mgm.nova import Node, BuildError, NotFound
@@ -62,7 +65,12 @@ class BuildController(object):
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
if node_id > 0:
return self._wait_until_node_ready(nova, node_id)
self._wait_until_node_ready(nova, node_id)
if self.msg[self.RESPONSE_FIELD] == self.RESPONSE_SUCCESS:
status = self._test_node(self.msg['name'])
if not status:
self.msg[self.RESPONSE_FIELD] == self.RESPONSE_FAILURE
return self.msg
else:
self.logger.error(
'Node build did not return an ID, cannot find it'
@@ -116,3 +124,57 @@ class BuildController(object):
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
def _test_node(self, name):
# Run diags on node, blow it away if bad
if all([self.args.gearman_ssl_ca, self.args.gearman_ssl_cert,
self.args.gearman_ssl_key]):
# Use SSL connections to each Gearman job server.
ssl_server_list = []
for server in self.args.gearman:
host, port = server.split(':')
ssl_server_list.append({'host': host,
'port': int(port),
'keyfile': self.args.gearman_ssl_key,
'certfile': self.args.gearman_ssl_cert,
'ca_certs': self.args.gearman_ssl_ca})
gm_client = JSONGearmanClient(ssl_server_list)
else:
gm_client = JSONGearmanClient(self.args.gearman)
job_data = {'hpcs_action': 'DIAGNOSTICS'}
job_status = gm_client.submit_job(
name, job_data, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=10
)
if job_status.state == JOB_UNKNOWN:
# Gearman server connect fail, count as bad node because we can't
# tell if it really is working
self.logger.error('Could not talk to gearman server')
return False
if job_status.timed_out:
self.logger.warning('Timeout getting diags from {0}'.format(name))
return False
self.logger.debug(job_status.result)
# Would only happen if DIAGNOSTICS call not supported
if job_status.result['hpcs_result'] == 'FAIL':
return True
if job_status.result['network'] == 'FAIL':
return False
gearman_count = 0
gearman_fail = 0
for gearman_test in job_status.result['gearman']:
gearman_count += 1
if gearman_test['status'] == 'FAIL':
self.logger.info(
'Device {0} cannot talk to gearman {1}'
.format(name, gearman_test['host'])
)
gearman_fail += 1
# Need 2/3rds gearman up
max_fail_count = gearman_count / 3
if gearman_fail > max_fail_count:
return False
return True

View File

@@ -63,7 +63,19 @@ class PoolMgmController(object):
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
return controller.run()
self.msg = controller.run()
# Delete a built device if it has failed
if (
action == 'BUILD_DEVICE' and self.msg == self.RESPONSE_FAILURE
and 'name' in self.msg
):
delete_msg = {'name': self.msg['name']}
controller = DeleteController(
self.logger, self.args, delete_msg
)
controller.run()
return self.msg
except Exception:
self.logger.exception("Controller exception")
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE

View File

@@ -22,7 +22,7 @@ class TestWorkerController(testtools.TestCase):
msg = {
c.ACTION_FIELD: 'BOGUS'
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn(c.RESPONSE_FIELD, response)
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_FAILURE)
@@ -32,7 +32,7 @@ class TestWorkerController(testtools.TestCase):
c.ACTION_FIELD: 'UPDATE',
'LoAdBaLaNcErS': [{'protocol': 'http'}]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
@@ -52,7 +52,7 @@ class TestWorkerController(testtools.TestCase):
}
]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn(c.RESPONSE_FIELD, response)
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
@@ -61,7 +61,7 @@ class TestWorkerController(testtools.TestCase):
msg = {
c.ACTION_FIELD: 'SUSPEND'
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn(c.RESPONSE_FIELD, response)
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
@@ -70,7 +70,7 @@ class TestWorkerController(testtools.TestCase):
msg = {
c.ACTION_FIELD: 'ENABLE'
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn(c.RESPONSE_FIELD, response)
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
@@ -79,7 +79,7 @@ class TestWorkerController(testtools.TestCase):
msg = {
c.ACTION_FIELD: 'DELETE'
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn(c.RESPONSE_FIELD, response)
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
@@ -99,7 +99,7 @@ class TestWorkerController(testtools.TestCase):
}
]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -121,7 +121,7 @@ class TestWorkerController(testtools.TestCase):
}
]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -131,7 +131,7 @@ class TestWorkerController(testtools.TestCase):
msg = {
c.ACTION_FIELD: 'UPDATE'
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -142,7 +142,7 @@ class TestWorkerController(testtools.TestCase):
c.ACTION_FIELD: 'UPDATE',
c.LBLIST_FIELD: [{'protocol': 'http'}]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -163,7 +163,7 @@ class TestWorkerController(testtools.TestCase):
}
]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -193,7 +193,7 @@ class TestWorkerController(testtools.TestCase):
}
]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertNotIn('badRequest', response)
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
@@ -221,7 +221,7 @@ class TestWorkerController(testtools.TestCase):
}
]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -250,7 +250,7 @@ class TestWorkerController(testtools.TestCase):
}
]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -279,7 +279,7 @@ class TestWorkerController(testtools.TestCase):
}
]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -308,7 +308,7 @@ class TestWorkerController(testtools.TestCase):
}
]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -337,7 +337,7 @@ class TestWorkerController(testtools.TestCase):
}
]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn(c.RESPONSE_FIELD, response)
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_SUCCESS)
@@ -359,7 +359,7 @@ class TestWorkerController(testtools.TestCase):
}
]
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn(c.RESPONSE_FIELD, response)
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_FAILURE)
@@ -368,7 +368,7 @@ class TestWorkerController(testtools.TestCase):
msg = {
c.ACTION_FIELD: 'DISCOVER'
}
controller = c(self.logger, self.driver, msg)
controller = c(self.logger, self.driver, msg, [])
response = controller.run()
self.assertIn('version', response)
self.assertIn('release', response)
@@ -381,7 +381,7 @@ class TestWorkerController(testtools.TestCase):
c.ACTION_FIELD: 'ARCHIVE'
}
null_driver = LoadBalancerDriver()
controller = c(self.logger, null_driver, msg)
controller = c(self.logger, null_driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -393,7 +393,7 @@ class TestWorkerController(testtools.TestCase):
c.OBJ_STORE_TYPE_FIELD: 'bad'
}
null_driver = LoadBalancerDriver()
controller = c(self.logger, null_driver, msg)
controller = c(self.logger, null_driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
@@ -408,7 +408,7 @@ class TestWorkerController(testtools.TestCase):
c.OBJ_STORE_TOKEN_FIELD: "XXXX",
c.LBLIST_FIELD: [{'protocol': 'http', 'id': '123'}]
}
controller = c(self.logger, null_driver, msg)
controller = c(self.logger, null_driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -423,7 +423,7 @@ class TestWorkerController(testtools.TestCase):
c.OBJ_STORE_TOKEN_FIELD: "XXXX",
c.LBLIST_FIELD: [{'protocol': 'http', 'id': '123'}]
}
controller = c(self.logger, null_driver, msg)
controller = c(self.logger, null_driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -438,7 +438,7 @@ class TestWorkerController(testtools.TestCase):
c.OBJ_STORE_ENDPOINT_FIELD: "https://example.com",
c.LBLIST_FIELD: [{'protocol': 'http', 'id': '123'}]
}
controller = c(self.logger, null_driver, msg)
controller = c(self.logger, null_driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -453,7 +453,7 @@ class TestWorkerController(testtools.TestCase):
c.OBJ_STORE_ENDPOINT_FIELD: "https://example.com",
c.OBJ_STORE_TOKEN_FIELD: "XXXX"
}
controller = c(self.logger, null_driver, msg)
controller = c(self.logger, null_driver, msg, [])
response = controller.run()
self.assertIn('badRequest', response)
msg = response['badRequest']['validationErrors']['message']
@@ -469,7 +469,7 @@ class TestWorkerController(testtools.TestCase):
c.LBLIST_FIELD: [{'protocol': 'http', 'id': '123'}]
}
null_driver = LoadBalancerDriver()
controller = c(self.logger, null_driver, msg)
controller = c(self.logger, null_driver, msg, [])
response = controller.run()
self.assertEquals(response[c.RESPONSE_FIELD], c.RESPONSE_FAILURE)
self.assertIn(c.ERROR_FIELD, response)

View File

@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import socket
from libra import __version__ as libra_version
from libra import __release__ as libra_release
from libra.common.exc import DeletedStateError
@@ -34,10 +35,11 @@ class LBaaSController(object):
OBJ_STORE_ENDPOINT_FIELD = 'hpcs_object_store_endpoint'
OBJ_STORE_TOKEN_FIELD = 'hpcs_object_store_token'
def __init__(self, logger, driver, json_msg):
def __init__(self, logger, driver, json_msg, gearman):
self.logger = logger
self.driver = driver
self.msg = json_msg
self.gearman = gearman
def run(self):
"""
@@ -66,6 +68,8 @@ class LBaaSController(object):
return self._action_archive()
elif action == 'STATS':
return self._action_stats()
elif action == 'DIAGNOSTIC':
return self._action_diagnostic()
else:
self.logger.error("Invalid `%s` value: %s" %
(self.ACTION_FIELD, action))
@@ -77,6 +81,53 @@ class LBaaSController(object):
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
def _action_diagnostic(self):
"""
Returns the results of a diagnostic run
This message is used to see if the worker that was built will actually
function as a load balancer
"""
# Gearman test
self.msg['gearman'] = []
for host_port in self.gearman:
host, port = host_port.split(':')
try:
self._check_host(host, port)
except:
self.msg['gearman'].append(
{'host': host, 'status': self.RESPONSE_FAILURE}
)
else:
self.msg['gearman'].append(
{'host': host, 'status': self.RESPONSE_SUCCESS}
)
# Outgoing network test
try:
# TODO: make this configurable
self._check_host('google.com', 80)
except:
self.msg['network'] = self.RESPONSE_FAILURE
else:
self.msg['network'] = self.RESPONSE_SUCCESS
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
return self.msg
def _check_host(self, ip, port):
# TCP connect check to see if floating IP was assigned correctly
sock = socket.socket()
sock.settimeout(5)
try:
sock.connect((ip, port))
return True
except socket.error:
self.logger.error(
"TCP connect error to gearman server {0}"
.format(ip)
)
raise
def _action_discover(self):
"""
Return service discovery information.