Add support Quantum Security Groups for Ryu plugin

fix bug #1124965

This patch add support security-groups extension to Ryu plugin.

Change-Id: I569ab1e48517f28a5103175fd04e848f82eb2a3c
This commit is contained in:
Yoshihiro Kaneko 2013-02-14 21:04:12 +09:00 committed by Nachi Ueno
parent f64c40b47d
commit e55b10ab39
9 changed files with 455 additions and 29 deletions

View File

@ -45,3 +45,11 @@ tunnel_interface = eth0
# ovsdb_ip = # ovsdb_ip =
# ovsdb_interface = # ovsdb_interface =
ovsdb_interface = eth0 ovsdb_interface = eth0
[SECURITYGROUP]
# Firewall driver for realizing quantum security group function
# firewall_driver = quantum.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver
[AGENT]
# Agent's polling interval in seconds
polling_interval = 2

View File

@ -34,6 +34,7 @@ migration_for_plugins = [
'quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin.NvpPluginV2', 'quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin.NvpPluginV2',
'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2', 'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2',
'quantum.plugins.nec.nec_plugin.NECPluginV2', 'quantum.plugins.nec.nec_plugin.NECPluginV2',
'quantum.plugins.ryu.ryu_quantum_plugin.RyuQuantumPluginV2',
] ]
from alembic import op from alembic import op

View File

@ -23,7 +23,9 @@
import httplib import httplib
import socket import socket
import sys import sys
import time
import eventlet
import netifaces import netifaces
from oslo.config import cfg from oslo.config import cfg
from ryu.app import client from ryu.app import client
@ -33,11 +35,14 @@ from ryu.app import rest_nw_id
from quantum.agent.linux import ovs_lib from quantum.agent.linux import ovs_lib
from quantum.agent.linux.ovs_lib import VifPort from quantum.agent.linux.ovs_lib import VifPort
from quantum.agent import rpc as agent_rpc from quantum.agent import rpc as agent_rpc
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import config as logging_config from quantum.common import config as logging_config
from quantum.common import exceptions as q_exc from quantum.common import exceptions as q_exc
from quantum.common import topics from quantum.common import topics
from quantum import context as q_context from quantum import context as q_context
from quantum.openstack.common import log from quantum.openstack.common import log
from quantum.openstack.common.rpc import dispatcher
from quantum.extensions import securitygroup as ext_sg
from quantum.plugins.ryu.common import config from quantum.plugins.ryu.common import config
@ -148,7 +153,8 @@ class VifPortSet(object):
port.switch.datapath_id, port.ofport) port.switch.datapath_id, port.ofport)
class RyuPluginApi(agent_rpc.PluginApi): class RyuPluginApi(agent_rpc.PluginApi,
sg_rpc.SecurityGroupServerRpcApiMixin):
def get_ofp_rest_api_addr(self, context): def get_ofp_rest_api_addr(self, context):
LOG.debug(_("Get Ryu rest API address")) LOG.debug(_("Get Ryu rest API address"))
return self.call(context, return self.call(context,
@ -156,17 +162,42 @@ class RyuPluginApi(agent_rpc.PluginApi):
topic=self.topic) topic=self.topic)
class OVSQuantumOFPRyuAgent(object): class RyuSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
def __init__(self, context, plugin_rpc, root_helper):
self.context = context
self.plugin_rpc = plugin_rpc
self.root_helper = root_helper
self.init_firewall()
class OVSQuantumOFPRyuAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
RPC_API_VERSION = '1.1'
def __init__(self, integ_br, tunnel_ip, ovsdb_ip, ovsdb_port, def __init__(self, integ_br, tunnel_ip, ovsdb_ip, ovsdb_port,
root_helper): polling_interval, root_helper):
super(OVSQuantumOFPRyuAgent, self).__init__() super(OVSQuantumOFPRyuAgent, self).__init__()
self.polling_interval = polling_interval
self._setup_rpc() self._setup_rpc()
self.sg_agent = RyuSecurityGroupAgent(self.context,
self.plugin_rpc,
root_helper)
self._setup_integration_br(root_helper, integ_br, tunnel_ip, self._setup_integration_br(root_helper, integ_br, tunnel_ip,
ovsdb_port, ovsdb_ip) ovsdb_port, ovsdb_ip)
def _setup_rpc(self): def _setup_rpc(self):
self.topic = topics.AGENT
self.plugin_rpc = RyuPluginApi(topics.PLUGIN) self.plugin_rpc = RyuPluginApi(topics.PLUGIN)
self.context = q_context.get_admin_context_without_session() self.context = q_context.get_admin_context_without_session()
self.dispatcher = self._create_rpc_dispatcher()
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.dispatcher,
self.topic,
consumers)
def _create_rpc_dispatcher(self):
return dispatcher.RpcDispatcher([self])
def _setup_integration_br(self, root_helper, integ_br, def _setup_integration_br(self, root_helper, integ_br,
tunnel_ip, ovsdb_port, ovsdb_ip): tunnel_ip, ovsdb_port, ovsdb_ip):
@ -192,13 +223,64 @@ class OVSQuantumOFPRyuAgent(object):
sc_client.set_key(self.int_br.datapath_id, conf_switch_key.OVSDB_ADDR, sc_client.set_key(self.int_br.datapath_id, conf_switch_key.OVSDB_ADDR,
'tcp:%s:%d' % (ovsdb_ip, ovsdb_port)) 'tcp:%s:%d' % (ovsdb_ip, ovsdb_port))
def port_update(self, context, **kwargs):
LOG.debug(_("port update received"))
port = kwargs.get('port')
vif_port = self.int_br.get_vif_port_by_id(port['id'])
if not vif_port:
return
if ext_sg.SECURITYGROUPS in port:
self.sg_agent.refresh_firewall()
def _update_ports(self, registered_ports):
ports = self.int_br.get_vif_port_set()
if ports == registered_ports:
return
added = ports - registered_ports
removed = registered_ports - ports
return {'current': ports,
'added': added,
'removed': removed}
def _process_devices_filter(self, port_info):
if 'added' in port_info:
self.sg_agent.prepare_devices_filter(port_info['added'])
if 'removed' in port_info:
self.sg_agent.remove_devices_filter(port_info['removed'])
def daemon_loop(self):
ports = set()
while True:
start = time.time()
try:
port_info = self._update_ports(ports)
if port_info:
LOG.debug(_("Agent loop has new device"))
self._process_devices_filter(port_info)
ports = port_info['current']
except:
LOG.exception(_("Error in agent event loop"))
elapsed = max(time.time() - start, 0)
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
LOG.debug(_("Loop iteration exceeded interval "
"(%(polling_interval)s vs. %(elapsed)s)!"),
{'polling_interval': self.polling_interval,
'elapsed': elapsed})
def main(): def main():
eventlet.monkey_patch()
cfg.CONF(project='quantum') cfg.CONF(project='quantum')
logging_config.setup_logging(cfg.CONF) logging_config.setup_logging(cfg.CONF)
integ_br = cfg.CONF.OVS.integration_bridge integ_br = cfg.CONF.OVS.integration_bridge
polling_interval = cfg.CONF.AGENT.polling_interval
root_helper = cfg.CONF.AGENT.root_helper root_helper = cfg.CONF.AGENT.root_helper
tunnel_ip = _get_tunnel_ip() tunnel_ip = _get_tunnel_ip()
@ -208,14 +290,16 @@ def main():
ovsdb_ip = _get_ovsdb_ip() ovsdb_ip = _get_ovsdb_ip()
LOG.debug(_('ovsdb_ip %s'), ovsdb_ip) LOG.debug(_('ovsdb_ip %s'), ovsdb_ip)
try: try:
OVSQuantumOFPRyuAgent(integ_br, tunnel_ip, ovsdb_ip, ovsdb_port, agent = OVSQuantumOFPRyuAgent(integ_br, tunnel_ip, ovsdb_ip,
root_helper) ovsdb_port, polling_interval,
root_helper)
except httplib.HTTPException, e: except httplib.HTTPException, e:
LOG.error(_("Initialization failed: %s"), e) LOG.error(_("Initialization failed: %s"), e)
sys.exit(1) sys.exit(1)
LOG.info(_("Ryu initialization on the node is done." LOG.info(_("Ryu initialization on the node is done. "
" Now Ryu agent exits successfully.")) "Agent initialized successfully, now running..."))
agent.daemon_loop()
sys.exit(0) sys.exit(0)

View File

@ -40,6 +40,13 @@ ovs_opts = [
help=_("OVSDB interface to connect to")), help=_("OVSDB interface to connect to")),
] ]
agent_opts = [
cfg.IntOpt('polling_interval', default=2,
help=_("The number of seconds the agent will wait between "
"polling for local device changes.")),
]
cfg.CONF.register_opts(ovs_opts, "OVS") cfg.CONF.register_opts(ovs_opts, "OVS")
cfg.CONF.register_opts(agent_opts, "AGENT")
config.register_root_helper(cfg.CONF) config.register_root_helper(cfg.CONF)

View File

@ -21,6 +21,9 @@ from sqlalchemy.orm import exc as orm_exc
from quantum.common import exceptions as q_exc from quantum.common import exceptions as q_exc
import quantum.db.api as db import quantum.db.api as db
from quantum.db import models_v2 from quantum.db import models_v2
from quantum.db import securitygroups_db as sg_db
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.plugins.ryu.db import models_v2 as ryu_models_v2 from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
@ -33,6 +36,30 @@ def network_all_tenant_list():
return session.query(models_v2.Network).all() return session.query(models_v2.Network).all()
def get_port_from_device(port_id):
LOG.debug(_("get_port_from_device() called:port_id=%s"), port_id)
session = db.get_session()
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
query = session.query(models_v2.Port,
sg_db.SecurityGroupPortBinding.security_group_id)
query = query.outerjoin(sg_db.SecurityGroupPortBinding,
models_v2.Port.id == sg_binding_port)
query = query.filter(models_v2.Port.id == port_id)
port_and_sgs = query.all()
if not port_and_sgs:
return None
port = port_and_sgs[0][0]
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict[ext_sg.SECURITYGROUPS] = [
sg_id for port, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address'] for ip in port['fixed_ips']]
return port_dict
class TunnelKey(object): class TunnelKey(object):
# VLAN: 12 bits # VLAN: 12 bits
# GRE, VXLAN: 24bits # GRE, VXLAN: 24bits

View File

@ -20,6 +20,7 @@ from oslo.config import cfg
from ryu.app import client from ryu.app import client
from ryu.app import rest_nw_id from ryu.app import rest_nw_id
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.common import constants as q_const from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc from quantum.common import rpc as q_rpc
@ -30,8 +31,12 @@ from quantum.db import dhcp_rpc_base
from quantum.db import extraroute_db from quantum.db import extraroute_db
from quantum.db import l3_rpc_base from quantum.db import l3_rpc_base
from quantum.db import models_v2 from quantum.db import models_v2
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.plugins.ryu.common import config from quantum.plugins.ryu.common import config
from quantum.plugins.ryu.db import api_v2 as db_api_v2 from quantum.plugins.ryu.db import api_v2 as db_api_v2
@ -40,9 +45,10 @@ LOG = logging.getLogger(__name__)
class RyuRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, class RyuRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin): l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
RPC_API_VERSION = '1.0' RPC_API_VERSION = '1.1'
def __init__(self, ofp_rest_api_addr): def __init__(self, ofp_rest_api_addr):
self.ofp_rest_api_addr = ofp_rest_api_addr self.ofp_rest_api_addr = ofp_rest_api_addr
@ -54,11 +60,37 @@ class RyuRpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr) LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr)
return self.ofp_rest_api_addr return self.ofp_rest_api_addr
@classmethod
def get_port_from_device(cls, device):
port = db_api_v2.get_port_from_device(device)
if port:
port['device'] = device
return port
class AgentNotifierApi(proxy.RpcProxy,
sg_rpc.SecurityGroupAgentRpcApiMixin):
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(AgentNotifierApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic_port_update = topics.get_topic_name(topic,
topics.PORT,
topics.UPDATE)
def port_update(self, context, port):
self.fanout_cast(context,
self.make_msg('port_update', port=port),
topic=self.topic_port_update)
class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2, class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
extraroute_db.ExtraRoute_db_mixin): extraroute_db.ExtraRoute_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin):
supported_extension_aliases = ["router", "extraroute"] supported_extension_aliases = ["router", "extraroute", "security-group"]
def __init__(self, configfile=None): def __init__(self, configfile=None):
db.configure_db() db.configure_db()
@ -82,6 +114,7 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
def _setup_rpc(self): def _setup_rpc(self):
self.conn = rpc.create_connection(new=True) self.conn = rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.callbacks = RyuRpcCallbacks(self.ofp_api_host) self.callbacks = RyuRpcCallbacks(self.ofp_api_host)
self.dispatcher = self.callbacks.create_rpc_dispatcher() self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(topics.PLUGIN, self.dispatcher, fanout=False) self.conn.create_consumer(topics.PLUGIN, self.dispatcher, fanout=False)
@ -109,6 +142,11 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
def create_network(self, context, network): def create_network(self, context, network):
session = context.session session = context.session
with session.begin(subtransactions=True): with session.begin(subtransactions=True):
#set up default security groups
tenant_id = self._get_tenant_id_for_create(
context, network['network'])
self._ensure_default_security_group(context, tenant_id)
net = super(RyuQuantumPluginV2, self).create_network(context, net = super(RyuQuantumPluginV2, self).create_network(context,
network) network)
self._process_l3_create(context, network['network'], net['id']) self._process_l3_create(context, network['network'], net['id'])
@ -154,7 +192,19 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
return [self._fields(net, fields) for net in nets] return [self._fields(net, fields) for net in nets]
def create_port(self, context, port): def create_port(self, context, port):
port = super(RyuQuantumPluginV2, self).create_port(context, port) session = context.session
with session.begin(subtransactions=True):
self._ensure_default_security_group_on_port(context, port)
sgids = self._get_security_groups_on_port(context, port)
port = super(RyuQuantumPluginV2, self).create_port(context, port)
self._process_port_create_security_group(
context, port['id'], sgids)
self._extend_port_dict_security_group(context, port)
if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
self.notifier.security_groups_provider_updated(context)
else:
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS))
self.iface_client.create_network_id(port['id'], port['network_id']) self.iface_client.create_network_id(port['id'], port['network_id'])
return port return port
@ -163,13 +213,53 @@ class RyuQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
# and l3-router. If so, we should prevent deletion. # and l3-router. If so, we should prevent deletion.
if l3_port_check: if l3_port_check:
self.prevent_l3_port_deletion(context, id) self.prevent_l3_port_deletion(context, id)
self.disassociate_floatingips(context, id)
return super(RyuQuantumPluginV2, self).delete_port(context, id) with context.session.begin(subtransactions=True):
self.disassociate_floatingips(context, id)
port = self.get_port(context, id)
self._delete_port_security_group_bindings(context, id)
super(RyuQuantumPluginV2, self).delete_port(context, id)
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS))
def update_port(self, context, id, port): def update_port(self, context, id, port):
deleted = port['port'].get('deleted', False) deleted = port['port'].get('deleted', False)
port = super(RyuQuantumPluginV2, self).update_port(context, id, port) session = context.session
need_port_update_notify = False
with session.begin(subtransactions=True):
original_port = super(RyuQuantumPluginV2, self).get_port(
context, id)
updated_port = super(RyuQuantumPluginV2, self).update_port(
context, id, port)
need_port_update_notify = self.update_security_group_on_port(
context, id, port, original_port, updated_port)
need_port_update_notify |= self.is_security_group_member_updated(
context, original_port, updated_port)
need_port_update_notify |= (original_port['admin_state_up'] !=
updated_port['admin_state_up'])
if need_port_update_notify:
self.notifier.port_update(context, updated_port)
if deleted: if deleted:
session = context.session
db_api_v2.set_port_status(session, id, q_const.PORT_STATUS_DOWN) db_api_v2.set_port_status(session, id, q_const.PORT_STATUS_DOWN)
return port return updated_port
def get_port(self, context, id, fields=None):
with context.session.begin(subtransactions=True):
port = super(RyuQuantumPluginV2, self).get_port(context, id,
fields)
self._extend_port_dict_security_group(context, port)
return self._fields(port, fields)
def get_ports(self, context, filters=None, fields=None):
with context.session.begin(subtransactions=True):
ports = super(RyuQuantumPluginV2, self).get_ports(
context, filters, fields)
for port in ports:
self._extend_port_dict_security_group(context, port)
return [self._fields(port, fields) for port in ports]

View File

@ -28,6 +28,7 @@ class ConfigurationTest(unittest2.TestCase):
self.assertEqual('br-int', cfg.CONF.OVS.integration_bridge) self.assertEqual('br-int', cfg.CONF.OVS.integration_bridge)
self.assertEqual(-1, cfg.CONF.DATABASE.sql_max_retries) self.assertEqual(-1, cfg.CONF.DATABASE.sql_max_retries)
self.assertEqual(2, cfg.CONF.DATABASE.reconnect_interval) self.assertEqual(2, cfg.CONF.DATABASE.reconnect_interval)
self.assertEqual(2, cfg.CONF.AGENT.polling_interval)
self.assertEqual('sudo', cfg.CONF.AGENT.root_helper) self.assertEqual('sudo', cfg.CONF.AGENT.root_helper)
self.assertEqual('127.0.0.1:8080', cfg.CONF.OVS.openflow_rest_api) self.assertEqual('127.0.0.1:8080', cfg.CONF.OVS.openflow_rest_api)
self.assertEqual(1, cfg.CONF.OVS.tunnel_key_min) self.assertEqual(1, cfg.CONF.OVS.tunnel_key_min)

View File

@ -44,12 +44,19 @@ class TestOVSQuantumOFPRyuAgent(RyuAgentTestCase):
self._AGENT_NAME + '.VifPortSet').start() self._AGENT_NAME + '.VifPortSet').start()
self.q_ctx = mock.patch( self.q_ctx = mock.patch(
self._AGENT_NAME + '.q_context').start() self._AGENT_NAME + '.q_context').start()
self.agent_rpc = mock.patch(
self._AGENT_NAME + '.agent_rpc.create_consumers').start()
self.sg_rpc = mock.patch(
self._AGENT_NAME + '.sg_rpc').start()
self.sg_agent = mock.patch(
self._AGENT_NAME + '.RyuSecurityGroupAgent').start()
def mock_rest_addr(self, rest_addr): def mock_rest_addr(self, rest_addr):
integ_br = 'integ_br' integ_br = 'integ_br'
tunnel_ip = '192.168.0.1' tunnel_ip = '192.168.0.1'
ovsdb_ip = '172.16.0.1' ovsdb_ip = '172.16.0.1'
ovsdb_port = 16634 ovsdb_port = 16634
interval = 2
root_helper = 'helper' root_helper = 'helper'
self.mod_agent.OVSBridge.return_value.datapath_id = '1234' self.mod_agent.OVSBridge.return_value.datapath_id = '1234'
@ -61,8 +68,8 @@ class TestOVSQuantumOFPRyuAgent(RyuAgentTestCase):
self.plugin_api.return_value.get_ofp_rest_api_addr = mock_rest_addr self.plugin_api.return_value.get_ofp_rest_api_addr = mock_rest_addr
# Instantiate OVSQuantumOFPRyuAgent # Instantiate OVSQuantumOFPRyuAgent
self.agent = self.mod_agent.OVSQuantumOFPRyuAgent( return self.mod_agent.OVSQuantumOFPRyuAgent(
integ_br, tunnel_ip, ovsdb_ip, ovsdb_port, root_helper) integ_br, tunnel_ip, ovsdb_ip, ovsdb_port, interval, root_helper)
def test_valid_rest_addr(self): def test_valid_rest_addr(self):
self.mock_rest_addr('192.168.0.1:8080') self.mock_rest_addr('192.168.0.1:8080')
@ -79,6 +86,11 @@ class TestOVSQuantumOFPRyuAgent(RyuAgentTestCase):
mock.call().get_ofp_rest_api_addr('abc') mock.call().get_ofp_rest_api_addr('abc')
]) ])
# Agent RPC
self.agent_rpc.assert_has_calls([
mock.call(mock.ANY, 'q-agent-notifier', mock.ANY)
])
# OFPClient # OFPClient
self.mod_agent.client.OFPClient.assert_calls([ self.mod_agent.client.OFPClient.assert_calls([
mock.call('192.168.0.1:8080') mock.call('192.168.0.1:8080')
@ -93,7 +105,6 @@ class TestOVSQuantumOFPRyuAgent(RyuAgentTestCase):
]) ])
# SwitchConfClient # SwitchConfClient
self.mod_agent.client.SwitchConfClient.assert_has_calls([ self.mod_agent.client.SwitchConfClient.assert_has_calls([
mock.call('192.168.0.1:8080'), mock.call('192.168.0.1:8080'),
mock.call().set_key('1234', 'ovs_tunnel_addr', '192.168.0.1'), mock.call().set_key('1234', 'ovs_tunnel_addr', '192.168.0.1'),
@ -110,6 +121,108 @@ class TestOVSQuantumOFPRyuAgent(RyuAgentTestCase):
self.assertRaises(self.mod_agent.q_exc.Invalid, self.assertRaises(self.mod_agent.q_exc.Invalid,
self.mock_rest_addr, ('')) self.mock_rest_addr, (''))
def mock_port_update(self, **kwargs):
agent = self.mock_rest_addr('192.168.0.1:8080')
agent.port_update(mock.Mock(), **kwargs)
def test_port_update(self, **kwargs):
port = {'id': 1, 'security_groups': 'default'}
with mock.patch.object(self.ovsbridge.return_value,
'get_vif_port_by_id',
return_value=1) as get_vif:
self.mock_port_update(port=port)
get_vif.assert_called_once_with(1)
self.sg_agent.assert_calls([
mock.call().refresh_firewall()
])
def test_port_update_not_vifport(self, **kwargs):
port = {'id': 1, 'security_groups': 'default'}
with mock.patch.object(self.ovsbridge.return_value,
'get_vif_port_by_id',
return_value=0) as get_vif:
self.mock_port_update(port=port)
get_vif.assert_called_once_with(1)
self.assertFalse(self.sg_agent.return_value.refresh_firewall.called)
def test_port_update_without_secgroup(self, **kwargs):
port = {'id': 1}
with mock.patch.object(self.ovsbridge.return_value,
'get_vif_port_by_id',
return_value=1) as get_vif:
self.mock_port_update(port=port)
get_vif.assert_called_once_with(1)
self.assertFalse(self.sg_agent.return_value.refresh_firewall.called)
def mock_update_ports(self, vif_port_set=None, registered_ports=None):
with mock.patch.object(self.ovsbridge.return_value,
'get_vif_port_set',
return_value=vif_port_set):
agent = self.mock_rest_addr('192.168.0.1:8080')
return agent._update_ports(registered_ports)
def test_update_ports_unchanged(self):
self.assertIsNone(self.mock_update_ports())
def test_update_ports_changed(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
expected = dict(current=vif_port_set,
added=set([3]),
removed=set([2]))
actual = self.mock_update_ports(vif_port_set, registered_ports)
self.assertEqual(expected, actual)
def mock_process_devices_filter(self, port_info):
agent = self.mock_rest_addr('192.168.0.1:8080')
agent._process_devices_filter(port_info)
def test_process_devices_filter_add(self):
port_info = {'added': 1}
self.mock_process_devices_filter(port_info)
self.sg_agent.assert_calls([
mock.call().prepare_devices_filter(1)
])
def test_process_devices_filter_remove(self):
port_info = {'removed': 2}
self.mock_process_devices_filter(port_info)
self.sg_agent.assert_calls([
mock.call().remove_devices_filter(2)
])
def test_process_devices_filter_both(self):
port_info = {'added': 1, 'removed': 2}
self.mock_process_devices_filter(port_info)
self.sg_agent.assert_calls([
mock.call().prepare_devices_filter(1),
mock.call().remove_devices_filter(2)
])
def test_process_devices_filter_none(self):
port_info = {}
self.mock_process_devices_filter(port_info)
self.assertFalse(
self.sg_agent.return_value.prepare_devices_filter.called)
self.assertFalse(
self.sg_agent.return_value.remove_devices_filter.called)
class TestRyuPluginApi(RyuAgentTestCase): class TestRyuPluginApi(RyuAgentTestCase):
def test_get_ofp_rest_api_addr(self): def test_get_ofp_rest_api_addr(self):
@ -468,17 +581,15 @@ class TestRyuQuantumAgent(RyuAgentTestCase):
]) ])
def test_main(self): def test_main(self):
with nested( agent_attrs = {'daemon_loop.side_effect': SystemExit(0)}
mock.patch(self._AGENT_NAME + '.OVSQuantumOFPRyuAgent'), with mock.patch(self._AGENT_NAME + '.OVSQuantumOFPRyuAgent',
mock.patch('sys.exit', side_effect=SystemExit(0)) **agent_attrs) as mock_agent:
) as (mock_agent, mock_exit):
self.assertRaises(SystemExit, self.mock_main) self.assertRaises(SystemExit, self.mock_main)
mock_agent.assert_calls([ mock_agent.assert_calls([
mock.call('integ_br', '10.0.0.1', '172.16.0.1', 16634, 'helper') mock.call('integ_br', '10.0.0.1', '172.16.0.1', 16634, 2,
]) 'helper'),
mock_exit.assert_calls([ mock.call().daemon_loop()
mock.call(0)
]) ])
def test_main_raise(self): def test_main_raise(self):
@ -490,7 +601,8 @@ class TestRyuQuantumAgent(RyuAgentTestCase):
self.assertRaises(SystemExit, self.mock_main) self.assertRaises(SystemExit, self.mock_main)
mock_agent.assert_calls([ mock_agent.assert_calls([
mock.call('integ_br', '10.0.0.1', '172.16.0.1', 16634, 'helper') mock.call('integ_br', '10.0.0.1', '172.16.0.1', 16634, 2,
'helper')
]) ])
mock_exit.assert_calls([ mock_exit.assert_calls([
mock.call(1) mock.call(1)

View File

@ -0,0 +1,96 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
from quantum.api.v2 import attributes
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.plugins.ryu.db import api_v2 as api_db_v2
from quantum.tests.unit import test_extension_security_group as test_sg
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
PLUGIN_NAME = ('quantum.plugins.ryu.'
'ryu_quantum_plugin.RyuQuantumPluginV2')
AGENT_NAME = ('quantum.plugins.ryu.'
'agent.ryu_quantum_agent.OVSQuantumOFPRyuAgent')
NOTIFIER = ('quantum.plugins.ryu.'
'ryu_quantum_plugin.AgentNotifierApi')
class RyuSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
_plugin_name = PLUGIN_NAME
def setUp(self, plugin=None):
self.addCleanup(mock.patch.stopall)
notifier_p = mock.patch(NOTIFIER)
notifier_cls = notifier_p.start()
self.notifier = mock.Mock()
notifier_cls.return_value = self.notifier
self._attribute_map_bk_ = {}
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
self._attribute_map_bk_[item] = (attributes.
RESOURCE_ATTRIBUTE_MAP[item].
copy())
super(RyuSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
def tearDown(self):
super(RyuSecurityGroupsTestCase, self).tearDown()
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
class TestRyuSecurityGroups(RyuSecurityGroupsTestCase,
test_sg.TestSecurityGroups,
test_sg_rpc.SGNotificationTestMixin):
def test_security_group_get_port_from_device(self):
with contextlib.nested(self.network(),
self.security_group()) as (n, sg):
with self.subnet(n):
security_group_id = sg['security_group']['id']
res = self._create_port(self.fmt, n['network']['id'])
port = self.deserialize(self.fmt, res)
fixed_ips = port['port']['fixed_ips']
data = {'port': {'fixed_ips': fixed_ips,
'name': port['port']['name'],
ext_sg.SECURITYGROUPS:
[security_group_id]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.api))
port_id = res['port']['id']
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device(port_id)
self.assertEqual(port_id, port_dict['id'])
self.assertEqual([security_group_id],
port_dict[ext_sg.SECURITYGROUPS])
self.assertEqual([], port_dict['security_group_rules'])
self.assertEqual([fixed_ips[0]['ip_address']],
port_dict['fixed_ips'])
self._delete('ports', port_id)
def test_security_group_get_port_from_device_with_no_port(self):
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin.callbacks.get_port_from_device('bad_device_id')
self.assertEqual(None, port_dict)
class TestRyuSecurityGroupsXML(TestRyuSecurityGroups):
fmt = 'xml'