From 9b6ee5df9af1c9c68732e4df1ed74b05d51e66b6 Mon Sep 17 00:00:00 2001 From: vikas Date: Mon, 3 Aug 2015 07:54:06 -0700 Subject: [PATCH] ovsdb server connection initiation implementation ovsdb server initiates the connection to l2gw agent with the entries in manager table in ovsdb hardware vtep schema. A configurable option 'enable_manager' in .ini file is provided so that the connection can be initiated by the ovsdb server or not based on boolean value. By default 'enable_manager' value is False, turn on the variable to True to initiate the connection from ovsdb server to l2gw agent. Closes-Bug:1466302 Change-Id: Ib668ecda381367ea9f2f3cfa45e63d0e91e48f33 --- etc/l2gateway_agent.ini | 5 + .../l2gateway/agent/ovsdb/base_connection.py | 116 ++++++--- .../services/l2gateway/agent/ovsdb/manager.py | 141 ++++++++--- .../agent/ovsdb/ovsdb_common_class.py | 21 ++ .../l2gateway/agent/ovsdb/ovsdb_monitor.py | 52 +++- .../l2gateway/agent/ovsdb/ovsdb_writer.py | 42 ++-- .../services/l2gateway/common/config.py | 3 + .../agent/ovsdb/test_base_connection.py | 55 +++++ .../l2gateway/agent/ovsdb/test_manager.py | 225 ++++++++++++++++++ .../agent/ovsdb/test_ovsdb_monitor.py | 87 +++++++ .../agent/ovsdb/test_ovsdb_writer.py | 24 +- 11 files changed, 685 insertions(+), 86 deletions(-) create mode 100644 networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_common_class.py diff --git a/etc/l2gateway_agent.ini b/etc/l2gateway_agent.ini index 7839eb5..5e0890f 100644 --- a/etc/l2gateway_agent.ini +++ b/etc/l2gateway_agent.ini @@ -11,6 +11,11 @@ # ovsdb_hosts = # Example: ovsdb_hosts = 'ovsdb1:16.95.16.1:6632,ovsdb2:16.95.16.2:6632' +#enable_manager = False +#connection can be initiated by the ovsdb server. +#By default 'enable_manager' value is False, turn on the variable to True +#to initiate the connection from ovsdb server to l2gw agent. + # (StrOpt) Base path to private key file(s). # Agent will find key file named # $l2_gw_agent_priv_key_base_path/$ovsdb_name.key diff --git a/networking_l2gw/services/l2gateway/agent/ovsdb/base_connection.py b/networking_l2gw/services/l2gateway/agent/ovsdb/base_connection.py index fde60b9..0211e44 100644 --- a/networking_l2gw/services/l2gateway/agent/ovsdb/base_connection.py +++ b/networking_l2gw/services/l2gateway/agent/ovsdb/base_connection.py @@ -18,6 +18,8 @@ import socket import ssl import time +import eventlet +from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils from oslo_utils import excutils @@ -41,41 +43,81 @@ class BaseConnection(object): def __init__(self, conf, gw_config): self.responses = [] self.connected = False - self.gw_config = gw_config - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if gw_config.use_ssl: - ssl_sock = ssl.wrap_socket( - self.socket, - server_side=False, - keyfile=gw_config.private_key, - certfile=gw_config.certificate, - cert_reqs=ssl.CERT_REQUIRED, - ssl_version=ssl.PROTOCOL_TLSv1, - ca_certs=gw_config.ca_cert) - self.socket = ssl_sock + self.enable_manager = cfg.CONF.ovsdb.enable_manager + if self.enable_manager: + self.s = None + self.c_sock = None + self.addr = None + self.check_c_sock = None + eventlet.greenthread.spawn(self._rcv_socket) + else: + self.gw_config = gw_config + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if gw_config.use_ssl: + ssl_sock = ssl.wrap_socket( + self.socket, + server_side=False, + keyfile=gw_config.private_key, + certfile=gw_config.certificate, + cert_reqs=ssl.CERT_REQUIRED, + ssl_version=ssl.PROTOCOL_TLSv1, + ca_certs=gw_config.ca_cert) + self.socket = ssl_sock + retryCount = 0 + while True: + try: + self.socket.connect((str(gw_config.ovsdb_ip), + int(gw_config.ovsdb_port))) + break + except (socket.error, socket.timeout): + LOG.warning(OVSDB_UNREACHABLE_MSG, gw_config.ovsdb_ip) + if retryCount == conf.max_connection_retries: + # Retried for max_connection_retries times. + # Give up and return so that it can be tried in + # the next periodic interval. + with excutils.save_and_reraise_exception(reraise=True): + LOG.exception(_LE("Socket error in connecting to " + "the OVSDB server")) + else: + time.sleep(1) + retryCount += 1 - retryCount = 0 + # Successfully connected to the socket + LOG.debug(OVSDB_CONNECTED_MSG, gw_config.ovsdb_ip) + self.connected = True + + def _rcv_socket(self): + # Create a socket object. + self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + host = '' # Get local machine name + port = 6632 # Reserve a port for your service. + self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.s.bind((host, port)) # Bind to the port + self.s.listen(5) # Now wait for client connection. while True: - try: - self.socket.connect((str(gw_config.ovsdb_ip), - int(gw_config.ovsdb_port))) - break - except (socket.error, socket.timeout): - LOG.warning(OVSDB_UNREACHABLE_MSG, gw_config.ovsdb_ip) - if retryCount == conf.max_connection_retries: - # Retried for max_connection_retries times. - # Give up and return so that it can be tried in - # the next periodic interval. - with excutils.save_and_reraise_exception(reraise=True): - LOG.exception(_LE("Socket error in connecting to " - "the OVSDB server")) - else: - time.sleep(1) - retryCount += 1 + # Establish connection with client. + self.c_sock, ip_addr = self.s.accept() + self.addr = ip_addr[0] + self.c_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + LOG.debug("Got connection from %s ", self.addr) + self.connected = True - # Successfully connected to the socket - LOG.debug(OVSDB_CONNECTED_MSG, gw_config.ovsdb_ip) - self.connected = True + def _echo_response(self): + while True: + try: + if self.enable_manager: + eventlet.greenthread.sleep(0) + response = self.c_sock.recv(n_const.BUFFER_SIZE) + sock_json_m = jsonutils.loads(response) + sock_handler_method = sock_json_m.get('method', None) + if sock_handler_method == 'echo': + self.check_c_sock = True + self.c_sock.send(jsonutils.dumps( + {"result": sock_json_m.get("params", None), + "error": None, "id": sock_json_m['id']})) + break + except Exception: + continue def send(self, message, callback=None): """Sends a message to the OVSDB server.""" @@ -85,7 +127,10 @@ class BaseConnection(object): bytes_sent = 0 while retry_count <= n_const.MAX_RETRIES: try: - bytes_sent = self.socket.send(jsonutils.dumps(message)) + if self.enable_manager: + bytes_sent = self.c_sock.send(jsonutils.dumps(message)) + else: + bytes_sent = self.socket.send(jsonutils.dumps(message)) if bytes_sent: return True except Exception as ex: @@ -100,7 +145,10 @@ class BaseConnection(object): def disconnect(self): """disconnects the connection from the OVSDB server.""" - self.socket.close() + if self.enable_manager: + self.c_sock.close() + else: + self.socket.close() self.connected = False def _response(self, operation_id): diff --git a/networking_l2gw/services/l2gateway/agent/ovsdb/manager.py b/networking_l2gw/services/l2gateway/agent/ovsdb/manager.py index 31e479b..e8d70f7 100644 --- a/networking_l2gw/services/l2gateway/agent/ovsdb/manager.py +++ b/networking_l2gw/services/l2gateway/agent/ovsdb/manager.py @@ -25,6 +25,7 @@ from oslo_service import loopingcall from networking_l2gw.services.l2gateway.agent import base_agent_manager from networking_l2gw.services.l2gateway.agent import l2gateway_config +from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_common_class from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_monitor from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_writer from networking_l2gw.services.l2gateway.common import constants as n_const @@ -41,8 +42,12 @@ class OVSDBManager(base_agent_manager.BaseAgentManager): def __init__(self, conf=None): super(OVSDBManager, self).__init__(conf) self._extract_ovsdb_config(conf) - self.looping_task = loopingcall.FixedIntervalLoopingCall( - self._connect_to_ovsdb_server) + self.enable_manager = cfg.CONF.ovsdb.enable_manager + if self.enable_manager: + self.ovsdb_fd = None + else: + self.looping_task = loopingcall.FixedIntervalLoopingCall( + self._connect_to_ovsdb_server) def _extract_ovsdb_config(self, conf): self.conf = conf or cfg.CONF @@ -110,8 +115,8 @@ class OVSDBManager(base_agent_manager.BaseAgentManager): self.agent_to_plugin_rpc) except Exception: ovsdb_states[key] = 'disconnected' - # Log a warning and continue so that it can be retried - # in the next iteration. + # Log a warning and continue so that it can be + # retried in the next iteration. LOG.error(_LE("OVSDB server %s is not " "reachable"), gateway.ovsdb_ip) # Continue processing the next element in the list. @@ -139,8 +144,9 @@ class OVSDBManager(base_agent_manager.BaseAgentManager): self.l2gw_agent_type = '' self.agent_state.get('configurations')[n_const.L2GW_AGENT_TYPE ] = self.l2gw_agent_type - self._stop_looping_task() - self._disconnect_all_ovsdb_servers() + if not self.enable_manager: + self._stop_looping_task() + self._disconnect_all_ovsdb_servers() def _disconnect_all_ovsdb_servers(self): if self.gateways: @@ -159,9 +165,15 @@ class OVSDBManager(base_agent_manager.BaseAgentManager): # If set to Monitor, then let us start monitoring the OVSDB # servers without any further delay. - if self.l2gw_agent_type == n_const.MONITOR: + if ((self.l2gw_agent_type == n_const.MONITOR) and not ( + self.enable_manager)): self._start_looping_task() - else: + elif self.enable_manager and self.l2gw_agent_type == n_const.MONITOR: + if self.ovsdb_fd is None: + self._sock_open_connection() + elif self.ovsdb_fd and not self.ovsdb_fd.check_monitor_thread: + self.ovsdb_fd._spawn_monitor_thread() + elif not self.enable_manager: # Otherwise, stop monitoring the OVSDB servers # and close the open connections if any. self._stop_looping_task() @@ -176,6 +188,14 @@ class OVSDBManager(base_agent_manager.BaseAgentManager): self.looping_task.start(interval=self.conf.ovsdb. periodic_interval) + def _sock_open_connection(self): + gateway = '' + if self.ovsdb_fd is None: + self.ovsdb_fd = ovsdb_common_class.OVSDB_commom_class( + self.conf.ovsdb, + gateway, + self.agent_to_plugin_rpc) + @contextmanager def _open_connection(self, ovsdb_identifier): ovsdb_fd = None @@ -197,26 +217,62 @@ class OVSDBManager(base_agent_manager.BaseAgentManager): def delete_network(self, context, ovsdb_identifier, logical_switch_uuid): """Handle RPC cast from plugin to delete a network.""" - if self._is_valid_request(ovsdb_identifier): - with self._open_connection(ovsdb_identifier) as ovsdb_fd: - ovsdb_fd.delete_logical_switch(logical_switch_uuid) + if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR: + self.ovsdb_fd.delete_logical_switch(logical_switch_uuid, False) + elif ((self.enable_manager) and ( + not self.l2gw_agent_type == n_const.MONITOR)): + self._sock_open_connection() + self.ovsdb_fd._echo_response() + if self.ovsdb_fd.check_c_sock: + self.ovsdb_fd.delete_logical_switch(logical_switch_uuid, + False) + elif not self.enable_manager: + if self._is_valid_request(ovsdb_identifier): + with self._open_connection(ovsdb_identifier) as ovsdb_fd: + ovsdb_fd.delete_logical_switch(logical_switch_uuid) def add_vif_to_gateway(self, context, ovsdb_identifier, logical_switch_dict, locator_dict, mac_dict): """Handle RPC cast from plugin to insert neutron port MACs.""" - if self._is_valid_request(ovsdb_identifier): - with self._open_connection(ovsdb_identifier) as ovsdb_fd: - ovsdb_fd.insert_ucast_macs_remote(logical_switch_dict, - locator_dict, - mac_dict) + if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR: + self.ovsdb_fd.insert_ucast_macs_remote(logical_switch_dict, + locator_dict, + mac_dict, False) + elif ((self.enable_manager) and ( + not self.l2gw_agent_type == n_const.MONITOR)): + self._sock_open_connection() + self.ovsdb_fd._echo_response() + if self.ovsdb_fd.check_c_sock: + self.ovsdb_fd.insert_ucast_macs_remote( + logical_switch_dict, + locator_dict, + mac_dict) + elif not self.enable_manager: + if self._is_valid_request(ovsdb_identifier): + with self._open_connection(ovsdb_identifier) as ovsdb_fd: + ovsdb_fd.insert_ucast_macs_remote(logical_switch_dict, + locator_dict, + mac_dict) def delete_vif_from_gateway(self, context, ovsdb_identifier, logical_switch_uuid, mac): """Handle RPC cast from plugin to delete neutron port MACs.""" - if self._is_valid_request(ovsdb_identifier): - with self._open_connection(ovsdb_identifier) as ovsdb_fd: - ovsdb_fd.delete_ucast_macs_remote(logical_switch_uuid, mac) + if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR: + self.ovsdb_fd.delete_ucast_macs_remote(logical_switch_uuid, mac, + False) + elif ((self.enable_manager) and ( + not self.l2gw_agent_type == n_const.MONITOR)): + self._sock_open_connection() + self.ovsdb_fd._echo_response() + if self.ovsdb_fd.check_c_sock: + self.ovsdb_fd.delete_ucast_macs_remote( + logical_switch_uuid, + mac) + elif not self.enable_manager: + if self._is_valid_request(ovsdb_identifier): + with self._open_connection(ovsdb_identifier) as ovsdb_fd: + ovsdb_fd.delete_ucast_macs_remote(logical_switch_uuid, mac) def update_vif_to_gateway(self, context, ovsdb_identifier, locator_dict, mac_dict): @@ -224,10 +280,21 @@ class OVSDBManager(base_agent_manager.BaseAgentManager): for VM migration. """ - if self._is_valid_request(ovsdb_identifier): - with self._open_connection(ovsdb_identifier) as ovsdb_fd: - ovsdb_fd.update_ucast_macs_remote(locator_dict, - mac_dict) + if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR: + self.ovsdb_fd.update_ucast_macs_remote(locator_dict, + mac_dict, False) + elif ((self.enable_manager) and ( + not self.l2gw_agent_type == n_const.MONITOR)): + self._sock_open_connection() + self.ovsdb_fd._echo_response() + if self.ovsdb_fd.check_c_sock: + self.ovsdb_fd.update_ucast_macs_remote(locator_dict, + mac_dict) + elif not self.enable_manager: + if self._is_valid_request(ovsdb_identifier): + with self._open_connection(ovsdb_identifier) as ovsdb_fd: + ovsdb_fd.update_ucast_macs_remote(locator_dict, + mac_dict) def update_connection_to_gateway(self, context, ovsdb_identifier, logical_switch_dict, locator_dicts, @@ -237,12 +304,28 @@ class OVSDBManager(base_agent_manager.BaseAgentManager): Handle RPC cast from plugin to connect/disconnect a network to/from an L2 gateway. """ - if self._is_valid_request(ovsdb_identifier): - with self._open_connection(ovsdb_identifier) as ovsdb_fd: - ovsdb_fd.update_connection_to_gateway(logical_switch_dict, - locator_dicts, - mac_dicts, - port_dicts) + if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR: + self.ovsdb_fd.update_connection_to_gateway(logical_switch_dict, + locator_dicts, + mac_dicts, + port_dicts, False) + elif ((self.enable_manager) and ( + not self.l2gw_agent_type == n_const.MONITOR)): + self._sock_open_connection() + self.ovsdb_fd._echo_response() + if self.ovsdb_fd.check_c_sock: + self.ovsdb_fd.update_connection_to_gateway( + logical_switch_dict, + locator_dicts, + mac_dicts, + port_dicts) + elif not self.enable_manager: + if self._is_valid_request(ovsdb_identifier): + with self._open_connection(ovsdb_identifier) as ovsdb_fd: + ovsdb_fd.update_connection_to_gateway(logical_switch_dict, + locator_dicts, + mac_dicts, + port_dicts) def agent_to_plugin_rpc(self, ovsdb_data): self.plugin_rpc.update_ovsdb_changes(ctx.get_admin_context(), diff --git a/networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_common_class.py b/networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_common_class.py new file mode 100644 index 0000000..d3728f9 --- /dev/null +++ b/networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_common_class.py @@ -0,0 +1,21 @@ +# Copyright (c) 2015 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_monitor +from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_writer + + +class OVSDB_commom_class(ovsdb_monitor.OVSDBMonitor, ovsdb_writer.OVSDBWriter): + pass diff --git a/networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_monitor.py b/networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_monitor.py index f647a7b..4a52d8f 100644 --- a/networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_monitor.py +++ b/networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_monitor.py @@ -40,7 +40,15 @@ class OVSDBMonitor(base_connection.BaseConnection): self._setup_dispatch_table() self.read_on = True self.handlers = {"echo": self._default_echo_handler} - eventlet.greenthread.spawn(self._rcv_thread) + if self.enable_manager: + self.check_c_sock = self.c_sock + self.check_monitor_thread = False + if not self.enable_manager: + eventlet.greenthread.spawn(self._rcv_thread) + + def _spawn_monitor_thread(self): + eventlet.greenthread.spawn(self._sock_rcv_thread) + self.check_monitor_thread = True def _initialize_data_dict(self): data_dict = {'new_local_macs': [], @@ -112,6 +120,8 @@ class OVSDBMonitor(base_connection.BaseConnection): response_result = self._process_response(op_id) except exceptions.OVSDBError: with excutils.save_and_reraise_exception(): + if self.enable_manager: + self.check_monitor_thread = False LOG.exception(_LE("Exception while receiving the " "response for the monitor message")) self._process_monitor_msg(response_result) @@ -188,6 +198,43 @@ class OVSDBMonitor(base_connection.BaseConnection): LOG.exception(_LE("Exception [%s] while handling " "message"), e) + def _sock_rcv_thread(self): + chunks = [] + lc = rc = 0 + prev_char = None + self._echo_response() + if self.enable_manager and self.check_c_sock: + eventlet.greenthread.spawn_n(self.set_monitor_response_handler) + while self.read_on: + response = self.c_sock.recv(n_const.BUFFER_SIZE) + eventlet.greenthread.sleep(0) + if response: + response = response.decode('utf8') + message_mark = 0 + for i, c in enumerate(response): + if c == '{' and not (prev_char and + prev_char == '\\'): + lc += 1 + elif c == '}' and not (prev_char and + prev_char == '\\'): + rc += 1 + if rc > lc: + raise Exception(_LE("json string not valid")) + elif lc == rc and lc is not 0: + chunks.append(response[message_mark:i + 1]) + message = "".join(chunks) + eventlet.greenthread.spawn_n( + self._on_remote_message, message) + eventlet.greenthread.sleep(0) + lc = rc = 0 + message_mark = i + 1 + chunks = [] + prev_char = c + chunks.append(response[message_mark:]) + else: + self.read_on = False + self.disconnect() + def _rcv_thread(self): chunks = [] lc = rc = 0 @@ -246,7 +293,8 @@ class OVSDBMonitor(base_connection.BaseConnection): return [element.__dict__ for element in resource_list] def _form_ovsdb_data(self, data_dict): - return {n_const.OVSDB_IDENTIFIER: self.gw_config.ovsdb_identifier, + return {n_const.OVSDB_IDENTIFIER: str(self.addr) if ( + self.enable_manager) else (self.gw_config.ovsdb_identifier), 'new_logical_switches': self._get_list( data_dict.get('new_logical_switches')), 'new_physical_switches': self._get_list( diff --git a/networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_writer.py b/networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_writer.py index df2310c..53ca26d 100644 --- a/networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_writer.py +++ b/networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_writer.py @@ -62,8 +62,14 @@ class OVSDBWriter(base_connection.BaseConnection): try: json_m = jsonutils.loads(response) self.responses.append(json_m) - if self._process_response(operation_id): - return True + method_type = json_m.get('method', None) + if method_type == "echo" and self.enable_manager: + self.c_sock.send(jsonutils.dumps( + {"result": json_m.get("params", None), + "error": None, "id": json_m['id']})) + else: + if self._process_response(operation_id): + return True except Exception as ex: with excutils.save_and_reraise_exception(): LOG.exception(_LE("Exception while receiving the " @@ -74,12 +80,13 @@ class OVSDBWriter(base_connection.BaseConnection): LOG.error(_LE("Could not obtain response from the OVSDB server " "for the request")) - def _send_and_receive(self, query, operation_id): + def _send_and_receive(self, query, operation_id, rcv_required): if not self.send(query): return - self._get_reply(operation_id) + if rcv_required: + self._get_reply(operation_id) - def delete_logical_switch(self, logical_switch_uuid): + def delete_logical_switch(self, logical_switch_uuid, rcv_required=True): """Delete an entry from Logical_Switch OVSDB table.""" commit_dict = {"op": "commit", "durable": True} op_id = str(random.getrandbits(128)) @@ -92,10 +99,10 @@ class OVSDBWriter(base_connection.BaseConnection): commit_dict], "id": op_id} LOG.debug("delete_logical_switch: query: %s", query) - self._send_and_receive(query, op_id) + self._send_and_receive(query, op_id, rcv_required) def insert_ucast_macs_remote(self, l_switch_dict, locator_dict, - mac_dict): + mac_dict, rcv_required=True): """Insert an entry in Ucast_Macs_Remote OVSDB table.""" # To insert an entry in Ucast_Macs_Remote table, it requires # corresponding entry in Physical_Locator (Compute node VTEP IP) @@ -139,9 +146,10 @@ class OVSDBWriter(base_connection.BaseConnection): "params": params, "id": op_id} LOG.debug("insert_ucast_macs_remote: query: %s", query) - self._send_and_receive(query, op_id) + self._send_and_receive(query, op_id, rcv_required) - def update_ucast_macs_remote(self, locator_dict, mac_dict): + def update_ucast_macs_remote(self, locator_dict, mac_dict, + rcv_required=True): """Update an entry in Ucast_Macs_Remote OVSDB table.""" # It is possible that the locator may not exist already. locator = ovsdb_schema.PhysicalLocator(locator_dict['uuid'], @@ -173,9 +181,10 @@ class OVSDBWriter(base_connection.BaseConnection): "params": params, "id": op_id} LOG.debug("update_ucast_macs_remote: query: %s", query) - self._send_and_receive(query, op_id) + self._send_and_receive(query, op_id, rcv_required) - def delete_ucast_macs_remote(self, logical_switch_uuid, macs): + def delete_ucast_macs_remote(self, logical_switch_uuid, macs, + rcv_required=True): """Delete entries from Ucast_Macs_Remote OVSDB table.""" commit_dict = {"op": "commit", "durable": True} op_id = str(random.getrandbits(128)) @@ -196,11 +205,11 @@ class OVSDBWriter(base_connection.BaseConnection): "params": params, "id": op_id} LOG.debug("delete_ucast_macs_remote: query: %s", query) - self._send_and_receive(query, op_id) + self._send_and_receive(query, op_id, rcv_required) def update_connection_to_gateway(self, logical_switch_dict, locator_dicts, mac_dicts, - port_dicts): + port_dicts, rcv_required=True): """Updates Physical Port's VNI to VLAN binding.""" # Form the JSON Query so as to update the physical port with the # vni-vlan (logical switch uuid to vlan) binding @@ -213,7 +222,7 @@ class OVSDBWriter(base_connection.BaseConnection): "params": update_dicts, "id": op_id} LOG.debug("update_connection_to_gateway: query = %s", query) - self._send_and_receive(query, op_id) + self._send_and_receive(query, op_id, rcv_required) def _recv_data(self): chunks = [] @@ -221,7 +230,10 @@ class OVSDBWriter(base_connection.BaseConnection): prev_char = None while True: try: - response = self.socket.recv(n_const.BUFFER_SIZE) + if self.enable_manager: + response = self.c_sock.recv(n_const.BUFFER_SIZE) + else: + response = self.socket.recv(n_const.BUFFER_SIZE) if response: response = response.decode('utf8') for i, c in enumerate(response): diff --git a/networking_l2gw/services/l2gateway/common/config.py b/networking_l2gw/services/l2gateway/common/config.py index 6019cc9..e0f8471 100644 --- a/networking_l2gw/services/l2gateway/common/config.py +++ b/networking_l2gw/services/l2gateway/common/config.py @@ -36,6 +36,9 @@ OVSDB_OPTS = [ cfg.IntOpt('periodic_interval', default=20, help=_('Seconds between periodic task runs')), + cfg.BoolOpt('enable_manager', + default=False, + help=_('Set to True if ovsdb Manager manages the client')), cfg.IntOpt('max_connection_retries', default=10, help=_('Maximum number of retries to open a socket ' diff --git a/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_base_connection.py b/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_base_connection.py index 032bb34..29793ed 100644 --- a/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_base_connection.py +++ b/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_base_connection.py @@ -13,12 +13,15 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet + import contextlib import socket import ssl import time import mock +from oslo_serialization import jsonutils from neutron.tests import base @@ -167,3 +170,55 @@ class TestBaseConnection(base.BaseTestCase): self.l2gw_ovsdb.disconnect() self.assertTrue(sock_close.called) self.assertFalse(self.l2gw_ovsdb.connected) + + +class TestBaseConnection_with_enable_manager(base.BaseTestCase): + def setUp(self): + super(TestBaseConnection_with_enable_manager, self).setUp() + + self.conf = mock.patch.object(conf, 'L2GatewayConfig').start() + config.register_ovsdb_opts_helper(cfg.CONF) + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_ovsdb_conn = base_connection.BaseConnection(mock.Mock(), + self.conf) + self.l2gw_ovsdb_conn.c_sock = mock.patch('socket.socket').start() + self.l2gw_ovsdb_conn.s = mock.patch('socket.socket').start() + + def test_init_with_enable_manager(self): + with mock.patch.object(eventlet.greenthread, + 'spawn') as (mock_thread): + self.l2gw_ovsdb_conn.__init__(mock.Mock(), self.conf) + self.assertEqual(self.l2gw_ovsdb_conn.s, mock.ANY) + self.assertIsNone(self.l2gw_ovsdb_conn.c_sock) + self.assertIsNone(self.l2gw_ovsdb_conn.addr) + self.assertIsNone(self.l2gw_ovsdb_conn.check_c_sock) + self.assertTrue(mock_thread.called) + + def test_echo_response(self): + fake_resp = {"method": "echo", + "params": "fake_params", + "id": "fake_id", + } + with contextlib.nested( + mock.patch.object(eventlet.greenthread, 'sleep'), + mock.patch.object(jsonutils, 'loads', return_value=fake_resp), + mock.patch.object(self.l2gw_ovsdb_conn.c_sock, + 'recv', + return_value=fake_resp), + mock.patch.object(self.l2gw_ovsdb_conn.c_sock, + 'send')) as ( + fake_thread, mock_loads, + mock_sock_rcv, + mock_sock_send): + self.l2gw_ovsdb_conn._echo_response() + self.assertTrue(fake_thread.called) + self.assertTrue(mock_sock_rcv.called) + mock_loads.assert_called_with(fake_resp) + self.assertTrue(self.l2gw_ovsdb_conn.check_c_sock) + self.assertTrue(mock_sock_send.called) + + def test_disconnect_with_enable_manager(self): + with mock.patch.object(self.l2gw_ovsdb_conn.c_sock, + 'close') as mock_close: + self.l2gw_ovsdb_conn.disconnect() + self.assertTrue(mock_close.called) diff --git a/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_manager.py b/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_manager.py index 9b92967..05f04e8 100644 --- a/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_manager.py +++ b/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_manager.py @@ -32,6 +32,7 @@ from networking_l2gw.services.l2gateway.agent import agent_api from networking_l2gw.services.l2gateway.agent import base_agent_manager from networking_l2gw.services.l2gateway.agent import l2gateway_config from networking_l2gw.services.l2gateway.agent.ovsdb import manager +from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_common_class from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_monitor from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_writer from networking_l2gw.services.l2gateway.common import config @@ -221,3 +222,227 @@ class TestManager(base.BaseTestCase): mock.Mock()) self.assertEqual(1, logger_call.call_count) self.assertFalse(mock_del_ls.called) + + def test_init_with_enable_manager(self): + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.__init__() + self.assertIsNone(self.l2gw_agent_manager.ovsdb_fd) + + def test_sock_open_connection(self): + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + with mock.patch.object(ovsdb_common_class, + 'OVSDB_commom_class') as mock_ovsdb_common: + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager._sock_open_connection() + self.assertTrue(mock_ovsdb_common.called) + + def test_set_monitor_agent_with_ovsdb_fd_None(self): + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.conf.host = 'fake_host' + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR + self.l2gw_agent_manager.ovsdb_fd = None + with mock.patch.object(manager.OVSDBManager, + '_sock_open_connection') as mock_open_conn: + self.l2gw_agent_manager.set_monitor_agent(self.context, + 'fake_host') + self.assertTrue(mock_open_conn.called) + + def test_set_monitor_agent_with_ovsdb_fd_not_None(self): + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.conf.host = 'fake_host' + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR + with mock.patch.object(ovsdb_common_class, + 'OVSDB_commom_class') as mock_ovsdb_common: + self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value + self.l2gw_agent_manager.ovsdb_fd.check_monitor_thread = False + self.l2gw_agent_manager.set_monitor_agent(self.context, + 'fake_host') + (self.l2gw_agent_manager.ovsdb_fd._spawn_monitor_thread. + assert_called_with()) + + def test_update_connection_to_gateway_for_monitor_agent(self): + """Test case to test update_connection_to_gateway for + + monitor agent with enable_manager. + """ + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR + with mock.patch.object(ovsdb_common_class, + 'OVSDB_commom_class') as mock_ovsdb_common: + self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value + self.l2gw_agent_manager.update_connection_to_gateway( + self.context, mock.Mock(), mock.Mock(), mock.Mock(), + mock.Mock(), mock.Mock()) + (self.l2gw_agent_manager.ovsdb_fd.update_connection_to_gateway. + assert_called_with(mock.ANY, mock.ANY, mock.ANY, mock.ANY, False)) + + def test_update_connection_to_gateway_for_transact_agent(self): + """Test case to test update_connection_to_gateway + + with enable_manager. + """ + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = '' + with contextlib.nested( + mock.patch.object(ovsdb_common_class, 'OVSDB_commom_class'), + mock.patch.object(manager.OVSDBManager, + '_sock_open_connection')) as ( + mock_ovsdb_common, mock_open_conn): + self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value + self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True + self.l2gw_agent_manager.update_connection_to_gateway( + self.context, mock.Mock(), mock.Mock(), mock.Mock(), + mock.Mock(), mock.Mock()) + (self.l2gw_agent_manager.ovsdb_fd._echo_response. + assert_called_with()) + self.assertTrue(mock_open_conn.called) + (self.l2gw_agent_manager.ovsdb_fd.update_connection_to_gateway. + assert_called_with(mock.ANY, mock.ANY, mock.ANY, mock.ANY)) + + def test_delete_network_for_monitor_agent(self): + """Test case to test delete_network with enable_manager.""" + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR + with mock.patch.object(ovsdb_common_class, + 'OVSDB_commom_class') as mock_ovsdb_common: + self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value + self.l2gw_agent_manager.delete_network( + self.context, mock.Mock(), "fake_logical_switch_uuid") + (self.l2gw_agent_manager.ovsdb_fd.delete_logical_switch. + assert_called_with("fake_logical_switch_uuid", False)) + + def test_delete_network_for_transact_agent(self): + """Test case to test delete_network with enable_manager.""" + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = '' + with contextlib.nested( + mock.patch.object(ovsdb_common_class, 'OVSDB_commom_class'), + mock.patch.object(manager.OVSDBManager, + '_sock_open_connection')) as ( + mock_ovsdb_common, mock_open_conn): + self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value + self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True + self.l2gw_agent_manager.delete_network( + self.context, mock.Mock(), "fake_logical_switch_uuid") + (self.l2gw_agent_manager.ovsdb_fd._echo_response. + assert_called_with()) + self.assertTrue(mock_open_conn.called) + (self.l2gw_agent_manager.ovsdb_fd.delete_logical_switch. + assert_called_with("fake_logical_switch_uuid", False)) + + def test_add_vif_to_gateway_for_monitor_agent(self): + """Test case to test add_vif_to_gateway with enable_manager.""" + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR + with mock.patch.object(ovsdb_common_class, + 'OVSDB_commom_class') as mock_ovsdb_common: + self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value + self.l2gw_agent_manager.add_vif_to_gateway( + self.context, mock.Mock(), "fake_logical_switch_dict", + "fake_locator_dict", "fake_mac_dict") + (self.l2gw_agent_manager.ovsdb_fd.insert_ucast_macs_remote. + assert_called_with("fake_logical_switch_dict", + "fake_locator_dict", "fake_mac_dict", False)) + + def test_add_vif_to_gateway_for_transact_agent(self): + """Test case to test add_vif_to_gateway with enable_manager.""" + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = '' + with contextlib.nested( + mock.patch.object(ovsdb_common_class, 'OVSDB_commom_class'), + mock.patch.object(manager.OVSDBManager, + '_sock_open_connection')) as ( + mock_ovsdb_common, mock_open_conn): + self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value + self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True + self.l2gw_agent_manager.add_vif_to_gateway( + self.context, mock.Mock(), "fake_logical_switch_dict", + "fake_locator_dict", "fake_mac_dict") + (self.l2gw_agent_manager.ovsdb_fd._echo_response. + assert_called_with()) + self.assertTrue(mock_open_conn.called) + (self.l2gw_agent_manager.ovsdb_fd.insert_ucast_macs_remote. + assert_called_with("fake_logical_switch_dict", + "fake_locator_dict", "fake_mac_dict")) + + def test_delete_vif_from_gateway_for_monitor_agent(self): + """Test case to test delete_vif_to_gateway with enable_manager.""" + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR + with mock.patch.object(ovsdb_common_class, + 'OVSDB_commom_class') as mock_ovsdb_common: + self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value + self.l2gw_agent_manager.delete_vif_from_gateway( + self.context, mock.Mock(), "fake_logical_switch_uuid", + "fake_mac") + (self.l2gw_agent_manager.ovsdb_fd.delete_ucast_macs_remote. + assert_called_with("fake_logical_switch_uuid", "fake_mac", False)) + + def test_delete_vif_to_gateway_for_transact_agent(self): + """Test case to test delete_vif_to_gateway with enable_manager.""" + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = '' + with contextlib.nested( + mock.patch.object(ovsdb_common_class, 'OVSDB_commom_class'), + mock.patch.object(manager.OVSDBManager, + '_sock_open_connection')) as ( + mock_ovsdb_common, mock_open_conn): + self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value + self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True + self.l2gw_agent_manager.delete_vif_from_gateway( + self.context, mock.Mock(), "fake_logical_switch_uuid", + "fake_mac") + (self.l2gw_agent_manager.ovsdb_fd._echo_response. + assert_called_with()) + self.assertTrue(mock_open_conn.called) + (self.l2gw_agent_manager.ovsdb_fd.delete_ucast_macs_remote. + assert_called_with("fake_logical_switch_uuid", "fake_mac")) + + def test_update_vif_from_gateway_for_monitor_agent(self): + """Test case to test update_vif_to_gateway with enable_manager.""" + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR + with mock.patch.object(ovsdb_common_class, + 'OVSDB_commom_class') as mock_ovsdb_common: + self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value + self.l2gw_agent_manager.update_vif_to_gateway( + self.context, mock.Mock(), + "fake_logical_switch_uuid", "fake_mac") + (self.l2gw_agent_manager.ovsdb_fd.update_ucast_macs_remote. + assert_called_with( + "fake_logical_switch_uuid", "fake_mac", False)) + + def test_update_vif_to_gateway_for_transact_agent(self): + """Test case to test update_vif_to_gateway + + with enable_manager. + """ + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.l2gw_agent_manager.__init__() + self.l2gw_agent_manager.l2gw_agent_type = '' + with contextlib.nested( + mock.patch.object(ovsdb_common_class, 'OVSDB_commom_class'), + mock.patch.object(manager.OVSDBManager, + '_sock_open_connection')) as ( + mock_ovsdb_common, mock_open_conn): + self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value + self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True + self.l2gw_agent_manager.update_vif_to_gateway( + self.context, mock.Mock(), "fake_logical_switch_uuid", + "fake_mac") + (self.l2gw_agent_manager.ovsdb_fd._echo_response. + assert_called_with()) + self.assertTrue(mock_open_conn.called) + (self.l2gw_agent_manager.ovsdb_fd.update_ucast_macs_remote. + assert_called_with("fake_logical_switch_uuid", "fake_mac")) diff --git a/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_ovsdb_monitor.py b/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_ovsdb_monitor.py index b6d7ed1..f4afe2c 100644 --- a/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_ovsdb_monitor.py +++ b/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_ovsdb_monitor.py @@ -23,6 +23,7 @@ import mock from neutron.tests import base from networking_l2gw.services.l2gateway.agent import l2gateway_config as conf +from networking_l2gw.services.l2gateway.agent.ovsdb import base_connection from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_monitor from networking_l2gw.services.l2gateway.common import config from networking_l2gw.services.l2gateway.common import constants as n_const @@ -707,3 +708,89 @@ class TestOVSDBMonitor(base.BaseTestCase): PhysLocatorSet = phys_loc_set.return_value self.assertIn(PhysLocatorSet, data_dict['deleted_locator_sets']) + + +class SocketClass(object): + def __init__(self, + connect_error=None, + send_error=None, + recv_error=None, + rcv_data=None, + sock=None, + ip_addr=None): + self.connect_error = connect_error + self.rcv_data = rcv_data + self.send_error = send_error + self.recv_error = recv_error + self.sock = sock + self.ip_addr = ip_addr + + def connect(self, ip_port): + if self.connect_error: + raise self.connect_error + + def send(self, data): + if self.send_error: + raise self.send_error + return len(data) + + def recv(self, length): + if self.recv_error: + raise self.recv_error + return self.rcv_data + + def bind(self, host_port): + pass + + def listen(self, conn): + pass + + def accept(self): + return self.sock, self.ip_addr + + def setsockopt(self, sock_opt, sock_reuse, mode): + pass + + +class TestOVSDBMonitor_with_enable_manager(base.BaseTestCase): + def setUp(self): + super(TestOVSDBMonitor_with_enable_manager, self).setUp() + config.register_ovsdb_opts_helper(cfg.CONF) + cfg.CONF.set_override('enable_manager', True, 'ovsdb') + self.conf = mock.Mock() + self.callback = mock.Mock() + self.l2gw_ovsdb = ovsdb_monitor.OVSDBMonitor(mock.Mock(), + self.conf, + self.callback) + fakesocket = SocketClass() + self.l2gw_ovsdb.c_sock = fakesocket + + def test_init_with_enable_manager(self): + self.l2gw_ovsdb.__init__(mock.Mock(), self.conf, + self.callback) + self.assertTrue(self.l2gw_ovsdb.enable_manager) + self.assertFalse(self.l2gw_ovsdb.check_monitor_thread) + self.assertIsNone(self.l2gw_ovsdb.check_c_sock) + + def test_sock_rcv_thread_none(self): + with contextlib.nested( + mock.patch.object(base_connection.BaseConnection, + '_echo_response'), + mock.patch.object(eventlet.greenthread, 'spawn_n'), + mock.patch.object(eventlet.greenthread, 'sleep'), + mock.patch.object(self.l2gw_ovsdb.c_sock, + 'recv', return_value=None), + mock.patch.object(base_connection.BaseConnection, + 'disconnect')) as ( + mock_resp, green_thrd_spawn, green_thrd_sleep, + mock_rcv, mock_disconnect): + self.l2gw_ovsdb.check_c_sock = True + self.l2gw_ovsdb.read_on = True + self.l2gw_ovsdb._sock_rcv_thread() + self.assertTrue(mock_resp.called) + self.assertTrue(green_thrd_spawn.called) + self.assertTrue(green_thrd_sleep.called) + self.assertTrue(mock_rcv.called) + self.assertTrue(mock_disconnect.called) + self.assertFalse(self.l2gw_ovsdb.connected) + self.assertFalse(self.l2gw_ovsdb.read_on) diff --git a/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_ovsdb_writer.py b/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_ovsdb_writer.py index 287f342..f633d00 100644 --- a/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_ovsdb_writer.py +++ b/networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_ovsdb_writer.py @@ -135,10 +135,22 @@ class TestOVSDBWriter(base.BaseTestCase): with mock.patch.object(ovsdb_writer.OVSDBWriter, '_get_reply') as mock_reply: self.l2gw_ovsdb._send_and_receive('some_query', - self.op_id) + self.op_id, True) mock_send.assert_called_with('some_query') mock_reply.assert_called_with(self.op_id) + def test_send_and_receive_with_rcv_required_false(self): + """Test case to test _send_and_receive.""" + with mock.patch.object(base_connection.BaseConnection, + 'send', return_value=True + ) as mock_send: + with mock.patch.object(ovsdb_writer.OVSDBWriter, + '_get_reply') as mock_reply: + self.l2gw_ovsdb._send_and_receive('some_query', + self.op_id, False) + mock_send.assert_called_with('some_query') + mock_reply.assert_not_called() + def test_delete_logical_switch(self): """Test case to test delete_logical_switch.""" commit_dict = {"op": "commit", "durable": True} @@ -163,7 +175,7 @@ class TestOVSDBWriter(base.BaseTestCase): self.l2gw_ovsdb.delete_logical_switch( 'fake_logical_switch_uuid') get_rand.assert_called_with(128) - send_n_receive.assert_called_with(query, self.op_id) + send_n_receive.assert_called_with(query, self.op_id, True) def test_insert_ucast_macs_remote(self): """Test case to test insert ucast_macs_remote.""" @@ -194,7 +206,7 @@ class TestOVSDBWriter(base.BaseTestCase): mock.MagicMock()) get_rand.assert_called_with(128) send_n_receive.assert_called_with(mock.ANY, - self.op_id) + self.op_id, True) self.assertTrue(get_ucast_mac_remote.called) @@ -267,7 +279,7 @@ class TestOVSDBWriter(base.BaseTestCase): mock.MagicMock()) get_rand.assert_called_with(128) send_n_receive.assert_called_with(mock.ANY, - self.op_id) + self.op_id, True) self.assertTrue(get_update_ucast_mac_remote.called) @@ -324,7 +336,7 @@ class TestOVSDBWriter(base.BaseTestCase): mock.MagicMock()) get_rand.assert_called_with(128) send_n_receive.assert_called_with(mock.ANY, - self.op_id) + self.op_id, True) def test_update_connection_to_gateway(self): """Test case to test update_connection_to_gateway.""" @@ -348,7 +360,7 @@ class TestOVSDBWriter(base.BaseTestCase): mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock()) get_rand.assert_called_with(128) send_n_receive.assert_called_with(mock.ANY, - self.op_id) + self.op_id, True) self.assertTrue(get_bindings.called) def test_get_bindings_to_update1(self):