From 5d7f10f6b895779866feb183289e356b76d16d67 Mon Sep 17 00:00:00 2001 From: Michael Johnson <johnsomor@gmail.com> Date: Tue, 22 Jan 2019 18:07:58 -0800 Subject: [PATCH] Fix flavors support when using spares pool This patch validates that a flavor is compatible with using spares pool amphora. It will also update the amphora-agent config after a spares pool amphora has been allocated. This patch enables the ability to update a running amphora's agent configuration and have the mutatable options be adopted. The following amphora agent configuration options can be updated: heartbeat_key controller_ip_port_list heartbeat_interval loadbalancer_topology This patch adds the support to the amphora-agent and the amphora driver. A follow on patch will expose this capabililty via the amphora admin API. Change-Id: I97bdf5188808193516509f20767e82c0f8d2f5a5 --- .../contributor/api/haproxy-amphora-api.rst | 114 +++++++++++++----- .../backends/agent/api_server/server.py | 34 ++++++ .../backends/health_daemon/health_sender.py | 33 +++-- .../amphorae/driver_exceptions/exceptions.py | 5 + octavia/amphorae/drivers/driver_base.py | 10 ++ .../drivers/haproxy/rest_api_driver.py | 29 +++++ .../amphorae/drivers/noop_driver/driver.py | 12 +- octavia/common/config.py | 4 + octavia/common/constants.py | 1 + octavia/common/validate.py | 8 ++ .../controller/worker/controller_worker.py | 24 ++-- .../controller/worker/flows/amphora_flows.py | 6 +- .../worker/tasks/amphora_driver_tasks.py | 28 ++++- .../controller/worker/tasks/database_tasks.py | 10 +- .../backend/agent/api_server/test_server.py | 42 +++++++ .../health_daemon/test_health_sender.py | 34 ++++++ .../drivers/haproxy/test_rest_api_driver.py | 18 +++ .../test_noop_amphoraloadbalancer_driver.py | 8 ++ octavia/tests/unit/common/test_validations.py | 10 ++ .../worker/flows/test_amphora_flows.py | 12 +- .../worker/tasks/test_amphora_driver_tasks.py | 50 ++++++++ .../worker/test_controller_worker.py | 3 +- 22 files changed, 435 insertions(+), 60 deletions(-) diff --git a/doc/source/contributor/api/haproxy-amphora-api.rst b/doc/source/contributor/api/haproxy-amphora-api.rst index 553832bb8b..77d630c7b7 100644 --- a/doc/source/contributor/api/haproxy-amphora-api.rst +++ b/doc/source/contributor/api/haproxy-amphora-api.rst @@ -29,9 +29,9 @@ communication is limited to fail-over protocols.) Versioning ---------- All Octavia APIs (including internal APIs like this one) are versioned. For the -purposes of this document, the initial version of this API shall be v0.1. (So, +purposes of this document, the initial version of this API shall be v0.5. (So, any reference to a *:version* variable should be replaced with the literal -string 'v0.1'.) +string 'v0.5'.) Response codes -------------- @@ -408,7 +408,7 @@ Get interface :: GET URL: - https://octavia-haproxy-img-00328.local/v0.1/interface/10.0.0.1 + https://octavia-haproxy-img-00328.local/v0.5/interface/10.0.0.1 JSON Response: { @@ -422,7 +422,7 @@ Get interface :: GET URL: - https://octavia-haproxy-img-00328.local/v0.1/interface/10.5.0.1 + https://octavia-haproxy-img-00328.local/v0.5/interface/10.5.0.1 JSON Response: { @@ -435,7 +435,7 @@ Get interface :: GET URL: - https://octavia-haproxy-img-00328.local/v0.1/interface/10.6.0.1.1 + https://octavia-haproxy-img-00328.local/v0.5/interface/10.6.0.1.1 JSON Response: { @@ -621,7 +621,7 @@ Start or Stop a listener :: PUT URL: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/start + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/start JSON Response: { @@ -634,7 +634,7 @@ Start or Stop a listener :: PUT URL: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/BAD_TEST_DATA + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/BAD_TEST_DATA JSON Response: { @@ -647,7 +647,7 @@ Start or Stop a listener :: PUT URL: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/stop + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/stop JSON Response: { @@ -660,7 +660,7 @@ Start or Stop a listener :: PUT URL: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/stop + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/stop Response: { @@ -724,7 +724,7 @@ Delete a listener :: DELETE URL: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a JSON Response: { @@ -736,7 +736,7 @@ Delete a listener :: DELETE URL: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a JSON Response: { @@ -812,7 +812,7 @@ explicitly restarted :: PUT URI: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem (Put data should contain the certificate information, concatenated as described above) @@ -826,7 +826,7 @@ explicitly restarted :: PUT URI: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem (If PUT data does not contain a certificate) JSON Response: @@ -839,7 +839,7 @@ explicitly restarted :: PUT URI: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem (If PUT data does not contain an RSA key) JSON Response: @@ -852,7 +852,7 @@ explicitly restarted :: PUT URI: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem (If the first certificate and the RSA key do not have the same modulus.) JSON Response: @@ -865,7 +865,7 @@ explicitly restarted :: PUT URI: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem JSON Response: { @@ -988,7 +988,7 @@ Delete SSL certificate PEM file :: DELETE URL: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem JSON Response: { @@ -1000,7 +1000,7 @@ Delete SSL certificate PEM file :: DELETE URL: - https://octavia-haproxy-img-00328.local/v0.1/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem + https://octavia-haproxy-img-00328.local/v0.5/listeners/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/certificates/www.example.com.pem JSON Response: { @@ -1071,7 +1071,7 @@ out of the haproxy daemon status interface for tracking health and stats). :: PUT URL: - https://octavia-haproxy-img-00328.local/v0.1/listeners/d459b1c8-54b0-4030-9bec-4f449e73b1ef/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/haproxy + https://octavia-haproxy-img-00328.local/v0.5/listeners/d459b1c8-54b0-4030-9bec-4f449e73b1ef/04bff5c3-5862-4a13-b9e3-9b440d0ed50a/haproxy (Upload PUT data should be a raw haproxy.conf file.) JSON Response: @@ -1134,7 +1134,7 @@ Get listener haproxy configuration :: GET URL: - https://octavia-haproxy-img-00328.local/v0.1/listeners/7e9f91eb-b3e6-4e3b-a1a7-d6f7fdc1de7c/haproxy + https://octavia-haproxy-img-00328.local/v0.5/listeners/7e9f91eb-b3e6-4e3b-a1a7-d6f7fdc1de7c/haproxy Response is the raw haproxy.cfg: @@ -1210,7 +1210,7 @@ Plug VIP :: POST URL: - https://octavia-haproxy-img-00328.local/v0.1/plug/vip/203.0.113.2 + https://octavia-haproxy-img-00328.local/v0.5/plug/vip/203.0.113.2 JSON POST parameters: { @@ -1292,7 +1292,7 @@ Plug Network :: POST URL: - https://octavia-haproxy-img-00328.local/v0.1/plug/network/ + https://octavia-haproxy-img-00328.local/v0.5/plug/network/ JSON POST parameters: { @@ -1362,7 +1362,7 @@ not be available for some time. :: PUT URI: - https://octavia-haproxy-img-00328.local/v0.1/certificate + https://octavia-haproxy-img-00328.local/v0.5/certificate (Put data should contain the certificate information, concatenated as described above) @@ -1376,7 +1376,7 @@ not be available for some time. :: PUT URI: - https://octavia-haproxy-img-00328.local/v0.1/certificates + https://octavia-haproxy-img-00328.local/v0.5/certificates (If PUT data does not contain a certificate) JSON Response: @@ -1389,7 +1389,7 @@ not be available for some time. :: PUT URI: - https://octavia-haproxy-img-00328.local/v0.1/certificate + https://octavia-haproxy-img-00328.local/v0.5/certificate (If PUT data does not contain an RSA key) JSON Response: @@ -1402,7 +1402,7 @@ not be available for some time. :: PUT URI: - https://octavia-haproxy-img-00328.local/v0.1/certificate + https://octavia-haproxy-img-00328.local/v0.5/certificate (If the first certificate and the RSA key do not have the same modulus.) JSON Response: @@ -1441,7 +1441,7 @@ OK :: PUT URI: - https://octavia-haproxy-img-00328.local/v0.1/vrrp/upload + https://octavia-haproxy-img-00328.local/v0.5/vrrp/upload JSON Response: { @@ -1489,7 +1489,7 @@ Start, Stop, or Reload keepalived :: PUT URL: - https://octavia-haproxy-img-00328.local/v0.1/vrrp/start + https://octavia-haproxy-img-00328.local/v0.5/vrrp/start JSON Response: { @@ -1502,7 +1502,7 @@ Start, Stop, or Reload keepalived :: PUT URL: - https://octavia-haproxy-img-00328.local/v0.1/vrrp/BAD_TEST_DATA + https://octavia-haproxy-img-00328.local/v0.5/vrrp/BAD_TEST_DATA JSON Response: { @@ -1515,7 +1515,7 @@ Start, Stop, or Reload keepalived :: PUT URL: - https://octavia-haproxy-img-00328.local/v0.1/vrrp/stop + https://octavia-haproxy-img-00328.local/v0.5/vrrp/stop JSON Response: { @@ -1523,4 +1523,58 @@ Start, Stop, or Reload keepalived 'details': 'keeepalived process with PID 3352 not found', } +Update the amphora agent configuration +-------------------------------------- +* **URL:** /*:version*/config +* **Method:** PUT + +* **Data params:** A amphora-agent configuration file +* **Success Response:** + + * Code: 202 + + * Content: OK + +* **Error Response:** + + * Code: 500 + + * message: Unable to update amphora-agent configuration. + * details: *(The exception details)* + +* **Response:** + +| OK + +* **Implied actions:** + + * The running amphora-agent configuration file is mutated. + +**Notes:** Only options that are marked mutable in the oslo configuration +will be updated. + +**Examples:** + +* Success code 202: + +:: + + PUT URL: + https://octavia-haproxy-img-00328.local/v0.5/config + (Upload PUT data should be a raw amphora-agent.conf file.) + + JSON Response: + { + 'message': 'OK' + } + +* Error code 500: + +:: + + JSON Response: + { + 'message': 'Unable to update amphora-agent configuration.', + 'details': *(The exception output)*, + } diff --git a/octavia/amphorae/backends/agent/api_server/server.py b/octavia/amphorae/backends/agent/api_server/server.py index b360ef565a..de5b77a105 100644 --- a/octavia/amphorae/backends/agent/api_server/server.py +++ b/octavia/amphorae/backends/agent/api_server/server.py @@ -12,8 +12,12 @@ # License for the specific language governing permissions and limitations # under the License. +import os +import stat import flask +from oslo_config import cfg +from oslo_log import log as logging import six import webob from werkzeug import exceptions @@ -28,7 +32,10 @@ from octavia.amphorae.backends.agent.api_server import plug from octavia.amphorae.backends.agent.api_server import udp_listener_base from octavia.amphorae.backends.agent.api_server import util +BUFFER = 1024 +CONF = cfg.CONF PATH_PREFIX = '/' + api_server.VERSION +LOG = logging.getLogger(__name__) # make the error pages all json @@ -81,6 +88,9 @@ class Server(object): self.app.add_url_rule(rule=PATH_PREFIX + '/listeners/<listener_id>', view_func=self.delete_listener, methods=['DELETE']) + self.app.add_url_rule(rule=PATH_PREFIX + '/config', + view_func=self.upload_config, + methods=['PUT']) self.app.add_url_rule(rule=PATH_PREFIX + '/details', view_func=self.get_details, methods=['GET']) @@ -217,3 +227,27 @@ class Server(object): def get_interface(self, ip_addr): return self._amphora_info.get_interface(ip_addr) + + def upload_config(self): + try: + stream = flask.request.stream + file_path = cfg.find_config_files(project=CONF.project, + prog=CONF.prog)[0] + flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC + # mode 00600 + mode = stat.S_IRUSR | stat.S_IWUSR + with os.fdopen(os.open(file_path, flags, mode), 'wb') as cfg_file: + b = stream.read(BUFFER) + while b: + cfg_file.write(b) + b = stream.read(BUFFER) + + CONF.mutate_config_files() + except Exception as e: + LOG.error("Unable to update amphora-agent configuration: " + "{}".format(str(e))) + return webob.Response(json=dict( + message="Unable to update amphora-agent configuration.", + details=str(e)), status=500) + + return webob.Response(json={'message': 'OK'}, status=202) diff --git a/octavia/amphorae/backends/health_daemon/health_sender.py b/octavia/amphorae/backends/health_daemon/health_sender.py index e43597a3e5..bb4128cdba 100644 --- a/octavia/amphorae/backends/health_daemon/health_sender.py +++ b/octavia/amphorae/backends/health_daemon/health_sender.py @@ -33,18 +33,9 @@ def round_robin_addr(addrinfo_list): class UDPStatusSender(object): def __init__(self): - self.dests = [] - for ipport in CONF.health_manager.controller_ip_port_list: - try: - ip, port = ipport.rsplit(':', 1) - except ValueError: - LOG.error("Invalid ip and port '%s' in health_manager " - "controller_ip_port_list", ipport) - break - self.update(ip, port) + self._update_dests() self.v4sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.v6sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) - self.key = str(CONF.health_manager.heartbeat_key) def update(self, dest, port): addrlist = socket.getaddrinfo(dest, port, 0, socket.SOCK_DGRAM) @@ -55,7 +46,9 @@ class UDPStatusSender(object): break def _send_msg(self, dest, msg): - envelope_str = status_message.wrap_envelope(msg, self.key) + # Note: heartbeat_key is mutable and must be looked up for each call + envelope_str = status_message.wrap_envelope( + msg, str(CONF.health_manager.heartbeat_key)) # dest = (family, socktype, proto, canonname, sockaddr) # e.g. 0 = sock family, 4 = sockaddr - what we actually need try: @@ -71,7 +64,25 @@ class UDPStatusSender(object): # if the message isn't received pass + # The controller_ip_port_list configuration has mutated, reload it. + def _update_dests(self): + self.dests = [] + for ipport in CONF.health_manager.controller_ip_port_list: + try: + ip, port = ipport.rsplit(':', 1) + except ValueError: + LOG.error("Invalid ip and port '%s' in health_manager " + "controller_ip_port_list", ipport) + break + self.update(ip, port) + self.current_controller_ip_port_list = ( + CONF.health_manager.controller_ip_port_list) + def dosend(self, obj): + # Check for controller_ip_port_list mutation + if not (self.current_controller_ip_port_list == + CONF.health_manager.controller_ip_port_list): + self._update_dests() dest = round_robin_addr(self.dests) if dest is None: LOG.error('No controller address found. Unable to send heartbeat.') diff --git a/octavia/amphorae/driver_exceptions/exceptions.py b/octavia/amphorae/driver_exceptions/exceptions.py index 76008446bf..b93b434017 100644 --- a/octavia/amphorae/driver_exceptions/exceptions.py +++ b/octavia/amphorae/driver_exceptions/exceptions.py @@ -116,3 +116,8 @@ class HealthMonitorProvisioningError(ProvisioningErrors): class NodeProvisioningError(ProvisioningErrors): message = _('couldn\'t provision Node') + + +class AmpDriverNotImplementedError(AmphoraDriverError): + + message = _('Amphora does not implement this feature.') diff --git a/octavia/amphorae/drivers/driver_base.py b/octavia/amphorae/drivers/driver_base.py index c3135f7f1c..04d889d483 100644 --- a/octavia/amphorae/drivers/driver_base.py +++ b/octavia/amphorae/drivers/driver_base.py @@ -219,6 +219,16 @@ class AmphoraLoadBalancerDriver(object): """ pass + def update_agent_config(self, amphora, agent_config): + """Upload and update the amphora agent configuration. + + :param amphora: amphora object, needs id and network ip(s) + :type amphora: object + :param agent_config: The new amphora agent configuration file. + :type agent_config: string + """ + pass + @six.add_metaclass(abc.ABCMeta) class HealthMixin(object): diff --git a/octavia/amphorae/drivers/haproxy/rest_api_driver.py b/octavia/amphorae/drivers/haproxy/rest_api_driver.py index d903ea0e33..503e924e96 100644 --- a/octavia/amphorae/drivers/haproxy/rest_api_driver.py +++ b/octavia/amphorae/drivers/haproxy/rest_api_driver.py @@ -289,6 +289,31 @@ class HaproxyAmphoraLoadBalancerDriver( self.client.upload_cert_pem( amp, listener_id, name, pem) + def update_amphora_agent_config(self, amphora, agent_config, + timeout_dict=None): + """Update the amphora agent configuration file. + + :param amphora: The amphora to update. + :type amphora: object + :param agent_config: The new amphora agent configuration. + :type agent_config: string + :param timeout_dict: Dictionary of timeout values for calls to the + amphora. May contain: req_conn_timeout, + req_read_timeout, conn_max_retries, + conn_retry_interval + :returns: None + + Note: This will mutate the amphora agent config and adopt the + new values. + """ + try: + self.client.update_agent_config(amphora, agent_config, + timeout_dict=timeout_dict) + except exc.NotFound: + LOG.debug('Amphora {} does not support the update_agent_config ' + 'API.'.format(amphora.id)) + raise driver_except.AmpDriverNotImplementedError() + # Check a custom hostname class CustomHostNameCheckingAdapter(requests.adapters.HTTPAdapter): @@ -515,3 +540,7 @@ class AmphoraAPIClient(object): amphora_id=amp.id, listener_id=listener_id), timeout_dict, data=config) return exc.check_exception(r) + + def update_agent_config(self, amp, agent_config, timeout_dict=None): + r = self.put(amp, 'config', timeout_dict, data=agent_config) + return exc.check_exception(r) diff --git a/octavia/amphorae/drivers/noop_driver/driver.py b/octavia/amphorae/drivers/noop_driver/driver.py index a9a612dfc0..5138d5d84c 100644 --- a/octavia/amphorae/drivers/noop_driver/driver.py +++ b/octavia/amphorae/drivers/noop_driver/driver.py @@ -104,11 +104,18 @@ class NoopManager(object): load_balancer.id, amphorae_network_config, 'post_vip_plug') def upload_cert_amp(self, amphora, pem_file): - LOG.debug("Amphora %s no-op, upload cert amphora %s,with pem fle %s", + LOG.debug("Amphora %s no-op, upload cert amphora %s,with pem file %s", self.__class__.__name__, amphora.id, pem_file) self.amphoraconfig[amphora.id, pem_file] = (amphora.id, pem_file, 'update_amp_cert_file') + def update_agent_config(self, amphora, agent_config): + LOG.debug("Amphora %s no-op, update agent config amphora " + "%s, with agent config %s", + self.__class__.__name__, amphora.id, agent_config) + self.amphoraconfig[amphora.id, agent_config] = ( + amphora.id, agent_config, 'update_agent_config') + class NoopAmphoraLoadBalancerDriver( driver_base.AmphoraLoadBalancerDriver, @@ -164,6 +171,9 @@ class NoopAmphoraLoadBalancerDriver( self.driver.upload_cert_amp(amphora, pem_file) + def update_agent_config(self, amphora, agent_config): + self.driver.update_agent_config(amphora, agent_config) + def update_vrrp_conf(self, loadbalancer): pass diff --git a/octavia/common/config.py b/octavia/common/config.py index 6a44f0c1a4..6cb0aa78eb 100644 --- a/octavia/common/config.py +++ b/octavia/common/config.py @@ -174,6 +174,7 @@ healthmanager_opts = [ default=None, help=_('Number of processes for amphora stats update.')), cfg.StrOpt('heartbeat_key', + mutable=True, help=_('key used to validate amphora sending ' 'the message'), secret=True), cfg.IntOpt('heartbeat_timeout', @@ -191,9 +192,11 @@ healthmanager_opts = [ help=_('List of controller ip and port pairs for the ' 'heartbeat receivers. Example 127.0.0.1:5555, ' '192.168.0.1:5555'), + mutable=True, default=[]), cfg.IntOpt('heartbeat_interval', default=10, + mutable=True, help=_('Sleep time between sending heartbeats.')), # Used for updating health and stats @@ -377,6 +380,7 @@ controller_worker_opts = [ cfg.StrOpt('loadbalancer_topology', default=constants.TOPOLOGY_SINGLE, choices=constants.SUPPORTED_LB_TOPOLOGIES, + mutable=True, help=_('Load balancer topology configuration. ' 'SINGLE - One amphora per load balancer. ' 'ACTIVE_STANDBY - Two amphora per load balancer.')), diff --git a/octavia/common/constants.py b/octavia/common/constants.py index 5c53c21410..05e5393cf1 100644 --- a/octavia/common/constants.py +++ b/octavia/common/constants.py @@ -334,6 +334,7 @@ AMP_COMPUTE_CONNECTIVITY_WAIT = 'octavia-amp-compute-connectivity-wait' AMP_LISTENER_UPDATE = 'octavia-amp-listeners-update' GENERATE_SERVER_PEM_TASK = 'GenerateServerPEMTask' +AMPHORA_CONFIG_UPDATE_TASK = 'AmphoraConfigUpdateTask' # Batch Member Update constants UNORDERED_MEMBER_UPDATES_FLOW = 'octavia-unordered-member-updates-flow' diff --git a/octavia/common/validate.py b/octavia/common/validate.py index f6d245cc07..5f64ad9871 100644 --- a/octavia/common/validate.py +++ b/octavia/common/validate.py @@ -346,3 +346,11 @@ def ip_not_reserved(ip_address): if ip_address in CONF.networking.reserved_ips: raise exceptions.InvalidOption(value=ip_address, option='member address') + + +def is_flavor_spares_compatible(flavor): + if flavor: + # If a compute flavor is specified, the flavor is not spares compatible + if flavor.get(constants.COMPUTE_FLAVOR, None): + return False + return True diff --git a/octavia/controller/worker/controller_worker.py b/octavia/controller/worker/controller_worker.py index 5a306acb17..c9cda0a1dc 100644 --- a/octavia/controller/worker/controller_worker.py +++ b/octavia/controller/worker/controller_worker.py @@ -108,18 +108,22 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): :returns: amphora_id """ - create_amp_tf = self._taskflow_load( - self._amphora_flows.get_create_amphora_flow(), - store={constants.BUILD_TYPE_PRIORITY: - constants.LB_CREATE_SPARES_POOL_PRIORITY} - ) - with tf_logging.DynamicLoggingListener( - create_amp_tf, log=LOG, - hide_inputs_outputs_of=self._exclude_result_logging_tasks): + try: + create_amp_tf = self._taskflow_load( + self._amphora_flows.get_create_amphora_flow(), + store={constants.BUILD_TYPE_PRIORITY: + constants.LB_CREATE_SPARES_POOL_PRIORITY, + constants.FLAVOR: None} + ) + with tf_logging.DynamicLoggingListener( + create_amp_tf, log=LOG, + hide_inputs_outputs_of=self._exclude_result_logging_tasks): - create_amp_tf.run() + create_amp_tf.run() - return create_amp_tf.storage.fetch('amphora') + return create_amp_tf.storage.fetch('amphora') + except Exception as e: + LOG.error('Failed to create an amphora due to: {}'.format(str(e))) def delete_amphora(self, amphora_id): """Deletes an existing Amphora. diff --git a/octavia/controller/worker/flows/amphora_flows.py b/octavia/controller/worker/flows/amphora_flows.py index bfe529f47c..a5f45b1909 100644 --- a/octavia/controller/worker/flows/amphora_flows.py +++ b/octavia/controller/worker/flows/amphora_flows.py @@ -95,6 +95,10 @@ class AmphoraFlows(object): requires=constants.AMPHORA_ID, provides=constants.AMPHORA)) + post_map_amp_to_lb.add(amphora_driver_tasks.AmphoraConfigUpdate( + name=sf_name + '-' + constants.AMPHORA_CONFIG_UPDATE_TASK, + requires=(constants.AMPHORA, constants.FLAVOR))) + if role == constants.ROLE_MASTER: post_map_amp_to_lb.add(database_tasks.MarkAmphoraMasterInDB( name=sf_name + '-' + constants.MARK_AMP_MASTER_INDB, @@ -252,7 +256,7 @@ class AmphoraFlows(object): # Setup the task that maps an amphora to a load balancer allocate_and_associate_amp = database_tasks.MapLoadbalancerToAmphora( name=sf_name + '-' + constants.MAP_LOADBALANCER_TO_AMPHORA, - requires=constants.LOADBALANCER_ID, + requires=(constants.LOADBALANCER_ID, constants.FLAVOR), provides=constants.AMPHORA_ID) # Define a subflow for if we successfully map an amphora diff --git a/octavia/controller/worker/tasks/amphora_driver_tasks.py b/octavia/controller/worker/tasks/amphora_driver_tasks.py index 7de9436c3b..95a0672d3d 100644 --- a/octavia/controller/worker/tasks/amphora_driver_tasks.py +++ b/octavia/controller/worker/tasks/amphora_driver_tasks.py @@ -20,6 +20,7 @@ from stevedore import driver as stevedore_driver from taskflow import task from taskflow.types import failure +from octavia.amphorae.backends.agent import agent_jinja_cfg from octavia.amphorae.driver_exceptions import exceptions as driver_except from octavia.common import constants from octavia.controller.worker import task_utils as task_utilities @@ -357,7 +358,7 @@ class AmphoraVRRPStart(BaseAmphoraTask): class AmphoraComputeConnectivityWait(BaseAmphoraTask): - """"Task to wait for the compute instance to be up.""" + """Task to wait for the compute instance to be up.""" def execute(self, amphora): """Execute get_info routine for an amphora until it responds.""" @@ -373,3 +374,28 @@ class AmphoraComputeConnectivityWait(BaseAmphoraTask): self.amphora_repo.update(db_apis.get_session(), amphora.id, status=constants.ERROR) raise + + +class AmphoraConfigUpdate(BaseAmphoraTask): + """Task to push a new amphora agent configuration to the amphroa.""" + + def execute(self, amphora, flavor): + # Extract any flavor based settings + if flavor: + topology = flavor.get(constants.LOADBALANCER_TOPOLOGY, + CONF.controller_worker.loadbalancer_topology) + else: + topology = CONF.controller_worker.loadbalancer_topology + + # Build the amphora agent config + agent_cfg_tmpl = agent_jinja_cfg.AgentJinjaTemplater() + agent_config = agent_cfg_tmpl.build_agent_config(amphora.id, topology) + + # Push the new configuration to the amphroa + try: + self.amphora_driver.update_amphora_agent_config(amphora, + agent_config) + except driver_except.AmpDriverNotImplementedError: + LOG.error('Amphora {} does not support agent configuration ' + 'update. Please update the amphora image for this ' + 'amphora. Skipping.'.format(amphora.id)) diff --git a/octavia/controller/worker/tasks/database_tasks.py b/octavia/controller/worker/tasks/database_tasks.py index cecaa0eccd..0535f6d93d 100644 --- a/octavia/controller/worker/tasks/database_tasks.py +++ b/octavia/controller/worker/tasks/database_tasks.py @@ -27,6 +27,7 @@ from taskflow.types import failure from octavia.common import constants from octavia.common import data_models import octavia.common.tls_utils.cert_parser as cert_parser +from octavia.common import validate from octavia.controller.worker import task_utils as task_utilities from octavia.db import api as db_apis from octavia.db import repositories as repo @@ -479,7 +480,7 @@ class AssociateFailoverAmphoraWithLBID(BaseDatabaseTask): class MapLoadbalancerToAmphora(BaseDatabaseTask): """Maps and assigns a load balancer to an amphora in the database.""" - def execute(self, loadbalancer_id, server_group_id=None): + def execute(self, loadbalancer_id, server_group_id=None, flavor=None): """Allocates an Amphora for the load balancer in the database. :param loadbalancer_id: The load balancer id to map to an amphora @@ -495,6 +496,13 @@ class MapLoadbalancerToAmphora(BaseDatabaseTask): "pool allocation.") return None + # Validate the flavor is spares compatible + if not validate.is_flavor_spares_compatible(flavor): + LOG.debug("Load balancer has a flavor that is not compatible with " + "using spares pool amphora. Skipping spares pool " + "allocation.") + return None + amp = self.amphora_repo.allocate_and_associate( db_apis.get_session(), loadbalancer_id) diff --git a/octavia/tests/functional/amphorae/backend/agent/api_server/test_server.py b/octavia/tests/functional/amphorae/backend/agent/api_server/test_server.py index 0dba2405ef..63002ed63e 100644 --- a/octavia/tests/functional/amphorae/backend/agent/api_server/test_server.py +++ b/octavia/tests/functional/amphorae/backend/agent/api_server/test_server.py @@ -19,6 +19,7 @@ import socket import stat import subprocess +import fixtures import mock import netifaces from oslo_config import fixture as oslo_fixture @@ -36,6 +37,7 @@ from octavia.tests.common import utils as test_utils import octavia.tests.unit.base as base +AMP_AGENT_CONF_PATH = '/etc/octavia/amphora-agent.conf' RANDOM_ERROR = 'random error' OK = dict(message='OK') @@ -49,6 +51,11 @@ class TestServerTestCase(base.TestCase): self.conf.config(group="haproxy_amphora", base_path='/var/lib/octavia') self.conf.config(group="controller_worker", loadbalancer_topology=consts.TOPOLOGY_SINGLE) + self.conf.load_raw_values(project='fake_project') + self.conf.load_raw_values(prog='fake_prog') + self.useFixture(fixtures.MockPatch( + 'oslo_config.cfg.find_config_files', + return_value=[AMP_AGENT_CONF_PATH])) with mock.patch('distro.id', return_value='ubuntu'): self.ubuntu_test_server = server.Server() @@ -2650,3 +2657,38 @@ class TestServerTestCase(base.TestCase): self.assertEqual(200, rv.status_code) self.assertEqual(expected_dict, json.loads(rv.data.decode('utf-8'))) + + def test_ubuntu_upload_config(self): + self._test_upload_config(consts.UBUNTU) + + def test_centos_upload_config(self): + self._test_upload_config(consts.CENTOS) + + @mock.patch('oslo_config.cfg.CONF.mutate_config_files') + def _test_upload_config(self, distro, mock_mutate): + server.BUFFER = 5 # test the while loop + m = self.useFixture( + test_utils.OpenFixture(AMP_AGENT_CONF_PATH)).mock_open + with mock.patch('os.open'), mock.patch.object(os, 'fdopen', m): + if distro == consts.UBUNTU: + rv = self.ubuntu_app.put('/' + api_server.VERSION + + '/config', data='TestTest') + elif distro == consts.CENTOS: + rv = self.centos_app.put('/' + api_server.VERSION + + '/config', data='TestTest') + self.assertEqual(202, rv.status_code) + self.assertEqual(OK, json.loads(rv.data.decode('utf-8'))) + handle = m() + handle.write.assert_any_call(six.b('TestT')) + handle.write.assert_any_call(six.b('est')) + mock_mutate.assert_called_once_with() + + # Test the exception handling + mock_mutate.side_effect = Exception('boom') + if distro == consts.UBUNTU: + rv = self.ubuntu_app.put('/' + api_server.VERSION + + '/config', data='TestTest') + elif distro == consts.CENTOS: + rv = self.centos_app.put('/' + api_server.VERSION + + '/config', data='TestTest') + self.assertEqual(500, rv.status_code) diff --git a/octavia/tests/unit/amphorae/backends/health_daemon/test_health_sender.py b/octavia/tests/unit/amphorae/backends/health_daemon/test_health_sender.py index afc1353c71..0994ee7335 100644 --- a/octavia/tests/unit/amphorae/backends/health_daemon/test_health_sender.py +++ b/octavia/tests/unit/amphorae/backends/health_daemon/test_health_sender.py @@ -127,3 +127,37 @@ class TestHealthSender(base.TestCase): # Should not raise an exception sender.dosend(SAMPLE_MSG) + + # Test an controller_ip_port_list update + sendto_mock.reset_mock() + mock_getaddrinfo.reset_mock() + self.conf.config(group="health_manager", + controller_ip_port_list=['192.0.2.20:80']) + mock_getaddrinfo.return_value = [(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP, + '', + ('192.0.2.20', 80))] + sender = health_sender.UDPStatusSender() + sender.dosend(SAMPLE_MSG) + sendto_mock.assert_called_once_with(SAMPLE_MSG_BIN, + ('192.0.2.20', 80)) + mock_getaddrinfo.assert_called_once_with('192.0.2.20', '80', + 0, socket.SOCK_DGRAM) + sendto_mock.reset_mock() + mock_getaddrinfo.reset_mock() + + self.conf.config(group="health_manager", + controller_ip_port_list=['192.0.2.21:81']) + mock_getaddrinfo.return_value = [(socket.AF_INET, + socket.SOCK_DGRAM, + socket.IPPROTO_UDP, + '', + ('192.0.2.21', 81))] + sender.dosend(SAMPLE_MSG) + mock_getaddrinfo.assert_called_once_with('192.0.2.21', '81', + 0, socket.SOCK_DGRAM) + sendto_mock.assert_called_once_with(SAMPLE_MSG_BIN, + ('192.0.2.21', 81)) + sendto_mock.reset_mock() + mock_getaddrinfo.reset_mock() diff --git a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py index 300e88766f..a070142b61 100644 --- a/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py +++ b/octavia/tests/unit/amphorae/drivers/haproxy/test_rest_api_driver.py @@ -375,6 +375,11 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase): self.driver.client.get_info.assert_called_once_with(self.amp) self.assertEqual(ref_versions, result) + def test_update_amphora_agent_config(self): + self.driver.update_amphora_agent_config(self.amp, six.b('test')) + self.driver.client.update_agent_config.assert_called_once_with( + self.amp, six.b('test'), timeout_dict=None) + class TestAmphoraAPIClientTest(base.TestCase): @@ -1067,3 +1072,16 @@ class TestAmphoraAPIClientTest(base.TestCase): self.assertRaises(exc.InternalServerError, self.driver.get_interface, self.amp, ip_addr) + + @requests_mock.mock() + def test_update_agent_config(self, m): + m.put("{base}/config".format(base=self.base_url)) + resp_body = self.driver.update_agent_config(self.amp, "some_file") + self.assertEqual(200, resp_body.status_code) + + @requests_mock.mock() + def test_update_agent_config_error(self, m): + m.put("{base}/config".format(base=self.base_url), status_code=500) + self.assertRaises(exc.InternalServerError, + self.driver.update_agent_config, self.amp, + "some_file") diff --git a/octavia/tests/unit/amphorae/drivers/test_noop_amphoraloadbalancer_driver.py b/octavia/tests/unit/amphorae/drivers/test_noop_amphoraloadbalancer_driver.py index dd760e6b11..2ce085679b 100644 --- a/octavia/tests/unit/amphorae/drivers/test_noop_amphoraloadbalancer_driver.py +++ b/octavia/tests/unit/amphorae/drivers/test_noop_amphoraloadbalancer_driver.py @@ -63,6 +63,7 @@ class TestNoopAmphoraLoadBalancerDriver(base.TestCase): vip_subnet=network_models.Subnet(id=self.FAKE_UUID_1)) } self.pem_file = 'test_pem_file' + self.agent_config = 'test agent config' self.timeout_dict = {constants.REQ_CONN_TIMEOUT: 1, constants.REQ_READ_TIMEOUT: 2, constants.CONN_MAX_RETRIES: 3, @@ -147,3 +148,10 @@ class TestNoopAmphoraLoadBalancerDriver(base.TestCase): (self.amphora.id, self.pem_file, 'update_amp_cert_file'), self.driver.driver.amphoraconfig[( self.amphora.id, self.pem_file)]) + + def test_update_agent_config(self): + self.driver.update_agent_config(self.amphora, self.agent_config) + self.assertEqual( + (self.amphora.id, self.agent_config, 'update_agent_config'), + self.driver.driver.amphoraconfig[( + self.amphora.id, self.agent_config)]) diff --git a/octavia/tests/unit/common/test_validations.py b/octavia/tests/unit/common/test_validations.py index c1a8204ca8..c05828673f 100644 --- a/octavia/tests/unit/common/test_validations.py +++ b/octavia/tests/unit/common/test_validations.py @@ -442,3 +442,13 @@ class TestValidations(base.TestCase): self.assertRaises(exceptions.InvalidOption, validate.ip_not_reserved, '2001:0DB8::5') + + def test_is_flavor_spares_compatible(self): + not_compat_flavor = {constants.COMPUTE_FLAVOR: 'chocolate'} + compat_flavor = {constants.LOADBALANCER_TOPOLOGY: + constants.TOPOLOGY_SINGLE} + + self.assertTrue(validate.is_flavor_spares_compatible(None)) + self.assertTrue(validate.is_flavor_spares_compatible(compat_flavor)) + self.assertFalse( + validate.is_flavor_spares_compatible(not_compat_flavor)) diff --git a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py index 418598ebec..943b629a54 100644 --- a/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py +++ b/octavia/tests/unit/controller/worker/flows/test_amphora_flows.py @@ -366,41 +366,45 @@ class TestAmphoraFlows(base.TestCase): self.assertIsInstance(amp_flow, flow.Flow) + self.assertIn(constants.FLAVOR, amp_flow.requires) self.assertIn(constants.AMPHORA_ID, amp_flow.requires) self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertEqual(1, len(amp_flow.provides)) - self.assertEqual(1, len(amp_flow.requires)) + self.assertEqual(2, len(amp_flow.requires)) amp_flow = self.AmpFlow._get_post_map_lb_subflow( 'SOMEPREFIX', constants.ROLE_BACKUP) self.assertIsInstance(amp_flow, flow.Flow) + self.assertIn(constants.FLAVOR, amp_flow.requires) self.assertIn(constants.AMPHORA_ID, amp_flow.requires) self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertEqual(1, len(amp_flow.provides)) - self.assertEqual(1, len(amp_flow.requires)) + self.assertEqual(2, len(amp_flow.requires)) amp_flow = self.AmpFlow._get_post_map_lb_subflow( 'SOMEPREFIX', constants.ROLE_STANDALONE) self.assertIsInstance(amp_flow, flow.Flow) + self.assertIn(constants.FLAVOR, amp_flow.requires) self.assertIn(constants.AMPHORA_ID, amp_flow.requires) self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertEqual(1, len(amp_flow.provides)) - self.assertEqual(1, len(amp_flow.requires)) + self.assertEqual(2, len(amp_flow.requires)) amp_flow = self.AmpFlow._get_post_map_lb_subflow( 'SOMEPREFIX', 'BOGUS_ROLE') self.assertIsInstance(amp_flow, flow.Flow) + self.assertIn(constants.FLAVOR, amp_flow.requires) self.assertIn(constants.AMPHORA_ID, amp_flow.requires) self.assertIn(constants.AMPHORA, amp_flow.provides) self.assertEqual(1, len(amp_flow.provides)) - self.assertEqual(1, len(amp_flow.requires)) + self.assertEqual(2, len(amp_flow.requires)) diff --git a/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py b/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py index 015a8781f0..58432b5cea 100644 --- a/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py +++ b/octavia/tests/unit/controller/worker/tasks/test_amphora_driver_tasks.py @@ -33,6 +33,7 @@ LISTENER_ID = uuidutils.generate_uuid() LB_ID = uuidutils.generate_uuid() CONN_MAX_RETRIES = 10 CONN_RETRY_INTERVAL = 6 +FAKE_CONFIG_FILE = 'fake config file' _amphora_mock = mock.MagicMock() _amphora_mock.id = AMP_ID @@ -71,6 +72,8 @@ class TestAmphoraDriverTasks(base.TestCase): active_connection_max_retries=CONN_MAX_RETRIES) conf.config(group="haproxy_amphora", active_connection_rety_interval=CONN_RETRY_INTERVAL) + conf.config(group="controller_worker", + loadbalancer_topology=constants.TOPOLOGY_SINGLE) super(TestAmphoraDriverTasks, self).setUp() def test_amp_listener_update(self, @@ -615,3 +618,50 @@ class TestAmphoraDriverTasks(base.TestCase): amp_compute_conn_wait_obj.execute, _amphora_mock) mock_amphora_repo_update.assert_called_once_with( _session_mock, AMP_ID, status=constants.ERROR) + + @mock.patch('octavia.amphorae.backends.agent.agent_jinja_cfg.' + 'AgentJinjaTemplater.build_agent_config') + def test_amphora_config_update(self, + mock_build_config, + mock_driver, + mock_generate_uuid, + mock_log, + mock_get_session, + mock_listener_repo_get, + mock_listener_repo_update, + mock_amphora_repo_update): + mock_build_config.return_value = FAKE_CONFIG_FILE + amp_config_update_obj = amphora_driver_tasks.AmphoraConfigUpdate() + mock_driver.update_amphora_agent_config.side_effect = [ + None, None, driver_except.AmpDriverNotImplementedError, + driver_except.TimeOutException] + # With Flavor + flavor = {constants.LOADBALANCER_TOPOLOGY: + constants.TOPOLOGY_ACTIVE_STANDBY} + amp_config_update_obj.execute(_amphora_mock, flavor) + mock_build_config.assert_called_once_with( + _amphora_mock.id, constants.TOPOLOGY_ACTIVE_STANDBY) + mock_driver.update_amphora_agent_config.assert_called_once_with( + _amphora_mock, FAKE_CONFIG_FILE) + # With no Flavor + mock_driver.reset_mock() + mock_build_config.reset_mock() + amp_config_update_obj.execute(_amphora_mock, None) + mock_build_config.assert_called_once_with( + _amphora_mock.id, constants.TOPOLOGY_SINGLE) + mock_driver.update_amphora_agent_config.assert_called_once_with( + _amphora_mock, FAKE_CONFIG_FILE) + # With amphora that does not support config update + mock_driver.reset_mock() + mock_build_config.reset_mock() + amp_config_update_obj.execute(_amphora_mock, flavor) + mock_build_config.assert_called_once_with( + _amphora_mock.id, constants.TOPOLOGY_ACTIVE_STANDBY) + mock_driver.update_amphora_agent_config.assert_called_once_with( + _amphora_mock, FAKE_CONFIG_FILE) + # With an unknown exception + mock_driver.reset_mock() + mock_build_config.reset_mock() + self.assertRaises(driver_except.TimeOutException, + amp_config_update_obj.execute, + _amphora_mock, flavor) diff --git a/octavia/tests/unit/controller/worker/test_controller_worker.py b/octavia/tests/unit/controller/worker/test_controller_worker.py index f474a10c09..d6d7854736 100644 --- a/octavia/tests/unit/controller/worker/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/test_controller_worker.py @@ -141,7 +141,8 @@ class TestControllerWorker(base.TestCase): assert_called_once_with( 'TEST', store={constants.BUILD_TYPE_PRIORITY: - constants.LB_CREATE_SPARES_POOL_PRIORITY})) + constants.LB_CREATE_SPARES_POOL_PRIORITY, + constants.FLAVOR: None})) _flow_mock.run.assert_called_once_with()