Merge "handle connection from multiple ovsdb servers"

stable/ocata
Jenkins 7 years ago committed by Gerrit Code Review
commit 89427e3a6f

@ -46,10 +46,11 @@ class BaseConnection(object):
self.enable_manager = cfg.CONF.ovsdb.enable_manager
if self.enable_manager:
self.s = None
self.c_sock = None
self.addr = None
self.check_c_sock = None
self.check_sock_rcv = False
eventlet.greenthread.spawn(self._rcv_socket)
self.ovsdb_dicts = {}
self.ovsdb_fd_states = {}
else:
self.gw_config = gw_config
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -96,30 +97,72 @@ class BaseConnection(object):
self.s.listen(5) # Now wait for client connection.
while True:
# Establish connection with client.
self.c_sock, ip_addr = self.s.accept()
self.addr = ip_addr[0]
self.c_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
LOG.debug("Got connection from %s ", self.addr)
c_sock, ip_addr = self.s.accept()
addr = ip_addr[0]
c_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
LOG.debug("Got connection from %s ", addr)
self.connected = True
self.ovsdb_dicts[addr] = c_sock
eventlet.greenthread.spawn(self._common_sock_rcv_thread, addr)
def _echo_response(self):
def _common_sock_rcv_thread(self, addr):
chunks = []
lc = rc = 0
prev_char = None
self._echo_response(addr)
if self.enable_manager and self.check_c_sock:
while self.read_on:
response = self.ovsdb_dicts.get(addr).recv(n_const.BUFFER_SIZE)
self.ovsdb_fd_states[addr] = 'connected'
eventlet.greenthread.sleep(0)
self.check_sock_rcv = True
if response:
response = response.decode('utf8')
message_mark = 0
for i, c in enumerate(response):
if c == '{' and not (prev_char and
prev_char == '\\'):
lc += 1
elif c == '}' and not (prev_char and
prev_char == '\\'):
rc += 1
if rc > lc:
raise Exception(_LE("json string not valid"))
elif lc == rc and lc is not 0:
chunks.append(response[message_mark:i + 1])
message = "".join(chunks)
eventlet.greenthread.spawn_n(
self._on_remote_message, message, addr)
eventlet.greenthread.sleep(0)
lc = rc = 0
message_mark = i + 1
chunks = []
prev_char = c
chunks.append(response[message_mark:])
else:
self.read_on = False
self.disconnect(addr)
self.ovsdb_fd_states[addr] = 'disconnected'
def _echo_response(self, addr):
while True:
try:
if self.enable_manager:
eventlet.greenthread.sleep(0)
response = self.c_sock.recv(n_const.BUFFER_SIZE)
response = self.ovsdb_dicts.get(addr).recv(
n_const.BUFFER_SIZE)
sock_json_m = jsonutils.loads(response)
sock_handler_method = sock_json_m.get('method', None)
if sock_handler_method == 'echo':
self.check_c_sock = True
self.c_sock.send(jsonutils.dumps(
self.ovsdb_dicts.get(addr).send(jsonutils.dumps(
{"result": sock_json_m.get("params", None),
"error": None, "id": sock_json_m['id']}))
break
except Exception:
continue
def send(self, message, callback=None):
def send(self, message, callback=None, addr=None):
"""Sends a message to the OVSDB server."""
if callback:
self.callbacks[message['id']] = callback
@ -128,7 +171,8 @@ class BaseConnection(object):
while retry_count <= n_const.MAX_RETRIES:
try:
if self.enable_manager:
bytes_sent = self.c_sock.send(jsonutils.dumps(message))
bytes_sent = self.ovsdb_dicts.get(addr).send(
jsonutils.dumps(message))
else:
bytes_sent = self.socket.send(jsonutils.dumps(message))
if bytes_sent:
@ -140,13 +184,14 @@ class BaseConnection(object):
LOG.warning(_LW("Could not send message to the "
"OVSDB server."))
self.disconnect()
self.disconnect(addr)
return False
def disconnect(self):
def disconnect(self, addr=None):
"""disconnects the connection from the OVSDB server."""
if self.enable_manager:
self.c_sock.close()
self.ovsdb_dicts.get(addr).close()
del self.ovsdb_dicts[addr]
else:
self.socket.close()
self.connected = False

@ -45,6 +45,9 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
self.enable_manager = cfg.CONF.ovsdb.enable_manager
if self.enable_manager:
self.ovsdb_fd = None
self._sock_open_connection()
self.looping_task_ovsdb_states = (
loopingcall.FixedIntervalLoopingCall(self._send_ovsdb_states))
else:
self.looping_task = loopingcall.FixedIntervalLoopingCall(
self._connect_to_ovsdb_server)
@ -148,6 +151,10 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
self._stop_looping_task()
self._disconnect_all_ovsdb_servers()
def _send_ovsdb_states(self):
self.plugin_rpc.notify_ovsdb_states(ctx.get_admin_context(),
self.ovsdb_fd.ovsdb_fd_states)
def _disconnect_all_ovsdb_servers(self):
if self.gateways:
for key, gateway in self.gateways.items():
@ -171,14 +178,32 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
elif self.enable_manager and self.l2gw_agent_type == n_const.MONITOR:
if self.ovsdb_fd is None:
self._sock_open_connection()
elif self.ovsdb_fd and not self.ovsdb_fd.check_monitor_thread:
self.ovsdb_fd._spawn_monitor_thread()
elif not self.enable_manager:
elif ((self.ovsdb_fd) and not (
self.ovsdb_fd.check_monitor_table_thread) and (
self.ovsdb_fd.check_sock_rcv)):
for key in self.ovsdb_fd.ovsdb_dicts.keys():
eventlet.greenthread.spawn_n(
self.ovsdb_fd._spawn_monitor_table_thread, key)
self._start_looping_task_ovsdb_states()
elif ((self.enable_manager) and not (
self.l2gw_agent_type == n_const.MONITOR)):
self._stop_looping_task_ovsdb_states()
elif (not (self.l2gw_agent_type == n_const.MONITOR) and not (
self.enable_manager)):
# Otherwise, stop monitoring the OVSDB servers
# and close the open connections if any.
self._stop_looping_task()
self._disconnect_all_ovsdb_servers()
def _stop_looping_task_ovsdb_states(self):
if self.looping_task_ovsdb_states._running:
self.looping_task_ovsdb_states.stop()
def _start_looping_task_ovsdb_states(self):
if not self.looping_task_ovsdb_states._running:
self.looping_task_ovsdb_states.start(
interval=self.conf.ovsdb.periodic_interval)
def _stop_looping_task(self):
if self.looping_task._running:
self.looping_task.stop()
@ -218,13 +243,15 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
def delete_network(self, context, ovsdb_identifier, logical_switch_uuid):
"""Handle RPC cast from plugin to delete a network."""
if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR:
self.ovsdb_fd.delete_logical_switch(logical_switch_uuid, False)
self.ovsdb_fd.delete_logical_switch(logical_switch_uuid,
ovsdb_identifier, False)
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response()
self.ovsdb_fd._echo_response(ovsdb_identifier)
if self.ovsdb_fd.check_c_sock:
self.ovsdb_fd.delete_logical_switch(logical_switch_uuid,
ovsdb_identifier,
False)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
@ -238,16 +265,17 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR:
self.ovsdb_fd.insert_ucast_macs_remote(logical_switch_dict,
locator_dict,
mac_dict, False)
mac_dict, ovsdb_identifier,
False)
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response()
self.ovsdb_fd._echo_response(ovsdb_identifier)
if self.ovsdb_fd.check_c_sock:
self.ovsdb_fd.insert_ucast_macs_remote(
logical_switch_dict,
locator_dict,
mac_dict)
mac_dict, ovsdb_identifier)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
@ -260,15 +288,16 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
"""Handle RPC cast from plugin to delete neutron port MACs."""
if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR:
self.ovsdb_fd.delete_ucast_macs_remote(logical_switch_uuid, mac,
ovsdb_identifier,
False)
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response()
self.ovsdb_fd._echo_response(ovsdb_identifier)
if self.ovsdb_fd.check_c_sock:
self.ovsdb_fd.delete_ucast_macs_remote(
logical_switch_uuid,
mac)
mac, ovsdb_identifier)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
@ -282,14 +311,16 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
"""
if self.enable_manager and self.l2gw_agent_type == n_const.MONITOR:
self.ovsdb_fd.update_ucast_macs_remote(locator_dict,
mac_dict, False)
mac_dict, ovsdb_identifier,
False)
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response()
self.ovsdb_fd._echo_response(ovsdb_identifier)
if self.ovsdb_fd.check_c_sock:
self.ovsdb_fd.update_ucast_macs_remote(locator_dict,
mac_dict)
mac_dict,
ovsdb_identifier)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
@ -308,17 +339,19 @@ class OVSDBManager(base_agent_manager.BaseAgentManager):
self.ovsdb_fd.update_connection_to_gateway(logical_switch_dict,
locator_dicts,
mac_dicts,
port_dicts, False)
port_dicts,
ovsdb_identifier,
False)
elif ((self.enable_manager) and (
not self.l2gw_agent_type == n_const.MONITOR)):
self._sock_open_connection()
self.ovsdb_fd._echo_response()
self.ovsdb_fd._echo_response(ovsdb_identifier)
if self.ovsdb_fd.check_c_sock:
self.ovsdb_fd.update_connection_to_gateway(
logical_switch_dict,
locator_dicts,
mac_dicts,
port_dicts)
port_dicts, ovsdb_identifier)
elif not self.enable_manager:
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:

@ -41,14 +41,13 @@ class OVSDBMonitor(base_connection.BaseConnection):
self.read_on = True
self.handlers = {"echo": self._default_echo_handler}
if self.enable_manager:
self.check_c_sock = self.c_sock
self.check_monitor_thread = False
self.check_monitor_table_thread = False
if not self.enable_manager:
eventlet.greenthread.spawn(self._rcv_thread)
def _spawn_monitor_thread(self):
eventlet.greenthread.spawn(self._sock_rcv_thread)
self.check_monitor_thread = True
def _spawn_monitor_table_thread(self, addr):
self.set_monitor_response_handler(addr)
self.check_monitor_table_thread = True
def _initialize_data_dict(self):
data_dict = {'new_local_macs': [],
@ -91,7 +90,7 @@ class OVSDBMonitor(base_connection.BaseConnection):
self._process_physical_locator_set
}
def set_monitor_response_handler(self):
def set_monitor_response_handler(self, addr=None):
"""Monitor OVSDB tables to receive events for any changes in OVSDB."""
if self.connected:
op_id = str(random.getrandbits(128))
@ -113,7 +112,7 @@ class OVSDBMonitor(base_connection.BaseConnection):
'Physical_Locator_Set': [props]}
]}
self._set_handler("update", self._update_event_handler)
if not self.send(monitor_message):
if not self.send(monitor_message, addr=addr):
# Return so that this will retried in the next iteration
return
try:
@ -121,15 +120,15 @@ class OVSDBMonitor(base_connection.BaseConnection):
except exceptions.OVSDBError:
with excutils.save_and_reraise_exception():
if self.enable_manager:
self.check_monitor_thread = False
self.check_monitor_table_thread = False
LOG.exception(_LE("Exception while receiving the "
"response for the monitor message"))
self._process_monitor_msg(response_result)
self._process_monitor_msg(response_result, addr)
def _update_event_handler(self, message):
self._process_update_event(message)
def _update_event_handler(self, message, addr):
self._process_update_event(message, addr)
def _process_update_event(self, message):
def _process_update_event(self, message, addr):
"""Process update event that is triggered by the OVSDB server."""
LOG.debug("_process_update_event: message = %s ", str(message))
data_dict = self._initialize_data_dict()
@ -137,7 +136,7 @@ class OVSDBMonitor(base_connection.BaseConnection):
params_list = message.get('params')
param_dict = params_list[1]
self._process_tables(param_dict, data_dict)
self.rpc_callback(self._form_ovsdb_data(data_dict))
self.rpc_callback(self._form_ovsdb_data(data_dict, addr))
def _process_tables(self, param_dict, data_dict):
# Process all the tables one by one.
@ -177,64 +176,27 @@ class OVSDBMonitor(base_connection.BaseConnection):
)
return result
def _default_echo_handler(self, message):
def _default_echo_handler(self, message, addr):
"""Message handler for the OVSDB server's echo request."""
self.send({"result": message.get("params", None),
"error": None, "id": message['id']})
"error": None, "id": message['id']}, addr=addr)
def _set_handler(self, method_name, handler):
self.handlers[method_name] = handler
def _on_remote_message(self, message):
def _on_remote_message(self, message, addr=None):
"""Processes the message received on the socket."""
try:
json_m = jsonutils.loads(message)
handler_method = json_m.get('method', None)
if handler_method:
self.handlers.get(handler_method)(json_m)
self.handlers.get(handler_method)(json_m, addr)
else:
self.responses.append(json_m)
except Exception as e:
LOG.exception(_LE("Exception [%s] while handling "
"message"), e)
def _sock_rcv_thread(self):
chunks = []
lc = rc = 0
prev_char = None
self._echo_response()
if self.enable_manager and self.check_c_sock:
eventlet.greenthread.spawn_n(self.set_monitor_response_handler)
while self.read_on:
response = self.c_sock.recv(n_const.BUFFER_SIZE)
eventlet.greenthread.sleep(0)
if response:
response = response.decode('utf8')
message_mark = 0
for i, c in enumerate(response):
if c == '{' and not (prev_char and
prev_char == '\\'):
lc += 1
elif c == '}' and not (prev_char and
prev_char == '\\'):
rc += 1
if rc > lc:
raise Exception(_LE("json string not valid"))
elif lc == rc and lc is not 0:
chunks.append(response[message_mark:i + 1])
message = "".join(chunks)
eventlet.greenthread.spawn_n(
self._on_remote_message, message)
eventlet.greenthread.sleep(0)
lc = rc = 0
message_mark = i + 1
chunks = []
prev_char = c
chunks.append(response[message_mark:])
else:
self.read_on = False
self.disconnect()
def _rcv_thread(self):
chunks = []
lc = rc = 0
@ -274,26 +236,26 @@ class OVSDBMonitor(base_connection.BaseConnection):
LOG.exception(_LE("Exception [%s] occurred while receiving"
"message from the OVSDB server"), ex)
def disconnect(self):
def disconnect(self, addr=None):
"""disconnects the connection from the OVSDB server."""
self.read_on = False
super(OVSDBMonitor, self).disconnect()
super(OVSDBMonitor, self).disconnect(addr)
def _process_monitor_msg(self, message):
def _process_monitor_msg(self, message, addr=None):
"""Process initial set of records in the OVSDB at startup."""
result_dict = message.get('result')
data_dict = self._initialize_data_dict()
try:
self._process_tables(result_dict, data_dict)
self.rpc_callback(self._form_ovsdb_data(data_dict))
self.rpc_callback(self._form_ovsdb_data(data_dict, addr))
except Exception as e:
LOG.exception(_LE("_process_monitor_msg:ERROR %s "), e)
def _get_list(self, resource_list):
return [element.__dict__ for element in resource_list]
def _form_ovsdb_data(self, data_dict):
return {n_const.OVSDB_IDENTIFIER: str(self.addr) if (
def _form_ovsdb_data(self, data_dict, addr):
return {n_const.OVSDB_IDENTIFIER: str(addr) if (
self.enable_manager) else (self.gw_config.ovsdb_identifier),
'new_logical_switches': self._get_list(
data_dict.get('new_logical_switches')),

@ -53,10 +53,10 @@ class OVSDBWriter(base_connection.BaseConnection):
message="Error from the OVSDB server: %s" % error)
return result
def _get_reply(self, operation_id):
def _get_reply(self, operation_id, ovsdb_identifier):
count = 0
while count <= n_const.MAX_RETRIES:
response = self._recv_data()
response = self._recv_data(ovsdb_identifier)
LOG.debug("Response from OVSDB server = %s", str(response))
if response:
try:
@ -64,9 +64,10 @@ class OVSDBWriter(base_connection.BaseConnection):
self.responses.append(json_m)
method_type = json_m.get('method', None)
if method_type == "echo" and self.enable_manager:
self.c_sock.send(jsonutils.dumps(
{"result": json_m.get("params", None),
"error": None, "id": json_m['id']}))
self.ovsdb_dicts.get(ovsdb_identifier).send(
jsonutils.dumps(
{"result": json_m.get("params", None),
"error": None, "id": json_m['id']}))
else:
if self._process_response(operation_id):
return True
@ -80,13 +81,15 @@ class OVSDBWriter(base_connection.BaseConnection):
LOG.error(_LE("Could not obtain response from the OVSDB server "
"for the request"))
def _send_and_receive(self, query, operation_id, rcv_required):
if not self.send(query):
def _send_and_receive(self, query, operation_id, ovsdb_identifier,
rcv_required):
if not self.send(query, addr=ovsdb_identifier):
return
if rcv_required:
self._get_reply(operation_id)
self._get_reply(operation_id, ovsdb_identifier)
def delete_logical_switch(self, logical_switch_uuid, rcv_required=True):
def delete_logical_switch(self, logical_switch_uuid, ovsdb_identifier,
rcv_required=True):
"""Delete an entry from Logical_Switch OVSDB table."""
commit_dict = {"op": "commit", "durable": True}
op_id = str(random.getrandbits(128))
@ -99,10 +102,11 @@ class OVSDBWriter(base_connection.BaseConnection):
commit_dict],
"id": op_id}
LOG.debug("delete_logical_switch: query: %s", query)
self._send_and_receive(query, op_id, rcv_required)
self._send_and_receive(query, op_id, ovsdb_identifier, rcv_required)
def insert_ucast_macs_remote(self, l_switch_dict, locator_dict,
mac_dict, rcv_required=True):
mac_dict, ovsdb_identifier,
rcv_required=True):
"""Insert an entry in Ucast_Macs_Remote OVSDB table."""
# To insert an entry in Ucast_Macs_Remote table, it requires
# corresponding entry in Physical_Locator (Compute node VTEP IP)
@ -146,9 +150,10 @@ class OVSDBWriter(base_connection.BaseConnection):
"params": params,
"id": op_id}
LOG.debug("insert_ucast_macs_remote: query: %s", query)
self._send_and_receive(query, op_id, rcv_required)
self._send_and_receive(query, op_id, ovsdb_identifier, rcv_required)
def update_ucast_macs_remote(self, locator_dict, mac_dict,
ovsdb_identifier,
rcv_required=True):
"""Update an entry in Ucast_Macs_Remote OVSDB table."""
# It is possible that the locator may not exist already.
@ -181,9 +186,10 @@ class OVSDBWriter(base_connection.BaseConnection):
"params": params,
"id": op_id}
LOG.debug("update_ucast_macs_remote: query: %s", query)
self._send_and_receive(query, op_id, rcv_required)
self._send_and_receive(query, op_id, ovsdb_identifier, rcv_required)
def delete_ucast_macs_remote(self, logical_switch_uuid, macs,
ovsdb_identifier,
rcv_required=True):
"""Delete entries from Ucast_Macs_Remote OVSDB table."""
commit_dict = {"op": "commit", "durable": True}
@ -205,11 +211,12 @@ class OVSDBWriter(base_connection.BaseConnection):
"params": params,
"id": op_id}
LOG.debug("delete_ucast_macs_remote: query: %s", query)
self._send_and_receive(query, op_id, rcv_required)
self._send_and_receive(query, op_id, ovsdb_identifier, rcv_required)
def update_connection_to_gateway(self, logical_switch_dict,
locator_dicts, mac_dicts,
port_dicts, rcv_required=True):
port_dicts, ovsdb_identifier,
rcv_required=True):
"""Updates Physical Port's VNI to VLAN binding."""
# Form the JSON Query so as to update the physical port with the
# vni-vlan (logical switch uuid to vlan) binding
@ -222,16 +229,17 @@ class OVSDBWriter(base_connection.BaseConnection):
"params": update_dicts,
"id": op_id}
LOG.debug("update_connection_to_gateway: query = %s", query)
self._send_and_receive(query, op_id, rcv_required)
self._send_and_receive(query, op_id, ovsdb_identifier, rcv_required)
def _recv_data(self):
def _recv_data(self, ovsdb_identifier):
chunks = []
lc = rc = 0
prev_char = None
while True:
try:
if self.enable_manager:
response = self.c_sock.recv(n_const.BUFFER_SIZE)
response = self.ovsdb_dicts.get(ovsdb_identifier).recv(
n_const.BUFFER_SIZE)
else:
response = self.socket.recv(n_const.BUFFER_SIZE)
if response:

@ -75,6 +75,9 @@ class SocketClass(object):
raise self.recv_error
return self.rcv_data
def close(self):
pass
class TestBaseConnection(base.BaseTestCase):
def setUp(self):
@ -181,17 +184,22 @@ class TestBaseConnection_with_enable_manager(base.BaseTestCase):
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_ovsdb_conn = base_connection.BaseConnection(mock.Mock(),
self.conf)
self.l2gw_ovsdb_conn.c_sock = mock.patch('socket.socket').start()
self.mock_sock = mock.patch('socket.socket').start()
self.l2gw_ovsdb_conn.s = mock.patch('socket.socket').start()
self.fakesocket = SocketClass()
self.fake_ip = 'fake_ip'
self.l2gw_ovsdb_conn.ovsdb_dicts = {'fake_ip': self.fakesocket}
def test_init_with_enable_manager(self):
fake_dict = {}
with mock.patch.object(eventlet.greenthread,
'spawn') as (mock_thread):
self.l2gw_ovsdb_conn.__init__(mock.Mock(), self.conf)
self.assertEqual(self.l2gw_ovsdb_conn.s, mock.ANY)
self.assertIsNone(self.l2gw_ovsdb_conn.c_sock)
self.assertIsNone(self.l2gw_ovsdb_conn.addr)
self.assertFalse(self.l2gw_ovsdb_conn.check_sock_rcv)
self.assertIsNone(self.l2gw_ovsdb_conn.check_c_sock)
self.assertEqual(self.l2gw_ovsdb_conn.ovsdb_dicts, fake_dict)
self.assertEqual(self.l2gw_ovsdb_conn.ovsdb_fd_states, fake_dict)
self.assertTrue(mock_thread.called)
def test_echo_response(self):
@ -202,23 +210,45 @@ class TestBaseConnection_with_enable_manager(base.BaseTestCase):
with contextlib.nested(
mock.patch.object(eventlet.greenthread, 'sleep'),
mock.patch.object(jsonutils, 'loads', return_value=fake_resp),
mock.patch.object(self.l2gw_ovsdb_conn.c_sock,
mock.patch.object(self.fakesocket,
'recv',
return_value=fake_resp),
mock.patch.object(self.l2gw_ovsdb_conn.c_sock,
'send')) as (
mock.patch.object(self.fakesocket,
'send')
) as (
fake_thread, mock_loads,
mock_sock_rcv,
mock_sock_send):
self.l2gw_ovsdb_conn._echo_response()
self.l2gw_ovsdb_conn._echo_response(self.fake_ip)
self.assertTrue(fake_thread.called)
self.assertTrue(mock_sock_rcv.called)
mock_loads.assert_called_with(fake_resp)
self.assertTrue(self.l2gw_ovsdb_conn.check_c_sock)
self.assertTrue(mock_sock_send.called)
def test_common_sock_rcv_thread_none(self):
with contextlib.nested(
mock.patch.object(base_connection.BaseConnection,
'_echo_response'),
mock.patch.object(eventlet.greenthread, 'sleep'),
mock.patch.object(self.fakesocket,
'recv', return_value=None),
mock.patch.object(base_connection.BaseConnection,
'disconnect')) as (
mock_resp, green_thrd_sleep,
mock_rcv, mock_disconnect):
self.l2gw_ovsdb_conn.check_c_sock = True
self.l2gw_ovsdb_conn.read_on = True
self.l2gw_ovsdb_conn._common_sock_rcv_thread(self.fake_ip)
self.assertTrue(mock_resp.called)
self.assertTrue(green_thrd_sleep.called)
self.assertTrue(mock_rcv.called)
self.assertTrue(mock_disconnect.called)
self.assertFalse(self.l2gw_ovsdb_conn.connected)
self.assertFalse(self.l2gw_ovsdb_conn.read_on)
def test_disconnect_with_enable_manager(self):
with mock.patch.object(self.l2gw_ovsdb_conn.c_sock,
'close') as mock_close:
self.l2gw_ovsdb_conn.disconnect()
with mock.patch.object(self.fakesocket,
'close') as (mock_close):
self.l2gw_ovsdb_conn.disconnect(self.fake_ip)
self.assertTrue(mock_close.called)

@ -225,8 +225,14 @@ class TestManager(base.BaseTestCase):
def test_init_with_enable_manager(self):
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
self.l2gw_agent_manager.__init__()
self.assertIsNone(self.l2gw_agent_manager.ovsdb_fd)
with contextlib.nested(
mock.patch.object(manager.OVSDBManager,
'_sock_open_connection'),
mock.patch.object(loopingcall, 'FixedIntervalLoopingCall')) as (
mock_sock_open_conn, mock_loop):
self.l2gw_agent_manager.__init__()
self.assertTrue(mock_sock_open_conn.called)
self.assertTrue(mock_loop.called)
def test_sock_open_connection(self):
cfg.CONF.set_override('enable_manager', True, 'ovsdb')
@ -253,14 +259,23 @@ class TestManager(base.BaseTestCase):
self.l2gw_agent_manager.conf.host = 'fake_host'
self.l2gw_agent_manager.__init__()
self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR
with mock.patch.object(ovsdb_common_class,
'OVSDB_commom_class') as mock_ovsdb_common:
with contextlib.nested(
mock.patch.object(ovsdb_common_class,
'OVSDB_commom_class'),
mock.patch.object(eventlet.greenthread,
'spawn_n'),
mock.patch.object(self.l2gw_agent_manager,
'_start_looping_task_ovsdb_states')) as (
mock_ovsdb_common, mock_thread, mock_looping):
self.l2gw_agent_manager.ovsdb_fd = mock_ovsdb_common.return_value
self.l2gw_agent_manager.ovsdb_fd.check_monitor_thread = False
self.l2gw_agent_manager.ovsdb_fd.check_monitor_table_thread = False
self.l2gw_agent_manager.ovsdb_fd.check_sock_rcv = True
self.l2gw_agent_manager.ovsdb_fd.ovsdb_dicts = {
"fake_ip": "fake_sock"}
self.l2gw_agent_manager.set_monitor_agent(self.context,
'fake_host')
(self.l2gw_agent_manager.ovsdb_fd._spawn_monitor_thread.
assert_called_with())
self.assertTrue(mock_thread.called)
self.assertTrue(mock_looping.called)
def test_update_connection_to_gateway_for_monitor_agent(self):
"""Test case to test update_connection_to_gateway for
@ -277,7 +292,8 @@ class TestManager(base.BaseTestCase):
self.context, mock.Mock(), mock.Mock(), mock.Mock(),
mock.Mock(), mock.Mock())
(self.l2gw_agent_manager.ovsdb_fd.update_connection_to_gateway.
assert_called_with(mock.ANY, mock.ANY, mock.ANY, mock.ANY, False))
assert_called_with(mock.ANY, mock.ANY, mock.ANY, mock.ANY,
mock.ANY, False))
def test_update_connection_to_gateway_for_transact_agent(self):
"""Test case to test update_connection_to_gateway
@ -298,10 +314,11 @@ class TestManager(base.BaseTestCase):
self.context, mock.Mock(), mock.Mock(), mock.Mock(),
mock.Mock(), mock.Mock())
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with())
assert_called_with(mock.ANY))
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.update_connection_to_gateway.
assert_called_with(mock.ANY, mock.ANY, mock.ANY, mock.ANY))
assert_called_with(mock.ANY, mock.ANY, mock.ANY, mock.ANY,
mock.ANY))
def test_delete_network_for_monitor_agent(self):
"""Test case to test delete_network with enable_manager."""
@ -314,7 +331,7 @@ class TestManager(base.BaseTestCase):
self.l2gw_agent_manager.delete_network(
self.context, mock.Mock(), "fake_logical_switch_uuid")
(self.l2gw_agent_manager.ovsdb_fd.delete_logical_switch.
assert_called_with("fake_logical_switch_uuid", False))
assert_called_with("fake_logical_switch_uuid", mock.ANY, False))
def test_delete_network_for_transact_agent(self):
"""Test case to test delete_network with enable_manager."""
@ -331,10 +348,10 @@ class TestManager(base.BaseTestCase):
self.l2gw_agent_manager.delete_network(
self.context, mock.Mock(), "fake_logical_switch_uuid")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with())
assert_called_with(mock.ANY))
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.delete_logical_switch.
assert_called_with("fake_logical_switch_uuid", False))
assert_called_with("fake_logical_switch_uuid", mock.ANY, False))
def test_add_vif_to_gateway_for_monitor_agent(self):
"""Test case to test add_vif_to_gateway with enable_manager."""
@ -349,7 +366,8 @@ class TestManager(base.BaseTestCase):
"fake_locator_dict", "fake_mac_dict")
(self.l2gw_agent_manager.ovsdb_fd.insert_ucast_macs_remote.
assert_called_with("fake_logical_switch_dict",
"fake_locator_dict", "fake_mac_dict", False))
"fake_locator_dict", "fake_mac_dict",
mock.ANY, False))
def test_add_vif_to_gateway_for_transact_agent(self):
"""Test case to test add_vif_to_gateway with enable_manager."""
@ -367,11 +385,12 @@ class TestManager(base.BaseTestCase):
self.context, mock.Mock(), "fake_logical_switch_dict",
"fake_locator_dict", "fake_mac_dict")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with())
assert_called_with(mock.ANY))
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.insert_ucast_macs_remote.
assert_called_with("fake_logical_switch_dict",
"fake_locator_dict", "fake_mac_dict"))
"fake_locator_dict", "fake_mac_dict",
mock.ANY))
def test_delete_vif_from_gateway_for_monitor_agent(self):
"""Test case to test delete_vif_to_gateway with enable_manager."""
@ -385,7 +404,8 @@ class TestManager(base.BaseTestCase):
self.context, mock.Mock(), "fake_logical_switch_uuid",
"fake_mac")
(self.l2gw_agent_manager.ovsdb_fd.delete_ucast_macs_remote.
assert_called_with("fake_logical_switch_uuid", "fake_mac", False))
assert_called_with("fake_logical_switch_uuid", "fake_mac",
mock.ANY, False))
def test_delete_vif_to_gateway_for_transact_agent(self):
"""Test case to test delete_vif_to_gateway with enable_manager."""
@ -403,10 +423,11 @@ class TestManager(base.BaseTestCase):
self.context, mock.Mock(), "fake_logical_switch_uuid",
"fake_mac")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with())
assert_called_with(mock.ANY))
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.delete_ucast_macs_remote.
assert_called_with("fake_logical_switch_uuid", "fake_mac"))
assert_called_with("fake_logical_switch_uuid", "fake_mac",
mock.ANY))
def test_update_vif_from_gateway_for_monitor_agent(self):
"""Test case to test update_vif_to_gateway with enable_manager."""
@ -421,7 +442,7 @@ class TestManager(base.BaseTestCase):
"fake_logical_switch_uuid", "fake_mac")
(self.l2gw_agent_manager.ovsdb_fd.update_ucast_macs_remote.
assert_called_with(
"fake_logical_switch_uuid", "fake_mac", False))
"fake_logical_switch_uuid", "fake_mac", mock.ANY, False))
def test_update_vif_to_gateway_for_transact_agent(self):
"""Test case to test update_vif_to_gateway
@ -442,7 +463,8 @@ class TestManager(base.BaseTestCase):
self.context, mock.Mock(), "fake_logical_switch_uuid",
"fake_mac")
(self.l2gw_agent_manager.ovsdb_fd._echo_response.
assert_called_with())
assert_called_with(mock.ANY))
self.assertTrue(mock_open_conn.called)
(self.l2gw_agent_manager.ovsdb_fd.update_ucast_macs_remote.
assert_called_with("fake_logical_switch_uuid", "fake_mac"))
assert_called_with("fake_logical_switch_uuid", "fake_mac",
mock.ANY))

@ -23,7 +23,6 @@ import mock
from neutron.tests import base
from networking_l2gw.services.l2gateway.agent import l2gateway_config as conf
from networking_l2gw.services.l2gateway.agent.ovsdb import base_connection
from networking_l2gw.services.l2gateway.agent.ovsdb import ovsdb_monitor
from networking_l2gw.services.l2gateway.common import config
from networking_l2gw.services.l2gateway.common import constants as n_const
@ -169,8 +168,8 @@ class TestOVSDBMonitor(base.BaseTestCase):
with mock.patch.object(ovsdb_monitor.OVSDBMonitor,
'_process_update_event'
) as process_update_event:
self.l2gw_ovsdb._update_event_handler(self.msg)
process_update_event.assert_called_once_with(self.msg)
self.l2gw_ovsdb._update_event_handler(self.msg, mock.ANY)
process_update_event.assert_called_once_with(self.msg, mock.ANY)
def test_process_update_event(self):
"""Test case to test _process_update_event."""
@ -200,7 +199,7 @@ class TestOVSDBMonitor(base.BaseTestCase):
proc_mcast_mac_local,
proc_phys_loc_set):
self.l2gw_ovsdb._setup_dispatch_table()
self.l2gw_ovsdb._process_update_event(self.msg2)
self.l2gw_ovsdb._process_update_event(self.msg2, mock.ANY)
self.assertTrue(proc_phy_port.called)
self.assertTrue(proc_phy_switch.called)
self.assertTrue(proc_logic_switch.called)
@ -238,7 +237,7 @@ class TestOVSDBMonitor(base.BaseTestCase):
'id': 'fake_id'}
with mock.patch.object(ovsdb_monitor.OVSDBMonitor,
'send') as send:
self.l2gw_ovsdb._default_echo_handler(dummy_msg)
self.l2gw_ovsdb._default_echo_handler(dummy_msg, mock.ANY)
self.assertTrue(send.called)
def test_set_handler(self):
@ -321,7 +320,7 @@ class TestOVSDBMonitor(base.BaseTestCase):
'_get_list',
return_value=some_value
):
result = self.l2gw_ovsdb._form_ovsdb_data(mock.Mock())
result = self.l2gw_ovsdb._form_ovsdb_data(mock.Mock(), mock.ANY)
self.assertEqual(expect, result)
def test_process_physical_port(self):
@ -762,35 +761,9 @@ class TestOVSDBMonitor_with_enable_manager(base.BaseTestCase):
self.l2gw_ovsdb = ovsdb_monitor.OVSDBMonitor(mock.Mock(),
self.conf,
self.callback)
fakesocket = SocketClass()
self.l2gw_ovsdb.c_sock = fakesocket
def test_init_with_enable_manager(self):
self.l2gw_ovsdb.__init__(mock.Mock(), self.conf,
self.callback)
self.assertTrue(self.l2gw_ovsdb.enable_manager)
self.assertFalse(self.l2gw_ovsdb.check_monitor_thread)
self.assertIsNone(self.l2gw_ovsdb.check_c_sock)
def test_sock_rcv_thread_none(self):
with contextlib.nested(
mock.patch.object(base_connection.BaseConnection,
'_echo_response'),
mock.patch.object(eventlet.greenthread, 'spawn_n'),
mock.patch.object(eventlet.greenthread, 'sleep'),
mock.patch.object(self.l2gw_ovsdb.c_sock,
'recv', return_value=None),
mock.patch.object(base_connection.BaseConnection,
'disconnect')) as (
mock_resp, green_thrd_spawn, green_thrd_sleep,
mock_rcv, mock_disconnect):
self.l2gw_ovsdb.check_c_sock = True
self.l2gw_ovsdb.read_on = True
self.l2gw_ovsdb._sock_rcv_thread()
self.assertTrue(mock_resp.called)
self.assertTrue(green_thrd_spawn.called)
self.assertTrue(green_thrd_sleep.called)
self.assertTrue(mock_rcv.called)
self.assertTrue(mock_disconnect.called)
self.assertFalse(self.l2gw_ovsdb.connected)
self.assertFalse(self.l2gw_ovsdb.read_on)
self.assertFalse(self.l2gw_ovsdb.check_monitor_table_thread)

@ -123,7 +123,7 @@ class TestOVSDBWriter(base.BaseTestCase):
mock.patch.object(ovsdb_writer.LOG,
'debug')
) as (recv_data, proc_response, debug):
self.l2gw_ovsdb._get_reply(self.op_id)
self.l2gw_ovsdb._get_reply(self.op_id, mock.ANY)
self.assertTrue(recv_data.called)
self.assertTrue(proc_response.called)
@ -135,9 +135,9 @@ class TestOVSDBWriter(base.BaseTestCase):
with mock.patch.object(ovsdb_writer.OVSDBWriter,
'_get_reply') as mock_reply:
self.l2gw_ovsdb._send_and_receive('some_query',
self.op_id, True)
mock_send.assert_called_with('some_query')
mock_reply.assert_called_with(self.op_id)
self.op_id, mock.ANY, True)
mock_send.assert_called_with('some_query', addr=mock.ANY)
mock_reply.assert_called_with(self.op_id, mock.ANY)
def test_send_and_receive_with_rcv_required_false(self):
"""Test case to test _send_and_receive."""
@ -147,8 +147,8 @@ class TestOVSDBWriter(base.BaseTestCase):
with mock.patch.object(ovsdb_writer.OVSDBWriter,
'_get_reply') as mock_reply:
self.l2gw_ovsdb._send_and_receive('some_query',
self.op_id, False)
mock_send.assert_called_with('some_query')
self.op_id, mock.ANY, False)
mock_send.assert_called_with('some_query', addr=mock.ANY)
mock_reply.assert_not_called()
def test_delete_logical_switch(self):
@ -173,9 +173,10 @@ class TestOVSDBWriter(base.BaseTestCase):
with mock.patch.object(ovsdb_writer.LOG,
'debug'):
self.l2gw_ovsdb.delete_logical_switch(
'fake_logical_switch_uuid')
'fake_logical_switch_uuid', mock.ANY)
get_rand.assert_called_with(128)
send_n_receive.assert_called_with(query, self.op_id, True)
send_n_receive.assert_called_with(query, self.op_id,
mock.ANY, True)
def test_insert_ucast_macs_remote(self):
"""Test case to test insert ucast_macs_remote."""
@ -203,10 +204,11 @@ class TestOVSDBWriter(base.BaseTestCase):
mock_ucmr):
self.l2gw_ovsdb.insert_ucast_macs_remote(mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock())
mock.MagicMock(),
mock.ANY)
get_rand.assert_called_with(128)
send_n_receive.assert_called_with(mock.ANY,
self.op_id, True)
self.op_id, mock.ANY, True)
self.assertTrue(get_ucast_mac_remote.called)
@ -248,7 +250,8 @@ class TestOVSDBWriter(base.BaseTestCase):
ls.name = 'ab-cd'
self.l2gw_ovsdb.insert_ucast_macs_remote(mock.MagicMock(),
mock.MagicMock(),
mock.MagicMock())
mock.MagicMock(),
mock.ANY)
self.assertTrue(get_ucast_mac_remote.called)
self.assertTrue(get_physical_locator_dict.called)
self.assertTrue(get_logical_switch_dict.called)
@ -276,10 +279,11 @@ class TestOVSDBWriter(base.BaseTestCase):
mock_pl,
mock_ucmr):
self.l2gw_ovsdb.update_ucast_macs_remote(mock.MagicMock(),
mock.MagicMock())
mock.MagicMock(),
mock.ANY)
get_rand.assert_called_with(128)
send_n_receive.assert_called_with(mock.ANY,
self.op_id, True)
self.op_id, mock.ANY, True)
self.assertTrue(get_update_ucast_mac_remote.called)
@ -313,7 +317,8 @@ class TestOVSDBWriter(base.BaseTestCase):
locator = mock_pl.return_value
locator.uuid = None
self.l2gw_ovsdb.update_ucast_macs_remote(mock.MagicMock(),
mock.MagicMock())
mock.MagicMock(),
mock.ANY)
self.assertTrue(get_update_ucast_mac_remote.called)
self.assertTrue(get_physical_locator_dict.called)
@ -333,10 +338,11 @@ class TestOVSDBWriter(base.BaseTestCase):
send_n_receive,
mock_log):
self.l2gw_ovsdb.delete_ucast_macs_remote(mock.Mock(),
mock.MagicMock())
mock.MagicMock(),
mock.ANY)
get_rand.assert_called_with(128)
send_n_receive.assert_called_with(mock.ANY,
self.op_id, True)
self.op_id, mock.ANY, True)
def test_update_connection_to_gateway(self):
"""Test case to test update_connection_to_gateway."""
@ -357,10 +363,11 @@ class TestOVSDBWriter(base.BaseTestCase):
send_n_receive,
mock_log):
self.l2gw_ovsdb.update_connection_to_gateway(
mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock(),
mock.ANY, mock.ANY)
get_rand.assert_called_with(128)
send_n_receive.assert_called_with(mock.ANY,
self.op_id, True)
self.op_id, mock.ANY, True)
self.assertTrue(get_bindings.called)
def test_get_bindings_to_update1(self):
@ -473,7 +480,7 @@ class TestOVSDBWriter(base.BaseTestCase):
ovsdb_conf = base_test.FakeConf()
l2gw_obj = ovsdb_writer.OVSDBWriter(
cfg.CONF.ovsdb, ovsdb_conf)
result = l2gw_obj._recv_data()
result = l2gw_obj._recv_data(mock.ANY)
self.assertEqual(jsonutils.dumps(fake_data), result)
def test_recv_data_with_empty_data(self):
@ -490,7 +497,7 @@ class TestOVSDBWriter(base.BaseTestCase):
ovsdb_conf = base_test.FakeConf()
l2gw_obj = ovsdb_writer.OVSDBWriter(
cfg.CONF.ovsdb, ovsdb_conf)
result = l2gw_obj._recv_data()
result = l2gw_obj._recv_data(mock.ANY)
self.assertEqual(None, result)
def test_recv_data_with_socket_error(self):
@ -507,7 +514,7 @@ class TestOVSDBWriter(base.BaseTestCase):
ovsdb_conf = base_test.FakeConf()
l2gw_obj = ovsdb_writer.OVSDBWriter(
cfg.CONF.ovsdb, ovsdb_conf)
result = l2gw_obj._recv_data()
result = l2gw_obj._recv_data(mock.ANY)
self.assertEqual(None, result)
fake_warn.assert_called_with(
_LW("Did not receive any reply from the OVSDB "

Loading…
Cancel
Save