From 2f9d2c847c3d146b3d4156599617d07a232d475d Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Mon, 8 Oct 2012 16:38:29 -0400 Subject: [PATCH] Refactor worker logic. Worker logic is now moved to a new controller class. Add initial support for new 'hpcs_action' and 'hpcs_response' JSON elements. Drivers now have the option of supporting SUSPEND, ENABLE, DELETE and UPDATE actions. --- libra/worker/controller.py | 156 +++++++++++++++++++++++++++++++++++ libra/worker/drivers/base.py | 12 +++ libra/worker/worker.py | 57 ++----------- tests/test_lbaas_worker.py | 25 ++++++ 4 files changed, 199 insertions(+), 51 deletions(-) create mode 100644 libra/worker/controller.py diff --git a/libra/worker/controller.py b/libra/worker/controller.py new file mode 100644 index 00000000..be5977e1 --- /dev/null +++ b/libra/worker/controller.py @@ -0,0 +1,156 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from libra.common.faults import BadRequest + + +class LBaaSController(object): + + NODE_OK = "ENABLED" + NODE_ERR = "DISABLED" + RESPONSE_FAILURE = "FAIL" + RESPONSE_SUCCESS = "PASS" + + def __init__(self, logger, driver, json_msg): + self.logger = logger + self.driver = driver + self.logger.debug("Entered LBaaSController") + # Standardize case on JSON elements + self.msg = dict((k.lower(), v) for k, v in json_msg.iteritems()) + + def run(self): + """ + Process the JSON message and return a JSON response. + """ + + if 'hpcs_action' not in self.msg: + self.logger.error("Missing HPCS_ACTION value") + self.msg['hpcs_response'] = self.RESPONSE_FAILURE + return self.msg + + action = self.msg['hpcs_action'].upper() + if action == 'CREATE': + return self._action_create() + elif action == 'UPDATE': + return self._action_update() + elif action == 'SUSPEND': + return self._action_suspend() + elif action == 'ENABLE': + return self._action_enable() + elif action == 'DELETE': + return self._action_delete() + else: + self.logger.error("Invalid HPCS_ACTION value: %s" % action) + self.msg['hpcs_response'] = self.RESPONSE_FAILURE + return self.msg + + def _action_create(self): + """ Create a Load Balancer. """ + if 'nodes' not in self.msg: + return BadRequest("Missing 'nodes' element").to_json() + + for lb_node in self.msg['nodes']: + port, address = None, None + + if 'port' in lb_node: + port = lb_node['port'] + else: + return BadRequest("Missing 'port' element.").to_json() + + if 'address' in lb_node: + address = lb_node['address'] + else: + return BadRequest("Missing 'address' element.").to_json() + + try: + self.driver.add_server(address, port) + except NotImplementedError: + self.logger.error( + "Selected driver does not support adding a server." + ) + lb_node['condition'] = self.NODE_ERR + except Exception as e: + self.logger.error("Failure trying adding server: %s, %s" % + (e.__class__, e)) + lb_node['condition'] = self.NODE_ERR + else: + self.logger.debug("Added server: %s:%s" % (address, port)) + lb_node['condition'] = self.NODE_OK + + try: + self.driver.activate() + except NotImplementedError: + self.logger.error( + "Selected driver does not support activating changes." + ) + for lb_node in self.msg['nodes']: + lb_node['condition'] = self.NODE_ERR + self.msg['hpcs_response'] = self.RESPONSE_FAILURE + except Exception as e: + self.logger.error("Failure activating changes: %s, %s" % + (e.__class__, e)) + for lb_node in self.msg['nodes']: + lb_node['condition'] = self.NODE_ERR + self.msg['hpcs_response'] = self.RESPONSE_FAILURE + else: + self.logger.info("Activated load balancer changes") + self.msg['hpcs_response'] = self.RESPONSE_SUCCESS + + return self.msg + + def _action_update(self): + """ Update a Load Balancer. """ + # NOTE(shrews): We would need to know the current configuration of + # the load balancer to do an update. Note sure this is really feasible, + # so for now we just do the same as CREATE. + return self.action_create() + + def _action_suspend(self): + """ Suspend a Load Balancer. """ + try: + self.driver.suspend() + except NotImplementedError: + self.logger.error( + "Selected driver does not support SUSPEND action." + ) + self.msg['hpcs_response'] = self.RESPONSE_FAILURE + else: + self.msg['hpcs_response'] = self.RESPONSE_SUCCESS + return self.msg + + def _action_enable(self): + """ Enable a suspended Load Balancer. """ + try: + self.driver.enable() + except NotImplementedError: + self.logger.error( + "Selected driver does not support ENABLE action." + ) + self.msg['hpcs_response'] = self.RESPONSE_FAILURE + else: + self.msg['hpcs_response'] = self.RESPONSE_SUCCESS + return self.msg + + def _action_delete(self): + """ Delete a Load Balancer. """ + try: + self.driver.delete() + except NotImplementedError: + self.logger.error( + "Selected driver does not support DELETE action." + ) + self.msg['hpcs_response'] = self.RESPONSE_FAILURE + else: + self.msg['hpcs_response'] = self.RESPONSE_SUCCESS + return self.msg diff --git a/libra/worker/drivers/base.py b/libra/worker/drivers/base.py index d1a75570..8520db88 100644 --- a/libra/worker/drivers/base.py +++ b/libra/worker/drivers/base.py @@ -41,3 +41,15 @@ class LoadBalancerDriver(object): def activate(self): """ Activate any changes made. """ raise NotImplementedError() + + def suspend(self): + """ Suspend the load balancer. """ + raise NotImplementedError() + + def enable(self): + """ Enable a suspended load balancer. """ + raise NotImplementedError() + + def delete(self): + """ Delete a load balancer. """ + raise NotImplementedError() diff --git a/libra/worker/worker.py b/libra/worker/worker.py index dd721c95..662fa1d1 100644 --- a/libra/worker/worker.py +++ b/libra/worker/worker.py @@ -22,8 +22,8 @@ import socket from time import sleep from libra.common.json_gearman import JSONGearmanWorker -from libra.common.faults import BadRequest from libra.common.options import Options, setup_logging +from libra.worker.controller import LBaaSController from libra.worker.drivers.base import known_drivers from libra.worker.utils import import_class @@ -36,62 +36,17 @@ def lbaas_task(worker, job): from the Gearman job server. It will be executed once per request. Data comes in as a JSON object, and a JSON object is returned in response. """ - - NODE_OK = "ENABLED" - NODE_ERR = "DISABLED" - logger = worker.logger driver = worker.driver - data = job.data logger.debug("Entered worker task") - logger.debug("Received JSON message: %s" % json.dumps(data, indent=4)) + logger.debug("Received JSON message: %s" % json.dumps(job.data, indent=4)) - if 'nodes' not in data: - return BadRequest("Missing 'nodes' element").to_json() + controller = LBaaSController(logger, driver, job.data) + response = controller.run() - for lb_node in data['nodes']: - port, address = None, None - - if 'port' in lb_node: - port = lb_node['port'] - else: - return BadRequest("Missing 'port' element.").to_json() - - if 'address' in lb_node: - address = lb_node['address'] - else: - return BadRequest("Missing 'address' element.").to_json() - - try: - driver.add_server(address, port) - except NotImplementedError: - logger.error("Selected driver could not add server.") - lb_node['condition'] = NODE_ERR - except Exception as e: - logger.error("Failure trying adding server: %s, %s" % - (e.__class__, e)) - lb_node['condition'] = NODE_ERR - else: - logger.debug("Added server: %s:%s" % (address, port)) - lb_node['condition'] = NODE_OK - - try: - driver.activate() - except NotImplementedError: - logger.error("Selected driver could not activate changes.") - for lb_node in data['nodes']: - lb_node['condition'] = NODE_ERR - except Exception as e: - logger.error("Failure activating changes: %s, %s" % - (e.__class__, e)) - for lb_node in data['nodes']: - lb_node['condition'] = NODE_ERR - else: - logger.info("Activated load balancer changes") - - # Return the same JSON object, but with condition fields set. - return data + logger.debug("Return JSON message: %s" % json.dumps(response, indent=4)) + return response class CustomJSONGearmanWorker(JSONGearmanWorker): diff --git a/tests/test_lbaas_worker.py b/tests/test_lbaas_worker.py index dee3c2d8..eb86e6eb 100644 --- a/tests/test_lbaas_worker.py +++ b/tests/test_lbaas_worker.py @@ -36,6 +36,7 @@ class TestLBaaSTask(unittest.TestCase): worker = FakeWorker() data = { + "hpcs_action": "create", "name": "a-new-loadbalancer", "nodes": [ { @@ -61,11 +62,33 @@ class TestLBaaSTask(unittest.TestCase): self.assertEqual(r["nodes"][1]["port"], data["nodes"][1]["port"]) self.assertIn("condition", r["nodes"][1]) + def testMissingAction(self): + """ Test invalid messages: missing hpcs_action """ + worker = FakeWorker() + data = { + "name": "a-new-loadbalancer", + "nodes": [ + { + "address": "10.1.1.1", + "port": "80" + }, + { + "address": "10.1.1.2", + "port": "81" + } + ] + } + job = FakeJob(data) + r = lbaas_task(worker, job) + self.assertIn("hpcs_response", r) + self.assertEqual("FAIL", r["hpcs_response"]) + def testMissingNodes(self): """ Test invalid messages: missing nodes """ worker = FakeWorker() data = { + "hpcs_action": "create", "name": "a-new-loadbalancer" } job = FakeJob(data) @@ -78,6 +101,7 @@ class TestLBaaSTask(unittest.TestCase): worker = FakeWorker() data = { + "hpcs_action": "create", "name": "a-new-loadbalancer", "nodes": [ { @@ -95,6 +119,7 @@ class TestLBaaSTask(unittest.TestCase): worker = FakeWorker() data = { + "hpcs_action": "create", "name": "a-new-loadbalancer", "nodes": [ {