Merge "Radware LBaaS driver is able to flip to a secondary backend node"

This commit is contained in:
Jenkins 2014-06-09 02:53:44 +00:00 committed by Gerrit Code Review
commit c4a8534d42
3 changed files with 103 additions and 9 deletions

View File

@ -1,5 +1,6 @@
[radware] [radware]
#vdirect_address = 0.0.0.0 #vdirect_address = 0.0.0.0
#ha_secondary_address=
#vdirect_user = vDirect #vdirect_user = vDirect
#vdirect_password = radware #vdirect_password = radware
#service_ha_pair = False #service_ha_pair = False

View File

@ -20,10 +20,10 @@ import base64
import copy import copy
import httplib import httplib
import Queue import Queue
import socket
import threading import threading
import time import time
import eventlet import eventlet
from oslo.config import cfg from oslo.config import cfg
@ -61,6 +61,8 @@ CREATE_SERVICE_HEADER = {'Content-Type':
driver_opts = [ driver_opts = [
cfg.StrOpt('vdirect_address', cfg.StrOpt('vdirect_address',
help=_('IP address of vDirect server.')), help=_('IP address of vDirect server.')),
cfg.StrOpt('ha_secondary_address',
help=_('IP address of secondary vDirect server.')),
cfg.StrOpt('vdirect_user', cfg.StrOpt('vdirect_user',
default='vDirect', default='vDirect',
help=_('vDirect user name.')), help=_('vDirect user name.')),
@ -173,8 +175,10 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
self.l2_l3_setup_params = rad.l2_l3_setup_params self.l2_l3_setup_params = rad.l2_l3_setup_params
self.l4_action_name = rad.l4_action_name self.l4_action_name = rad.l4_action_name
self.actions_to_skip = rad.actions_to_skip self.actions_to_skip = rad.actions_to_skip
vdirect_address = cfg.CONF.radware.vdirect_address vdirect_address = rad.vdirect_address
sec_server = rad.ha_secondary_address
self.rest_client = vDirectRESTClient(server=vdirect_address, self.rest_client = vDirectRESTClient(server=vdirect_address,
secondary_server=sec_server,
user=rad.vdirect_user, user=rad.vdirect_user,
password=rad.vdirect_password) password=rad.vdirect_password)
self.queue = Queue.Queue() self.queue = Queue.Queue()
@ -633,6 +637,7 @@ class vDirectRESTClient:
def __init__(self, def __init__(self,
server='localhost', server='localhost',
secondary_server=None,
user=None, user=None,
password=None, password=None,
port=2189, port=2189,
@ -640,6 +645,7 @@ class vDirectRESTClient:
timeout=5000, timeout=5000,
base_uri=''): base_uri=''):
self.server = server self.server = server
self.secondary_server = secondary_server
self.port = port self.port = port
self.ssl = ssl self.ssl = ssl
self.base_uri = base_uri self.base_uri = base_uri
@ -651,14 +657,48 @@ class vDirectRESTClient:
raise r_exc.AuthenticationMissing() raise r_exc.AuthenticationMissing()
debug_params = {'server': self.server, debug_params = {'server': self.server,
'sec_server': self.secondary_server,
'port': self.port, 'port': self.port,
'ssl': self.ssl} 'ssl': self.ssl}
LOG.debug(_('vDirectRESTClient:init server=%(server)s, ' LOG.debug(_('vDirectRESTClient:init server=%(server)s, '
'port=%(port)d, ' 'secondary server=%(sec_server)s, '
'ssl=%(ssl)r'), debug_params) 'port=%(port)d, '
'ssl=%(ssl)r'), debug_params)
def _flip_servers(self):
LOG.warning(_('Fliping servers. Current is: %(server)s, '
'switching to %(secondary)s'),
{'server': self.server,
'secondary': self.secondary_server})
self.server, self.secondary_server = self.secondary_server, self.server
def _recover(self, action, resource, data, headers, binary=False):
if self.server and self.secondary_server:
self._flip_servers()
resp = self._call(action, resource, data,
headers, binary)
return resp
else:
LOG.exception(_('REST client is not able to recover '
'since only one vDirect server is '
'configured.'))
return -1, None, None, None
def call(self, action, resource, data, headers, binary=False):
resp = self._call(action, resource, data, headers, binary)
if resp[RESP_STATUS] == -1:
LOG.warning(_('vDirect server is not responding (%s).'),
self.server)
return self._recover(action, resource, data, headers, binary)
elif resp[RESP_STATUS] in (301, 307):
LOG.warning(_('vDirect server is not active (%s).'),
self.server)
return self._recover(action, resource, data, headers, binary)
else:
return resp
@call_log.log @call_log.log
def call(self, action, resource, data, headers, binary=False): def _call(self, action, resource, data, headers, binary=False):
if resource.startswith('http'): if resource.startswith('http'):
uri = resource uri = resource
else: else:
@ -701,11 +741,11 @@ class vDirectRESTClient:
# response was not JSON, ignore the exception # response was not JSON, ignore the exception
pass pass
ret = (response.status, response.reason, respstr, respdata) ret = (response.status, response.reason, respstr, respdata)
except (socket.timeout, socket.error) as e: except Exception as e:
log_dict = {'action': action, 'e': e} log_dict = {'action': action, 'e': e}
LOG.error(_('vdirectRESTClient: %(action)s failure, %(e)r'), LOG.error(_('vdirectRESTClient: %(action)s failure, %(e)r'),
log_dict) log_dict)
ret = 0, None, None, None ret = -1, None, None, None
conn.close() conn.close()
return ret return ret
@ -853,7 +893,14 @@ class OperationCompletionHandler(threading.Thread):
def _rest_wrapper(response, success_codes=[202]): def _rest_wrapper(response, success_codes=[202]):
"""Wrap a REST call and make sure a valid status is returned.""" """Wrap a REST call and make sure a valid status is returned."""
if response[RESP_STATUS] not in success_codes: if not response:
raise r_exc.RESTRequestFailure(
status=-1,
reason="Unknown",
description="Unknown",
success_codes=success_codes
)
elif response[RESP_STATUS] not in success_codes:
raise r_exc.RESTRequestFailure( raise r_exc.RESTRequestFailure(
status=response[RESP_STATUS], status=response[RESP_STATUS],
reason=response[RESP_REASON], reason=response[RESP_REASON],

View File

@ -32,6 +32,7 @@ from neutron.services.loadbalancer.drivers.radware import exceptions as r_exc
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate') GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate')
SERVER_DOWN_CODES = (-1, 301, 307)
class QueueMock(Queue.Queue): class QueueMock(Queue.Queue):
@ -43,10 +44,16 @@ class QueueMock(Queue.Queue):
self.completion_handler(oper) self.completion_handler(oper)
def _recover_function_mock(action, resource, data, headers, binary=False):
pass
def rest_call_function_mock(action, resource, data, headers, binary=False): def rest_call_function_mock(action, resource, data, headers, binary=False):
if rest_call_function_mock.RESPOND_WITH_ERROR: if rest_call_function_mock.RESPOND_WITH_ERROR:
return 400, 'error_status', 'error_description', None return 400, 'error_status', 'error_description', None
if rest_call_function_mock.RESPOND_WITH_SERVER_DOWN in SERVER_DOWN_CODES:
val = rest_call_function_mock.RESPOND_WITH_SERVER_DOWN
return val, 'error_status', 'error_description', None
if action == 'GET': if action == 'GET':
return _get_handler(resource) return _get_handler(resource)
elif action == 'DELETE': elif action == 'DELETE':
@ -114,6 +121,8 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
{'RESPOND_WITH_ERROR': False}) {'RESPOND_WITH_ERROR': False})
rest_call_function_mock.__dict__.update( rest_call_function_mock.__dict__.update(
{'TEMPLATES_MISSING': False}) {'TEMPLATES_MISSING': False})
rest_call_function_mock.__dict__.update(
{'RESPOND_WITH_SERVER_DOWN': 200})
self.operation_completer_start_mock = mock.Mock( self.operation_completer_start_mock = mock.Mock(
return_value=None) return_value=None)
@ -121,13 +130,22 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
return_value=None) return_value=None)
self.driver_rest_call_mock = mock.Mock( self.driver_rest_call_mock = mock.Mock(
side_effect=rest_call_function_mock) side_effect=rest_call_function_mock)
self.flip_servers_mock = mock.Mock(
return_value=None)
self.recover_mock = mock.Mock(
side_effect=_recover_function_mock)
radware_driver = self.plugin_instance.drivers['radware'] radware_driver = self.plugin_instance.drivers['radware']
radware_driver.completion_handler.start = ( radware_driver.completion_handler.start = (
self.operation_completer_start_mock) self.operation_completer_start_mock)
radware_driver.completion_handler.join = ( radware_driver.completion_handler.join = (
self.operation_completer_join_mock) self.operation_completer_join_mock)
self.orig_call = radware_driver.rest_client.call
self.orig__call = radware_driver.rest_client._call
radware_driver.rest_client.call = self.driver_rest_call_mock radware_driver.rest_client.call = self.driver_rest_call_mock
radware_driver.rest_client._call = self.driver_rest_call_mock
radware_driver.rest_client._flip_servers = self.flip_servers_mock
radware_driver.rest_client._recover = self.recover_mock
radware_driver.completion_handler.rest_client.call = ( radware_driver.completion_handler.rest_client.call = (
self.driver_rest_call_mock) self.driver_rest_call_mock)
@ -136,6 +154,34 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
self.addCleanup(radware_driver.completion_handler.join) self.addCleanup(radware_driver.completion_handler.join)
def test_rest_client_recover_was_called(self):
"""Call the real REST client and verify _recover is called."""
radware_driver = self.plugin_instance.drivers['radware']
radware_driver.rest_client.call = self.orig_call
radware_driver.rest_client._call = self.orig__call
self.assertRaises(r_exc.RESTRequestFailure,
radware_driver._verify_workflow_templates)
self.recover_mock.assert_called_once()
def test_rest_client_flip_servers(self):
radware_driver = self.plugin_instance.drivers['radware']
server = radware_driver.rest_client.server
sec_server = radware_driver.rest_client.secondary_server
radware_driver.rest_client._flip_servers()
self.assertEqual(server,
radware_driver.rest_client.secondary_server)
self.assertEqual(sec_server,
radware_driver.rest_client.server)
def test_verify_workflow_templates_server_down(self):
"""Test the rest call failure when backend is down."""
for value in SERVER_DOWN_CODES:
rest_call_function_mock.__dict__.update(
{'RESPOND_WITH_SERVER_DOWN': value})
self.assertRaises(r_exc.RESTRequestFailure,
self.plugin_instance.drivers['radware'].
_verify_workflow_templates)
def test_verify_workflow_templates(self): def test_verify_workflow_templates(self):
"""Test the rest call failure handling by Exception raising.""" """Test the rest call failure handling by Exception raising."""
rest_call_function_mock.__dict__.update( rest_call_function_mock.__dict__.update(