Implements a redis client for publishing security group rules for ports
connected to briged networks.
This commit is contained in:
Matt Dietz
2014-09-21 01:37:36 +00:00
parent 61150032af
commit e5ef78fa40
13 changed files with 424 additions and 71 deletions

View File

@@ -16,6 +16,8 @@
from neutron.openstack.common import log as logging
from quark import network_strategy
from quark.security_groups import redis_client
STRATEGY = network_strategy.STRATEGY
LOG = logging.getLogger(__name__)
@@ -59,6 +61,13 @@ class UnmanagedDriver(object):
def update_port(self, context, port_id, **kwargs):
LOG.info("update_port %s %s" % (context.tenant_id, port_id))
if "security_groups" in kwargs:
client = redis_client.Client()
payload = client.serialize(kwargs["security_groups"])
client.apply_rules(kwargs["device_id"], kwargs["mac_address"],
payload)
return {"uuid": port_id}
def delete_port(self, context, port_id, **kwargs):

View File

@@ -105,3 +105,17 @@ class IPPolicyInUse(exceptions.InUse):
class DriverLimitReached(exceptions.InvalidInput):
message = _("Driver has reached limit on resource '%(limit)s'")
class SecurityGroupsNotImplemented(exceptions.InvalidInput):
message = _("Security Groups are not currently implemented on port "
"create")
class TenantNetworkSecurityGroupsNotImplemented(exceptions.InvalidInput):
message = _("Security Groups are not currently implemented for "
"tenant networks")
class SecurityGroupsCouldNotBeApplied(exceptions.NeutronException):
message = _("There was an error applying security groups to the port.")

View File

@@ -62,7 +62,6 @@ def create_port(context, port):
port_id = uuidutils.generate_uuid()
net = db_api.network_find(context, id=net_id, scope=db_api.ONE)
if not net:
raise exceptions.NetworkNotFound(net_id=net_id)
@@ -83,7 +82,6 @@ def create_port(context, port):
segment_id = None
port_count = db_api.port_count_all(context, network_id=[net_id],
tenant_id=[context.tenant_id])
quota.QUOTAS.limit_check(
context, context.tenant_id,
ports_per_network=port_count + 1)
@@ -92,10 +90,17 @@ def create_port(context, port):
raise q_exc.AmbiguousNetworkId(net_id=net_id)
ipam_driver = ipam.IPAM_REGISTRY.get_strategy(net["ipam_strategy"])
net_driver = registry.DRIVER_REGISTRY.get_driver(net["network_plugin"])
group_ids, security_groups = _make_security_group_list(
context, port["port"].pop("security_groups", None))
net_driver = registry.DRIVER_REGISTRY.get_driver(net["network_plugin"])
# TODO(anyone): security groups are not currently supported on port create,
# nor on isolated networks today. Please see RM8615
security_groups = utils.pop_param(port_attrs, "security_groups")
if security_groups:
raise q_exc.SecurityGroupsNotImplemented()
group_ids, security_groups = _make_security_group_list(context,
security_groups)
quota.QUOTAS.limit_check(context, context.tenant_id,
security_groups_per_port=len(group_ids))
addresses = []
@@ -230,8 +235,15 @@ def update_port(context, id, port):
utils.filter_body(context, port_dict, admin_only=admin_only,
always_filter=always_filter)
group_ids, security_groups = _make_security_group_list(
context, port_dict.pop("security_groups", None))
# TODO(anyone): security groups are not currently supported on port create,
# nor on isolated networks today. Please see RM8615
security_groups = utils.pop_param(port_dict, "security_groups")
if security_groups:
if not STRATEGY.is_parent_network(port_db["network_id"]):
raise q_exc.TenantNetworkSecurityGroupsNotImplemented()
group_ids, security_groups = _make_security_group_list(context,
security_groups)
quota.QUOTAS.limit_check(context, context.tenant_id,
security_groups_per_port=len(group_ids))
@@ -295,8 +307,15 @@ def update_port(context, id, port):
net_driver = registry.DRIVER_REGISTRY.get_driver(
port_db.network["network_plugin"])
net_driver.update_port(context, port_id=port_db.backend_key,
security_groups=group_ids)
# TODO(anyone): What do we want to have happen here if this fails? Is it
# ok to continue to keep the IPs but fail to apply security
# groups? Is there a clean way to have a multi-status? Since
# we're in a beta-y status, I'm going to let this sit for
# a future patch where we have time to solve it well.
net_driver.update_port(context, port_id=port_db["backend_key"],
mac_address=port_db["mac_address"],
security_groups=security_groups)
port_dict["security_groups"] = security_groups

View File

@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import exceptions
from neutron.extensions import securitygroup as sg_ext
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
@@ -29,9 +30,14 @@ DEFAULT_SG_UUID = "00000000-0000-0000-0000-000000000000"
def _validate_security_group_rule(context, rule):
# TODO(mdietz): As per RM8615, Remote groups are not currently supported
if rule.get("remote_group_id"):
raise exceptions.InvalidInput(
error_message="Remote groups are not currently supported")
if rule.get("remote_ip_prefix") and rule.get("remote_group_id"):
raise sg_ext.SecurityGroupRemoteGroupAndRemoteIpPrefix()
if "direction" in rule and rule["direction"] != "ingress":
raise exceptions.InvalidInput(
error_message="Non-ingress rules are not currently supported")
protocol = rule.pop('protocol')
port_range_min = rule['port_range_min']

View File

@@ -52,6 +52,8 @@ MIN_PROTOCOL = 0
MAX_PROTOCOL = 255
REVERSE_PROTOCOLS = {}
REVERSE_ETHERTYPES = {}
MIN_PORT = 0
MAX_PORT = 65535
def _is_allowed(protocol, ethertype):
@@ -100,17 +102,23 @@ def human_readable_protocol(protocol, ethertype):
def validate_protocol_with_port_ranges(protocol, port_range_min,
port_range_max):
if protocol in ALLOWED_WITH_RANGE:
# TODO(mdietz) Allowed with range makes little sense. TCP without
# a port range means what, exactly?
# TODO(anyone): what exactly is a TCP or UDP rule without ports?
if (port_range_min is None) != (port_range_max is None):
raise exceptions.InvalidInput(
error_message="For TCP/UDP rules, port_range_min and"
"port_range_max must either both be supplied, "
"or neither of them")
if port_range_min is not None and port_range_max is not None:
if port_range_min > port_range_max:
raise sg_ext.SecurityGroupInvalidPortRange()
if port_range_min < MIN_PORT or port_range_max > MAX_PORT:
raise exceptions.InvalidInput(
error_message="port_range_min and port_range_max must be "
">= %s and <= %s" % (MIN_PORT, MAX_PORT))
else:
if port_range_min or port_range_max:
raise exceptions.InvalidInput(
error_message=("You may not supply ports for the requested "
"protocol"))

View File

View File

@@ -0,0 +1,128 @@
# Copyright 2014 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
#
import json
import uuid
import netaddr
from neutron.openstack.common import log as logging
from oslo.config import cfg
import redis
from quark import exceptions as q_exc
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
quark_opts = [
cfg.StrOpt('redis_security_groups_host',
default='127.0.0.1',
help=_("The server to write security group rules to")),
cfg.IntOpt('redis_security_groups_port',
default=6379,
help=_("The port for the redis server"))]
CONF.register_opts(quark_opts, "QUARK")
class Client(object):
def __init__(self):
host = CONF.QUARK.redis_security_groups_host
port = CONF.QUARK.redis_security_groups_port
# NOTE: this is a naive implementation. The redis module
# also supports connection pooling, which may be necessary
# going forward, but we'll roll with this for now.
try:
self._client = redis.Redis(host=host, port=port)
except redis.ConnectionError as e:
LOG.exception(e)
raise q_exc.SecurityGroupsCouldNotBeApplied()
def serialize(self, groups):
"""Creates a payload for the redis server
The rule schema is the following:
REDIS KEY - port_device_id.port_mac_address
REDIS VALUE - A JSON dump of the following:
{"id": "<arbitrary uuid>",
"rules": [
{"ethertype": <hexademical integer>,
"protocol": <integer>,
"port start": <integer>,
"port end": <integer>,
"source network": <string>,
"destination network": <string>,
"action": <string>,
"direction": <string>},
]
}
Example:
{"id": "004c6369-9f3d-4d33-b8f5-9416bf3567dd",
"rules": [
{"ethertype": 0x800,
"protocol": "tcp",
"port start": 1000,
"port end": 1999,
"source network": "10.10.10.0/24",
"destination network": "",
"action": "allow",
"direction": "ingress"},
]
}
"""
rule_uuid = str(uuid.uuid4())
rule_dict = {"id": rule_uuid, "rules": []}
# Action and direction are static, for now. The implementation may
# support 'deny' and 'egress' respectively in the future
for group in groups:
for rule in group.rules:
direction = "ingress"
source = ''
destination = ''
if rule["remote_ip_prefix"]:
if direction == "ingress":
source = rule["remote_ip_prefix"]
else:
destination = rule["remote_ip_prefix"]
rule_dict["rules"].append(
{"ethertype": rule["ethertype"],
"protocol": rule["protocol"],
"port start": rule["port_range_min"],
"port end": rule["port_range_max"],
"source network": source,
"destination network": destination,
"action": "allow",
"direction": "ingress"})
return rule_dict
def rule_key(self, device_id, mac_address):
return "{0}.{1}".format(device_id, str(netaddr.EUI(mac_address)))
def apply_rules(self, device_id, mac_address, rules):
"""Writes a series of security group rules to a redis server."""
redis_key = self.rule_key(device_id, mac_address)
try:
self._client.set(redis_key, json.dumps(rules))
except redis.ConnectionError as e:
LOG.exception(e)
raise q_exc.SecurityGroupsCouldNotBeApplied()

View File

@@ -19,7 +19,6 @@ import json
import mock
from neutron.api.v2 import attributes as neutron_attrs
from neutron.common import exceptions
from neutron.extensions import securitygroup as sg_ext
from oslo.config import cfg
from quark.db import models
@@ -244,7 +243,7 @@ class TestQuarkCreatePortFailure(test_quark_plugin.TestQuarkPlugin):
class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port=None, network=None, addr=None, mac=None,
limit_raise=False):
limit_checks=None):
if network:
network["network_plugin"] = "BASE"
network["ipam_strategy"] = "ANY"
@@ -268,8 +267,8 @@ class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin):
alloc_ip.return_value = addr
alloc_mac.return_value = mac
port_count.return_value = 0
if limit_raise:
limit_check.side_effect = exceptions.OverQuota
if limit_checks:
limit_check.side_effect = limit_checks
yield port_create
def test_create_port(self):
@@ -389,7 +388,7 @@ class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin):
with self.assertRaises(exceptions.NetworkNotFound):
self.plugin.create_port(self.context, port)
def test_create_port_security_groups(self, groups=[1]):
def test_create_port_security_groups_raises(self, groups=[1]):
network = dict(id=1)
mac = dict(address="AA:BB:CC:DD:EE:FF")
port_name = "foobar"
@@ -400,54 +399,10 @@ class TestQuarkCreatePort(test_quark_plugin.TestQuarkPlugin):
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
name=port_name, security_groups=[group]))
expected = {'status': "ACTIVE",
'name': port_name,
'device_owner': None,
'mac_address': mac["address"],
'network_id': network["id"],
'tenant_id': self.context.tenant_id,
'admin_state_up': None,
'fixed_ips': [],
'security_groups': groups,
'device_id': 2}
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac) as port_create:
with mock.patch("quark.db.api.security_group_find") as group_find:
group_find.return_value = (groups and group)
port["port"]["security_groups"] = groups or [1]
result = self.plugin.create_port(self.context, port)
self.assertTrue(port_create.called)
for key in expected.keys():
self.assertEqual(result[key], expected[key])
def test_create_port_security_groups_not_found(self):
with self.assertRaises(sg_ext.SecurityGroupNotFound):
self.test_create_port_security_groups([])
def test_create_port_security_groups_over_quota(self):
network = dict(id=1)
mac = dict(address="AA:BB:CC:DD:EE:FF")
port_name = "foobar"
ip = dict()
groups = []
group_ids = range(6)
for gid in group_ids:
group = models.SecurityGroup()
group.update({'id': gid, 'tenant_id': self.context.tenant_id,
'name': 'foo', 'description': 'bar'})
groups.append(group)
port = dict(port=dict(mac_address=mac["address"], network_id=1,
tenant_id=self.context.tenant_id, device_id=2,
name=port_name, security_groups=groups))
with self._stubs(port=port["port"], network=network, addr=ip,
mac=mac, limit_raise=True):
with mock.patch("quark.db.api.security_group_find") as group_find:
group_find.return_value = groups
port["port"]["security_groups"] = groups
with self.assertRaises(exceptions.OverQuota):
mac=mac):
with mock.patch("quark.db.api.security_group_find"):
with self.assertRaises(q_exc.SecurityGroupsNotImplemented):
self.plugin.create_port(self.context, port)
@@ -495,7 +450,7 @@ class TestQuarkPortCreateQuota(test_quark_plugin.TestQuarkPlugin):
class TestQuarkUpdatePort(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port, new_ips=None):
def _stubs(self, port, new_ips=None, parent_net=False):
port_model = None
if port:
net_model = models.Network()
@@ -503,12 +458,13 @@ class TestQuarkUpdatePort(test_quark_plugin.TestQuarkPlugin):
port_model = models.Port()
port_model.network = net_model
port_model.update(port)
with contextlib.nested(
mock.patch("quark.db.api.port_find"),
mock.patch("quark.db.api.port_update"),
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
mock.patch("quark.ipam.QuarkIpam.deallocate_ips_by_port"),
mock.patch("neutron.quota.QuotaEngine.limit_check")
mock.patch("neutron.quota.QuotaEngine.limit_check"),
) as (port_find, port_update, alloc_ip, dealloc_ip, limit_check):
port_find.return_value = port_model
port_update.return_value = port_model
@@ -592,6 +548,63 @@ class TestQuarkUpdatePort(test_quark_plugin.TestQuarkPlugin):
self.assertEqual(alloc_ip.call_count, 1)
class TestQuarkUpdatePortSecurityGroups(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port, new_ips=None, parent_net=False):
port_model = None
sg_mod = models.SecurityGroup()
if port:
net_model = models.Network()
net_model["network_plugin"] = "BASE"
port_model = models.Port()
port_model.network = net_model
port_model.update(port)
port_model["security_groups"].append(sg_mod)
with contextlib.nested(
mock.patch("quark.db.api.port_find"),
mock.patch("quark.db.api.port_update"),
mock.patch("quark.ipam.QuarkIpam.allocate_ip_address"),
mock.patch("quark.ipam.QuarkIpam.deallocate_ips_by_port"),
mock.patch("neutron.quota.QuotaEngine.limit_check"),
mock.patch("quark.plugin_modules.ports.STRATEGY"
".is_parent_network"),
mock.patch("quark.db.api.security_group_find")
) as (port_find, port_update, alloc_ip, dealloc_ip, limit_check,
net_strat, sg_find):
port_find.return_value = port_model
port_update.return_value = port_model
if new_ips:
alloc_ip.return_value = new_ips
net_strat.return_value = parent_net
sg_find.return_value = sg_mod
yield port_find, port_update, alloc_ip, dealloc_ip, sg_find
def test_update_port_security_groups_on_tenant_net_raises(self):
with self._stubs(
port=dict(id=1)
) as (port_find, port_update, alloc_ip, dealloc_ip, sg_find):
new_port = dict(port=dict(name="ourport",
security_groups=[1]))
with self.assertRaises(
q_exc.TenantNetworkSecurityGroupsNotImplemented):
self.plugin.update_port(self.context, 1, new_port)
def test_update_port_security_groups(self):
with self._stubs(
port=dict(id=1), parent_net=True
) as (port_find, port_update, alloc_ip, dealloc_ip, sg_find):
new_port = dict(port=dict(name="ourport",
security_groups=[1]))
port = self.plugin.update_port(self.context, 1, new_port)
port_update.assert_called_once_with(
self.context,
port_find(),
name="ourport",
security_groups=[sg_find()])
self.assertEqual(sg_find()["id"], port["security_groups"][0])
class TestQuarkUpdatePortSetsIps(test_quark_plugin.TestQuarkPlugin):
@contextlib.contextmanager
def _stubs(self, port, new_ips=None):

View File

@@ -315,12 +315,27 @@ class TestQuarkCreateSecurityGroupRule(test_quark_plugin.TestQuarkPlugin):
self._test_create_security_rule(remote_ip_prefix='192.168.0.1')
def test_create_security_rule_remote_group(self):
with self.assertRaises(exceptions.InvalidInput):
self._test_create_security_rule(remote_group_id=2)
def test_create_security_rule_port_range_invalid_ranges_fails(self):
with self.assertRaises(exceptions.InvalidInput):
self._test_create_security_rule(protocol=6, port_range_min=0)
def test_create_security_rule_min_under_port_min(self):
with self.assertRaises(exceptions.InvalidInput):
self._test_create_security_rule(protocol=6, port_range_min=-1,
port_range_max=10)
def test_create_security_rule_egress_raises(self):
with self.assertRaises(exceptions.InvalidInput):
self._test_create_security_rule(protocol=6, direction="egress")
def test_create_security_rule_max_over_port_max(self):
with self.assertRaises(exceptions.InvalidInput):
self._test_create_security_rule(protocol=6, port_range_min=0,
port_range_max=65537)
def test_create_security_rule_remote_conflicts(self):
with self.assertRaises(Exception): # noqa
self._test_create_security_rule(remote_ip_prefix='192.168.0.1',

View File

View File

@@ -0,0 +1,117 @@
# Copyright 2014 Openstack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
#
import uuid
import mock
import netaddr
import redis
from quark.db import models
from quark import exceptions as q_exc
from quark.security_groups import redis_client
from quark.tests import test_base
class TestRedisSerialization(test_base.TestBase):
def setUp(self):
super(TestRedisSerialization, self).setUp()
@mock.patch("quark.security_groups.redis_client.redis.Redis")
def test_redis_key(self, redis):
client = redis_client.Client()
device_id = str(uuid.uuid4())
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF")
redis_key = client.rule_key(device_id, mac_address.value)
expected = "%s.%s" % (device_id, str(mac_address))
self.assertEqual(expected, redis_key)
@mock.patch("quark.security_groups.redis_client.Client.rule_key")
@mock.patch("quark.security_groups.redis_client.redis.Redis")
def test_apply_rules(self, rule_key, redis):
client = redis_client.Client()
port_id = 1
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF")
client.apply_rules(port_id, mac_address.value, [])
self.assertTrue(client._client.set.called)
def test_client_connection_fails_gracefully(self):
conn_err = redis.ConnectionError
with mock.patch("redis.Redis") as redis_mock:
redis_mock.side_effect = conn_err
with self.assertRaises(q_exc.SecurityGroupsCouldNotBeApplied):
redis_client.Client()
def test_apply_rules_set_fails_gracefully(self):
port_id = 1
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF")
conn_err = redis.ConnectionError
with mock.patch("redis.Redis") as redis_mock:
client = redis_client.Client()
redis_mock.set.side_effect = conn_err
client.apply_rules(port_id, mac_address.value, [])
@mock.patch("quark.security_groups.redis_client.redis.Redis")
def test_serialize_group_no_rules(self, redis):
client = redis_client.Client()
group = models.SecurityGroup()
payload = client.serialize([group])
self.assertTrue(payload.get("id") is not None)
self.assertEqual([], payload.get("rules"))
@mock.patch("quark.security_groups.redis_client.redis.Redis")
def test_serialize_group_with_rules(self, redis):
rule_dict = {"ethertype": 0x800, "protocol": 6, "port_range_min": 80,
"port_range_max": 443}
client = redis_client.Client()
group = models.SecurityGroup()
rule = models.SecurityGroupRule()
rule.update(rule_dict)
group.rules.append(rule)
payload = client.serialize([group])
self.assertTrue(payload.get("id") is not None)
rule = payload["rules"][0]
self.assertEqual(0x800, rule["ethertype"])
self.assertEqual(6, rule["protocol"])
self.assertEqual(80, rule["port start"])
self.assertEqual(443, rule["port end"])
self.assertEqual("allow", rule["action"])
self.assertEqual("ingress", rule["direction"])
self.assertEqual("", rule["source network"])
self.assertEqual("", rule["destination network"])
@mock.patch("quark.security_groups.redis_client.redis.Redis")
def test_serialize_group_with_rules_and_remote_network(self, redis):
rule_dict = {"ethertype": 0x800, "protocol": 1,
"remote_ip_prefix": "192.168.0.0/24"}
client = redis_client.Client()
group = models.SecurityGroup()
rule = models.SecurityGroupRule()
rule.update(rule_dict)
group.rules.append(rule)
payload = client.serialize([group])
self.assertTrue(payload.get("id") is not None)
rule = payload["rules"][0]
self.assertEqual(0x800, rule["ethertype"])
self.assertEqual(1, rule["protocol"])
self.assertEqual(None, rule["port start"])
self.assertEqual(None, rule["port end"])
self.assertEqual("allow", rule["action"])
self.assertEqual("ingress", rule["direction"])
self.assertEqual("192.168.0.0/24", rule["source network"])
self.assertEqual("", rule["destination network"])

View File

@@ -14,6 +14,10 @@
# under the License.
import json
import uuid
import mock
import netaddr
from quark.drivers import unmanaged
from quark import network_strategy
@@ -60,6 +64,25 @@ class TestUnmanagedDriver(test_base.TestBase):
self.driver.update_port(context=self.context,
network_id="public_network", port_id=2)
@mock.patch("quark.security_groups.redis_client.Client")
def test_update_port_with_security_groups(self, redis_cli):
mock_client = mock.MagicMock()
redis_cli.return_value = mock_client
port_id = str(uuid.uuid4())
device_id = str(uuid.uuid4())
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value
security_groups = [str(uuid.uuid4())]
payload = {}
mock_client.serialize.return_value = payload
self.driver.update_port(
context=self.context, network_id="public_network", port_id=port_id,
device_id=device_id, mac_address=mac_address,
security_groups=security_groups)
mock_client.serialize.assert_called_once_with(security_groups)
mock_client.apply_rules.assert_called_once_with(
device_id, mac_address, payload)
def test_delete_port(self):
self.driver.delete_port(context=self.context, port_id=2)

View File

@@ -8,6 +8,7 @@ http://tarballs.openstack.org/neutron/neutron-master.tar.gz#egg=neutron
aiclib
gunicorn
pymysql>=0.6.2
redis==2.10.3
# NOTE(jkoelker) not technically required, but something has to commit
# the transactions. in the future this should be the