Refactor worker logic.
Worker logic is now moved to a new controller class. Add initial support for new 'hpcs_action' and 'hpcs_response' JSON elements. Drivers now have the option of supporting SUSPEND, ENABLE, DELETE and UPDATE actions.
This commit is contained in:
156
libra/worker/controller.py
Normal file
156
libra/worker/controller.py
Normal file
@@ -0,0 +1,156 @@
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from libra.common.faults import BadRequest
|
||||
|
||||
|
||||
class LBaaSController(object):
|
||||
|
||||
NODE_OK = "ENABLED"
|
||||
NODE_ERR = "DISABLED"
|
||||
RESPONSE_FAILURE = "FAIL"
|
||||
RESPONSE_SUCCESS = "PASS"
|
||||
|
||||
def __init__(self, logger, driver, json_msg):
|
||||
self.logger = logger
|
||||
self.driver = driver
|
||||
self.logger.debug("Entered LBaaSController")
|
||||
# Standardize case on JSON elements
|
||||
self.msg = dict((k.lower(), v) for k, v in json_msg.iteritems())
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Process the JSON message and return a JSON response.
|
||||
"""
|
||||
|
||||
if 'hpcs_action' not in self.msg:
|
||||
self.logger.error("Missing HPCS_ACTION value")
|
||||
self.msg['hpcs_response'] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
action = self.msg['hpcs_action'].upper()
|
||||
if action == 'CREATE':
|
||||
return self._action_create()
|
||||
elif action == 'UPDATE':
|
||||
return self._action_update()
|
||||
elif action == 'SUSPEND':
|
||||
return self._action_suspend()
|
||||
elif action == 'ENABLE':
|
||||
return self._action_enable()
|
||||
elif action == 'DELETE':
|
||||
return self._action_delete()
|
||||
else:
|
||||
self.logger.error("Invalid HPCS_ACTION value: %s" % action)
|
||||
self.msg['hpcs_response'] = self.RESPONSE_FAILURE
|
||||
return self.msg
|
||||
|
||||
def _action_create(self):
|
||||
""" Create a Load Balancer. """
|
||||
if 'nodes' not in self.msg:
|
||||
return BadRequest("Missing 'nodes' element").to_json()
|
||||
|
||||
for lb_node in self.msg['nodes']:
|
||||
port, address = None, None
|
||||
|
||||
if 'port' in lb_node:
|
||||
port = lb_node['port']
|
||||
else:
|
||||
return BadRequest("Missing 'port' element.").to_json()
|
||||
|
||||
if 'address' in lb_node:
|
||||
address = lb_node['address']
|
||||
else:
|
||||
return BadRequest("Missing 'address' element.").to_json()
|
||||
|
||||
try:
|
||||
self.driver.add_server(address, port)
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
"Selected driver does not support adding a server."
|
||||
)
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
except Exception as e:
|
||||
self.logger.error("Failure trying adding server: %s, %s" %
|
||||
(e.__class__, e))
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
else:
|
||||
self.logger.debug("Added server: %s:%s" % (address, port))
|
||||
lb_node['condition'] = self.NODE_OK
|
||||
|
||||
try:
|
||||
self.driver.activate()
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
"Selected driver does not support activating changes."
|
||||
)
|
||||
for lb_node in self.msg['nodes']:
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
self.msg['hpcs_response'] = self.RESPONSE_FAILURE
|
||||
except Exception as e:
|
||||
self.logger.error("Failure activating changes: %s, %s" %
|
||||
(e.__class__, e))
|
||||
for lb_node in self.msg['nodes']:
|
||||
lb_node['condition'] = self.NODE_ERR
|
||||
self.msg['hpcs_response'] = self.RESPONSE_FAILURE
|
||||
else:
|
||||
self.logger.info("Activated load balancer changes")
|
||||
self.msg['hpcs_response'] = self.RESPONSE_SUCCESS
|
||||
|
||||
return self.msg
|
||||
|
||||
def _action_update(self):
|
||||
""" Update a Load Balancer. """
|
||||
# NOTE(shrews): We would need to know the current configuration of
|
||||
# the load balancer to do an update. Note sure this is really feasible,
|
||||
# so for now we just do the same as CREATE.
|
||||
return self.action_create()
|
||||
|
||||
def _action_suspend(self):
|
||||
""" Suspend a Load Balancer. """
|
||||
try:
|
||||
self.driver.suspend()
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
"Selected driver does not support SUSPEND action."
|
||||
)
|
||||
self.msg['hpcs_response'] = self.RESPONSE_FAILURE
|
||||
else:
|
||||
self.msg['hpcs_response'] = self.RESPONSE_SUCCESS
|
||||
return self.msg
|
||||
|
||||
def _action_enable(self):
|
||||
""" Enable a suspended Load Balancer. """
|
||||
try:
|
||||
self.driver.enable()
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
"Selected driver does not support ENABLE action."
|
||||
)
|
||||
self.msg['hpcs_response'] = self.RESPONSE_FAILURE
|
||||
else:
|
||||
self.msg['hpcs_response'] = self.RESPONSE_SUCCESS
|
||||
return self.msg
|
||||
|
||||
def _action_delete(self):
|
||||
""" Delete a Load Balancer. """
|
||||
try:
|
||||
self.driver.delete()
|
||||
except NotImplementedError:
|
||||
self.logger.error(
|
||||
"Selected driver does not support DELETE action."
|
||||
)
|
||||
self.msg['hpcs_response'] = self.RESPONSE_FAILURE
|
||||
else:
|
||||
self.msg['hpcs_response'] = self.RESPONSE_SUCCESS
|
||||
return self.msg
|
||||
@@ -41,3 +41,15 @@ class LoadBalancerDriver(object):
|
||||
def activate(self):
|
||||
""" Activate any changes made. """
|
||||
raise NotImplementedError()
|
||||
|
||||
def suspend(self):
|
||||
""" Suspend the load balancer. """
|
||||
raise NotImplementedError()
|
||||
|
||||
def enable(self):
|
||||
""" Enable a suspended load balancer. """
|
||||
raise NotImplementedError()
|
||||
|
||||
def delete(self):
|
||||
""" Delete a load balancer. """
|
||||
raise NotImplementedError()
|
||||
|
||||
@@ -22,8 +22,8 @@ import socket
|
||||
from time import sleep
|
||||
|
||||
from libra.common.json_gearman import JSONGearmanWorker
|
||||
from libra.common.faults import BadRequest
|
||||
from libra.common.options import Options, setup_logging
|
||||
from libra.worker.controller import LBaaSController
|
||||
from libra.worker.drivers.base import known_drivers
|
||||
from libra.worker.utils import import_class
|
||||
|
||||
@@ -36,62 +36,17 @@ def lbaas_task(worker, job):
|
||||
from the Gearman job server. It will be executed once per request. Data
|
||||
comes in as a JSON object, and a JSON object is returned in response.
|
||||
"""
|
||||
|
||||
NODE_OK = "ENABLED"
|
||||
NODE_ERR = "DISABLED"
|
||||
|
||||
logger = worker.logger
|
||||
driver = worker.driver
|
||||
data = job.data
|
||||
|
||||
logger.debug("Entered worker task")
|
||||
logger.debug("Received JSON message: %s" % json.dumps(data, indent=4))
|
||||
logger.debug("Received JSON message: %s" % json.dumps(job.data, indent=4))
|
||||
|
||||
if 'nodes' not in data:
|
||||
return BadRequest("Missing 'nodes' element").to_json()
|
||||
controller = LBaaSController(logger, driver, job.data)
|
||||
response = controller.run()
|
||||
|
||||
for lb_node in data['nodes']:
|
||||
port, address = None, None
|
||||
|
||||
if 'port' in lb_node:
|
||||
port = lb_node['port']
|
||||
else:
|
||||
return BadRequest("Missing 'port' element.").to_json()
|
||||
|
||||
if 'address' in lb_node:
|
||||
address = lb_node['address']
|
||||
else:
|
||||
return BadRequest("Missing 'address' element.").to_json()
|
||||
|
||||
try:
|
||||
driver.add_server(address, port)
|
||||
except NotImplementedError:
|
||||
logger.error("Selected driver could not add server.")
|
||||
lb_node['condition'] = NODE_ERR
|
||||
except Exception as e:
|
||||
logger.error("Failure trying adding server: %s, %s" %
|
||||
(e.__class__, e))
|
||||
lb_node['condition'] = NODE_ERR
|
||||
else:
|
||||
logger.debug("Added server: %s:%s" % (address, port))
|
||||
lb_node['condition'] = NODE_OK
|
||||
|
||||
try:
|
||||
driver.activate()
|
||||
except NotImplementedError:
|
||||
logger.error("Selected driver could not activate changes.")
|
||||
for lb_node in data['nodes']:
|
||||
lb_node['condition'] = NODE_ERR
|
||||
except Exception as e:
|
||||
logger.error("Failure activating changes: %s, %s" %
|
||||
(e.__class__, e))
|
||||
for lb_node in data['nodes']:
|
||||
lb_node['condition'] = NODE_ERR
|
||||
else:
|
||||
logger.info("Activated load balancer changes")
|
||||
|
||||
# Return the same JSON object, but with condition fields set.
|
||||
return data
|
||||
logger.debug("Return JSON message: %s" % json.dumps(response, indent=4))
|
||||
return response
|
||||
|
||||
|
||||
class CustomJSONGearmanWorker(JSONGearmanWorker):
|
||||
|
||||
@@ -36,6 +36,7 @@ class TestLBaaSTask(unittest.TestCase):
|
||||
|
||||
worker = FakeWorker()
|
||||
data = {
|
||||
"hpcs_action": "create",
|
||||
"name": "a-new-loadbalancer",
|
||||
"nodes": [
|
||||
{
|
||||
@@ -61,11 +62,33 @@ class TestLBaaSTask(unittest.TestCase):
|
||||
self.assertEqual(r["nodes"][1]["port"], data["nodes"][1]["port"])
|
||||
self.assertIn("condition", r["nodes"][1])
|
||||
|
||||
def testMissingAction(self):
|
||||
""" Test invalid messages: missing hpcs_action """
|
||||
worker = FakeWorker()
|
||||
data = {
|
||||
"name": "a-new-loadbalancer",
|
||||
"nodes": [
|
||||
{
|
||||
"address": "10.1.1.1",
|
||||
"port": "80"
|
||||
},
|
||||
{
|
||||
"address": "10.1.1.2",
|
||||
"port": "81"
|
||||
}
|
||||
]
|
||||
}
|
||||
job = FakeJob(data)
|
||||
r = lbaas_task(worker, job)
|
||||
self.assertIn("hpcs_response", r)
|
||||
self.assertEqual("FAIL", r["hpcs_response"])
|
||||
|
||||
def testMissingNodes(self):
|
||||
""" Test invalid messages: missing nodes """
|
||||
|
||||
worker = FakeWorker()
|
||||
data = {
|
||||
"hpcs_action": "create",
|
||||
"name": "a-new-loadbalancer"
|
||||
}
|
||||
job = FakeJob(data)
|
||||
@@ -78,6 +101,7 @@ class TestLBaaSTask(unittest.TestCase):
|
||||
|
||||
worker = FakeWorker()
|
||||
data = {
|
||||
"hpcs_action": "create",
|
||||
"name": "a-new-loadbalancer",
|
||||
"nodes": [
|
||||
{
|
||||
@@ -95,6 +119,7 @@ class TestLBaaSTask(unittest.TestCase):
|
||||
|
||||
worker = FakeWorker()
|
||||
data = {
|
||||
"hpcs_action": "create",
|
||||
"name": "a-new-loadbalancer",
|
||||
"nodes": [
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user