Adding Websocket client for ODL
ODL provides websocket based notifications. https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Restconf:Change_event_notification_subscription. Currently Pseudo Agent port binding uses polling to read host config from ODL. Websocket can be used as alternative to receive notification for host config changes. This patch adds the basic framework for websocket client. The same framework can be used to receive notifications other than host config. Also changes to add websocket to pseudo agent port binding driver is included. Change-Id: Ia73d1f9d4def2497df987126d13422595cf4d7a0
This commit is contained in:
@@ -57,6 +57,8 @@ odl_opts = [
|
||||
default="/restconf/operational/neutron:neutron/hostconfigs"),
|
||||
cfg.IntOpt('restconf_poll_interval', default=30,
|
||||
help=_("Poll interval in seconds for getting ODL hostconfig")),
|
||||
cfg.BoolOpt('enable_websocket_pseudo_agentdb', default=False,
|
||||
help=_('Enable websocket for pseudo-agent-port-binding.')),
|
||||
|
||||
]
|
||||
|
||||
|
||||
331
networking_odl/common/websocket_client.py
Normal file
331
networking_odl/common/websocket_client.py
Normal file
@@ -0,0 +1,331 @@
|
||||
# Copyright (c) 2017 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import excutils
|
||||
import re
|
||||
from requests import codes
|
||||
from requests import exceptions
|
||||
import threading
|
||||
import time
|
||||
import websocket
|
||||
|
||||
from networking_odl._i18n import _
|
||||
from networking_odl.common import client as odl_client
|
||||
|
||||
|
||||
cfg.CONF.import_group('ml2_odl', 'networking_odl.common.config')
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
ODL_OPERATIONAL_DATASTORE = "OPERATIONAL"
|
||||
ODL_CONFIGURATION_DATASTORE = "CONFIGURATION"
|
||||
ODL_NOTIFICATION_SCOPE_BASE = "BASE"
|
||||
ODL_NOTIFICATION_SCOPE_ONE = "ONE"
|
||||
ODL_NOTIFICATION_SCOPE_SUBTREE = "SUBTREE"
|
||||
|
||||
ODL_WEBSOCKET_DISCONNECTED = "ODL_WEBSOCKET_DISCONNECTED"
|
||||
ODL_WEBSOCKET_CONNECTING = "ODL_WEBSOCKET_CONNECTING"
|
||||
ODL_WEBSOCKET_CONNECTED = "ODL_WEBSOCKET_CONNECTED"
|
||||
|
||||
|
||||
class OpendaylightWebsocketClient(object):
|
||||
"""Thread for the Opendaylight Websocket """
|
||||
|
||||
def __init__(self, odl_rest_client, path, datastore, scope, leaf_node_only,
|
||||
packet_handler, timeout, status_cb=None):
|
||||
self.odl_rest_client = odl_rest_client
|
||||
self.path = path
|
||||
self.datastore = datastore
|
||||
self.scope = scope
|
||||
self.leaf_node_only = leaf_node_only
|
||||
self.packet_handler = packet_handler
|
||||
self.timeout = timeout
|
||||
self.exit_websocket_thread = False
|
||||
self.status_cb = status_cb
|
||||
self.current_status = ODL_WEBSOCKET_DISCONNECTED
|
||||
self._odl_sync_thread = self.start_odl_websocket_thread()
|
||||
|
||||
@classmethod
|
||||
def odl_create_websocket(cls, odl_url, path, datastore, scope,
|
||||
packet_handler, status_cb=None,
|
||||
leaf_node_only=False):
|
||||
"""Create a websocket connection with ODL.
|
||||
|
||||
This method will create a websocket client based on path,
|
||||
datastore and scope params. On data recv from websocket
|
||||
packet_handler callback is called. status_cb callback can be
|
||||
provided if notifications are requried for socket status
|
||||
changes
|
||||
"""
|
||||
|
||||
if odl_url is None:
|
||||
LOG.error("invalid odl url", exc_info=True)
|
||||
raise ValueError(_("Invalid ODL URL"))
|
||||
|
||||
odl_rest_client = odl_client.OpenDaylightRestClient.create_client(
|
||||
odl_url)
|
||||
return cls(
|
||||
odl_rest_client, path, datastore, scope, leaf_node_only,
|
||||
packet_handler, cfg.CONF.ml2_odl.timeout, status_cb
|
||||
)
|
||||
|
||||
def start_odl_websocket_thread(self):
|
||||
# Start the websocket thread
|
||||
LOG.debug("starting a new websocket thread")
|
||||
odl_websocket_thread = threading.Thread(
|
||||
name='websocket',
|
||||
target=self.run_websocket_thread)
|
||||
odl_websocket_thread.start()
|
||||
return odl_websocket_thread
|
||||
|
||||
def set_exit_flag(self, value=True):
|
||||
# set flag to exit
|
||||
self.exit_websocket_thread = value
|
||||
|
||||
def run_websocket_thread(self, exit_after_run=False):
|
||||
# TBD connections are persistent so there is really no way to know
|
||||
# when it is a "first connection". We need to wait for the
|
||||
# dis/reconnect logic to be able to know this
|
||||
first_connection = True
|
||||
ws = None
|
||||
while not self.exit_websocket_thread:
|
||||
if exit_after_run:
|
||||
# Permanently waiting thread model breaks unit tests
|
||||
# Adding this arg to exit after one run for unit tests
|
||||
self.set_exit_flag()
|
||||
# connect if necessary
|
||||
if ws is None:
|
||||
try:
|
||||
ws = self._connect_ws()
|
||||
except ValueError:
|
||||
LOG.error("websocket irrecoverable error ")
|
||||
return
|
||||
if ws is None:
|
||||
time.sleep(cfg.CONF.ml2_odl.restconf_poll_interval)
|
||||
continue
|
||||
# read off the websocket
|
||||
try:
|
||||
data = ws.recv()
|
||||
if len(data) == 0:
|
||||
LOG.warning("websocket received 0 bytes")
|
||||
continue
|
||||
except websocket.WebSocketTimeoutException:
|
||||
continue
|
||||
except websocket.WebSocketConnectionClosedException:
|
||||
# per websocket-client, "If remote host closed the connection
|
||||
# or some network error happened"
|
||||
LOG.warning("websocket connection closed or IO error",
|
||||
exc_info=True)
|
||||
self._close_ws(ws)
|
||||
ws = None
|
||||
continue
|
||||
except Exception:
|
||||
# Connection closed trigger reconnection
|
||||
LOG.error("websocket unexpected exception, "
|
||||
"closing and restarting...", exc_info=True)
|
||||
# TODO(rsood): Websocket reconnect can cause race conditions
|
||||
self._close_ws(ws)
|
||||
ws = None
|
||||
continue
|
||||
|
||||
# Call handler for data received
|
||||
try:
|
||||
self.packet_handler(data, first_connection)
|
||||
first_connection = False
|
||||
except Exception:
|
||||
LOG.error("Error in packet_handler callback",
|
||||
exc_info=True)
|
||||
|
||||
self._close_ws(ws)
|
||||
|
||||
def _set_websocket_status(self, status):
|
||||
try:
|
||||
if self.status_cb:
|
||||
self.status_cb(status)
|
||||
except Exception:
|
||||
LOG.error("Error in status_cb", exc_info=True)
|
||||
|
||||
def _subscribe_websocket(self):
|
||||
"""ODL Websocket change notification subscription"""
|
||||
# Check ODL URL for details on this process
|
||||
# https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL:Restconf:
|
||||
# Change_event_notification_subscription#rpc_create-data-change-event-subscription
|
||||
|
||||
# Invoke rpc create-data-change-event-subscription
|
||||
ws_create_dce_subs_url = ("restconf/operations/sal-remote:"
|
||||
"create-data-change-event-subscription")
|
||||
odl_subscription_data = {'input': {
|
||||
'path': self.path,
|
||||
'sal-remote-augment:datastore': self.datastore,
|
||||
'sal-remote-augment:scope': self.scope,
|
||||
'sal-remote-augment:notification-output-type': 'JSON'
|
||||
}}
|
||||
try:
|
||||
response = self.odl_rest_client.sendjson('post',
|
||||
ws_create_dce_subs_url,
|
||||
odl_subscription_data)
|
||||
response.raise_for_status()
|
||||
except exceptions.ConnectionError:
|
||||
LOG.error("cannot connect to the opendaylight controller")
|
||||
return None
|
||||
except exceptions.HTTPError as e:
|
||||
# restconf returns 400 on operation when path is not available
|
||||
if e.response.status_code == codes.bad_request:
|
||||
LOG.debug("response code bad_request (400)"
|
||||
"check path for websocket connection")
|
||||
raise ValueError(_("bad_request (http400),check path."))
|
||||
else:
|
||||
LOG.warning("websocket connection failed",
|
||||
exc_info=True)
|
||||
return None
|
||||
except Exception:
|
||||
LOG.error("websocket subscription failed", exc_info=True)
|
||||
return None
|
||||
|
||||
# Subscribing to stream. Returns websocket URL to listen to
|
||||
ws_dce_subs_url = """restconf/streams/stream/"""
|
||||
try:
|
||||
stream_name = response.json()
|
||||
stream_name = stream_name['output']['stream-name']
|
||||
url = ws_dce_subs_url + stream_name
|
||||
if self.leaf_node_only:
|
||||
url += "?odl-leaf-nodes-only=true"
|
||||
response = self.odl_rest_client.get(url)
|
||||
response.raise_for_status()
|
||||
stream_url = response.headers['location']
|
||||
LOG.debug("websocket stream URL: %s", stream_url)
|
||||
return stream_url
|
||||
except exceptions.ConnectionError:
|
||||
LOG.error("cannot connect to the opendaylight controller")
|
||||
return None
|
||||
except exceptions.HTTPError as e:
|
||||
# restconf returns 404 on operation when there is no entry
|
||||
if e.response.status_code == codes.not_found:
|
||||
LOG.debug("response code not_found (404)"
|
||||
"unable to websocket connection url")
|
||||
raise ValueError(_("bad_request (http400),check path"))
|
||||
else:
|
||||
LOG.warning("websocket connection failed")
|
||||
return None
|
||||
except ValueError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error("websocket subscribe got invalid stream name")
|
||||
except KeyError:
|
||||
LOG.error("websocket subscribe got bad stream data")
|
||||
raise ValueError(_("websocket subscribe bad stream data"))
|
||||
except Exception:
|
||||
LOG.error("websocket subscription failed", exc_info=True)
|
||||
return None
|
||||
|
||||
def _socket_create_connection(self, stream_url):
|
||||
ws = None
|
||||
try:
|
||||
ws = websocket.create_connection(stream_url,
|
||||
timeout=self.timeout)
|
||||
except ValueError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error("websocket create connection invalid URL")
|
||||
except websocket.WebSocketBadStatusException:
|
||||
LOG.error("webSocket bad status exception", exc_info=True)
|
||||
return None
|
||||
except Exception:
|
||||
LOG.exception("websocket create connection failed",
|
||||
exc_info=True)
|
||||
return None
|
||||
if ws is None or not ws.connected:
|
||||
LOG.error("websocket create connection unsuccessful")
|
||||
return None
|
||||
|
||||
LOG.debug("websocket connection established")
|
||||
return ws
|
||||
|
||||
def _connect_ws(self):
|
||||
self._set_websocket_status(ODL_WEBSOCKET_CONNECTING)
|
||||
stream_url = self._subscribe_websocket()
|
||||
if stream_url is None:
|
||||
return None
|
||||
# Delay here causes websocket notification lose (ODL Bug 8299)
|
||||
ws = self._socket_create_connection(stream_url)
|
||||
if ws is not None:
|
||||
self._set_websocket_status(ODL_WEBSOCKET_CONNECTED)
|
||||
return ws
|
||||
|
||||
def _close_ws(self, ws):
|
||||
LOG.debug("closing websocket")
|
||||
try:
|
||||
if ws is not None:
|
||||
ws.close()
|
||||
except Exception:
|
||||
LOG.error("Error while closing websocket", exc_info=True)
|
||||
self._set_websocket_status(ODL_WEBSOCKET_DISCONNECTED)
|
||||
|
||||
|
||||
class EventDataParser(object):
|
||||
"""Helper class to parse websocket notification data"""
|
||||
|
||||
NOTIFICATION_TAG = 'notification'
|
||||
DC_NOTIFICATION_TAG = 'data-changed-notification'
|
||||
DC_EVENT_TAG = 'data-change-event'
|
||||
OPERATION_DELETE = 'deleted'
|
||||
OPERATION_CREATE = 'created'
|
||||
OPERATION_UPDATE = 'updated'
|
||||
|
||||
def __init__(self, item):
|
||||
self.item = item
|
||||
|
||||
@classmethod
|
||||
def get_item(cls, payload):
|
||||
try:
|
||||
data = jsonutils.loads(payload)
|
||||
except ValueError:
|
||||
LOG.warning("invalid websocket notification")
|
||||
return
|
||||
try:
|
||||
dn_events = (data[cls.NOTIFICATION_TAG]
|
||||
[cls.DC_NOTIFICATION_TAG]
|
||||
[cls.DC_EVENT_TAG])
|
||||
|
||||
if not isinstance(dn_events, list):
|
||||
dn_events = [dn_events]
|
||||
|
||||
for e in dn_events:
|
||||
yield cls(e)
|
||||
except KeyError:
|
||||
LOG.warning("invalid JSON for websocket notification")
|
||||
|
||||
def get_fields(self):
|
||||
return (self.get_operation(),
|
||||
self.get_path(),
|
||||
self.get_data())
|
||||
|
||||
def get_path(self):
|
||||
return self.item.get('path')
|
||||
|
||||
def get_data(self):
|
||||
return self.item.get('data')
|
||||
|
||||
def get_operation(self):
|
||||
return self.item.get('operation')
|
||||
|
||||
@staticmethod
|
||||
def extract_field(text, key):
|
||||
pattern = '\[' + key + '=(.*?)\]'
|
||||
match = re.search(pattern, text)
|
||||
if match:
|
||||
return match.group(1)
|
||||
else:
|
||||
return None
|
||||
@@ -29,6 +29,7 @@ import six.moves.urllib.parse as urlparse
|
||||
from string import Template
|
||||
|
||||
from networking_odl.common import client as odl_client
|
||||
from networking_odl.common import websocket_client as odl_ws_client
|
||||
from networking_odl.journal import maintenance as mt
|
||||
from networking_odl.ml2 import port_binding
|
||||
|
||||
@@ -72,9 +73,16 @@ class PseudoAgentDBBindingController(port_binding.PortBindingController):
|
||||
self.agents_db = db_plugin
|
||||
self._known_agents = set()
|
||||
|
||||
# Start polling ODL restconf using maintenance thread.
|
||||
# default: 30s (should be <= agent keep-alive poll interval)
|
||||
self._start_maintenance_thread(cfg.CONF.ml2_odl.restconf_poll_interval)
|
||||
if cfg.CONF.ml2_odl.enable_websocket_pseudo_agentdb:
|
||||
# Update hostconfig once for the configurations already present
|
||||
self._get_and_update_hostconfigs()
|
||||
odl_url = self._make_odl_url(cfg.CONF.ml2_odl.url)
|
||||
self._start_websocket(odl_url)
|
||||
else:
|
||||
# Start polling ODL restconf using maintenance thread.
|
||||
# default: 30s (should be <= agent keep-alive poll interval)
|
||||
self._start_maintenance_thread(
|
||||
cfg.CONF.ml2_odl.restconf_poll_interval)
|
||||
|
||||
def _make_hostconf_uri(self, odl_url=None, path=''):
|
||||
"""Make ODL hostconfigs URI with host/port extraced from ODL_URL."""
|
||||
@@ -86,10 +94,15 @@ class PseudoAgentDBBindingController(port_binding.PortBindingController):
|
||||
purl = urlparse.urlsplit(odl_url)
|
||||
return urlparse.urlunparse((purl.scheme, purl.netloc,
|
||||
path, '', '', ''))
|
||||
#
|
||||
# TODO(mzmalick):
|
||||
# 1. implement websockets for ODL hostconfig events
|
||||
#
|
||||
|
||||
def _make_odl_url(self, odl_url):
|
||||
"""Extract host/port from ODL_URL to use for websocket."""
|
||||
|
||||
# extract ODL_IP and ODL_PORT from ODL_ENDPOINT
|
||||
# urlsplit and urlunparse don't throw exceptions
|
||||
purl = urlparse.urlsplit(odl_url)
|
||||
return urlparse.urlunparse((purl.scheme, purl.netloc,
|
||||
'', '', '', ''))
|
||||
|
||||
def _start_maintenance_thread(self, poll_interval):
|
||||
self._mainth = mt.MaintenanceThread()
|
||||
@@ -151,32 +164,54 @@ class PseudoAgentDBBindingController(port_binding.PortBindingController):
|
||||
def _update_agents_db(self, hostconfigs):
|
||||
LOG.debug("ODLPORTBINDING Updating agents DB with ODL hostconfigs")
|
||||
|
||||
agents_db = self._get_neutron_db_plugin()
|
||||
|
||||
if not agents_db: # if ML2 is still initializing
|
||||
LOG.warning("ML2 still initializing, Will retry agentdb"
|
||||
" update on next poll")
|
||||
return # Retry on next poll
|
||||
|
||||
old_agents = self._known_agents
|
||||
self._old_agents = self._known_agents
|
||||
self._known_agents = set()
|
||||
for host_config in hostconfigs:
|
||||
try:
|
||||
agentdb_row = self._AGENTDB_ROW.copy()
|
||||
host_id = host_config['host-id']
|
||||
agent_type = host_config['host-type']
|
||||
agentdb_row['host'] = host_id
|
||||
agentdb_row['agent_type'] = agent_type
|
||||
agentdb_row['configurations'] = jsonutils.loads(
|
||||
host_config['config'])
|
||||
if (host_id, agent_type) in old_agents:
|
||||
agentdb_row.pop('start_flag', None)
|
||||
agents_db.create_or_update_agent(
|
||||
context.get_admin_context(), agentdb_row)
|
||||
self._known_agents.add((host_id, agent_type))
|
||||
except Exception:
|
||||
LOG.exception("Unable to update agentdb.")
|
||||
continue # try next hostcofig
|
||||
self._update_agents_db_row(host_config)
|
||||
|
||||
def _update_agents_db_row(self, host_config):
|
||||
# Update one row in agent db
|
||||
agents_db = self._get_neutron_db_plugin()
|
||||
if not agents_db: # if ML2 is still initializing
|
||||
LOG.error("ML2 still initializing, Missed an update")
|
||||
# TODO(rsood): Neutron worker can be used
|
||||
return
|
||||
host_id = host_config['host-id']
|
||||
host_type = host_config['host-type']
|
||||
config = host_config['config']
|
||||
try:
|
||||
agentdb_row = self._AGENTDB_ROW.copy()
|
||||
agentdb_row['host'] = host_id
|
||||
agentdb_row['agent_type'] = host_type
|
||||
agentdb_row['configurations'] = jsonutils.loads(config)
|
||||
if (host_id, host_type) in self._old_agents:
|
||||
agentdb_row.pop('start_flag', None)
|
||||
agents_db.create_or_update_agent(
|
||||
context.get_admin_context(), agentdb_row)
|
||||
self._known_agents.add((host_id, host_type))
|
||||
except Exception:
|
||||
LOG.exception("Unable to update agentdb.")
|
||||
|
||||
def _delete_agents_db_row(self, host_id, host_type):
|
||||
"""Delete agent row."""
|
||||
agents_db = self._get_neutron_db_plugin()
|
||||
if not agents_db: # if ML2 is still initializing
|
||||
LOG.error("ML2 still initializing, Missed an update")
|
||||
return None
|
||||
try:
|
||||
filters = {'agent_type': [host_type],
|
||||
'host': [host_id]}
|
||||
# TODO(rsood): get_agent can be used here
|
||||
agent = agents_db.get_agents_db(
|
||||
context.get_admin_context(), filters=filters)
|
||||
if not agent:
|
||||
return
|
||||
|
||||
LOG.debug("Deleting Agent with Agent id: %s", agent[0]['id'])
|
||||
agents_db.delete_agent(context.get_admin_context(), agent[0]['id'])
|
||||
self._known_agents.remove((host_id, host_type))
|
||||
except Exception:
|
||||
LOG.exception("Unable to delete from agentdb.")
|
||||
|
||||
def _substitute_hconfig_tmpl(self, port_context, hconfig):
|
||||
# TODO(mzmalick): Explore options for inlines string splicing of
|
||||
@@ -288,3 +323,49 @@ class PseudoAgentDBBindingController(port_binding.PortBindingController):
|
||||
"""Verify a segment is supported by ODL."""
|
||||
network_type = segment[api.NETWORK_TYPE]
|
||||
return network_type in conf['allowed_network_types']
|
||||
|
||||
def _start_websocket(self, odl_url):
|
||||
# Opendaylight path to recieve websocket notifications on
|
||||
neutron_hostconfigs_path = """/neutron:neutron/neutron:hostconfigs"""
|
||||
|
||||
self.odl_websocket_client = (
|
||||
odl_ws_client.OpendaylightWebsocketClient.odl_create_websocket(
|
||||
odl_url, neutron_hostconfigs_path,
|
||||
odl_ws_client.ODL_OPERATIONAL_DATASTORE,
|
||||
odl_ws_client.ODL_NOTIFICATION_SCOPE_SUBTREE,
|
||||
self._process_websocket_recv,
|
||||
self._process_websocket_reconnect
|
||||
))
|
||||
if self.odl_websocket_client is None:
|
||||
LOG.error("Error starting websocket thread")
|
||||
|
||||
def _process_websocket_recv(self, payload, reconnect):
|
||||
# Callback for websocket notification
|
||||
LOG.debug("Websocket notification for hostconfig update")
|
||||
for event in odl_ws_client.EventDataParser.get_item(payload):
|
||||
try:
|
||||
operation, path, data = event.get_fields()
|
||||
if operation == event.OPERATION_DELETE:
|
||||
host_id = event.extract_field(path, "neutron:host-id")
|
||||
host_type = event.extract_field(path, "neutron:host-type")
|
||||
if not host_id or not host_type:
|
||||
LOG.warning("Invalid delete notification")
|
||||
continue
|
||||
self._delete_agents_db_row(host_id.strip("'"),
|
||||
host_type.strip("'"))
|
||||
elif operation == event.OPERATION_CREATE:
|
||||
if 'hostconfig' in data:
|
||||
hostconfig = data['hostconfig']
|
||||
self._old_agents = self._known_agents
|
||||
self._update_agents_db_row(hostconfig)
|
||||
except KeyError:
|
||||
LOG.warning("Invalid JSON for websocket notification",
|
||||
exc_info=True)
|
||||
continue
|
||||
|
||||
# TODO(rsood): Mixing restconf and websocket can cause race conditions
|
||||
def _process_websocket_reconnect(self, status):
|
||||
if status == odl_ws_client.ODL_WEBSOCKET_CONNECTED:
|
||||
# Get hostconfig data using restconf
|
||||
LOG.debug("Websocket notification on reconnection")
|
||||
self._get_and_update_hostconfigs()
|
||||
|
||||
250
networking_odl/tests/unit/common/test_websocket_client.py
Normal file
250
networking_odl/tests/unit/common/test_websocket_client.py
Normal file
@@ -0,0 +1,250 @@
|
||||
# Copyright (c) 2017 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_serialization import jsonutils
|
||||
import requests
|
||||
import websocket
|
||||
|
||||
from networking_odl.common.client import OpenDaylightRestClient as odl_client
|
||||
from networking_odl.common import websocket_client as wsc
|
||||
from networking_odl.tests import base
|
||||
|
||||
|
||||
class TestWebsocketClient(base.DietTestCase):
|
||||
"""Test class for Websocket Client."""
|
||||
|
||||
FAKE_WEBSOCKET_STREAM_NAME_DATA = {
|
||||
'output': {
|
||||
'stream-name': 'data-change-event-subscription/neutron:neutron/'
|
||||
'neutron:hostconfigs/datastore=OPERATIONAL/scope=SUBTREE'
|
||||
}}
|
||||
|
||||
INVALID_WEBSOCKET_STREAM_NAME_DATA = {
|
||||
'outputs': {
|
||||
'stream-name': 'data-change-event-subscription/neutron:neutron/'
|
||||
'neutron:hostconfigs/datastore=OPERATIONAL/scope=SUBTREE'
|
||||
}}
|
||||
|
||||
FAKE_WEBSOCKET_SUBS_DATA = {
|
||||
'location': 'ws://localhost:8185/data-change-event-subscription/'
|
||||
'neutron:neutron/neutron:hostconfigs/datastore=OPERATIONAL'
|
||||
'/scope=SUBTREE'}
|
||||
|
||||
mock_callback_handler = mock.MagicMock()
|
||||
|
||||
def setUp(self):
|
||||
"""Setup test."""
|
||||
super(TestWebsocketClient, self).setUp()
|
||||
self.useFixture(base.OpenDaylightRestClientFixture())
|
||||
mock.patch.object(wsc.OpendaylightWebsocketClient,
|
||||
'start_odl_websocket_thread').start()
|
||||
|
||||
self.mgr = wsc.OpendaylightWebsocketClient.odl_create_websocket(
|
||||
"http://localhost:8080/",
|
||||
"restconf/operational/neutron:neutron/hostconfigs",
|
||||
wsc.ODL_OPERATIONAL_DATASTORE, wsc.ODL_NOTIFICATION_SCOPE_SUBTREE,
|
||||
TestWebsocketClient.mock_callback_handler
|
||||
)
|
||||
|
||||
def _get_raised_response(self, status_code):
|
||||
response = requests.Response()
|
||||
response.status_code = status_code
|
||||
return response
|
||||
|
||||
@classmethod
|
||||
def _get_mock_request_response(cls, status_code):
|
||||
response = mock.Mock(status_code=status_code)
|
||||
response.raise_for_status = mock.Mock() if status_code < 400 else (
|
||||
mock.Mock(side_effect=requests.exceptions.HTTPError()))
|
||||
return response
|
||||
|
||||
@mock.patch.object(odl_client, 'sendjson')
|
||||
def test_subscribe_websocket_sendjson(self, mocked_sendjson):
|
||||
request_response = self._get_raised_response(401)
|
||||
mocked_sendjson.return_value = request_response
|
||||
stream_url = self.mgr._subscribe_websocket()
|
||||
self.assertIsNone(stream_url)
|
||||
|
||||
request_response = self._get_raised_response(400)
|
||||
mocked_sendjson.return_value = request_response
|
||||
self.assertRaises(ValueError, self.mgr._subscribe_websocket)
|
||||
|
||||
request_response = self._get_mock_request_response(200)
|
||||
request_response.json = mock.Mock(
|
||||
return_value=(TestWebsocketClient.
|
||||
INVALID_WEBSOCKET_STREAM_NAME_DATA))
|
||||
mocked_sendjson.return_value = request_response
|
||||
self.assertRaises(ValueError, self.mgr._subscribe_websocket)
|
||||
|
||||
request_response = self._get_mock_request_response(200)
|
||||
request_response.json = mock.Mock(return_value={""})
|
||||
mocked_sendjson.return_value = request_response
|
||||
self.assertIsNone(self.mgr._subscribe_websocket())
|
||||
|
||||
@mock.patch.object(odl_client, 'get')
|
||||
def test_subscribe_websocket_get(self, mocked_get):
|
||||
request_response = self._get_raised_response(404)
|
||||
mocked_get.return_value = request_response
|
||||
self.assertRaises(ValueError, self.mgr._subscribe_websocket)
|
||||
|
||||
request_response = self._get_raised_response(400)
|
||||
mocked_get.return_value = request_response
|
||||
stream_url = self.mgr._subscribe_websocket()
|
||||
self.assertIsNone(stream_url)
|
||||
|
||||
request_response = self._get_raised_response(401)
|
||||
mocked_get.return_value = request_response
|
||||
stream_url = self.mgr._subscribe_websocket()
|
||||
self.assertIsNone(stream_url)
|
||||
|
||||
@mock.patch.object(odl_client, 'sendjson')
|
||||
@mock.patch.object(odl_client, 'get')
|
||||
def test_subscribe_websocket(self, mocked_get, mocked_sendjson):
|
||||
request_response = self._get_mock_request_response(200)
|
||||
request_response.json = mock.Mock(
|
||||
return_value=TestWebsocketClient.FAKE_WEBSOCKET_STREAM_NAME_DATA)
|
||||
mocked_sendjson.return_value = request_response
|
||||
|
||||
request_response = self._get_mock_request_response(200)
|
||||
request_response.headers = TestWebsocketClient.FAKE_WEBSOCKET_SUBS_DATA
|
||||
mocked_get.return_value = request_response
|
||||
stream_url = self.mgr._subscribe_websocket()
|
||||
|
||||
EXPECTED_OUTPUT = (
|
||||
"ws://localhost:8185/" +
|
||||
"data-change-event-subscription/neutron:neutron/" +
|
||||
"neutron:hostconfigs/datastore=OPERATIONAL/scope=SUBTREE")
|
||||
self.assertEqual(EXPECTED_OUTPUT, stream_url)
|
||||
|
||||
@mock.patch.object(websocket, 'create_connection')
|
||||
def test_create_connection(self, mock_create_connection):
|
||||
mock_create_connection.return_value = None
|
||||
return_value = self.mgr._socket_create_connection("localhost")
|
||||
self.assertIsNone(return_value)
|
||||
|
||||
def test_run_websocket_thread(self):
|
||||
self.mgr._connect_ws = mock.MagicMock(return_value=None)
|
||||
cfg.CONF.ml2_odl.restconf_poll_interval = 0
|
||||
self.mgr.run_websocket_thread(True)
|
||||
assert self.mgr._connect_ws.call_count == 1
|
||||
|
||||
self.mgr.set_exit_flag(False)
|
||||
self.mgr._connect_ws = mock.MagicMock(return_value=1)
|
||||
with mock.patch.object(wsc, 'LOG') as mock_log:
|
||||
self.mgr.run_websocket_thread(True)
|
||||
self.assertTrue(mock_log.error.called)
|
||||
|
||||
self.mgr.set_exit_flag(False)
|
||||
self.mgr._connect_ws = mock.MagicMock(return_value=mock.MagicMock())
|
||||
self.mgr._close_ws = mock.MagicMock(return_value=None)
|
||||
with mock.patch.object(wsc, 'LOG') as mock_log:
|
||||
self.mgr.run_websocket_thread(True)
|
||||
self.assertTrue(mock_log.warning.called)
|
||||
|
||||
self.mgr.set_exit_flag(False)
|
||||
ws = mock.MagicMock()
|
||||
ws.recv.return_value = "Test Data"
|
||||
self.mgr._connect_ws = mock.MagicMock(return_value=ws)
|
||||
self.mgr._close_ws = mock.MagicMock(return_value=None)
|
||||
self.mgr.run_websocket_thread(True)
|
||||
TestWebsocketClient.mock_callback_handler.assert_called_once()
|
||||
|
||||
|
||||
class TestEventDataParser(base.DietTestCase):
|
||||
"""Test class for Websocket Client."""
|
||||
|
||||
# test data port status payload
|
||||
sample_port_status_payload = """{"notification":
|
||||
{"xmlns":"urn:ietf:params:xml:ns:netconf:notification:1.0",
|
||||
"data-changed-notification": { "xmlns":
|
||||
"urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
|
||||
"data-change-event":
|
||||
[{"path":
|
||||
"/neutron:neutron/neutron:ports/neutron:port\
|
||||
[neutron:uuid='a51e439f-4d02-4e76-9b0d-08f6c08855dd']\
|
||||
/neutron:uuid",
|
||||
"data":{"uuid":{"xmlns":"urn:opendaylight:neutron",
|
||||
"content":"a51e439f-4d02-4e76-9b0d-08f6c08855dd"}},
|
||||
"operation":"created"},
|
||||
{"path":
|
||||
"/neutron:neutron/neutron:ports/neutron:port\
|
||||
[neutron:uuid='a51e439f-4d02-4e76-9b0d-08f6c08855dd']\
|
||||
/neutron:status",
|
||||
"data":{"status":{"xmlns":"urn:opendaylight:neutron",
|
||||
"content":"ACTIVE"}},
|
||||
"operation":"created"}
|
||||
]},
|
||||
"eventTime":"2017-03-23T09:28:55.379-07:00"}}"""
|
||||
|
||||
sample_port_status_payload_one_item = """{"notification":
|
||||
{"xmlns": "urn:ietf:params:xml:ns:netconf:notification:1.0",
|
||||
"data-changed-notification": {
|
||||
"data-change-event": {
|
||||
"data": { "status": {
|
||||
"content": "ACTIVE",
|
||||
"xmlns": "urn:opendaylight:neutron"
|
||||
}},
|
||||
"operation": "updated",
|
||||
"path": "/neutron:neutron/neutron:ports/neutron:port\
|
||||
[neutron:uuid='d6e6335d-9568-4949-aef1-4107e34c5f28']\
|
||||
/neutron:status"
|
||||
},
|
||||
"xmlns": "urn:opendaylight:params:xml:ns:yang:controller:md:\
|
||||
sal:remote"
|
||||
},
|
||||
"eventTime": "2017-02-22T02:27:32+02:00" }}"""
|
||||
|
||||
def setUp(self):
|
||||
"""Setup test."""
|
||||
super(TestEventDataParser, self).setUp()
|
||||
|
||||
def test_get_item_port_status_payload(self):
|
||||
sample = jsonutils.loads(self.sample_port_status_payload)
|
||||
expected_events = (sample
|
||||
[wsc.EventDataParser.NOTIFICATION_TAG]
|
||||
[wsc.EventDataParser.DC_NOTIFICATION_TAG]
|
||||
[wsc.EventDataParser.DC_EVENT_TAG])
|
||||
event_0 = expected_events[0]
|
||||
event = wsc.EventDataParser.get_item(self.sample_port_status_payload)
|
||||
operation, path, data = next(event).get_fields()
|
||||
|
||||
self.assertEqual(event_0.get('operation'), operation)
|
||||
self.assertEqual(event_0.get('path'), path)
|
||||
self.assertEqual(event_0.get('data'), data)
|
||||
|
||||
uuid = wsc.EventDataParser.extract_field(path, "neutron:uuid")
|
||||
self.assertEqual("'a51e439f-4d02-4e76-9b0d-08f6c08855dd'", uuid)
|
||||
|
||||
uuid = wsc.EventDataParser.extract_field(path, "invalidkey")
|
||||
self.assertIsNone(uuid)
|
||||
|
||||
def test_get_item_port_status_payload_one_item(self):
|
||||
sample = jsonutils.loads(self.sample_port_status_payload_one_item)
|
||||
expected_events = (sample
|
||||
[wsc.EventDataParser.NOTIFICATION_TAG]
|
||||
[wsc.EventDataParser.DC_NOTIFICATION_TAG]
|
||||
[wsc.EventDataParser.DC_EVENT_TAG])
|
||||
event = (wsc.EventDataParser.
|
||||
get_item(self.sample_port_status_payload_one_item))
|
||||
operation, path, data = next(event).get_fields()
|
||||
|
||||
self.assertEqual(expected_events.get('operation'), operation)
|
||||
self.assertEqual(expected_events.get('path'), path)
|
||||
self.assertEqual(expected_events.get('data'), data)
|
||||
|
||||
uuid = wsc.EventDataParser.extract_field(path, "neutron:uuid")
|
||||
self.assertEqual("'d6e6335d-9568-4949-aef1-4107e34c5f28'", uuid)
|
||||
@@ -0,0 +1,8 @@
|
||||
---
|
||||
prelude: >
|
||||
Websocket-client provides framework to create
|
||||
webscket clients for ODL.
|
||||
features:
|
||||
- Features include callback on new notifications
|
||||
and callback on reconnection which includes
|
||||
status information.
|
||||
@@ -7,3 +7,4 @@ Babel!=2.4.0,>=2.3.4 # BSD
|
||||
stevedore>=1.20.0 # Apache-2.0
|
||||
debtcollector>=1.2.0 # Apache-2.0
|
||||
neutron-lib>=1.6.0 # Apache-2.0
|
||||
websocket-client>=0.32.0 # LGPLv2+
|
||||
|
||||
Reference in New Issue
Block a user