Merge "Monitor new ovsdb server without L2gw agent restart"

changes/16/292816/8
Jenkins 7 years ago committed by Gerrit Code Review
commit 0bfb973d37

@ -38,17 +38,19 @@ class BaseConnection(object):
Connects to an ovsdb server with/without SSL
on a given host and TCP port.
"""
def __init__(self, conf, gw_config):
def __init__(self, conf, gw_config, mgr=None):
self.responses = []
self.connected = False
self.mgr = mgr
self.enable_manager = cfg.CONF.ovsdb.enable_manager
if self.enable_manager:
self.s = None
self.check_c_sock = None
self.check_sock_rcv = False
eventlet.greenthread.spawn(self._rcv_socket)
self.ovsdb_dicts = {}
self.ovsdb_fd_states = {}
self.ovsdb_conn_list = []
eventlet.greenthread.spawn(self._rcv_socket)
else:
self.gw_config = gw_config
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -100,20 +102,47 @@ class BaseConnection(object):
c_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
LOG.debug("Got connection from %s ", addr)
self.connected = True
if addr in self.ovsdb_fd_states.keys():
del self.ovsdb_fd_states[addr]
if addr in self.ovsdb_conn_list:
self.ovsdb_conn_list.remove(addr)
if addr in self.ovsdb_dicts.keys():
self.ovsdb_dicts.get(addr).close()
del self.ovsdb_dicts[addr]
self.ovsdb_dicts[addr] = c_sock
eventlet.greenthread.spawn(self._common_sock_rcv_thread, addr)
# Now that OVSDB server has sent a socket open request, let us wait
# for echo request. After the first echo request, we will send the
# "monitor" request to the OVSDB server.
def _send_monitor_msg_to_ovsdb_connection(self, addr):
if self.mgr.l2gw_agent_type == n_const.MONITOR:
try:
if (self.mgr.ovsdb_fd) and (addr in self.ovsdb_conn_list):
eventlet.greenthread.spawn_n(
self.mgr.ovsdb_fd._spawn_monitor_table_thread,
addr)
except Exception:
LOG.warning(_LW("Could not send monitor message to the "
"OVSDB server."))
self.disconnect(addr)
def _common_sock_rcv_thread(self, addr):
chunks = []
lc = rc = 0
prev_char = None
self.read_on = True
check_monitor_msg = True
self._echo_response(addr)
if self.enable_manager and self.check_c_sock:
if self.enable_manager and (addr in self.ovsdb_conn_list):
while self.read_on:
response = self.ovsdb_dicts.get(addr).recv(n_const.BUFFER_SIZE)
self.ovsdb_fd_states[addr] = 'connected'
eventlet.greenthread.sleep(0)
self.check_sock_rcv = True
eventlet.greenthread.sleep(0)
if check_monitor_msg:
self._send_monitor_msg_to_ovsdb_connection(addr)
check_monitor_msg = False
if response:
response = response.decode('utf8')
message_mark = 0
@ -140,7 +169,6 @@ class BaseConnection(object):
else:
self.read_on = False
self.disconnect(addr)
self.ovsdb_fd_states[addr] = 'disconnected'
def _echo_response(self, addr):
while True:
@ -156,6 +184,8 @@ class BaseConnection(object):
self.ovsdb_dicts.get(addr).send(jsonutils.dumps(
{"result": sock_json_m.get("params", None),
"error": None, "id": sock_json_m['id']}))
if (addr not in self.ovsdb_conn_list):
self.ovsdb_conn_list.append(addr)
break
except Exception:
continue
@ -190,6 +220,9 @@ class BaseConnection(object):
if self.enable_manager:
self.ovsdb_dicts.get(addr).close()
del self.ovsdb_dicts[addr]
if addr in self.ovsdb_fd_states.keys():
del self.ovsdb_fd_states[addr]
self.ovsdb_conn_list.remove(addr)
else:
self.socket.close()
self.connected = False

@ -182,8 +182,9 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
self.ovsdb_fd.check_monitor_table_thread) and (
self.ovsdb_fd.check_sock_rcv)):
for key in self.ovsdb_fd.ovsdb_dicts.keys():
eventlet.greenthread.spawn_n(
self.ovsdb_fd._spawn_monitor_table_thread, key)
if key in self.ovsdb_fd.ovsdb_conn_list:
eventlet.greenthread.spawn_n(
self.ovsdb_fd._spawn_monitor_table_thread, key)
self._start_looping_task_ovsdb_states()
elif ((self.enable_manager) and not (
self.l2gw_agent_type == n_const.MONITOR)):
@ -219,7 +220,7 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
self.ovsdb_fd = ovsdb_common_class.OVSDB_commom_class(
self.conf.ovsdb,
gateway,
self.agent_to_plugin_rpc)
self.agent_to_plugin_rpc, self)
@contextmanager
def _open_connection(self, ovsdb_identifier):
@ -248,8 +249,7 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response(ovsdb_identifier)
if self.ovsdb_fd.check_c_sock:
if ovsdb_identifier in self.ovsdb_fd.ovsdb_conn_list:
self.ovsdb_fd.delete_logical_switch(logical_switch_uuid,
ovsdb_identifier,
False)
@ -271,12 +271,11 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response(ovsdb_identifier)
if self.ovsdb_fd.check_c_sock:
if ovsdb_identifier in self.ovsdb_fd.ovsdb_conn_list:
self.ovsdb_fd.insert_ucast_macs_remote(
logical_switch_dict,
locator_dict,
mac_dict, ovsdb_identifier)
mac_dict, ovsdb_identifier, False)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
@ -295,11 +294,10 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response(ovsdb_identifier)
if self.ovsdb_fd.check_c_sock:
if ovsdb_identifier in self.ovsdb_fd.ovsdb_conn_list:
self.ovsdb_fd.delete_ucast_macs_remote(
logical_switch_uuid,
mac, ovsdb_identifier)
mac, ovsdb_identifier, False)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
@ -319,11 +317,10 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response(ovsdb_identifier)
if self.ovsdb_fd.check_c_sock:
if ovsdb_identifier in self.ovsdb_fd.ovsdb_conn_list:
self.ovsdb_fd.update_ucast_macs_remote(locator_dict,
mac_dict,
ovsdb_identifier)
ovsdb_identifier, False)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
@ -348,13 +345,12 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response(ovsdb_identifier)
if self.ovsdb_fd.check_c_sock:
if ovsdb_identifier in self.ovsdb_fd.ovsdb_conn_list:
self.ovsdb_fd.update_connection_to_gateway(
logical_switch_dict,
locator_dicts,
mac_dicts,
port_dicts, ovsdb_identifier)
port_dicts, ovsdb_identifier, False)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:

@ -31,8 +31,9 @@ LOG = logging.getLogger(__name__)
class OVSDBMonitor(base_connection.BaseConnection):
"""Monitors OVSDB servers."""
def __init__(self, conf, gw_config, callback):
super(OVSDBMonitor, self).__init__(conf, gw_config)
def __init__(self, conf, gw_config, callback, mgr=None):
super(OVSDBMonitor, self).__init__(conf, gw_config, mgr=None)
self.mgr = mgr
self.rpc_callback = callback
self.callbacks = {}
self._setup_dispatch_table()

@ -31,8 +31,9 @@ LOG = logging.getLogger(__name__)
class OVSDBWriter(base_connection.BaseConnection):
"""Performs transactions to OVSDB server tables."""
def __init__(self, conf, gw_config):
super(OVSDBWriter, self).__init__(conf, gw_config)
def __init__(self, conf, gw_config, mgr=None):
super(OVSDBWriter, self).__init__(conf, gw_config, mgr=None)
self.mgr = mgr
def _process_response(self, op_id):
result = self._response(op_id)

@ -27,7 +27,9 @@ from neutron.tests import base
from networking_l2gw.services.l2gateway.agent import l2gateway_config as conf
from networking_l2gw.services.l2gateway.agent.ovsdb import base_connection
from networking_l2gw.services.l2gateway.agent.ovsdb import manager
from networking_l2gw.services.l2gateway.common import config
from networking_l2gw.services.l2gateway.common import constants as n_const
from oslo_config import cfg
from oslo_log import log as logging
@ -182,12 +184,14 @@ class TestBaseConnection_with_enable_manager(base.BaseTestCase):
self.conf = mock.patch.object(conf, 'L2GatewayConfig').start()
config.register_ovsdb_opts_helper(cfg.CONF)
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_ovsdb_conn = base_connection.BaseConnection(mock.Mock(),
self.conf)
self.mgr = mock.patch.object(manager, 'OVSDBManager').start()
self.l2gw_ovsdb_conn = base_connection.BaseConnection(
mock.Mock(), self.conf, self.mgr)
self.mock_sock = mock.patch('socket.socket').start()
self.l2gw_ovsdb_conn.s = mock.patch('socket.socket').start()
self.fakesocket = SocketClass()
self.fake_ip = 'fake_ip'
self.l2gw_ovsdb_conn.ovsdb_conn_list = ['fake_ip']
self.l2gw_ovsdb_conn.ovsdb_dicts = {'fake_ip': self.fakesocket}
def test_init_with_enable_manager(self):
@ -202,6 +206,34 @@ class TestBaseConnection_with_enable_manager(base.BaseTestCase):
self.assertEqual(self.l2gw_ovsdb_conn.ovsdb_fd_states, fake_dict)
self.assertTrue(mock_thread.called)
def test_send_monitor_msg_to_ovsdb_connection(self):
fake_ip = 'fake_ip'
self.l2gw_ovsdb_conn.ovsdb_fd_states = {fake_ip: 'fake_status'}
self.l2gw_ovsdb_conn.mgr.l2gw_agent_type = n_const.MONITOR
self.l2gw_ovsdb_conn.ovsdb_conn_list = [fake_ip]
with mock.patch.object(eventlet.greenthread, 'spawn_n') as (
mock_thread):
self.l2gw_ovsdb_conn._send_monitor_msg_to_ovsdb_connection(
fake_ip)
self.assertTrue(mock_thread.called)
def test_exception_for_send_monitor_msg_to_ovsdb_connection(self):
fake_ip = 'fake_ip'
self.l2gw_ovsdb_conn.ovsdb_fd_states = {fake_ip: 'fake_status'}
self.l2gw_ovsdb_conn.mgr.l2gw_agent_type = n_const.MONITOR
self.l2gw_ovsdb_conn.ovsdb_conn_list = [fake_ip]
with contextlib.nested(
mock.patch.object(eventlet.greenthread, 'spawn_n',
side_effect=Exception),
mock.patch.object(base_connection.LOG, 'warning'),
mock.patch.object(self.l2gw_ovsdb_conn, 'disconnect')) as (
mock_thread, mock_warning, mock_disconnect):
self.l2gw_ovsdb_conn._send_monitor_msg_to_ovsdb_connection(
fake_ip)
self.assertTrue(mock_thread.called)
self.assertTrue(mock_warning.called)
mock_disconnect.assert_called_with(fake_ip)
def test_echo_response(self):
fake_resp = {"method": "echo",
"params": "fake_params",
@ -248,7 +280,12 @@ class TestBaseConnection_with_enable_manager(base.BaseTestCase):
self.assertFalse(self.l2gw_ovsdb_conn.read_on)
def test_disconnect_with_enable_manager(self):
fake_ip = 'fake_ip'
self.l2gw_ovsdb_conn.ovsdb_fd_states = {fake_ip: 'fake_status'}
self.l2gw_ovsdb_conn.ovsdb_conn_list = [fake_ip]
with mock.patch.object(self.fakesocket,
'close') as (mock_close):
self.l2gw_ovsdb_conn.disconnect(self.fake_ip)
self.l2gw_ovsdb_conn.disconnect(fake_ip)
self.assertTrue(mock_close.called)
self.assertNotIn(fake_ip, self.l2gw_ovsdb_conn.ovsdb_fd_states)
self.assertNotIn(fake_ip, self.l2gw_ovsdb_conn.ovsdb_conn_list)

@ -270,6 +270,7 @@ class TestManager(base.BaseTestCase):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_monitor_table_thread = False
self.l2gw_agent_manager.ovsdb_fd.check_sock_rcv = True
self.l2gw_agent_manager.ovsdb_fd.ovsdb_conn_list = ["fake_ip"]
self.l2gw_agent_manager.ovsdb_fd.ovsdb_dicts = {
"fake_ip": "fake_sock"}
self.l2gw_agent_manager.set_monitor_agent(self.context,
@ -309,16 +310,14 @@ class TestManager(base.BaseTestCase):
'_sock_open_connection')) as (
mock_ovsdb_common, mock_open_conn):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True
self.l2gw_agent_manager.ovsdb_fd.ovsdb_conn_list = ['fake_ip']
self.l2gw_agent_manager.update_connection_to_gateway(
self.context, mock.Mock(), mock.Mock(), mock.Mock(),
self.context, 'fake_ip', mock.Mock(), mock.Mock(),
mock.Mock(), mock.Mock())
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with(mock.ANY))
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.update_connection_to_gateway.
assert_called_with(mock.ANY, mock.ANY, mock.ANY, mock.ANY,
mock.ANY))
'fake_ip', False))
def test_delete_network_for_monitor_agent(self):
"""Test case to test delete_network with enable_manager."""
@ -344,14 +343,12 @@ class TestManager(base.BaseTestCase):
'_sock_open_connection')) as (
mock_ovsdb_common, mock_open_conn):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True
self.l2gw_agent_manager.ovsdb_fd.ovsdb_conn_list = ['fake_ip']
self.l2gw_agent_manager.delete_network(
self.context, mock.Mock(), "fake_logical_switch_uuid")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with(mock.ANY))
self.context, 'fake_ip', "fake_logical_switch_uuid")
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.delete_logical_switch.
assert_called_with("fake_logical_switch_uuid", mock.ANY, False))
assert_called_with("fake_logical_switch_uuid", 'fake_ip', False))
def test_add_vif_to_gateway_for_monitor_agent(self):
"""Test case to test add_vif_to_gateway with enable_manager."""
@ -381,16 +378,15 @@ class TestManager(base.BaseTestCase):
mock_ovsdb_common, mock_open_conn):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True
self.l2gw_agent_manager.ovsdb_fd.ovsdb_conn_list = ['fake_ip']
self.l2gw_agent_manager.add_vif_to_gateway(
self.context, mock.Mock(), "fake_logical_switch_dict",
self.context, 'fake_ip', "fake_logical_switch_dict",
"fake_locator_dict", "fake_mac_dict")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with(mock.ANY))
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.insert_ucast_macs_remote.
assert_called_with("fake_logical_switch_dict",
"fake_locator_dict", "fake_mac_dict",
mock.ANY))
'fake_ip', False))
def test_delete_vif_from_gateway_for_monitor_agent(self):
"""Test case to test delete_vif_to_gateway with enable_manager."""
@ -418,16 +414,14 @@ class TestManager(base.BaseTestCase):
'_sock_open_connection')) as (
mock_ovsdb_common, mock_open_conn):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True
self.l2gw_agent_manager.ovsdb_fd.ovsdb_conn_list = ['fake_ip']
self.l2gw_agent_manager.delete_vif_from_gateway(
self.context, mock.Mock(), "fake_logical_switch_uuid",
self.context, 'fake_ip', "fake_logical_switch_uuid",
"fake_mac")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with(mock.ANY))
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.delete_ucast_macs_remote.
assert_called_with("fake_logical_switch_uuid", "fake_mac",
mock.ANY))
'fake_ip', False))
def test_update_vif_from_gateway_for_monitor_agent(self):
"""Test case to test update_vif_to_gateway with enable_manager."""
@ -458,13 +452,11 @@ class TestManager(base.BaseTestCase):
'_sock_open_connection')) as (
mock_ovsdb_common, mock_open_conn):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_c_sock = True
self.l2gw_agent_manager.ovsdb_fd.ovsdb_conn_list = ['fake_ip']
self.l2gw_agent_manager.update_vif_to_gateway(
self.context, mock.Mock(), "fake_logical_switch_uuid",
self.context, 'fake_ip', "fake_logical_switch_uuid",
"fake_mac")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with(mock.ANY))
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.update_ucast_macs_remote.
assert_called_with("fake_logical_switch_uuid", "fake_mac",
mock.ANY))
'fake_ip', False))

Loading…
Cancel
Save