neutron/neutron/services/loadbalancer/drivers/embrane/agent/dispatcher.py

109 lines
4.5 KiB
Python

# Copyright 2014 Embrane, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ivar Lazzaro, Embrane, Inc. ivar@embrane.com
from eventlet import greenthread
from eventlet import queue
from heleosapi import exceptions as h_exc
from neutron.openstack.common import log as logging
from neutron.plugins.embrane.common import contexts as ctx
from neutron.services.loadbalancer.drivers.embrane.agent import lb_operations
from neutron.services.loadbalancer.drivers.embrane import constants as econ
LOG = logging.getLogger(__name__)
class Dispatcher(object):
def __init__(self, driver, async=True):
self._async = async
self._driver = driver
self.sync_items = dict()
self.handlers = lb_operations.handlers
def dispatch_lb(self, d_context, *args, **kwargs):
item = d_context.item
event = d_context.event
n_context = d_context.n_context
chain = d_context.chain
item_id = item["id"]
if event in self.handlers:
for f in self.handlers[event]:
first_run = False
if item_id not in self.sync_items:
self.sync_items[item_id] = [queue.Queue()]
first_run = True
self.sync_items[item_id][0].put(
ctx.OperationContext(event, n_context, item, chain, f,
args, kwargs))
if first_run:
t = greenthread.spawn(self._consume_lb,
item_id,
self.sync_items[item_id][0],
self._driver,
self._async)
self.sync_items[item_id].append(t)
if not self._async:
t = self.sync_items[item_id][1]
t.wait()
def _consume_lb(self, sync_item, sync_queue, driver, a_sync):
current_state = None
while True:
try:
if current_state == econ.DELETED:
del self.sync_items[sync_item]
return
try:
operation_context = sync_queue.get(
block=a_sync,
timeout=econ.QUEUE_TIMEOUT)
except queue.Empty:
del self.sync_items[sync_item]
return
(operation_context.chain and
operation_context.chain.execute_all())
transient_state = None
try:
transient_state = operation_context.function(
driver, operation_context.n_context,
operation_context.item, *operation_context.args,
**operation_context.kwargs)
except (h_exc.PendingDva, h_exc.DvaNotFound,
h_exc.BrokenInterface, h_exc.DvaCreationFailed,
h_exc.BrokenDva, h_exc.ConfigurationFailed) as ex:
LOG.warning(econ.error_map[type(ex)], ex.message)
except h_exc.DvaDeleteFailed as ex:
LOG.warning(econ.error_map[type(ex)], ex.message)
transient_state = econ.DELETED
finally:
# if the returned transient state is None, no operations
# are required on the DVA status
if transient_state == econ.DELETED:
current_state = driver._delete_vip(
operation_context.n_context,
operation_context.item)
# Error state cannot be reverted
else:
driver._update_vip_graph_state(
operation_context.n_context,
operation_context.item)
except Exception:
LOG.exception(_('Unhandled exception occurred'))