ovsdb server connection initiation implementation

ovsdb server initiates the connection to l2gw agent with the entries in
manager table in ovsdb hardware vtep schema.
A configurable option 'enable_manager' in .ini file is provided so that
the connection can be initiated by the ovsdb server or not based on boolean
value.
By default 'enable_manager' value is False, turn on the variable to True
to initiate the connection from ovsdb server to l2gw agent.

Closes-Bug:1466302

Change-Id: Ib668ecda381367ea9f2f3cfa45e63d0e91e48f33
stable/ocata
vikas 7 years ago
parent b730370310
commit 9b6ee5df9a
  1. 5
      etc/l2gateway_agent.ini
  2. 116
      networking_l2gw/services/l2gateway/agent/ovsdb/base_connection.py
  3. 141
      networking_l2gw/services/l2gateway/agent/ovsdb/manager.py
  4. 21
      networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_common_class.py
  5. 52
      networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_monitor.py
  6. 42
      networking_l2gw/services/l2gateway/agent/ovsdb/ovsdb_writer.py
  7. 3
      networking_l2gw/services/l2gateway/common/config.py
  8. 55
      networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_base_connection.py
  9. 225
      networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_manager.py
  10. 87
      networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_ovsdb_monitor.py
  11. 24
      networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_ovsdb_writer.py

@ -11,6 +11,11 @@
# ovsdb_hosts =
# Example: ovsdb_hosts = 'ovsdb1:16.95.16.1:6632,ovsdb2:16.95.16.2:6632'
#enable_manager = False
#connection can be initiated by the ovsdb server.
#By default 'enable_manager' value is False, turn on the variable to True
#to initiate the connection from ovsdb server to l2gw agent.
# (StrOpt) Base path to private key file(s).
# Agent will find key file named
# $l2_gw_agent_priv_key_base_path/$ovsdb_name.key

@ -18,6 +18,8 @@ import socket
import ssl
import time
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import excutils
@ -41,41 +43,81 @@ class BaseConnection(object):
def __init__(self, conf, gw_config):
self.responses = []
self.connected = False
self.gw_config = gw_config
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if gw_config.use_ssl:
ssl_sock = ssl.wrap_socket(
self.socket,
server_side=False,
keyfile=gw_config.private_key,
certfile=gw_config.certificate,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_TLSv1,
ca_certs=gw_config.ca_cert)
self.socket = ssl_sock
self.enable_manager = cfg.CONF.ovsdb.enable_manager
if self.enable_manager:
self.s = None
self.c_sock = None
self.addr = None
self.check_c_sock = None
eventlet.greenthread.spawn(self._rcv_socket)
else:
self.gw_config = gw_config
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if gw_config.use_ssl:
ssl_sock = ssl.wrap_socket(
self.socket,
server_side=False,
keyfile=gw_config.private_key,
certfile=gw_config.certificate,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_TLSv1,
ca_certs=gw_config.ca_cert)
self.socket = ssl_sock
retryCount = 0
while True:
try:
self.socket.connect((str(gw_config.ovsdb_ip),
int(gw_config.ovsdb_port)))
break
except (socket.error, socket.timeout):
LOG.warning(OVSDB_UNREACHABLE_MSG, gw_config.ovsdb_ip)
if retryCount == conf.max_connection_retries:
# Retried for max_connection_retries times.
# Give up and return so that it can be tried in
# the next periodic interval.
with excutils.save_and_reraise_exception(reraise=True):
LOG.exception(_LE("Socket error in connecting to "
"the OVSDB server"))
else:
time.sleep(1)
retryCount += 1
retryCount = 0
# Successfully connected to the socket
LOG.debug(OVSDB_CONNECTED_MSG, gw_config.ovsdb_ip)
self.connected = True
def _rcv_socket(self):
# Create a socket object.
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = '' # Get local machine name
port = 6632 # Reserve a port for your service.
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.s.bind((host, port)) # Bind to the port
self.s.listen(5) # Now wait for client connection.
while True:
try:
self.socket.connect((str(gw_config.ovsdb_ip),
int(gw_config.ovsdb_port)))
break
except (socket.error, socket.timeout):
LOG.warning(OVSDB_UNREACHABLE_MSG, gw_config.ovsdb_ip)
if retryCount == conf.max_connection_retries:
# Retried for max_connection_retries times.
# Give up and return so that it can be tried in
# the next periodic interval.
with excutils.save_and_reraise_exception(reraise=True):
LOG.exception(_LE("Socket error in connecting to "
"the OVSDB server"))
else:
time.sleep(1)
retryCount += 1
# Establish connection with client.
self.c_sock, ip_addr = self.s.accept()
self.addr = ip_addr[0]
self.c_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
LOG.debug("Got connection from %s ", self.addr)
self.connected = True
# Successfully connected to the socket
LOG.debug(OVSDB_CONNECTED_MSG, gw_config.ovsdb_ip)
self.connected = True
def _echo_response(self):
while True:
try:
if self.enable_manager:
eventlet.greenthread.sleep(0)
response = self.c_sock.recv(n_const.BUFFER_SIZE)
sock_json_m = jsonutils.loads(response)
sock_handler_method = sock_json_m.get('method', None)
if sock_handler_method == 'echo':
self.check_c_sock = True
self.c_sock.send(jsonutils.dumps(
{"result": sock_json_m.get("params", None),
"error": None, "id": sock_json_m['id']}))
break
except Exception:
continue
def send(self, message, callback=None):
"""Sends a message to the OVSDB server."""
@ -85,7 +127,10 @@ class BaseConnection(object):
bytes_sent = 0
while retry_count <= n_const.MAX_RETRIES:
try:
bytes_sent = self.socket.send(jsonutils.dumps(message))
if self.enable_manager:
bytes_sent = self.c_sock.send(jsonutils.dumps(message))
else:
bytes_sent = self.socket.send(jsonutils.dumps(message))
if bytes_sent:
return True
except Exception as ex:
@ -100,7 +145,10 @@ class BaseConnection(object):
def disconnect(self):
"""disconnects the connection from the OVSDB server."""
self.socket.close()
if self.enable_manager:
self.c_sock.close()
else:
self.socket.close()
self.connected = False
def _response(self, operation_id):

@ -25,6 +25,7 @@ from oslo_service import loopingcall
from networking_l2gw.services.l2gateway.agent import base_agent_manager
from networking_l2gw.services.l2gateway.agent import l2gateway_config
from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_common_class
from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_monitor
from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_writer
from networking_l2gw.services.l2gateway.common import constants as n_const
@ -41,8 +42,12 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
def __init__(self, conf=None):
super(OVSDBManager, self).__init__(conf)
self._extract_ovsdb_config(conf)
self.looping_task = loopingcall.FixedIntervalLoopingCall(
self._connect_to_ovsdb_server)
self.enable_manager = cfg.CONF.ovsdb.enable_manager
if self.enable_manager:
self.ovsdb_fd = None
else:
self.looping_task = loopingcall.FixedIntervalLoopingCall(
self._connect_to_ovsdb_server)
def _extract_ovsdb_config(self, conf):
self.conf = conf or cfg.CONF
@ -110,8 +115,8 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
self.agent_to_plugin_rpc)
except Exception:
ovsdb_states[key] = 'disconnected'
# Log a warning and continue so that it can be retried
# in the next iteration.
# Log a warning and continue so that it can be
# retried in the next iteration.
LOG.error(_LE("OVSDB server %s is not "
"reachable"), gateway.ovsdb_ip)
# Continue processing the next element in the list.
@ -139,8 +144,9 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
self.l2gw_agent_type = ''
self.agent_state.get('configurations')[n_const.L2GW_AGENT_TYPE
] = self.l2gw_agent_type
self._stop_looping_task()
self._disconnect_all_ovsdb_servers()
if not self.enable_manager:
self._stop_looping_task()
self._disconnect_all_ovsdb_servers()
def _disconnect_all_ovsdb_servers(self):
if self.gateways:
@ -159,9 +165,15 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
# If set to Monitor, then let us start monitoring the OVSDB
# servers without any further delay.
if self.l2gw_agent_type == n_const.MONITOR:
if ((self.l2gw_agent_type == n_const.MONITOR) and not (
self.enable_manager)):
self._start_looping_task()
else:
elif self.enable_manager and self.l2gw_agent_type == n_const.MONITOR:
if self.ovsdb_fd is None:
self._sock_open_connection()
elif self.ovsdb_fd and not self.ovsdb_fd.check_monitor_thread:
self.ovsdb_fd._spawn_monitor_thread()
elif not self.enable_manager:
# Otherwise, stop monitoring the OVSDB servers
# and close the open connections if any.
self._stop_looping_task()
@ -176,6 +188,14 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
self.looping_task.start(interval=self.conf.ovsdb.
periodic_interval)
def _sock_open_connection(self):
gateway = ''
if self.ovsdb_fd is None:
self.ovsdb_fd = ovsdb_common_class.OVSDB_commom_class(
self.conf.ovsdb,
gateway,
self.agent_to_plugin_rpc)
@contextmanager
def _open_connection(self, ovsdb_identifier):
ovsdb_fd = None
@ -197,26 +217,62 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
def delete_network(self, context, ovsdb_identifier, logical_switch_uuid):
"""Handle RPC cast from plugin to delete a network."""
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.delete_logical_switch(logical_switch_uuid)
if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR:
self.ovsdb_fd.delete_logical_switch(logical_switch_uuid, False)
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response()
if self.ovsdb_fd.check_c_sock:
self.ovsdb_fd.delete_logical_switch(logical_switch_uuid,
False)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.delete_logical_switch(logical_switch_uuid)
def add_vif_to_gateway(self, context, ovsdb_identifier,
logical_switch_dict, locator_dict,
mac_dict):
"""Handle RPC cast from plugin to insert neutron port MACs."""
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.insert_ucast_macs_remote(logical_switch_dict,
locator_dict,
mac_dict)
if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR:
self.ovsdb_fd.insert_ucast_macs_remote(logical_switch_dict,
locator_dict,
mac_dict, False)
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response()
if self.ovsdb_fd.check_c_sock:
self.ovsdb_fd.insert_ucast_macs_remote(
logical_switch_dict,
locator_dict,
mac_dict)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.insert_ucast_macs_remote(logical_switch_dict,
locator_dict,
mac_dict)
def delete_vif_from_gateway(self, context, ovsdb_identifier,
logical_switch_uuid, mac):
"""Handle RPC cast from plugin to delete neutron port MACs."""
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.delete_ucast_macs_remote(logical_switch_uuid, mac)
if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR:
self.ovsdb_fd.delete_ucast_macs_remote(logical_switch_uuid, mac,
False)
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response()
if self.ovsdb_fd.check_c_sock:
self.ovsdb_fd.delete_ucast_macs_remote(
logical_switch_uuid,
mac)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.delete_ucast_macs_remote(logical_switch_uuid, mac)
def update_vif_to_gateway(self, context, ovsdb_identifier,
locator_dict, mac_dict):
@ -224,10 +280,21 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
for VM migration.
"""
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.update_ucast_macs_remote(locator_dict,
mac_dict)
if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR:
self.ovsdb_fd.update_ucast_macs_remote(locator_dict,
mac_dict, False)
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response()
if self.ovsdb_fd.check_c_sock:
self.ovsdb_fd.update_ucast_macs_remote(locator_dict,
mac_dict)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.update_ucast_macs_remote(locator_dict,
mac_dict)
def update_connection_to_gateway(self, context, ovsdb_identifier,
logical_switch_dict, locator_dicts,
@ -237,12 +304,28 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
Handle RPC cast from plugin to connect/disconnect a network
to/from an L2 gateway.
"""
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.update_connection_to_gateway(logical_switch_dict,
locator_dicts,
mac_dicts,
port_dicts)
if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR:
self.ovsdb_fd.update_connection_to_gateway(logical_switch_dict,
locator_dicts,
mac_dicts,
port_dicts, False)
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response()
if self.ovsdb_fd.check_c_sock:
self.ovsdb_fd.update_connection_to_gateway(
logical_switch_dict,
locator_dicts,
mac_dicts,
port_dicts)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.update_connection_to_gateway(logical_switch_dict,
locator_dicts,
mac_dicts,
port_dicts)
def agent_to_plugin_rpc(self, ovsdb_data):
self.plugin_rpc.update_ovsdb_changes(ctx.get_admin_context(),

@ -0,0 +1,21 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_monitor
from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_writer
class OVSDB_commom_class(ovsdb_monitor.OVSDBMonitor, ovsdb_writer.OVSDBWriter):
pass

@ -40,7 +40,15 @@ class OVSDBMonitor(base_connection.BaseConnection):
self._setup_dispatch_table()
self.read_on = True
self.handlers = {"echo": self._default_echo_handler}
eventlet.greenthread.spawn(self._rcv_thread)
if self.enable_manager:
self.check_c_sock = self.c_sock
self.check_monitor_thread = False
if not self.enable_manager:
eventlet.greenthread.spawn(self._rcv_thread)
def _spawn_monitor_thread(self):
eventlet.greenthread.spawn(self._sock_rcv_thread)
self.check_monitor_thread = True
def _initialize_data_dict(self):
data_dict = {'new_local_macs': [],
@ -112,6 +120,8 @@ class OVSDBMonitor(base_connection.BaseConnection):
response_result = self._process_response(op_id)
except exceptions.OVSDBError:
with excutils.save_and_reraise_exception():
if self.enable_manager:
self.check_monitor_thread = False
LOG.exception(_LE("Exception while receiving the "
"response for the monitor message"))
self._process_monitor_msg(response_result)
@ -188,6 +198,43 @@ class OVSDBMonitor(base_connection.BaseConnection):
LOG.exception(_LE("Exception [%s] while handling "
"message"), e)
def _sock_rcv_thread(self):
chunks = []
lc = rc = 0
prev_char = None
self._echo_response()
if self.enable_manager and self.check_c_sock:
eventlet.greenthread.spawn_n(self.set_monitor_response_handler)
while self.read_on:
response = self.c_sock.recv(n_const.BUFFER_SIZE)
eventlet.greenthread.sleep(0)
if response:
response = response.decode('utf8')
message_mark = 0
for i, c in enumerate(response):
if c == '{' and not (prev_char and
prev_char == '\\'):
lc += 1
elif c == '}' and not (prev_char and
prev_char == '\\'):
rc += 1
if rc > lc:
raise Exception(_LE("json string not valid"))
elif lc == rc and lc is not 0:
chunks.append(response[message_mark:i + 1])
message = "".join(chunks)
eventlet.greenthread.spawn_n(
self._on_remote_message, message)
eventlet.greenthread.sleep(0)
lc = rc = 0
message_mark = i + 1
chunks = []
prev_char = c
chunks.append(response[message_mark:])
else:
self.read_on = False
self.disconnect()
def _rcv_thread(self):
chunks = []
lc = rc = 0
@ -246,7 +293,8 @@ class OVSDBMonitor(base_connection.BaseConnection):
return [element.__dict__ for element in resource_list]
def _form_ovsdb_data(self, data_dict):
return {n_const.OVSDB_IDENTIFIER: self.gw_config.ovsdb_identifier,
return {n_const.OVSDB_IDENTIFIER: str(self.addr) if (
self.enable_manager) else (self.gw_config.ovsdb_identifier),
'new_logical_switches': self._get_list(
data_dict.get('new_logical_switches')),
'new_physical_switches': self._get_list(

@ -62,8 +62,14 @@ class OVSDBWriter(base_connection.BaseConnection):
try:
json_m = jsonutils.loads(response)
self.responses.append(json_m)
if self._process_response(operation_id):
return True
method_type = json_m.get('method', None)
if method_type == "echo" and self.enable_manager:
self.c_sock.send(jsonutils.dumps(
{"result": json_m.get("params", None),
"error": None, "id": json_m['id']}))
else:
if self._process_response(operation_id):
return True
except Exception as ex:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Exception while receiving the "
@ -74,12 +80,13 @@ class OVSDBWriter(base_connection.BaseConnection):
LOG.error(_LE("Could not obtain response from the OVSDB server "
"for the request"))
def _send_and_receive(self, query, operation_id):
def _send_and_receive(self, query, operation_id, rcv_required):
if not self.send(query):
return
self._get_reply(operation_id)
if rcv_required:
self._get_reply(operation_id)
def delete_logical_switch(self, logical_switch_uuid):
def delete_logical_switch(self, logical_switch_uuid, rcv_required=True):
"""Delete an entry from Logical_Switch OVSDB table."""
commit_dict = {"op": "commit", "durable": True}
op_id = str(random.getrandbits(128))
@ -92,10 +99,10 @@ class OVSDBWriter(base_connection.BaseConnection):
commit_dict],
"id": op_id}
LOG.debug("delete_logical_switch: query: %s", query)
self._send_and_receive(query, op_id)
self._send_and_receive(query, op_id, rcv_required)
def insert_ucast_macs_remote(self, l_switch_dict, locator_dict,
mac_dict):
mac_dict, rcv_required=True):
"""Insert an entry in Ucast_Macs_Remote OVSDB table."""
# To insert an entry in Ucast_Macs_Remote table, it requires
# corresponding entry in Physical_Locator (Compute node VTEP IP)
@ -139,9 +146,10 @@ class OVSDBWriter(base_connection.BaseConnection):
"params": params,
"id": op_id}
LOG.debug("insert_ucast_macs_remote: query: %s", query)
self._send_and_receive(query, op_id)
self._send_and_receive(query, op_id, rcv_required)
def update_ucast_macs_remote(self, locator_dict, mac_dict):
def update_ucast_macs_remote(self, locator_dict, mac_dict,
rcv_required=True):
"""Update an entry in Ucast_Macs_Remote OVSDB table."""
# It is possible that the locator may not exist already.
locator = ovsdb_schema.PhysicalLocator(locator_dict['uuid'],
@ -173,9 +181,10 @@ class OVSDBWriter(base_connection.BaseConnection):
"params": params,
"id": op_id}
LOG.debug("update_ucast_macs_remote: query: %s", query)
self._send_and_receive(query, op_id)
self._send_and_receive(query, op_id, rcv_required)
def delete_ucast_macs_remote(self, logical_switch_uuid, macs):
def delete_ucast_macs_remote(self, logical_switch_uuid, macs,
rcv_required=True):
"""Delete entries from Ucast_Macs_Remote OVSDB table."""
commit_dict = {"op": "commit", "durable": True}
op_id = str(random.getrandbits(128))
@ -196,11 +205,11 @@ class OVSDBWriter(base_connection.BaseConnection):
"params": params,
"id": op_id}
LOG.debug("delete_ucast_macs_remote: query: %s", query)
self._send_and_receive(query, op_id)
self._send_and_receive(query, op_id, rcv_required)
def update_connection_to_gateway(self, logical_switch_dict,
locator_dicts, mac_dicts,
port_dicts):
port_dicts, rcv_required=True):
"""Updates Physical Port's VNI to VLAN binding."""
# Form the JSON Query so as to update the physical port with the
# vni-vlan (logical switch uuid to vlan) binding
@ -213,7 +222,7 @@ class OVSDBWriter(base_connection.BaseConnection):
"params": update_dicts,
"id": op_id}
LOG.debug("update_connection_to_gateway: query = %s", query)
self._send_and_receive(query, op_id)
self._send_and_receive(query, op_id, rcv_required)
def _recv_data(self):
chunks = []
@ -221,7 +230,10 @@ class OVSDBWriter(base_connection.BaseConnection):
prev_char = None
while True:
try:
response = self.socket.recv(n_const.BUFFER_SIZE)
if self.enable_manager:
response = self.c_sock.recv(n_const.BUFFER_SIZE)
else:
response = self.socket.recv(n_const.BUFFER_SIZE)
if response:
response = response.decode('utf8')
for i, c in enumerate(response):

@ -36,6 +36,9 @@ OVSDB_OPTS = [
cfg.IntOpt('periodic_interval',
default=20,
help=_('Seconds between periodic task runs')),
cfg.BoolOpt('enable_manager',
default=False,
help=_('Set to True if ovsdb Manager manages the client')),
cfg.IntOpt('max_connection_retries',
default=10,
help=_('Maximum number of retries to open a socket '

@ -13,12 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import contextlib
import socket
import ssl
import time
import mock
from oslo_serialization import jsonutils
from neutron.tests import base
@ -167,3 +170,55 @@ class TestBaseConnection(base.BaseTestCase):
self.l2gw_ovsdb.disconnect()
self.assertTrue(sock_close.called)
self.assertFalse(self.l2gw_ovsdb.connected)
class TestBaseConnection_with_enable_manager(base.BaseTestCase):
def setUp(self):
super(TestBaseConnection_with_enable_manager, self).setUp()
self.conf = mock.patch.object(conf, 'L2GatewayConfig').start()
config.register_ovsdb_opts_helper(cfg.CONF)
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_ovsdb_conn = base_connection.BaseConnection(mock.Mock(),
self.conf)
self.l2gw_ovsdb_conn.c_sock = mock.patch('socket.socket').start()
self.l2gw_ovsdb_conn.s = mock.patch('socket.socket').start()
def test_init_with_enable_manager(self):
with mock.patch.object(eventlet.greenthread,
'spawn') as (mock_thread):
self.l2gw_ovsdb_conn.__init__(mock.Mock(), self.conf)
self.assertEqual(self.l2gw_ovsdb_conn.s, mock.ANY)
self.assertIsNone(self.l2gw_ovsdb_conn.c_sock)
self.assertIsNone(self.l2gw_ovsdb_conn.addr)
self.assertIsNone(self.l2gw_ovsdb_conn.check_c_sock)
self.assertTrue(mock_thread.called)
def test_echo_response(self):
fake_resp = {"method": "echo",
"params": "fake_params",
"id": "fake_id",
}
with contextlib.nested(
mock.patch.object(eventlet.greenthread, 'sleep'),
mock.patch.object(jsonutils, 'loads', return_value=fake_resp),
mock.patch.object(self.l2gw_ovsdb_conn.c_sock,
'recv',
return_value=fake_resp),
mock.patch.object(self.l2gw_ovsdb_conn.c_sock,
'send')) as (
fake_thread, mock_loads,
mock_sock_rcv,
mock_sock_send):
self.l2gw_ovsdb_conn._echo_response()
self.assertTrue(fake_thread.called)
self.assertTrue(mock_sock_rcv.called)
mock_loads.assert_called_with(fake_resp)
self.assertTrue(self.l2gw_ovsdb_conn.check_c_sock)
self.assertTrue(mock_sock_send.called)
def test_disconnect_with_enable_manager(self):
with mock.patch.object(self.l2gw_ovsdb_conn.c_sock,
'close') as mock_close:
self.l2gw_ovsdb_conn.disconnect()
self.assertTrue(mock_close.called)

@ -32,6 +32,7 @@ from networking_l2gw.services.l2gateway.agent import agent_api
from networking_l2gw.services.l2gateway.agent import base_agent_manager
from networking_l2gw.services.l2gateway.agent import l2gateway_config
from networking_l2gw.services.l2gateway.agent.ovsdb import manager
from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_common_class
from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_monitor
from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_writer
from networking_l2gw.services.l2gateway.common import config
@ -221,3 +222,227 @@ class TestManager(base.BaseTestCase):
mock.Mock())
self.assertEqual(1, logger_call.call_count)
self.assertFalse(mock_del_ls.called)
def test_init_with_enable_manager(self):
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.assertIsNone(self.l2gw_agent_manager.ovsdb_fd)
def test_sock_open_connection(self):
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
with mock.patch.object(ovsdb_common_class,
'OVSDB_commom_class') as mock_ovsdb_common:
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager._sock_open_connection()
self.assertTrue(mock_ovsdb_common.called)
def test_set_monitor_agent_with_ovsdb_fd_None(self):
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.conf.host = 'fake_host'
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR
self.l2gw_agent_manager.ovsdb_fd = None
with mock.patch.object(manager.OVSDBManager,
'_sock_open_connection') as mock_open_conn:
self.l2gw_agent_manager.set_monitor_agent(self.context,
'fake_host')
self.assertTrue(mock_open_conn.called)
def test_set_monitor_agent_with_ovsdb_fd_not_None(self):
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.conf.host = 'fake_host'
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR
with mock.patch.object(ovsdb_common_class,
'OVSDB_commom_class') as mock_ovsdb_common:
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_monitor_thread = False
self.l2gw_agent_manager.set_monitor_agent(self.context,
'fake_host')
(self.l2gw_agent_manager.ovsdb_fd._spawn_monitor_thread.
assert_called_with())
def test_update_connection_to_gateway_for_monitor_agent(self):
"""Test case to test update_connection_to_gateway for
monitor agent with enable_manager.
"""
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR
with mock.patch.object(ovsdb_common_class,
'OVSDB_commom_class') as mock_ovsdb_common:
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.update_connection_to_gateway(
self.context, mock.Mock(), mock.Mock(), mock.Mock(),
mock.Mock(), mock.Mock())
(self.l2gw_agent_manager.ovsdb_fd.update_connection_to_gateway.
assert_called_with(mock.ANY, mock.ANY, mock.ANY, mock.ANY, False))
def test_update_connection_to_gateway_for_transact_agent(self):
"""Test case to test update_connection_to_gateway
with enable_manager.
"""
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = ''
with contextlib.nested(
mock.patch.object(ovsdb_common_class, 'OVSDB_commom_class'),
mock.patch.object(manager.OVSDBManager,
'_sock_open_connection')) as (
mock_ovsdb_common, mock_open_conn):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True
self.l2gw_agent_manager.update_connection_to_gateway(
self.context, mock.Mock(), mock.Mock(), mock.Mock(),
mock.Mock(), mock.Mock())
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with())
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.update_connection_to_gateway.
assert_called_with(mock.ANY, mock.ANY, mock.ANY, mock.ANY))
def test_delete_network_for_monitor_agent(self):
"""Test case to test delete_network with enable_manager."""
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR
with mock.patch.object(ovsdb_common_class,
'OVSDB_commom_class') as mock_ovsdb_common:
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.delete_network(
self.context, mock.Mock(), "fake_logical_switch_uuid")
(self.l2gw_agent_manager.ovsdb_fd.delete_logical_switch.
assert_called_with("fake_logical_switch_uuid", False))
def test_delete_network_for_transact_agent(self):
"""Test case to test delete_network with enable_manager."""
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = ''
with contextlib.nested(
mock.patch.object(ovsdb_common_class, 'OVSDB_commom_class'),
mock.patch.object(manager.OVSDBManager,
'_sock_open_connection')) as (
mock_ovsdb_common, mock_open_conn):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True
self.l2gw_agent_manager.delete_network(
self.context, mock.Mock(), "fake_logical_switch_uuid")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with())
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.delete_logical_switch.
assert_called_with("fake_logical_switch_uuid", False))
def test_add_vif_to_gateway_for_monitor_agent(self):
"""Test case to test add_vif_to_gateway with enable_manager."""
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR
with mock.patch.object(ovsdb_common_class,
'OVSDB_commom_class') as mock_ovsdb_common:
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.add_vif_to_gateway(
self.context, mock.Mock(), "fake_logical_switch_dict",
"fake_locator_dict", "fake_mac_dict")
(self.l2gw_agent_manager.ovsdb_fd.insert_ucast_macs_remote.
assert_called_with("fake_logical_switch_dict",
"fake_locator_dict", "fake_mac_dict", False))
def test_add_vif_to_gateway_for_transact_agent(self):
"""Test case to test add_vif_to_gateway with enable_manager."""
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = ''
with contextlib.nested(
mock.patch.object(ovsdb_common_class, 'OVSDB_commom_class'),
mock.patch.object(manager.OVSDBManager,
'_sock_open_connection')) as (
mock_ovsdb_common, mock_open_conn):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True
self.l2gw_agent_manager.add_vif_to_gateway(
self.context, mock.Mock(), "fake_logical_switch_dict",
"fake_locator_dict", "fake_mac_dict")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with())
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.insert_ucast_macs_remote.
assert_called_with("fake_logical_switch_dict",
"fake_locator_dict", "fake_mac_dict"))
def test_delete_vif_from_gateway_for_monitor_agent(self):
"""Test case to test delete_vif_to_gateway with enable_manager."""
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR
with mock.patch.object(ovsdb_common_class,
'OVSDB_commom_class') as mock_ovsdb_common:
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.delete_vif_from_gateway(
self.context, mock.Mock(), "fake_logical_switch_uuid",
"fake_mac")
(self.l2gw_agent_manager.ovsdb_fd.delete_ucast_macs_remote.
assert_called_with("fake_logical_switch_uuid", "fake_mac", False))
def test_delete_vif_to_gateway_for_transact_agent(self):
"""Test case to test delete_vif_to_gateway with enable_manager."""
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = ''
with contextlib.nested(
mock.patch.object(ovsdb_common_class, 'OVSDB_commom_class'),
mock.patch.object(manager.OVSDBManager,
'_sock_open_connection')) as (
mock_ovsdb_common, mock_open_conn):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True
self.l2gw_agent_manager.delete_vif_from_gateway(
self.context, mock.Mock(), "fake_logical_switch_uuid",
"fake_mac")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with())
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.delete_ucast_macs_remote.
assert_called_with("fake_logical_switch_uuid", "fake_mac"))
def test_update_vif_from_gateway_for_monitor_agent(self):
"""Test case to test update_vif_to_gateway with enable_manager."""
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR
with mock.patch.object(ovsdb_common_class,
'OVSDB_commom_class') as mock_ovsdb_common:
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.update_vif_to_gateway(
self.context, mock.Mock(),
"fake_logical_switch_uuid", "fake_mac")
(self.l2gw_agent_manager.ovsdb_fd.update_ucast_macs_remote.
assert_called_with(
"fake_logical_switch_uuid", "fake_mac", False))
def test_update_vif_to_gateway_for_transact_agent(self):
"""Test case to test update_vif_to_gateway
with enable_manager.
"""
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = ''
with contextlib.nested(
mock.patch.object(ovsdb_common_class, 'OVSDB_commom_class'),
mock.patch.object(manager.OVSDBManager,
'_sock_open_connection')) as (
mock_ovsdb_common, mock_open_conn):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True
self.l2gw_agent_manager.update_vif_to_gateway(
self.context, mock.Mock(), "fake_logical_switch_uuid",
"fake_mac")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with())
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.update_ucast_macs_remote.
assert_called_with("fake_logical_switch_uuid", "fake_mac"))

@ -23,6 +23,7 @@ import mock
from neutron.tests import base
from networking_l2gw.services.l2gateway.agent import l2gateway_config as conf
from networking_l2gw.services.l2gateway.agent.ovsdb import base_connection
from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_monitor
from networking_l2gw.services.l2gateway.common import config
from networking_l2gw.services.l2gateway.common import constants as n_const
@ -707,3 +708,89 @@ class TestOVSDBMonitor(base.BaseTestCase):
PhysLocatorSet = phys_loc_set.return_value
self.assertIn(PhysLocatorSet,
data_dict['deleted_locator_sets'])
class SocketClass(object):
def __init__(self,
connect_error=None,
send_error=None,
recv_error=None,
rcv_data=None,
sock=None,
ip_addr=None):
self.connect_error = connect_error
self.rcv_data = rcv_data
self.send_error = send_error
self.recv_error = recv_error
self.sock = sock
self.ip_addr = ip_addr
def connect(self, ip_port):
if self.connect_error:
raise self.connect_error
def send(self, data):
if self.send_error:
raise self.send_error
return len(data)
def recv(self, length):
if self.recv_error:
raise self.recv_error
return self.rcv_data
def bind(self, host_port):
pass
def listen(self, conn):
pass
def accept(self):
return self.sock, self.ip_addr
def setsockopt(self, sock_opt, sock_reuse, mode):
pass
class TestOVSDBMonitor_with_enable_manager(base.BaseTestCase):
def setUp(self):
super(TestOVSDBMonitor_with_enable_manager, self).setUp()
config.register_ovsdb_opts_helper(cfg.CONF)
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.conf = mock.Mock()
self.callback = mock.Mock()
self.l2gw_ovsdb = ovsdb_monitor.OVSDBMonitor(mock.Mock(),
self.conf,
self.callback)
fakesocket = SocketClass()
self.l2gw_ovsdb.c_sock = fakesocket
def test_init_with_enable_manager(self):
self.l2gw_ovsdb.__init__(mock.Mock(), self.conf,
self.callback)
self.assertTrue(self.l2gw_ovsdb.enable_manager)
self.assertFalse(self.l2gw_ovsdb.check_monitor_thread)
self.assertIsNone(self.l2gw_ovsdb.check_c_sock)
def test_sock_rcv_thread_none(self):
with contextlib.nested(
mock.patch.object(base_connection.BaseConnection,
'_echo_response'),
mock.patch.object(eventlet.greenthread, 'spawn_n'),
mock.patch.object(eventlet.greenthread, 'sleep'),
mock.patch.object(self.l2gw_ovsdb.c_sock,
'recv', return_value=None),
mock.patch.object(base_connection.BaseConnection,
'disconnect')) as (
mock_resp, green_thrd_spawn, green_thrd_sleep,
mock_rcv, mock_disconnect):
self.l2gw_ovsdb.check_c_sock = True
self.l2gw_ovsdb.read_on = True
self.l2gw_ovsdb._sock_rcv_thread()
self.assertTrue(mock_resp.called)
self.assertTrue(green_thrd_spawn.called)
self.assertTrue(green_thrd_sleep.called)
self.assertTrue(mock_rcv.called)
self.assertTrue(mock_disconnect.called)
self.assertFalse(self.l2gw_ovsdb.connected)
self.assertFalse(self.l2gw_ovsdb.read_on)

@ -135,10 +135,22 @@ class TestOVSDBWriter(base.BaseTestCase):
with mock.patch.object(ovsdb_writer.OVSDBWriter,
'_get_reply') as mock_reply:
self.l2gw_ovsdb._send_and_receive('some_query',
self.op_id)
self.op_id, True)
mock_send.assert_called_with('some_query')
mock_reply.assert_called_with(self.op_id)
def test_send_and_receive_with_rcv_required_false(self):
"""Test case to test _send_and_receive."""
with mock.patch.object(base_connection.BaseConnection,
'send', return_value=True
) as mock_send:
with mock.patch.object(ovsdb_writer.OVSDBWriter,
'_get_reply') as mock_reply:
self.l2gw_ovsdb._send_and_receive('some_query',
self.op_id, False)
mock_send.assert_called_with('some_query')
mock_reply.assert_not_called()
def test_delete_logical_switch(self):
"""Test case to test delete_logical_switch."""
commit_dict = {"op": "commit", "durable": True}
@ -163,7 +175,7 @@ class TestOVSDBWriter(base.BaseTestCase):
self.l2gw_ovsdb.delete_logical_switch(
'fake_logical_switch_uuid')
get_rand.assert_called_with(128)
send_n_receive.assert_called_with(query, self.op_id)
send_n_receive.assert_called_with(query, self.op_id, True)
def test_insert_ucast_macs_remote(self):
"""Test case to test insert ucast_macs_remote."""
@ -194,7 +206,7 @@ class TestOVSDBWriter(base.BaseTestCase):
mock.MagicMock())
get_rand.assert_called_with(128)
send_n_receive.assert_called_with(mock.ANY,
self.op_id)
self.op_id, True)
self.assertTrue(get_ucast_mac_remote.called)
@ -267,7 +279,7 @@ class TestOVSDBWriter(base.BaseTestCase):
mock.MagicMock())
get_rand.assert_called_with(128)
send_n_receive.assert_called_with(mock.ANY,
self.op_id)
self.op_id, True)
self.assertTrue(get_update_ucast_mac_remote.called)
@ -324,7 +336,7 @@ class TestOVSDBWriter(base.BaseTestCase):
mock.MagicMock())
get_rand.assert_called_with(128)
send_n_receive.assert_called_with(mock.ANY,
self.op_id)
self.op_id, True)
def test_update_connection_to_gateway(self):
"""Test case to test update_connection_to_gateway."""
@ -348,7 +360,7 @@ class TestOVSDBWriter(base.BaseTestCase):
mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
get_rand.assert_called_with(128)
send_n_receive.assert_called_with(mock.ANY,
self.op_id)
self.op_id, True)
self.assertTrue(get_bindings.called)
def test_get_bindings_to_update1(self):

Loading…
Cancel
Save