From 8a7e13b95bf8e3f9c878c23e42eeef4edff9ac6f Mon Sep 17 00:00:00 2001 From: Adam Harwell Date: Mon, 4 Dec 2017 12:31:53 -0800 Subject: [PATCH] Producer/endpoint code to allow for amphora failovers Change-Id: I06b0ec785610c2113384e1da638a56e93b1ab38d --- octavia/api/handlers/queue/producer.py | 23 +++++++++++++++++++ octavia/controller/queue/endpoint.py | 5 ++++ .../unit/api/handlers/queue/test_producer.py | 14 +++++++++++ .../unit/controller/queue/test_endpoint.py | 5 ++++ 4 files changed, 47 insertions(+) diff --git a/octavia/api/handlers/queue/producer.py b/octavia/api/handlers/queue/producer.py index a1c9cb6746..7652df5187 100644 --- a/octavia/api/handlers/queue/producer.py +++ b/octavia/api/handlers/queue/producer.py @@ -120,6 +120,28 @@ class LoadBalancerProducer(BaseProducer): self.client.cast({}, method_name, **kw) +class AmphoraProducer(BaseProducer): + """Sends failover messages to the RPC end of the queue consumer + + """ + PAYLOAD_CLASS = "amphora" + + @property + def payload_class(self): + return self.PAYLOAD_CLASS + + def failover(self, data_model): + """sends a failover message to the controller via oslo.messaging + + :param data_model: + """ + model_id = getattr(data_model, 'id', None) + p_class = self.payload_class + kw = {"{0}_id".format(p_class): model_id} + method_name = "failover_{0}".format(self.payload_class) + self.client.cast({}, method_name, **kw) + + class ListenerProducer(BaseProducer): """Sends updates,deletes and creates to the RPC end of the queue consumer @@ -215,3 +237,4 @@ class ProducerHandler(abstract_handler.BaseHandler): member = MemberProducer() l7policy = L7PolicyProducer() l7rule = L7RuleProducer() + amphora = AmphoraProducer() diff --git a/octavia/controller/queue/endpoint.py b/octavia/controller/queue/endpoint.py index 51375ca87b..219725d87d 100644 --- a/octavia/controller/queue/endpoint.py +++ b/octavia/controller/queue/endpoint.py @@ -58,6 +58,11 @@ class Endpoint(object): load_balancer_id) self.worker.failover_loadbalancer(load_balancer_id) + def failover_amphora(self, context, amphora_id): + LOG.info('Failing over amphora \'%s\'...', + amphora_id) + self.worker.failover_amphora(amphora_id) + def create_listener(self, context, listener_id): LOG.info('Creating listener \'%s\'...', listener_id) self.worker.create_listener(listener_id) diff --git a/octavia/tests/unit/api/handlers/queue/test_producer.py b/octavia/tests/unit/api/handlers/queue/test_producer.py index 16481fd2b8..51ce90a226 100644 --- a/octavia/tests/unit/api/handlers/queue/test_producer.py +++ b/octavia/tests/unit/api/handlers/queue/test_producer.py @@ -78,6 +78,20 @@ class TestProducer(base.TestCase): self.mck_client.cast.assert_called_once_with( {}, 'delete_load_balancer', **kw) + def test_failover_loadbalancer(self): + p = producer.LoadBalancerProducer() + p.failover(self.mck_model) + kw = {'load_balancer_id': self.mck_model.id} + self.mck_client.cast.assert_called_once_with( + {}, 'failover_load_balancer', **kw) + + def test_failover_amphora(self): + p = producer.AmphoraProducer() + p.failover(self.mck_model) + kw = {'amphora_id': self.mck_model.id} + self.mck_client.cast.assert_called_once_with( + {}, 'failover_amphora', **kw) + def test_update_loadbalancer(self): p = producer.LoadBalancerProducer() lb = data_models.LoadBalancer(id=10) diff --git a/octavia/tests/unit/controller/queue/test_endpoint.py b/octavia/tests/unit/controller/queue/test_endpoint.py index 1ef9efbb32..9be458ea83 100644 --- a/octavia/tests/unit/controller/queue/test_endpoint.py +++ b/octavia/tests/unit/controller/queue/test_endpoint.py @@ -61,6 +61,11 @@ class TestEndpoint(base.TestCase): self.ep.worker.failover_loadbalancer.assert_called_once_with( self.resource_id) + def test_failover_amphora(self): + self.ep.failover_amphora(self.context, self.resource_id) + self.ep.worker.failover_amphora.assert_called_once_with( + self.resource_id) + def test_create_listener(self): self.ep.create_listener(self.context, self.resource_id) self.ep.worker.create_listener.assert_called_once_with(