L2 gateway agent implementation

The patch provides the following:
1. A base L2 gateway agent and manager that:
   a. periodically sends heart beats to the Neutron server
   b. processes basic RPC (setting the agent type) received from the plugin

2. A variant of L2 gateway agent manager that:
   a. manages connections to the configured OVSDB servers
   b. processes RPCs originated from the plugin destined to the OVSDB servers
   c. notifies to the plugin of any changes in the configured OVSDB servers'
      tables

Partially implements: blueprint l2-gateway-api

Co-Authored-By: vikas<vikas.d-m@hp.com>

Change-Id: I19798e2283a6814dcc9e0f72afe8e1327c78de6b
stable/ocata
Maruti 8 years ago
parent a627728b08
commit 614a5d50ee
  1. 7
      etc/l2gateway_agent.ini
  2. 97
      networking_l2gw/services/l2gateway/agent/base_agent_manager.py
  3. 4
      networking_l2gw/services/l2gateway/agent/l2gateway_config.py
  4. 0
      networking_l2gw/services/l2gateway/agent/ovsdb/__init__.py
  5. 340
      networking_l2gw/services/l2gateway/agent/ovsdb/connection.py
  6. 162
      networking_l2gw/services/l2gateway/agent/ovsdb/manager.py
  7. 6
      networking_l2gw/services/l2gateway/common/config.py
  8. 14
      networking_l2gw/services/l2gateway/common/constants.py
  9. 53
      networking_l2gw/services/l2gateway/l2gw_agent.py
  10. 0
      networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/__init__.py
  11. 188
      networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_connection.py
  12. 250
      networking_l2gw/tests/unit/services/l2gateway/agent/ovsdb/test_manager.py
  13. 129
      networking_l2gw/tests/unit/services/l2gateway/agent/test_base_agent_manager.py
  14. 58
      networking_l2gw/tests/unit/services/l2gateway/agent/test_l2gw_agent.py
  15. 7
      setup.cfg

@ -34,3 +34,10 @@
# The interval is number of seconds between attempts.
# periodic_interval =
# Example: periodic_interval = 20
# (IntOpt) The L2 gateway agent retries to connect to the OVSDB server
# if a socket does not get opened in the first attempt.
# the max_connection_retries is the maximum number of such attempts
# before giving up.
# max_connection_retries =
# Example: max_connection_retries = 10

@ -0,0 +1,97 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent import rpc as agent_rpc
from neutron import context
from neutron.i18n import _LE
from neutron.i18n import _LI
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
from oslo.config import cfg
from networking_l2gw.services.l2gateway.agent import agent_api
from networking_l2gw.services.l2gateway.common import constants as n_const
from networking_l2gw.services.l2gateway.common import topics
LOG = logging.getLogger(__name__)
VALID_L2GW_AGENT_TYPES = [n_const.MONITOR, n_const.TRANSACT,
'+'.join([n_const.MONITOR, n_const.TRANSACT])]
class BaseAgentManager(periodic_task.PeriodicTasks):
"""Basic agent manager that handles basic RPCs and report states."""
def __init__(self, conf=None):
super(BaseAgentManager, self).__init__()
self.conf = conf or cfg.CONF
self.l2gw_agent_type = n_const.TRANSACT
self.use_call = True
self.gateways = {}
self.context = context.get_admin_context_without_session()
self.plugin_rpc = agent_api.L2GatewayAgentApi(
topics.L2GATEWAY_PLUGIN,
self.context,
self.conf.host
)
self._get_agent_state()
self.admin_state_up = True
self._setup_state_rpc()
def _get_agent_state(self):
self.agent_state = {
'binary': 'neutron-l2gateway-agent',
'host': self.conf.host,
'topic': topics.L2GATEWAY_AGENT,
'configurations': {
'report_interval': self.conf.AGENT.report_interval,
n_const.L2GW_AGENT_TYPE: self.l2gw_agent_type,
},
'start_flag': True,
'agent_type': n_const.AGENT_TYPE_L2GATEWAY}
def _setup_state_rpc(self):
self.state_rpc = agent_rpc.PluginReportStateAPI(
topics.L2GATEWAY_PLUGIN)
report_interval = self.conf.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
def _report_state(self):
try:
self.state_rpc.report_state(self.context, self.agent_state,
self.use_call)
self.use_call = False
self.agent_state['start_flag'] = False
except Exception:
LOG.exception(_LE("Failed reporting state!"))
def agent_updated(self, context, payload):
LOG.info(_LI("agent_updated by server side %s!"), payload)
def set_l2gateway_agent_type(self, context, l2gw_agent_type):
"""Handle RPC call from plugin to update agent type.
RPC call from the plugin to accept that I am a monitoring
or a transact agent.
"""
if l2gw_agent_type not in VALID_L2GW_AGENT_TYPES:
return n_const.L2GW_INVALID_RPC_MSG_FORMAT
self.l2gw_agent_type = l2gw_agent_type
self.agent_state.get('configurations')[n_const.L2GW_AGENT_TYPE
] = self.l2gw_agent_type

@ -12,8 +12,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from networking_l2gw.services.l2gateway.common import constants as n_const
OVSDB_IDENTIFIER = 'ovsdb_identifier'
OVSDB_IP = 'ovsdb_ip'
OVSDB_PORT = 'ovsdb_port'
PRIVATE_KEY = 'private_key'
@ -31,7 +31,7 @@ class L2GatewayConfig(object):
self.certificate = ovsdb_config[CERTIFICATE]
self.ca_cert = ovsdb_config[CA_CERT]
self.ovsdb_identifier = ovsdb_config[OVSDB_IDENTIFIER]
self.ovsdb_identifier = ovsdb_config[n_const.OVSDB_IDENTIFIER]
self.ovsdb_ip = ovsdb_config[OVSDB_IP]
self.ovsdb_port = ovsdb_config[OVSDB_PORT]
self.ovsdb_fd = None

@ -0,0 +1,340 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from neutron.i18n import _LE
from neutron.i18n import _LW
from neutron.openstack.common import log as logging
import random
import socket
import ssl
import time
from oslo.serialization import jsonutils
from networking_l2gw.services.l2gateway.common import constants as n_const
LOG = logging.getLogger(__name__)
OVSDB_UNREACHABLE_MSG = _LW('Unable to reach OVSDB server %s')
OVSDB_CONNECTED_MSG = 'Connected to OVSDB server %s'
BUFFER_SIZE = 4096
class OVSDBConnection(object):
"""Connects to OVSDB server.
Connects to an ovsdb server with/without SSL
on a given host and TCP port.
"""
def __init__(self, conf, gw_config, is_monitor, plugin_rpc=None):
self.responses = []
self.connected = False
self._reset_variables()
self.gw_config = gw_config
self.plugin_rpc = plugin_rpc
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if gw_config.use_ssl:
ssl_sock = ssl.wrap_socket(
self.socket,
server_side=False,
keyfile=gw_config.private_key,
certfile=gw_config.certificate,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_TLSv1,
ca_certs=gw_config.ca_cert)
self.socket = ssl_sock
retryCount = 0
while True:
try:
self.socket.connect((str(gw_config.ovsdb_ip),
int(gw_config.ovsdb_port)))
break
except Exception as ex:
LOG.warning(OVSDB_UNREACHABLE_MSG, gw_config.ovsdb_ip)
if retryCount == conf.max_connection_retries:
# Retried for max_connection_retries times.
# Give up and return so that it can be tried in
# the next periodic interval.
raise ex
else:
time.sleep(1)
retryCount += 1
# Successfully connected to the socket
LOG.debug(OVSDB_CONNECTED_MSG, gw_config.ovsdb_ip)
self.connected = True
self.callbacks = {}
self._setup_dispatch_table()
if is_monitor:
self.read_on = True
eventlet.greenthread.spawn_n(self._rcv_thread)
def _reset_variables(self):
self.new_local_macs = []
self.deleted_local_macs = []
self.modified_local_macs = []
self.new_remote_macs = []
self.deleted_remote_macs = []
self.modified_remote_macs = []
self.new_physical_ports = []
self.deleted_physical_ports = []
self.modified_physical_ports = []
self.new_physical_switches = []
self.deleted_physical_switches = []
self.modified_physical_switches = []
self.new_physical_locators = []
self.deleted_physical_locators = []
self.modified_physical_locators = []
self.new_logical_switches = []
self.deleted_logical_switches = []
self.modified_logical_switches = []
self.new_mlocal_macs = []
self.modified_mlocal_macs = []
self.deleted_mlocal_macs = []
self.new_locator_sets = []
self.modified_locator_sets = []
self.deleted_locator_sets = []
def _setup_dispatch_table(self):
self.dispatch_table = {'Physical_Port': self._process_physical_port,
'Physical_Switch':
self._process_physical_switch,
'Logical_Switch': self._process_logical_switch,
'Ucast_Macs_Local':
self._process_ucast_macs_local,
'Physical_Locator':
self._process_physical_locator,
'Ucast_Macs_Remote':
self._process_ucast_macs_remote,
'Mcast_Macs_Local':
self._process_mcast_macs_local,
'Physical_Locator_Set':
self._process_physical_locator_set
}
def set_monitor_response_handler(self):
"""Monitor OVSDB tables to receive events for any changes in OVSDB."""
if self.connected:
op_id = str(random.getrandbits(128))
props = {'select': {'initial': True,
'insert': True,
'delete': True,
'modify': True}}
monitor_message = {'id': op_id,
'method': 'monitor',
'params': [n_const.OVSDB_SCHEMA_NAME,
None,
{'Logical_Switch': [props],
'Physical_Switch': [props],
'Physical_Port': [props],
'Ucast_Macs_Local': [props],
'Ucast_Macs_Remote': [props],
'Physical_Locator': [props],
'Mcast_Macs_Local': [props],
'Physical_Locator_Set': [props]}
]}
if not self.send(monitor_message):
# Return so that this will retried in the next iteration
return
def _update_event_handler(self, message):
pass
def _default_echo_handler(self, message):
"""Message handler for the OVSDB server's echo request."""
pass
def send(self, message, callback=None):
"""Sends a message to the OVSDB server."""
if callback:
self.callbacks[message['id']] = callback
try:
self.socket.send(jsonutils.dumps(message))
return True
except Exception as ex:
self.connected = False
LOG.exception(_LE("Exception [%s] occurred while sending message "
"to the OVSDB server"), ex)
return False
def _on_remote_message(self, message):
"""Processes the message received on the socket."""
pass
def _rcv_thread(self):
chunks = []
lc = rc = 0
prev_char = None
while self.read_on:
try:
response = self.socket.recv(BUFFER_SIZE)
if response:
response = response.decode('utf8')
message_mark = 0
for i, c in enumerate(response):
if c == '{' and not (prev_char and
prev_char == '\\'):
lc += 1
elif c == '}' and not (prev_char and
prev_char == '\\'):
rc += 1
if rc > lc:
raise Exception(_("json string not valid"))
elif lc == rc and lc is not 0:
chunks.append(response[message_mark:i + 1])
message = "".join(chunks)
self._on_remote_message(message)
lc = rc = 0
message_mark = i + 1
chunks = []
prev_char = c
chunks.append(response[message_mark:])
else:
self.connected = False
self.read_on = False
self.socket.close()
except Exception as ex:
self.connected = False
self.read_on = False
self.socket.close()
LOG.exception(_LE("Exception [%s] occurred while receiving"
"message from the OVSDB server"), ex)
def disconnect(self):
self.socket.close()
def _process_monitor_msg(self, message):
"""Process initial set of records in the OVSDB at startup."""
result_dict = message.get('result')
self._reset_variables()
port_map = {}
try:
for table_name in result_dict.keys():
table_dict = result_dict.get(table_name)
for uuid in table_dict.keys():
uuid_dict = table_dict.get(uuid)
self.dispatch_table.get(table_name)(uuid,
uuid_dict, port_map)
except Exception as e:
LOG.exception(_LE("_process_monitor_msg:ERROR %s "), e)
self.plugin_rpc.update_ovsdb_changes(self._form_ovsdb_data())
def _get_list(self, resource_list):
return [element.__dict__ for element in resource_list]
def _form_ovsdb_data(self):
return {n_const.OVSDB_IDENTIFIER: self.gw_config.ovsdb_identifier,
'new_logical_switches': self._get_list(
self.new_logical_switches),
'new_physical_switches': self._get_list(
self.new_physical_switches),
'new_physical_ports': self._get_list(
self.new_physical_ports),
'new_physical_locators': self._get_list(
self.new_physical_locators),
'new_local_macs': self._get_list(
self.new_local_macs),
'new_remote_macs': self._get_list(
self.new_remote_macs),
'new_mlocal_macs': self._get_list(
self.new_mlocal_macs),
'new_locator_sets': self._get_list(
self.new_locator_sets),
'deleted_logical_switches': self._get_list(
self.deleted_logical_switches),
'deleted_physical_switches': self._get_list(
self.deleted_physical_switches),
'deleted_physical_ports': self._get_list(
self.deleted_physical_ports),
'deleted_physical_locators': self._get_list(
self.deleted_physical_locators),
'deleted_local_macs': self._get_list(
self.deleted_local_macs),
'deleted_remote_macs': self._get_list(
self.deleted_remote_macs),
'deleted_mlocal_macs': self._get_list(
self.deleted_mlocal_macs),
'deleted_locator_sets': self._get_list(
self.deleted_locator_sets),
'modified_logical_switches': self._get_list(
self.modified_logical_switches),
'modified_physical_switches': self._get_list(
self.modified_physical_switches),
'modified_physical_ports': self._get_list(
self.modified_physical_ports),
'modified_physical_locators': self._get_list(
self.modified_physical_locators),
'modified_local_macs': self._get_list(
self.modified_local_macs),
'modified_remote_macs': self._get_list(
self.modified_remote_macs),
'modified_mlocal_macs': self._get_list(
self.modified_mlocal_macs),
'modified_locator_sets': self._get_list(
self.modified_locator_sets)}
def _process_physical_port(self, uuid, uuid_dict, port_map=None):
"""Processes Physical_Port record from the OVSDB event."""
pass
def _process_physical_switch(self, uuid, uuid_dict, port_map=None):
"""Processes Physical_Switch record from the OVSDB event."""
pass
def _process_logical_switch(self, uuid, uuid_dict, port_map=None):
"""Processes Logical_Switch record from the OVSDB event."""
pass
def _process_ucast_macs_local(self, uuid, uuid_dict, port_map=None):
"""Processes Ucast_Macs_Local record from the OVSDB event."""
pass
def _process_ucast_macs_remote(self, uuid, uuid_dict, port_map=None):
"""Processes Ucast_Macs_Remote record from the OVSDB event."""
pass
def _process_physical_locator(self, uuid, uuid_dict, port_map=None):
"""Processes Physical_Locator record from the OVSDB event."""
pass
def _process_mcast_macs_local(self, uuid, uuid_dict, port_map=None):
"""Processes Mcast_Macs_Local record from the OVSDB event."""
pass
def _process_physical_locator_set(self, uuid, uuid_dict, port_map=None):
"""Processes Physical_Locator_Set record from the OVSDB event."""
pass
def insert_logical_switch(self, record_dict):
"""Insert an entry in Logical_Switch OVSDB table."""
pass
def delete_logical_switch(self, record_dict):
"""Delete an entry from Logical_Switch OVSDB table."""
pass
def insert_ucast_macs_remote(self, record_dict):
"""Insert an entry in Ucast_Macs_Remote OVSDB table."""
pass
def delete_ucast_macs_remote(self, record_dict):
"""Delete an entry from Ucast_Macs_Remote OVSDB table."""
pass
def update_connection_to_gateway(self, request_dict):
"""Updates Physical Port's VNI to VLAN binding."""
pass

@ -0,0 +1,162 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import contextmanager
import eventlet
from neutron.i18n import _LE
from neutron.i18n import _LW
from neutron.openstack.common import log as logging
from neutron.openstack.common import periodic_task
from networking_l2gw.services.l2gateway.agent import base_agent_manager
from networking_l2gw.services.l2gateway.agent import l2gateway_config
from networking_l2gw.services.l2gateway.agent.ovsdb import connection
from networking_l2gw.services.l2gateway.common import constants as n_const
from oslo.config import cfg
LOG = logging.getLogger(__name__)
class OVSDBManager(base_agent_manager.BaseAgentManager):
"""OVSDB variant of agent manager.
Listens to state change notifications from OVSDB servers and
handles transactions (RPCs) destined to OVSDB servers.
"""
def __init__(self, conf=None):
super(OVSDBManager, self).__init__(conf)
self._extract_ovsdb_config(conf)
def _extract_ovsdb_config(self, conf):
self.conf = conf.ovsdb or cfg.CONF.ovsdb
ovsdb_hosts = self.conf.ovsdb_hosts
if ovsdb_hosts != '':
ovsdb_hosts = ovsdb_hosts.split(',')
for host in ovsdb_hosts:
self._process_ovsdb_host(host)
def _process_ovsdb_host(self, host):
try:
host_splits = str(host).split(':')
ovsdb_identifier = str(host_splits[0]).strip()
ovsdb_conf = {n_const.OVSDB_IDENTIFIER: ovsdb_identifier,
'ovsdb_ip': str(host_splits[1]).strip(),
'ovsdb_port': str(host_splits[2]).strip()}
priv_key_path = self.conf.l2_gw_agent_priv_key_base_path
cert_path = self.conf.l2_gw_agent_cert_base_path
ca_cert_path = self.conf.l2_gw_agent_ca_cert_base_path
use_ssl = priv_key_path and cert_path and ca_cert_path
if use_ssl:
ssl_ovsdb = {'use_ssl': True,
'private_key':
"/".join([str(priv_key_path),
'.'.join([str(host_splits[0]).
strip(),
'key'])]),
'certificate':
"/".join([str(cert_path),
'.'.join([str(host_splits[0]).
strip(), 'cert'])]),
'ca_cert':
"/".join([str(ca_cert_path),
'.'.join([str(host_splits[0]).
strip(), 'ca_cert'])])
}
ovsdb_conf.update(ssl_ovsdb)
LOG.debug("ovsdb_conf = %s", str(ovsdb_conf))
gateway = l2gateway_config.L2GatewayConfig(ovsdb_conf)
self.gateways[ovsdb_identifier] = gateway
except Exception as ex:
LOG.exception(_LE("Exception %(ex)s occurred while processing "
"host %(host)s"), {'ex': ex, 'host': host})
@periodic_task.periodic_task(run_immediately=True)
def _connect_to_ovsdb_server(self, context):
"""Initializes the connection to the OVSDB servers."""
if self.gateways and n_const.MONITOR in self.l2gw_agent_type:
for key in self.gateways.keys():
gateway = self.gateways.get(key)
ovsdb_fd = gateway.ovsdb_fd
if not (ovsdb_fd and ovsdb_fd.connected):
LOG.debug("OVSDB server %s is disconnected",
str(gateway.ovsdb_ip))
try:
ovsdb_fd = connection.OVSDBConnection(self.conf,
gateway,
True,
self.plugin_rpc)
except Exception:
# Log a warning and continue so that it can retried
# in the next iteration
LOG.warning(_LW("OVSDB server %s is not reachable"),
gateway.ovsdb_ip)
gateway.ovsdb_fd = ovsdb_fd
eventlet.greenthread.spawn_n(ovsdb_fd.
set_monitor_response_handler)
@contextmanager
def _open_connection(self, ovsdb_identifier):
ovsdb_fd = None
gateway = self.gateways.get(ovsdb_identifier)
try:
ovsdb_fd = connection.OVSDBConnection(self.conf,
gateway,
False,
self.plugin_rpc)
yield ovsdb_fd
finally:
if ovsdb_fd:
ovsdb_fd.disconnect()
def _is_valid_request(self, ovsdb_identifier):
val_req = ovsdb_identifier and ovsdb_identifier in self.gateways.keys()
if not val_req:
LOG.warning(n_const.ERROR_DICT
[n_const.L2GW_INVALID_OVSDB_IDENTIFIER])
return val_req
def delete_network(self, context, record_dict):
"""Handle RPC cast from plugin to delete a network."""
ovsdb_identifier = record_dict.get(n_const.OVSDB_IDENTIFIER, None)
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.delete_logical_switch(record_dict)
def add_vif_to_gateway(self, context, record_dict):
"""Handle RPC cast from plugin to insert neutron port MACs."""
ovsdb_identifier = record_dict.get(n_const.OVSDB_IDENTIFIER, None)
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.insert_ucast_macs_remote(record_dict)
def delete_vif_from_gateway(self, context, record_dict):
"""Handle RPC cast from plugin to delete neutron port MACs."""
ovsdb_identifier = record_dict.get(n_const.OVSDB_IDENTIFIER, None)
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.delete_ucast_macs_remote(record_dict)
def update_connection_to_gateway(self, context, record_dict):
"""Handle RPC cast from plugin.
Handle RPC cast from plugin to connect/disconnect a network
to/from an L2 gateway.
"""
ovsdb_identifier = record_dict.get(n_const.OVSDB_IDENTIFIER, None)
if self._is_valid_request(ovsdb_identifier):
with self._open_connection(ovsdb_identifier) as ovsdb_fd:
ovsdb_fd.update_connection_to_gateway(record_dict)

@ -42,7 +42,11 @@ OVSDB_OPTS = [
help=_('Trusted issuer CA cert')),
cfg.IntOpt('periodic_interval',
default=20,
help=_('Seconds between periodic task runs'))
help=_('Seconds between periodic task runs')),
cfg.IntOpt('max_connection_retries',
default=10,
help=_('Maximum number of retries to open a socket '
'with the OVSDB server'))
]
L2GW_OPTS = [

@ -12,7 +12,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# service type constants:
L2GW = "L2GW"
@ -27,3 +26,16 @@ ALLOWED_SERVICES = [L2GW]
COMMON_PREFIXES = {
L2GW: "",
}
L2GW_INVALID_RPC_MSG_FORMAT = 100
L2GW_INVALID_OVSDB_IDENTIFIER = 101
ERROR_DICT = {L2GW_INVALID_RPC_MSG_FORMAT: "Invalid RPC message format",
L2GW_INVALID_OVSDB_IDENTIFIER: "Invalid ovsdb_identifier in the "
"request"}
MONITOR = 'monitor'
TRANSACT = 'transact'
OVSDB_SCHEMA_NAME = 'hardware_vtep'
OVSDB_IDENTIFIER = 'ovsdb_identifier'
L2GW_AGENT_TYPE = 'l2gw_agent_type'

@ -0,0 +1,53 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import config as common_config
from neutron.common import rpc as n_rpc
from neutron.openstack.common import log as logging
from neutron.openstack.common import service
import sys
from oslo.config import cfg
from networking_l2gw.services.l2gateway.agent.ovsdb import manager
from networking_l2gw.services.l2gateway.common import config
from networking_l2gw.services.l2gateway.common import topics
LOG = logging.getLogger(__name__)
class L2gatewayAgentService(n_rpc.Service):
def start(self):
super(L2gatewayAgentService, self).start()
self.tg.add_timer(
cfg.CONF.ovsdb.periodic_interval,
self.manager.run_periodic_tasks,
None,
None
)
def main():
config.register_ovsdb_opts_helper(cfg.CONF)
config.register_agent_state_opts_helper(cfg.CONF)
common_config.init(sys.argv[1:])
config.setup_logging()
mgr = manager.OVSDBManager(cfg.CONF)
svc = L2gatewayAgentService(
host=cfg.CONF.host,
topic=topics.L2GATEWAY_AGENT,
manager=mgr
)
service.launch(svc).wait()

@ -0,0 +1,188 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import eventlet
import mock
import ssl
from networking_l2gw.services.l2gateway.agent import l2gateway_config as conf
from networking_l2gw.services.l2gateway.agent.ovsdb import connection
from networking_l2gw.services.l2gateway.common import constants as n_const
from neutron.openstack.common import log as logging
from neutron.tests import base
LOG = logging.getLogger(__name__)
class TestOVSDBConnection(base.BaseTestCase):
def setUp(self):
super(TestOVSDBConnection, self).setUp()
self.conf = mock.patch.object(conf, 'L2GatewayConfig').start()
self.sock = mock.patch('socket.socket').start()
self.ssl_sock = mock.patch.object(ssl, 'wrap_socket').start()
self.plugin_rpc = mock.Mock()
self.greenthread = mock.patch.object(eventlet.greenthread,
'spawn_n').start()
self.l2gw_ovsdb = connection.OVSDBConnection(mock.Mock(),
self.conf, True,
self.plugin_rpc)
self.l2gw_ovsdb.socket = self.sock
self.op_id = 'abcd'
props = {'select': {'initial': True,
'insert': True,
'delete': True,
'modify': True}}
self.monitor_message = {'id': self.op_id,
'method': 'monitor',
'params': [n_const.OVSDB_SCHEMA_NAME,
None,
{'Logical_Switch': [props],
'Physical_Switch': [props],
'Physical_Port': [props],
'Ucast_Macs_Local': [props],
'Ucast_Macs_Remote': [props],
'Physical_Locator': [props],
'Mcast_Macs_Local': [props],
'Physical_Locator_Set': [props]}]}
fake_message = {'Logical_Switch': props,
'Physical_Switch': props,
'Physical_Port': props,
'Ucast_Macs_Local': props,
'Ucast_Macs_Remote': props,
'Physical_Locator': props,
'Mcast_Macs_Local': props,
'Physical_Locator_Set': props}
self.msg = self.monitor_message
self.msg1 = {'result': fake_message}
self.msg2 = {'method': 'update',
'params': ['', fake_message]}
self.l2gw_ovsdb.responses = [self.monitor_message]
def test_init(self):
"""Test case to test __init__."""
with contextlib.nested(
mock.patch.object(connection.LOG, 'debug'),
mock.patch.object(eventlet.greenthread, 'spawn_n')
) as(logger_call, gt):
self.l2gw_ovsdb.__init__(mock.Mock(), self.conf, self.plugin_rpc)
self.assertTrue(self.l2gw_ovsdb.connected)
self.assertTrue(logger_call.called)
self.assertTrue(gt.called)
self.assertTrue(self.sock.called)
def test_set_monitor_response_handler(self):
"""Test case to test _set_monitor_response_handler with error_msg."""
self.l2gw_ovsdb.connected = True
with mock.patch.object(connection.OVSDBConnection,
'send', return_value=True) as send:
self.l2gw_ovsdb.set_monitor_response_handler()
self.assertTrue(send.called)
def test_set_monitor_response_handler_with_error_in_send(self):
"""Test case to test _set_monitor_response_handler."""
self.l2gw_ovsdb.connected = True
with mock.patch.object(connection.OVSDBConnection,
'send', return_value=False) as send:
self.l2gw_ovsdb.set_monitor_response_handler()
self.assertTrue(send.called)
def test_send(self):
"""Test case to test send."""
with mock.patch.object(self.l2gw_ovsdb.socket, 'send',
side_effect=Exception) as send:
with mock.patch.object(connection.LOG,
'exception') as logger_call:
self.l2gw_ovsdb.send(mock.Mock())
self.assertTrue(send.called)
self.assertTrue(logger_call.called)
def test_disconnect(self):
"""Test case to test disconnect socket."""
with mock.patch.object(self.l2gw_ovsdb.socket, 'close') as sock_close:
self.l2gw_ovsdb.disconnect()
self.assertTrue(sock_close.called)
def test_rcv_thread_none(self):
"""Test case to test _rcv_thread receives None from socket."""
self.assertTrue(self.l2gw_ovsdb.read_on)
with mock.patch.object(self.l2gw_ovsdb.socket,
'recv', return_value=None) as sock_recv:
with mock.patch.object(self.l2gw_ovsdb.socket,
'close') as sock_close:
self.l2gw_ovsdb._rcv_thread()
self.assertTrue(sock_recv.called)
self.assertFalse(self.l2gw_ovsdb.connected)
self.assertFalse(self.l2gw_ovsdb.read_on)
self.assertTrue(sock_close.called)
def test_rcv_thread_exception(self):
"""Test case to test _rcv_thread with exception."""
with contextlib.nested(
mock.patch.object(self.l2gw_ovsdb.socket, 'recv',
side_effect=Exception,
return_value=None),
mock.patch.object(self.l2gw_ovsdb.socket,
'close'),
mock.patch.object(connection.LOG,
'exception')
) as (sock_recv, sock_close, logger_call):
self.l2gw_ovsdb._rcv_thread()
logger_call.assertCalled()
self.assertTrue(sock_recv.called)
self.assertFalse(self.l2gw_ovsdb.connected)
self.assertFalse(self.l2gw_ovsdb.read_on)
self.assertTrue(sock_close.called)
def test_process_monitor_msg(self):
"""Test case to test _process_monitor_msg."""
with contextlib.nested(
mock.patch.object(connection.OVSDBConnection,
'_process_physical_port'),
mock.patch.object(connection.OVSDBConnection,
'_process_physical_switch'),
mock.patch.object(connection.OVSDBConnection,
'_process_logical_switch'),
mock.patch.object(connection.OVSDBConnection,
'_process_ucast_macs_local'),
mock.patch.object(connection.OVSDBConnection,
'_process_physical_locator'),
mock.patch.object(connection.OVSDBConnection,
'_process_ucast_macs_remote'),
mock.patch.object(connection.OVSDBConnection,
'_process_mcast_macs_local'),
mock.patch.object(connection.OVSDBConnection,
'_process_physical_locator_set')
) as(proc_phy_port,
proc_phy_switch, proc_logic_switch,
proc_ucast_mac, proc_phy_loc, proc_ucast_mac_remote,
proc_mcast_mac_local, proc_physical_locator_set):
self.l2gw_ovsdb._process_monitor_msg(self.msg1)
proc_phy_port.assert_called()
proc_phy_switch.assert_called()
proc_logic_switch.assert_called()
proc_ucast_mac.assert_called()
proc_phy_loc.assert_called()
proc_ucast_mac_remote.assert_called()
proc_mcast_mac_local.assert_called()
proc_physical_locator_set.assert_called()
self.plugin_rpc.update_ovsdb_changes.assert_called_with(
mock.ANY)

@ -0,0 +1,250 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import eventlet
import mock
from neutron.common import rpc
from neutron import context
from neutron.tests import base
import socket
from oslo.config import cfg
from networking_l2gw.services.l2gateway.agent import agent_api
from networking_l2gw.services.l2gateway.agent import l2gateway_config
from networking_l2gw.services.l2gateway.agent.ovsdb import connection
from networking_l2gw.services.l2gateway.agent.ovsdb import manager
from networking_l2gw.services.l2gateway.common import config
from networking_l2gw.services.l2gateway.common import constants as n_const
class TestManager(base.BaseTestCase):
def setUp(self):
super(TestManager, self).setUp()
self.conf = cfg.CONF
config.register_ovsdb_opts_helper(self.conf)
config.register_agent_state_opts_helper(self.conf)
self.driver_mock = mock.Mock()
self.fake_record_dict = {n_const.OVSDB_IDENTIFIER: 'fake_ovsdb_id'}
cfg.CONF.set_override('report_interval', 1, 'AGENT')
self.plugin_rpc = mock.patch.object(agent_api,
'L2GatewayAgentApi').start()
self.context = mock.Mock
self.cntxt = mock.patch.object(context,
'get_admin_context_without_session'
).start()
self.test_rpc = mock.patch.object(rpc, 'get_client').start()
self.l2gw_agent_manager = manager.OVSDBManager(
self.conf)
self.l2gw_agent_manager.plugin_rpc = self.plugin_rpc
self.fake_config_json = {n_const.OVSDB_IDENTIFIER:
'fake_ovsdb_identifier',
'ovsdb_ip': '5.5.5.5',
'ovsdb_port': '6672',
'private_key': 'dummy_key',
'enable_ssl': False,
'certificate': 'dummy_cert',
'ca_cert': 'dummy_ca'}
def test_extract_ovsdb_config(self):
fake_ovsdb_config = {n_const.OVSDB_IDENTIFIER: 'host2',
'ovsdb_ip': '10.10.10.10',
'ovsdb_port': '6632',
'private_key': '/home/someuser/fakedir/host2.key',
'use_ssl': True,
'certificate':
'/home/someuser/fakedir/host2.cert',
'ca_cert': '/home/someuser/fakedir/host2.ca_cert'}
cfg.CONF.set_override('ovsdb_hosts',
'host2:10.10.10.10:6632',
'ovsdb')
cfg.CONF.set_override('l2_gw_agent_priv_key_base_path',
'/home/someuser/fakedir',
'ovsdb')
cfg.CONF.set_override('l2_gw_agent_cert_base_path',
'/home/someuser/fakedir',
'ovsdb')
cfg.CONF.set_override('l2_gw_agent_ca_cert_base_path',
'/home/someuser/fakedir',
'ovsdb')
self.l2gw_agent_manager._extract_ovsdb_config(cfg.CONF)
l2gwconfig = l2gateway_config.L2GatewayConfig(fake_ovsdb_config)
gw = self.l2gw_agent_manager.gateways.get(
fake_ovsdb_config[n_const.OVSDB_IDENTIFIER])
self.assertEqual(l2gwconfig.ovsdb_identifier, gw.ovsdb_identifier)
self.assertEqual(l2gwconfig.use_ssl, gw.use_ssl)
self.assertEqual(l2gwconfig.ovsdb_ip, gw.ovsdb_ip)
self.assertEqual(l2gwconfig.ovsdb_port, gw.ovsdb_port)
self.assertEqual(l2gwconfig.private_key, gw.private_key)
self.assertEqual(l2gwconfig.certificate, gw.certificate)
self.assertEqual(l2gwconfig.ca_cert, gw.ca_cert)
def test_connect_to_ovsdb_server(self):
self.l2gw_agent_manager.gateways = {}
self.l2gw_agent_manager.l2gw_agent_type = n_const.MONITOR
gateway = l2gateway_config.L2GatewayConfig(self.fake_config_json)
ovsdb_ident = self.fake_config_json.get(n_const.OVSDB_IDENTIFIER)
self.l2gw_agent_manager.gateways[ovsdb_ident] = gateway
with mock.patch.object(connection,
'OVSDBConnection') as ovsdb_connection:
with mock.patch.object(eventlet.greenthread,
'spawn_n') as event_spawn:
self.l2gw_agent_manager._connect_to_ovsdb_server(self.context)
self.assertTrue(event_spawn.called)
self.assertTrue(ovsdb_connection.called)
ovsdb_connection.assert_called_with(
self.conf.ovsdb, gateway, True, self.plugin_rpc)
def test_is_valid_request_fails(self):
self.l2gw_agent_manager.gateways = {}
fake_ovsdb_identifier = 'fake_ovsdb_identifier_2'
gateway = l2gateway_config.L2GatewayConfig(self.fake_config_json)
self.l2gw_agent_manager.gateways['fake_ovsdb_identifier'] = gateway
with mock.patch.object(manager.LOG,
'warning') as logger_call:
self.l2gw_agent_manager._is_valid_request(
fake_ovsdb_identifier)
self.assertEqual(1, logger_call.call_count)
def test_open_connection(self):
self.l2gw_agent_manager.gateways = {}
fake_ovsdb_identifier = 'fake_ovsdb_identifier'
gateway = l2gateway_config.L2GatewayConfig(self.fake_config_json)
self.l2gw_agent_manager.gateways['fake_ovsdb_identifier'] = gateway
with mock.patch.object(manager.LOG,
'warning') as logger_call:
with mock.patch.object(connection,
'OVSDBConnection') as ovsdb_connection:
is_valid_request = self.l2gw_agent_manager._is_valid_request(
fake_ovsdb_identifier)
with self.l2gw_agent_manager._open_connection(
fake_ovsdb_identifier):
self.assertTrue(is_valid_request)
self.assertEqual(0, logger_call.call_count)
self.assertTrue(ovsdb_connection.called)
def test_open_connection_with_socket_error(self):
self.l2gw_agent_manager.gateways = {}
gateway = l2gateway_config.L2GatewayConfig(self.fake_config_json)
self.l2gw_agent_manager.gateways['fake_ovsdb_identifier'] = gateway
with contextlib.nested(
mock.patch.object(manager.LOG, 'warning'),
mock.patch.object(socket.socket, 'connect'),
mock.patch.object(connection.OVSDBConnection,
'delete_logical_switch')
) as (logger_call, mock_connect, mock_del_ls):
mock_connect.side_effect = socket.error
self.l2gw_agent_manager.delete_network(self.context,
self.fake_record_dict)
logger_call.assert_called_once()
mock_del_ls.assert_not_called()
def test_disconnection(self):
self.l2gw_agent_manager.gateways = {}
fake_ovsdb_identifier = 'fake_ovsdb_identifier'
gateway = l2gateway_config.L2GatewayConfig(self.fake_config_json)
self.l2gw_agent_manager.gateways[fake_ovsdb_identifier] = gateway
with mock.patch.object(connection,
'OVSDBConnection') as ovsdb_connection:
ovsdb_connection.return_value.connected = True
with mock.patch.object(connection.OVSDBConnection,
'disconnect') as mock_dis:
with self.l2gw_agent_manager._open_connection(
fake_ovsdb_identifier):
self.assertTrue(ovsdb_connection.called)
mock_dis.assert_called_once()
def test_add_vif_to_gateway(self):
with contextlib.nested(
mock.patch.object(manager.OVSDBManager,
'_open_connection'),
mock.patch.object(manager.OVSDBManager,
'_is_valid_request',
return_value=True),
mock.patch.object(connection.OVSDBConnection,
'insert_ucast_macs_remote'),
mock.patch.object(connection.OVSDBConnection,
'disconnect')
) as (gw_open_conn, valid_req, ins_ucast, discon):
self.l2gw_agent_manager.add_vif_to_gateway(
self.context, self.fake_record_dict)
self.assertTrue(gw_open_conn.called)
mocked_conn = gw_open_conn.return_value
mocked_conn.insert_ucast_macs_remote.assert_called_once(
self.fake_record_dict)
discon.assert_called_once()
def test_delete_vif_from_gateway(self):
with contextlib.nested(
mock.patch.object(manager.OVSDBManager,
'_open_connection'),
mock.patch.object(manager.OVSDBManager,
'_is_valid_request',
return_value=True),
mock.patch.object(connection.OVSDBConnection,
'delete_ucast_macs_remote'),
mock.patch.object(connection.OVSDBConnection,
'disconnect')
) as (gw_open_conn, valid_req, del_ucast, discon):
self.l2gw_agent_manager.delete_vif_from_gateway(
self.context, self.fake_record_dict)
self.assertTrue(gw_open_conn.called)
mocked_conn = gw_open_conn.return_value
mocked_conn.delete_ucast_macs_remote.assert_called_once(
self.fake_record_dict)
discon.assert_called_once()
def test_update_connection_to_gateway(self):
with contextlib.nested(
mock.patch.object(manager.OVSDBManager,
'_open_connection'),
mock.patch.object(manager.OVSDBManager,
'_is_valid_request',
return_value=True),
mock.patch.object(connection.OVSDBConnection,
'update_connection_to_gateway'),
mock.patch.object(connection.OVSDBConnection,
'disconnect')
) as (gw_open_conn, valid_req, upd_con, discon):
self.l2gw_agent_manager.update_connection_to_gateway(
self.context, self.fake_record_dict)
self.assertTrue(gw_open_conn.called)
mocked_conn = gw_open_conn.return_value
mocked_conn.update_connection_to_gateway.assert_called_once(
self.fake_record_dict)
discon.assert_called_once()
def test_delete_network(self):
with contextlib.nested(
mock.patch.object(manager.OVSDBManager,
'_open_connection'),
mock.patch.object(manager.OVSDBManager,
'_is_valid_request',
return_value=True),
mock.patch.object(connection.OVSDBConnection,
'delete_logical_switch'),
mock.patch.object(connection.OVSDBConnection,
'disconnect')
) as (gw_open_conn, valid_req, del_ls, discon):
self.l2gw_agent_manager.delete_network(
self.context, self.fake_record_dict)
self.assertTrue(gw_open_conn.called)
mocked_conn = gw_open_conn.return_value
mocked_conn.delete_logical_switch.assert_called_once(
self.fake_record_dict)
discon.assert_called_once()

@ -0,0 +1,129 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.agent import rpc as agent_rpc
from neutron.openstack.common import loopingcall
from neutron.tests import base
from oslo.config import cfg
from networking_l2gw.services.l2gateway.agent import (base_agent_manager
as l2gw_manager)
from networking_l2gw.services.l2gateway.agent import agent_api
from networking_l2gw.services.l2gateway.common import config
from networking_l2gw.services.l2gateway.common import constants as n_const
class TestBaseAgentManager(base.BaseTestCase):
def setUp(self):
super(TestBaseAgentManager, self).setUp()
self.conf = cfg.CONF
config.register_agent_state_opts_helper(self.conf)
cfg.CONF.set_override('report_interval', 1, 'AGENT')
self.context = mock.Mock
mock_conf = mock.Mock()
self.l2gw_agent_manager = l2gw_manager.BaseAgentManager(
mock_conf)
def test_init(self):
with mock.patch.object(agent_api,
'L2GatewayAgentApi') as l2_gw_agent_api:
with mock.patch.object(l2gw_manager.BaseAgentManager,
'_setup_state_rpc') as setup_state_rpc:
self.l2gw_agent_manager.__init__(mock.Mock())
self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type,
n_const.TRANSACT)
self.assertTrue(self.l2gw_agent_manager.admin_state_up)
self.assertTrue(setup_state_rpc.called)
self.assertTrue(l2_gw_agent_api.called)
def test_setup_state_rpc(self):
cfg.CONF.set_override('report_interval', 1, 'AGENT')
with mock.patch.object(agent_rpc,
'PluginReportStateAPI'
) as agent_report_state_rpc:
with mock.patch.object(loopingcall,
'FixedIntervalLoopingCall'
) as looping_call:
self.l2gw_agent_manager._setup_state_rpc()
self.assertTrue(agent_report_state_rpc.called)
self.assertTrue(looping_call.called)
def test_report_state(self):
with mock.patch('neutron.agent.rpc.PluginReportStateAPI') as state_api:
l2_gw = l2gw_manager.BaseAgentManager(mock.Mock())
self.assertTrue(l2_gw.agent_state['start_flag'])
original_state = l2_gw.agent_state
original_use_call = l2_gw.use_call
self.assertTrue(l2_gw.use_call)
l2_gw._report_state()
self.assertFalse(l2_gw.agent_state['start_flag'])
self.assertFalse(l2_gw.use_call)
state_api_inst = state_api.return_value
state_api_inst.report_state.assert_called_once_with(
l2_gw.context, original_state, original_use_call)
def test_report_state_Exception(self):
with mock.patch('neutron.agent.rpc.PluginReportStateAPI') as state_api:
with mock.patch.object(l2gw_manager.LOG, 'exception') as exc:
state_api_inst = state_api.return_value
self.l2gw_agent_manager._report_state()
state_api_inst.report_state.side_effect = Exception
exc.assertCalled()
def test_agent_updated(self):
fake_payload = {'fake_key': 'fake_value'}
with mock.patch.object(l2gw_manager.LOG, 'info') as logger_call:
self.l2gw_agent_manager.agent_updated(mock.Mock(), fake_payload)
self.assertEqual(1, logger_call.call_count)
def test_set_l2gateway_agent_type_monitor(self):
l2_gw_agent_type = n_const.MONITOR
self.l2gw_agent_manager.set_l2gateway_agent_type(
self.context, l2_gw_agent_type)
self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type,
l2_gw_agent_type)
self.assertEqual(
self.l2gw_agent_manager.agent_state.get(
'configurations')[n_const.L2GW_AGENT_TYPE], l2_gw_agent_type)
def test_set_l2gateway_agent_type_transact(self):
l2_gw_agent_type = n_const.TRANSACT
self.l2gw_agent_manager.set_l2gateway_agent_type(
self.context, l2_gw_agent_type)
self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type,
l2_gw_agent_type)
self.assertEqual(
self.l2gw_agent_manager.agent_state.get(
'configurations')[n_const.L2GW_AGENT_TYPE], l2_gw_agent_type)
def test_set_l2gateway_agent_type_transactmonitor(self):
l2_gw_agent_type = '+'.join([n_const.MONITOR, n_const.TRANSACT])
self.l2gw_agent_manager.set_l2gateway_agent_type(
self.context, l2_gw_agent_type)
self.assertEqual(self.l2gw_agent_manager.l2gw_agent_type,
l2_gw_agent_type)
self.assertEqual(
self.l2gw_agent_manager.agent_state.get(
'configurations')[n_const.L2GW_AGENT_TYPE], l2_gw_agent_type)
def test_set_l2gateway_agent_type_invalid(self):
l2_gw_agent_type = 'fake_type'
result = self.l2gw_agent_manager.set_l2gateway_agent_type(
self.context, l2_gw_agent_type)
self.assertTrue(result, n_const.L2GW_INVALID_RPC_MSG_FORMAT)

@ -0,0 +1,58 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from neutron.common import config as common_config
from neutron.tests import base
from oslo.config import cfg
from networking_l2gw.services.l2gateway.common import config
from networking_l2gw.services.l2gateway import l2gw_agent as agent
class TestL2gwAgent(base.BaseTestCase):