49f88f15c9
This commit re-orders the imports to allow enabling in flake8: H306: imports not in alphabetical order This change should have no functional impact on the code Scenario tests all passing. Story: 2004515 Task: 28684 Change-Id: I412474dfdd1ee9d7deade7f7d6aca32085c0f3f2 Signed-off-by: Al Bailey <Al.Bailey@windriver.com>
180 lines
6.0 KiB
Python
Executable File
180 lines
6.0 KiB
Python
Executable File
#
|
|
# Copyright (c) 2015-2016 Wind River Systems, Inc.
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
import threading
|
|
import time
|
|
|
|
from kombu.connection import Connection
|
|
from kombu import Consumer
|
|
from kombu import exceptions
|
|
from kombu import Exchange
|
|
from kombu import Queue
|
|
|
|
from nfv_common import debug
|
|
from nfv_common import selectable
|
|
from nfv_common import selobj
|
|
|
|
from nfv_common.helpers import coroutine
|
|
|
|
DLOG = debug.debug_get_logger('nfv_plugins.nfvi_plugins.openstack.rpc')
|
|
|
|
|
|
class RPCListener(threading.Thread):
|
|
"""
|
|
RPC Listener
|
|
"""
|
|
def __init__(self, host, port, user_id, password, virt_host,
|
|
exchange_name, routing_key, consumer_queue_name):
|
|
super(RPCListener, self).__init__()
|
|
self._exit = threading.Event()
|
|
self._exchange_name = exchange_name
|
|
self._routing_key = routing_key
|
|
self._consumer_queue_name = consumer_queue_name
|
|
|
|
self._exchange = Exchange(self._exchange_name, type='topic', durable=False)
|
|
self._connection = Connection(host, user_id, password, virt_host, port)
|
|
self._rpc_receive_queue = Queue(self._consumer_queue_name, durable=True,
|
|
exchange=self._exchange,
|
|
routing_key=self._routing_key)
|
|
|
|
self._consumer = Consumer(self._connection, self._rpc_receive_queue)
|
|
self._consumer.register_callback(self._callback)
|
|
|
|
self._message_queue = selectable.ThreadQueue(consumer_queue_name)
|
|
self._message_filters_lock = threading.RLock()
|
|
self._message_filters = dict()
|
|
self._message_handlers = dict()
|
|
|
|
selobj.selobj_add_read_obj(self._message_queue.selobj,
|
|
self._dispatch_messages)
|
|
|
|
@coroutine
|
|
def _dispatch_messages(self):
|
|
"""
|
|
Dispatch messages from the message queue
|
|
"""
|
|
while True:
|
|
select_obj = (yield)
|
|
if select_obj == self._message_queue.selobj:
|
|
msg = self._message_queue.get_nowait()
|
|
if msg is not None:
|
|
msg_type = msg.get('type', None)
|
|
if msg_type is not None:
|
|
msg_handler = self._message_handlers.get(msg_type, None)
|
|
if msg_handler is not None:
|
|
msg_handler(msg['data'])
|
|
|
|
def add_message_handler(self, msg_type, msg_filter, msg_handler):
|
|
"""
|
|
Add message handler
|
|
"""
|
|
self._message_filters_lock.acquire()
|
|
try:
|
|
self._message_filters[msg_type] = msg_filter
|
|
finally:
|
|
self._message_filters_lock.release()
|
|
|
|
self._message_handlers[msg_type] = msg_handler
|
|
|
|
def del_message_handler(self, msg_type):
|
|
"""
|
|
Delete message handler
|
|
"""
|
|
self._message_filters_lock.acquire()
|
|
try:
|
|
if msg_type in self._message_filters:
|
|
del self._message_filters[msg_type]
|
|
finally:
|
|
self._message_filters_lock.release()
|
|
|
|
if msg_type in self._message_handlers:
|
|
del self._message_handlers[msg_type]
|
|
|
|
def _callback(self, body, message):
|
|
"""
|
|
RPC Listener callback
|
|
"""
|
|
self._message_filters_lock.acquire()
|
|
try:
|
|
for msg_type, msg_filter in self._message_filters.items():
|
|
msg_data = msg_filter(body)
|
|
if msg_data is not None:
|
|
msg = dict()
|
|
msg['type'] = msg_type
|
|
msg['data'] = msg_data
|
|
self._message_queue.put(msg)
|
|
else:
|
|
import pprint
|
|
pp_msg = pprint.pformat(body)
|
|
DLOG.verbose("Ignoring message: %s" % pp_msg)
|
|
finally:
|
|
self._message_filters_lock.release()
|
|
message.ack()
|
|
|
|
def run(self):
|
|
"""
|
|
RPC Listener main
|
|
"""
|
|
def _connection_error(exc, interval):
|
|
DLOG.info("Connection error, exception=%s" % exc)
|
|
|
|
while not self._exit.is_set():
|
|
try:
|
|
if not self._connection.connected:
|
|
DLOG.info("RPC-Listener not connected to exchange %s, queue=%s."
|
|
% (self._exchange_name, self._consumer_queue_name))
|
|
self._connection.ensure_connection(
|
|
errback=_connection_error, interval_start=2,
|
|
interval_step=0, interval_max=2)
|
|
self._consumer.revive(self._connection)
|
|
DLOG.info("RPC-Listener connected to exchange %s, queue=%s."
|
|
% (self._exchange_name, self._consumer_queue_name))
|
|
|
|
self._consumer.consume()
|
|
self._connection.drain_events(timeout=1)
|
|
|
|
except exceptions.TimeoutError as e:
|
|
DLOG.verbose("Timeout exception, exception=%s" % e)
|
|
|
|
except Exception as e:
|
|
DLOG.error("Exception received, exception=%s." % e)
|
|
time.sleep(2)
|
|
|
|
def stop(self):
|
|
"""
|
|
Stop RPC Listener
|
|
"""
|
|
self._exit.set()
|
|
|
|
|
|
def test_connection(host, port, user_id, password, virt_host, exchange_name):
|
|
"""
|
|
Test a connection to an exchange on a virtual host
|
|
"""
|
|
connection = None
|
|
connected = False
|
|
success = False
|
|
|
|
try:
|
|
# Connect to the virtual host - will raise exception if it fails.
|
|
connection = Connection(host, user_id, password, virt_host, port)
|
|
connection.connect()
|
|
connected = connection.connected
|
|
if connected:
|
|
# Check whether exchange exists - will raise exception if it fails.
|
|
exchange = Exchange(exchange_name, channel=connection,
|
|
type='topic', durable=False, passive=True)
|
|
exchange.declare()
|
|
success = True
|
|
except Exception as e:
|
|
DLOG.info("Unable to connect to virt_host %s, exchange %s, error: %s" %
|
|
(virt_host, exchange_name, e))
|
|
|
|
finally:
|
|
if connected:
|
|
connection.close()
|
|
|
|
return success
|