Refactoring Redis ClientBase out of security groups
To make room for other uses of the Redis cache, it is necessary to refactor the core components of a ClientBase out of the existing Security Groups Redis client. Afterwards I created a child SecurityGroupsClient to implement the Security Groups specific features.
This commit is contained in:
188
quark/cache/redis_base.py
vendored
Normal file
188
quark/cache/redis_base.py
vendored
Normal file
@@ -0,0 +1,188 @@
|
||||
# Copyright 2014 Openstack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
#
|
||||
|
||||
import functools
|
||||
import json
|
||||
import string
|
||||
|
||||
import netaddr
|
||||
from neutron.openstack.common import log as logging
|
||||
from oslo.config import cfg
|
||||
import redis
|
||||
import redis.sentinel
|
||||
|
||||
from quark import exceptions as q_exc
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
MAC_TRANS_TABLE = string.maketrans(string.ascii_uppercase,
|
||||
string.ascii_lowercase)
|
||||
|
||||
quark_opts = [
|
||||
cfg.StrOpt('redis_host',
|
||||
default='127.0.0.1',
|
||||
help=_("The server to write redis data to or"
|
||||
" retrieve sentinel information from, as appropriate")),
|
||||
cfg.IntOpt('redis_port',
|
||||
default=6379,
|
||||
help=_("The port for the redis server to write redis data to or"
|
||||
" retrieve sentinel information from, as appropriate")),
|
||||
cfg.BoolOpt("redis_use_sentinels",
|
||||
default=False,
|
||||
help=_("Tell the redis client to use sentinels rather than a "
|
||||
"direct connection")),
|
||||
cfg.ListOpt("redis_sentinel_hosts",
|
||||
default=["localhost:26397"],
|
||||
help=_("Comma-separated list of host:port pairs for Redis "
|
||||
"sentinel hosts")),
|
||||
cfg.StrOpt("redis_sentinel_master",
|
||||
default='',
|
||||
help=_("The name label of the master redis sentinel")),
|
||||
cfg.StrOpt("redis_password",
|
||||
default='',
|
||||
help=_("The password for authenticating with redis.")),
|
||||
cfg.StrOpt("redis_db",
|
||||
default="0",
|
||||
help=("The database number to use")),
|
||||
cfg.FloatOpt("redis_socket_timeout",
|
||||
default=0.1,
|
||||
help=("Timeout for Redis socket operations"))]
|
||||
|
||||
CONF.register_opts(quark_opts, "QUARK")
|
||||
|
||||
# TODO(mdietz): Rewrite this to use a module level connection
|
||||
# pool, and then incorporate that into creating
|
||||
# connections. When connecting to a master we
|
||||
# connect by creating a redis client, and when
|
||||
# we connect to a slave, we connect by telling it
|
||||
# we want a slave and ending up with a connection,
|
||||
# with no control over SSL or anything else. :-|
|
||||
|
||||
|
||||
def handle_connection_error(fn):
|
||||
@functools.wraps(fn)
|
||||
def wrapped(*args, **kwargs):
|
||||
try:
|
||||
return fn(*args, **kwargs)
|
||||
except redis.ConnectionError as e:
|
||||
LOG.exception(e)
|
||||
raise q_exc.RedisConnectionFailure()
|
||||
return wrapped
|
||||
|
||||
|
||||
class ClientBase(object):
|
||||
connection_pool = None
|
||||
|
||||
def __init__(self, use_master=False):
|
||||
self._use_master = use_master
|
||||
|
||||
try:
|
||||
if CONF.QUARK.redis_use_sentinels:
|
||||
self._compile_sentinel_list()
|
||||
self._ensure_connection_pool_exists(use_master)
|
||||
self._client = self._client()
|
||||
except redis.ConnectionError as e:
|
||||
LOG.exception(e)
|
||||
raise q_exc.RedisConnectionFailure()
|
||||
|
||||
def _ensure_connection_pool_exists(self, use_master):
|
||||
if not ClientBase.connection_pool:
|
||||
LOG.info("Creating redis connection pool for the first time...")
|
||||
host = CONF.QUARK.redis_host
|
||||
port = CONF.QUARK.redis_port
|
||||
|
||||
connect_kw = {}
|
||||
if CONF.QUARK.redis_password:
|
||||
connect_kw["password"] = CONF.QUARK.redis_password
|
||||
|
||||
connect_args = []
|
||||
|
||||
klass = redis.ConnectionPool
|
||||
if CONF.QUARK.redis_use_sentinels:
|
||||
connect_args.append(CONF.QUARK.redis_sentinel_master)
|
||||
klass = redis.sentinel.SentinelConnectionPool
|
||||
connect_args.append(
|
||||
redis.sentinel.Sentinel(self._sentinel_list))
|
||||
connect_kw["check_connection"] = True
|
||||
connect_kw["is_master"] = use_master
|
||||
LOG.info("Using redis sentinel connections %s" %
|
||||
self._sentinel_list)
|
||||
else:
|
||||
connect_kw["host"] = host
|
||||
connect_kw["port"] = port
|
||||
LOG.info("Using redis host %s:%s" % (host, port))
|
||||
|
||||
ClientBase.connection_pool = klass(*connect_args,
|
||||
**connect_kw)
|
||||
|
||||
def _compile_sentinel_list(self):
|
||||
self._sentinel_list = [tuple(host.split(':'))
|
||||
for host in CONF.QUARK.redis_sentinel_hosts]
|
||||
if not self._sentinel_list:
|
||||
raise TypeError("sentinel_list is not a properly formatted"
|
||||
"list of 'host:port' pairs")
|
||||
|
||||
def _client(self):
|
||||
kwargs = {"connection_pool": ClientBase.connection_pool,
|
||||
"db": CONF.QUARK.redis_db,
|
||||
"socket_timeout": CONF.QUARK.redis_socket_timeout}
|
||||
return redis.StrictRedis(**kwargs)
|
||||
|
||||
def vif_key(self, device_id, mac_address):
|
||||
mac = str(netaddr.EUI(mac_address))
|
||||
|
||||
# Lower cases and strips hyphens from the mac
|
||||
mac = mac.translate(MAC_TRANS_TABLE, ":-")
|
||||
return "{0}.{1}".format(device_id, mac)
|
||||
|
||||
@handle_connection_error
|
||||
def echo(self, echo_str):
|
||||
return self._client.echo(echo_str)
|
||||
|
||||
@handle_connection_error
|
||||
def vif_keys(self, field=None):
|
||||
keys = self._client.keys("*.????????????")
|
||||
filtered = []
|
||||
if isinstance(keys, str):
|
||||
keys = [keys]
|
||||
for key in keys:
|
||||
value = None
|
||||
if field:
|
||||
value = self._client.hget(key, field)
|
||||
else:
|
||||
value = self._client.hgetall(key)
|
||||
if value:
|
||||
filtered.append(key)
|
||||
return filtered
|
||||
|
||||
@handle_connection_error
|
||||
def set_field(self, key, field, data):
|
||||
return self._client.hset(key, field, json.dumps(data))
|
||||
|
||||
@handle_connection_error
|
||||
def get_field(self, key, field):
|
||||
return self._client.hget(key, field)
|
||||
|
||||
@handle_connection_error
|
||||
def delete_field(self, key, field):
|
||||
return self._client.hdel(key, field)
|
||||
|
||||
@handle_connection_error
|
||||
def get_fields(self, keys, field):
|
||||
p = self._client.pipeline()
|
||||
for key in keys:
|
||||
p.hget(key, field)
|
||||
return p.execute()
|
||||
170
quark/cache/security_groups_client.py
vendored
Normal file
170
quark/cache/security_groups_client.py
vendored
Normal file
@@ -0,0 +1,170 @@
|
||||
# Copyright 2014 Openstack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
#
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import netaddr
|
||||
from neutron.openstack.common import log as logging
|
||||
|
||||
from quark.cache import redis_base
|
||||
from quark import exceptions as q_exc
|
||||
from quark import protocols
|
||||
from quark import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
SECURITY_GROUP_VERSION_UUID_KEY = "id"
|
||||
SECURITY_GROUP_RULE_KEY = "rules"
|
||||
SECURITY_GROUP_HASH_ATTR = "security group rules"
|
||||
|
||||
|
||||
class SecurityGroupsClient(redis_base.ClientBase):
|
||||
def serialize_rules(self, rules):
|
||||
"""Creates a payload for the redis server."""
|
||||
# TODO(mdietz): If/when we support other rule types, this comment
|
||||
# will have to be revised.
|
||||
# Action and direction are static, for now. The implementation may
|
||||
# support 'deny' and 'egress' respectively in the future. We allow
|
||||
# the direction to be set to something else, technically, but current
|
||||
# plugin level call actually raises. It's supported here for unit
|
||||
# test purposes at this time
|
||||
serialized = []
|
||||
for rule in rules:
|
||||
direction = rule["direction"]
|
||||
source = ''
|
||||
destination = ''
|
||||
if rule["remote_ip_prefix"]:
|
||||
if direction == "ingress":
|
||||
source = netaddr.IPNetwork(rule["remote_ip_prefix"])
|
||||
source = str(source.ipv6())
|
||||
else:
|
||||
destination = netaddr.IPNetwork(
|
||||
rule["remote_ip_prefix"])
|
||||
destination = str(destination.ipv6())
|
||||
|
||||
optional_fields = {}
|
||||
|
||||
# NOTE(mdietz): this will expand as we add more protocols
|
||||
if rule["protocol"] == protocols.PROTOCOLS["icmp"]:
|
||||
optional_fields["icmp type"] = rule["port_range_min"]
|
||||
optional_fields["icmp code"] = rule["port_range_max"]
|
||||
else:
|
||||
optional_fields["port start"] = rule["port_range_min"]
|
||||
optional_fields["port end"] = rule["port_range_max"]
|
||||
|
||||
payload = {"ethertype": rule["ethertype"],
|
||||
"protocol": rule["protocol"],
|
||||
"source network": source,
|
||||
"destination network": destination,
|
||||
"action": "allow",
|
||||
"direction": "ingress"}
|
||||
payload.update(optional_fields)
|
||||
serialized.append(payload)
|
||||
return serialized
|
||||
|
||||
def serialize_groups(self, groups):
|
||||
"""Creates a payload for the redis server
|
||||
|
||||
The rule schema is the following:
|
||||
|
||||
REDIS KEY - port_device_id.port_mac_address/sg
|
||||
REDIS VALUE - A JSON dump of the following:
|
||||
|
||||
port_mac_address must be lower-cased and stripped of non-alphanumeric
|
||||
characters
|
||||
|
||||
{"id": "<arbitrary uuid>",
|
||||
"rules": [
|
||||
{"ethertype": <hexademical integer>,
|
||||
"protocol": <integer>,
|
||||
"port start": <integer>, # optional
|
||||
"port end": <integer>, # optional
|
||||
"icmp type": <integer>, # optional
|
||||
"icmp code": <integer>, # optional
|
||||
"source network": <string>,
|
||||
"destination network": <string>,
|
||||
"action": <string>,
|
||||
"direction": <string>},
|
||||
]
|
||||
}
|
||||
|
||||
Example:
|
||||
{"id": "004c6369-9f3d-4d33-b8f5-9416bf3567dd",
|
||||
"rules": [
|
||||
{"ethertype": 0x800,
|
||||
"protocol": "tcp",
|
||||
"port start": 1000,
|
||||
"port end": 1999,
|
||||
"source network": "10.10.10.0/24",
|
||||
"destination network": "",
|
||||
"action": "allow",
|
||||
"direction": "ingress"},
|
||||
]
|
||||
}
|
||||
|
||||
port start/end and icmp type/code are mutually exclusive pairs.
|
||||
"""
|
||||
rules = []
|
||||
for group in groups:
|
||||
rules.extend(self.serialize_rules(group.rules))
|
||||
return rules
|
||||
|
||||
def get_rules_for_port(self, device_id, mac_address):
|
||||
rules = self.get_field(
|
||||
self.vif_key(device_id, mac_address), SECURITY_GROUP_HASH_ATTR)
|
||||
if rules:
|
||||
return json.loads(rules)
|
||||
|
||||
def apply_rules(self, device_id, mac_address, rules):
|
||||
"""Writes a series of security group rules to a redis server."""
|
||||
LOG.info("Applying security group rules for device %s with MAC %s" %
|
||||
(device_id, mac_address))
|
||||
if not self._use_master:
|
||||
raise q_exc.RedisSlaveWritesForbidden()
|
||||
|
||||
ruleset_uuid = str(uuid.uuid4())
|
||||
rule_dict = {SECURITY_GROUP_VERSION_UUID_KEY: ruleset_uuid,
|
||||
SECURITY_GROUP_RULE_KEY: rules}
|
||||
redis_key = self.vif_key(device_id, mac_address)
|
||||
self.set_field(redis_key, SECURITY_GROUP_HASH_ATTR, rule_dict)
|
||||
|
||||
def delete_vif_rules(self, device_id, mac_address):
|
||||
# Redis DEL command will ignore key safely if it doesn't exist
|
||||
self.delete_field(self.vif_key(device_id, mac_address),
|
||||
SECURITY_GROUP_HASH_ATTR)
|
||||
|
||||
@utils.retry_loop(3)
|
||||
def get_security_groups(self, new_interfaces):
|
||||
"""Gets security groups for interfaces from Redis
|
||||
|
||||
Returns a dictionary of xapi.VIFs mapped to security group version
|
||||
UUIDs from a set of xapi.VIF.
|
||||
"""
|
||||
LOG.debug("Getting security groups from Redis for {0}".format(
|
||||
new_interfaces))
|
||||
new_interfaces = tuple(new_interfaces)
|
||||
vif_keys = [self.vif_key(vif.device_id, vif.mac_address)
|
||||
for vif in new_interfaces]
|
||||
security_groups = self.get_fields(vif_keys, SECURITY_GROUP_HASH_ATTR)
|
||||
|
||||
ret = {}
|
||||
for vif, security_group in zip(new_interfaces, security_groups):
|
||||
security_group_uuid = None
|
||||
if security_group:
|
||||
security_group_uuid = json.loads(security_group).get(
|
||||
SECURITY_GROUP_VERSION_UUID_KEY)
|
||||
ret[vif] = security_group_uuid
|
||||
return ret
|
||||
@@ -15,8 +15,8 @@
|
||||
|
||||
from neutron.openstack.common import log as logging
|
||||
|
||||
from quark.cache import security_groups_client as sg_client
|
||||
from quark import network_strategy
|
||||
from quark.security_groups import redis_client
|
||||
|
||||
|
||||
STRATEGY = network_strategy.STRATEGY
|
||||
@@ -63,7 +63,7 @@ class UnmanagedDriver(object):
|
||||
LOG.info("update_port %s %s" % (context.tenant_id, port_id))
|
||||
|
||||
if "security_groups" in kwargs:
|
||||
client = redis_client.Client(use_master=True)
|
||||
client = sg_client.SecurityGroupsClient(use_master=True)
|
||||
if kwargs["security_groups"]:
|
||||
payload = client.serialize_groups(kwargs["security_groups"])
|
||||
client.apply_rules(kwargs["device_id"], kwargs["mac_address"],
|
||||
|
||||
@@ -1,301 +0,0 @@
|
||||
# Copyright 2014 Openstack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
#
|
||||
|
||||
import json
|
||||
import string
|
||||
import uuid
|
||||
|
||||
import netaddr
|
||||
from neutron.openstack.common import log as logging
|
||||
from oslo.config import cfg
|
||||
import redis
|
||||
import redis.sentinel
|
||||
|
||||
from quark import exceptions as q_exc
|
||||
from quark import protocols
|
||||
from quark import utils
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
SECURITY_GROUP_VERSION_UUID_KEY = "id"
|
||||
SECURITY_GROUP_RULE_KEY = "rules"
|
||||
SECURITY_GROUP_HASH_ATTR = "security group rules"
|
||||
MAC_TRANS_TABLE = string.maketrans(string.ascii_uppercase,
|
||||
string.ascii_lowercase)
|
||||
|
||||
quark_opts = [
|
||||
cfg.StrOpt('redis_security_groups_host',
|
||||
default='127.0.0.1',
|
||||
help=_("The server to write security group rules to or "
|
||||
"retrieve sentinel information from, as appropriate")),
|
||||
cfg.IntOpt('redis_security_groups_port',
|
||||
default=6379,
|
||||
help=_("The port for the redis server to write rules to or "
|
||||
"retrieve sentinel information from, as appropriate")),
|
||||
cfg.BoolOpt("redis_use_sentinels",
|
||||
default=False,
|
||||
help=_("Tell the redis client to use sentinels rather than a "
|
||||
"direct connection")),
|
||||
cfg.ListOpt("redis_sentinel_hosts",
|
||||
default=["localhost:26397"],
|
||||
help=_("Comma-separated list of host:port pairs for Redis "
|
||||
"sentinel hosts")),
|
||||
cfg.StrOpt("redis_sentinel_master",
|
||||
default='',
|
||||
help=_("The name label of the master redis sentinel")),
|
||||
cfg.StrOpt("redis_password",
|
||||
default='',
|
||||
help=_("The password for authenticating with redis.")),
|
||||
cfg.StrOpt("redis_db",
|
||||
default="0",
|
||||
help=("The database number to use")),
|
||||
cfg.FloatOpt("redis_socket_timeout",
|
||||
default=0.1,
|
||||
help=("Timeout for Redis socket operations"))]
|
||||
|
||||
CONF.register_opts(quark_opts, "QUARK")
|
||||
|
||||
# TODO(mdietz): Rewrite this to use a module level connection
|
||||
# pool, and then incorporate that into creating
|
||||
# connections. When connecting to a master we
|
||||
# connect by creating a redis client, and when
|
||||
# we connect to a slave, we connect by telling it
|
||||
# we want a slave and ending up with a connection,
|
||||
# with no control over SSL or anything else. :-|
|
||||
|
||||
|
||||
class Client(object):
|
||||
connection_pool = None
|
||||
|
||||
def __init__(self, use_master=False):
|
||||
self._use_master = use_master
|
||||
|
||||
try:
|
||||
if CONF.QUARK.redis_use_sentinels:
|
||||
self._compile_sentinel_list()
|
||||
self._ensure_connection_pool_exists(use_master)
|
||||
self._client = self._client()
|
||||
except redis.ConnectionError as e:
|
||||
LOG.exception(e)
|
||||
raise q_exc.RedisConnectionFailure()
|
||||
|
||||
def _ensure_connection_pool_exists(self, use_master):
|
||||
if not Client.connection_pool:
|
||||
LOG.info("Creating redis connection pool for the first time...")
|
||||
host = CONF.QUARK.redis_security_groups_host
|
||||
port = CONF.QUARK.redis_security_groups_port
|
||||
|
||||
connect_kw = {}
|
||||
if CONF.QUARK.redis_password:
|
||||
connect_kw["password"] = CONF.QUARK.redis_password
|
||||
|
||||
connect_args = []
|
||||
|
||||
klass = redis.ConnectionPool
|
||||
if CONF.QUARK.redis_use_sentinels:
|
||||
connect_args.append(CONF.QUARK.redis_sentinel_master)
|
||||
klass = redis.sentinel.SentinelConnectionPool
|
||||
connect_args.append(
|
||||
redis.sentinel.Sentinel(self._sentinel_list))
|
||||
connect_kw["check_connection"] = True
|
||||
connect_kw["is_master"] = use_master
|
||||
LOG.info("Using redis sentinel connections %s" %
|
||||
self._sentinel_list)
|
||||
else:
|
||||
connect_kw["host"] = host
|
||||
connect_kw["port"] = port
|
||||
LOG.info("Using redis host %s:%s" % (host, port))
|
||||
|
||||
Client.connection_pool = klass(*connect_args,
|
||||
**connect_kw)
|
||||
|
||||
def _compile_sentinel_list(self):
|
||||
self._sentinel_list = [tuple(host.split(':'))
|
||||
for host in CONF.QUARK.redis_sentinel_hosts]
|
||||
if not self._sentinel_list:
|
||||
raise TypeError("sentinel_list is not a properly formatted"
|
||||
"list of 'host:port' pairs")
|
||||
|
||||
def _client(self):
|
||||
kwargs = {"connection_pool": Client.connection_pool,
|
||||
"db": CONF.QUARK.redis_db,
|
||||
"socket_timeout": CONF.QUARK.redis_socket_timeout}
|
||||
return redis.StrictRedis(**kwargs)
|
||||
|
||||
def serialize_rules(self, rules):
|
||||
"""Creates a payload for the redis server."""
|
||||
# TODO(mdietz): If/when we support other rule types, this comment
|
||||
# will have to be revised.
|
||||
# Action and direction are static, for now. The implementation may
|
||||
# support 'deny' and 'egress' respectively in the future. We allow
|
||||
# the direction to be set to something else, technically, but current
|
||||
# plugin level call actually raises. It's supported here for unit
|
||||
# test purposes at this time
|
||||
serialized = []
|
||||
for rule in rules:
|
||||
direction = rule["direction"]
|
||||
source = ''
|
||||
destination = ''
|
||||
if rule["remote_ip_prefix"]:
|
||||
if direction == "ingress":
|
||||
source = netaddr.IPNetwork(rule["remote_ip_prefix"])
|
||||
source = str(source.ipv6())
|
||||
else:
|
||||
destination = netaddr.IPNetwork(
|
||||
rule["remote_ip_prefix"])
|
||||
destination = str(destination.ipv6())
|
||||
|
||||
optional_fields = {}
|
||||
|
||||
# NOTE(mdietz): this will expand as we add more protocols
|
||||
if rule["protocol"] == protocols.PROTOCOLS["icmp"]:
|
||||
optional_fields["icmp type"] = rule["port_range_min"]
|
||||
optional_fields["icmp code"] = rule["port_range_max"]
|
||||
else:
|
||||
optional_fields["port start"] = rule["port_range_min"]
|
||||
optional_fields["port end"] = rule["port_range_max"]
|
||||
|
||||
payload = {"ethertype": rule["ethertype"],
|
||||
"protocol": rule["protocol"],
|
||||
"source network": source,
|
||||
"destination network": destination,
|
||||
"action": "allow",
|
||||
"direction": "ingress"}
|
||||
payload.update(optional_fields)
|
||||
serialized.append(payload)
|
||||
return serialized
|
||||
|
||||
def serialize_groups(self, groups):
|
||||
"""Creates a payload for the redis server
|
||||
|
||||
The rule schema is the following:
|
||||
|
||||
REDIS KEY - port_device_id.port_mac_address/sg
|
||||
REDIS VALUE - A JSON dump of the following:
|
||||
|
||||
port_mac_address must be lower-cased and stripped of non-alphanumeric
|
||||
characters
|
||||
|
||||
{"id": "<arbitrary uuid>",
|
||||
"rules": [
|
||||
{"ethertype": <hexademical integer>,
|
||||
"protocol": <integer>,
|
||||
"port start": <integer>, # optional
|
||||
"port end": <integer>, # optional
|
||||
"icmp type": <integer>, # optional
|
||||
"icmp code": <integer>, # optional
|
||||
"source network": <string>,
|
||||
"destination network": <string>,
|
||||
"action": <string>,
|
||||
"direction": <string>},
|
||||
]
|
||||
}
|
||||
|
||||
Example:
|
||||
{"id": "004c6369-9f3d-4d33-b8f5-9416bf3567dd",
|
||||
"rules": [
|
||||
{"ethertype": 0x800,
|
||||
"protocol": "tcp",
|
||||
"port start": 1000,
|
||||
"port end": 1999,
|
||||
"source network": "10.10.10.0/24",
|
||||
"destination network": "",
|
||||
"action": "allow",
|
||||
"direction": "ingress"},
|
||||
]
|
||||
}
|
||||
|
||||
port start/end and icmp type/code are mutually exclusive pairs.
|
||||
"""
|
||||
rules = []
|
||||
for group in groups:
|
||||
rules.extend(self.serialize_rules(group.rules))
|
||||
return rules
|
||||
|
||||
def rule_key(self, device_id, mac_address):
|
||||
mac = str(netaddr.EUI(mac_address))
|
||||
|
||||
# Lower cases and strips hyphens from the mac
|
||||
mac = mac.translate(MAC_TRANS_TABLE, ":-")
|
||||
return "{0}.{1}".format(device_id, mac)
|
||||
|
||||
def get_rules_for_port(self, device_id, mac_address):
|
||||
rules = self._client.hget(
|
||||
self.rule_key(device_id, mac_address), SECURITY_GROUP_HASH_ATTR)
|
||||
if rules:
|
||||
return json.loads(rules)
|
||||
|
||||
def apply_rules(self, device_id, mac_address, rules):
|
||||
"""Writes a series of security group rules to a redis server."""
|
||||
LOG.info("Applying security group rules for device %s with MAC %s" %
|
||||
(device_id, mac_address))
|
||||
if not self._use_master:
|
||||
raise q_exc.RedisSlaveWritesForbidden()
|
||||
|
||||
ruleset_uuid = str(uuid.uuid4())
|
||||
rule_dict = {SECURITY_GROUP_VERSION_UUID_KEY: ruleset_uuid,
|
||||
SECURITY_GROUP_RULE_KEY: rules}
|
||||
redis_key = self.rule_key(device_id, mac_address)
|
||||
try:
|
||||
self._client.hset(redis_key, SECURITY_GROUP_HASH_ATTR,
|
||||
json.dumps(rule_dict))
|
||||
except redis.ConnectionError as e:
|
||||
LOG.exception(e)
|
||||
raise q_exc.RedisConnectionFailure()
|
||||
|
||||
def echo(self, echo_str):
|
||||
return self._client.echo(echo_str)
|
||||
|
||||
def vif_keys(self):
|
||||
keys = self._client.keys("*.????????????")
|
||||
if isinstance(keys, str):
|
||||
keys = [keys]
|
||||
return [k for k in keys if k and
|
||||
self._client.hget(k, SECURITY_GROUP_HASH_ATTR)]
|
||||
|
||||
def delete_vif_rules(self, key):
|
||||
# Redis DEL command will ignore key safely if it doesn't exist
|
||||
try:
|
||||
self._client.hdel(key, SECURITY_GROUP_HASH_ATTR)
|
||||
except redis.ConnectionError as e:
|
||||
LOG.exception(e)
|
||||
raise q_exc.RedisConnectionFailure()
|
||||
|
||||
@utils.retry_loop(3)
|
||||
def get_security_groups(self, new_interfaces):
|
||||
"""Gets security groups for interfaces from Redis
|
||||
|
||||
Returns a dictionary of xapi.VIFs mapped to security group version
|
||||
UUIDs from a set of xapi.VIF.
|
||||
"""
|
||||
LOG.debug("Getting security groups from Redis for {0}".format(
|
||||
new_interfaces))
|
||||
new_interfaces = tuple(new_interfaces)
|
||||
|
||||
p = self._client.pipeline()
|
||||
for vif in new_interfaces:
|
||||
key = self.rule_key(vif.device_id, vif.mac_address)
|
||||
p.hget(key, SECURITY_GROUP_HASH_ATTR)
|
||||
security_groups = p.execute()
|
||||
|
||||
ret = {}
|
||||
for vif, security_group in zip(new_interfaces, security_groups):
|
||||
security_group_uuid = None
|
||||
if security_group:
|
||||
security_group_uuid = json.loads(security_group).get(
|
||||
SECURITY_GROUP_VERSION_UUID_KEY)
|
||||
ret[vif] = security_group_uuid
|
||||
return ret
|
||||
261
quark/tests/cache/test_redis_base.py
vendored
Normal file
261
quark/tests/cache/test_redis_base.py
vendored
Normal file
@@ -0,0 +1,261 @@
|
||||
# Copyright 2014 Openstack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
#
|
||||
|
||||
import contextlib
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
import netaddr
|
||||
from oslo.config import cfg
|
||||
import redis
|
||||
|
||||
from quark.cache import redis_base
|
||||
from quark import exceptions as q_exc
|
||||
from quark.tests import test_base
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class TestClientBase(test_base.TestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestClientBase, self).setUp()
|
||||
# Forces the connection pool to be recreated on every test
|
||||
redis_base.ClientBase.connection_pool = None
|
||||
|
||||
@mock.patch("quark.cache.redis_base.redis")
|
||||
def test_vif_key(self, *args, **kwargs):
|
||||
client = redis_base.ClientBase()
|
||||
device_id = str(uuid.uuid4())
|
||||
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF")
|
||||
|
||||
redis_key = client.vif_key(device_id, mac_address.value)
|
||||
expected = "%s.%s" % (device_id, "aabbccddeeff")
|
||||
self.assertEqual(expected, redis_key)
|
||||
|
||||
@mock.patch("redis.ConnectionPool")
|
||||
@mock.patch("quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_init(self, strict_redis, conn_pool):
|
||||
host = "127.0.0.1"
|
||||
port = 6379
|
||||
redis_base.ClientBase()
|
||||
conn_pool.assert_called_with(host=host, port=port)
|
||||
|
||||
@mock.patch("redis.ConnectionPool")
|
||||
def test_client_connection_fails_gracefully(self, conn_pool):
|
||||
conn_err = redis.ConnectionError
|
||||
with mock.patch("redis.StrictRedis") as redis_mock:
|
||||
redis_mock.side_effect = conn_err
|
||||
with self.assertRaises(q_exc.RedisConnectionFailure):
|
||||
redis_base.ClientBase(use_master=True)
|
||||
|
||||
@mock.patch(
|
||||
"quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_get_field(self, strict_redis):
|
||||
rc = redis_base.ClientBase()
|
||||
mock_client = rc._client = mock.MagicMock()
|
||||
mock_client.hget.return_value = "returned hash field"
|
||||
|
||||
r = rc.get_field("1.000000000002", "test_field_name")
|
||||
|
||||
mock_client.hget.assert_called_once_with("1.000000000002",
|
||||
"test_field_name")
|
||||
|
||||
self.assertEqual(r, "returned hash field")
|
||||
|
||||
@mock.patch(
|
||||
"quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_vif_keys_hget(self, strict_redis):
|
||||
rc = redis_base.ClientBase()
|
||||
keys = ['1.000000000002', '2.000000000003']
|
||||
mock_client = rc._client = mock.MagicMock()
|
||||
mock_client.hget.return_value = "returned hash field"
|
||||
mock_client.keys.return_value = keys
|
||||
|
||||
r = rc.vif_keys(field="test_field_name")
|
||||
|
||||
mock_client.hget.assert_has_calls(
|
||||
[mock.call("1.000000000002", "test_field_name"),
|
||||
mock.call("2.000000000003", "test_field_name")])
|
||||
self.assertFalse(mock_client.hgetall.called)
|
||||
self.assertEqual(r, keys)
|
||||
|
||||
@mock.patch(
|
||||
"quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_vif_keys_hget_string_key_returned(self, strict_redis):
|
||||
rc = redis_base.ClientBase()
|
||||
keys = '1.000000000002'
|
||||
mock_client = rc._client = mock.MagicMock()
|
||||
mock_client.hget.return_value = "returned hash field"
|
||||
mock_client.keys.return_value = keys
|
||||
|
||||
r = rc.vif_keys(field="test_field_name")
|
||||
|
||||
mock_client.hget.assert_called_once_with("1.000000000002",
|
||||
"test_field_name")
|
||||
self.assertEqual(r, [keys])
|
||||
|
||||
@mock.patch(
|
||||
"quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_vif_keys_hget_nil_returned(self, strict_redis):
|
||||
rc = redis_base.ClientBase()
|
||||
keys = ['1.000000000002', '2.000000000003']
|
||||
mock_client = rc._client = mock.MagicMock()
|
||||
mock_client.hget.side_effect = ["returned hash field", None]
|
||||
mock_client.keys.return_value = keys
|
||||
|
||||
r = rc.vif_keys(field="test_field_name")
|
||||
|
||||
mock_client.hget.assert_has_calls(
|
||||
[mock.call("1.000000000002", "test_field_name"),
|
||||
mock.call("2.000000000003", "test_field_name")])
|
||||
self.assertFalse(mock_client.hgetall.called)
|
||||
self.assertEqual(r, keys[:1])
|
||||
|
||||
@mock.patch(
|
||||
"quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_vif_keys_hgetall(self, strict_redis):
|
||||
rc = redis_base.ClientBase()
|
||||
keys = ['1.000000000002', '2.000000000003']
|
||||
mock_client = rc._client = mock.MagicMock()
|
||||
mock_client.hgetall.return_value = {
|
||||
"returned hash field1": "returned hash value1",
|
||||
"returned hash field2": "returned hash value2"
|
||||
}
|
||||
mock_client.keys.return_value = keys
|
||||
|
||||
r = rc.vif_keys()
|
||||
|
||||
mock_client.hgetall.assert_has_calls([mock.call("1.000000000002"),
|
||||
mock.call("2.000000000003")])
|
||||
self.assertFalse(mock_client.hget.called)
|
||||
self.assertEqual(r, keys)
|
||||
|
||||
@mock.patch(
|
||||
"quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_vif_keys_hgetall_nil_returned(self, strict_redis):
|
||||
rc = redis_base.ClientBase()
|
||||
keys = ['1.000000000002', '2.000000000003']
|
||||
mock_client = rc._client = mock.MagicMock()
|
||||
mock_client.hgetall.side_effect = [
|
||||
{
|
||||
"returned hash field1": "returned hash value1",
|
||||
"returned hash field2": "returned hash value2"
|
||||
},
|
||||
None
|
||||
]
|
||||
mock_client.keys.return_value = keys
|
||||
|
||||
r = rc.vif_keys()
|
||||
|
||||
mock_client.hgetall.assert_has_calls([mock.call("1.000000000002"),
|
||||
mock.call("2.000000000003")])
|
||||
self.assertFalse(mock_client.hget.called)
|
||||
self.assertEqual(r, keys[:1])
|
||||
|
||||
@mock.patch(
|
||||
"quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_set_field(self, strict_redis):
|
||||
rc = redis_base.ClientBase()
|
||||
mock_client = rc._client = mock.MagicMock()
|
||||
dummy_data = {"dummy_data": "foo"}
|
||||
|
||||
rc.set_field("1.000000000002", "test_field_name", dummy_data)
|
||||
|
||||
mock_client.hset.assert_called_once_with("1.000000000002",
|
||||
"test_field_name",
|
||||
json.dumps(dummy_data))
|
||||
|
||||
@mock.patch(
|
||||
"quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_delete_field(self, strict_redis):
|
||||
rc = redis_base.ClientBase()
|
||||
mock_client = rc._client = mock.MagicMock()
|
||||
rc.delete_field("1.000000000002", "test_field_name")
|
||||
|
||||
mock_client.hdel.assert_called_once_with("1.000000000002",
|
||||
"test_field_name")
|
||||
|
||||
@mock.patch(
|
||||
"quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_get_fields(self, strict_redis):
|
||||
mock_redis = mock.MagicMock()
|
||||
mock_pipeline = mock.MagicMock()
|
||||
strict_redis.return_value = mock_redis
|
||||
mock_redis.pipeline.return_value = mock_pipeline
|
||||
mock_pipeline.execute.return_value = "returned executed"
|
||||
rc = redis_base.ClientBase()
|
||||
|
||||
r = rc.get_fields(["1.000000000002", "1.000000000002",
|
||||
"5.000000000006", "7.000000000008"],
|
||||
"test_field_name")
|
||||
|
||||
mock_pipeline.hget.assert_has_calls(
|
||||
[mock.call("1.000000000002", "test_field_name"),
|
||||
mock.call("1.000000000002", "test_field_name"),
|
||||
mock.call("5.000000000006", "test_field_name"),
|
||||
mock.call("7.000000000008", "test_field_name")],
|
||||
any_order=True)
|
||||
|
||||
mock_pipeline.execute.assert_called_once_with()
|
||||
self.assertEqual(r, "returned executed")
|
||||
|
||||
|
||||
class TestRedisSentinelConnection(test_base.TestBase):
|
||||
def setUp(self):
|
||||
super(TestRedisSentinelConnection, self).setUp()
|
||||
# Forces the connection pool to be recreated on every test
|
||||
redis_base.ClientBase.connection_pool = None
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, use_sentinels, sentinels, master_label):
|
||||
CONF.set_override("redis_use_sentinels", True, "QUARK")
|
||||
CONF.set_override("redis_sentinel_hosts", sentinels, "QUARK")
|
||||
CONF.set_override("redis_sentinel_master", master_label, "QUARK")
|
||||
yield
|
||||
CONF.set_override("redis_use_sentinels", False, "QUARK")
|
||||
CONF.set_override("redis_sentinel_hosts", '', "QUARK")
|
||||
CONF.set_override("redis_sentinel_master", '', "QUARK")
|
||||
|
||||
@mock.patch("redis.sentinel.Sentinel")
|
||||
@mock.patch("redis.sentinel.SentinelConnectionPool")
|
||||
@mock.patch("redis.sentinel.Sentinel.master_for")
|
||||
@mock.patch("quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_sentinel_connection(self, strict_redis, master_for,
|
||||
sentinel_pool, sentinel_mock):
|
||||
host = "127.0.0.1"
|
||||
port = 6379
|
||||
sentinels = ["%s:%s" % (host, port)]
|
||||
master_label = "master"
|
||||
sentinel_mock.return_value = sentinels
|
||||
|
||||
with self._stubs(True, sentinels, master_label):
|
||||
redis_base.ClientBase(use_master=True)
|
||||
sentinel_pool.assert_called_with(master_label, sentinels,
|
||||
check_connection=True,
|
||||
is_master=True)
|
||||
|
||||
@mock.patch("redis.sentinel.SentinelConnectionPool")
|
||||
@mock.patch("redis.sentinel.Sentinel.master_for")
|
||||
@mock.patch("quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_sentinel_connection_bad_format_raises(self, strict_redis,
|
||||
master_for, sentinel_pool):
|
||||
sentinels = ""
|
||||
master_label = "master"
|
||||
|
||||
with self._stubs(True, sentinels, master_label):
|
||||
with self.assertRaises(TypeError):
|
||||
redis_base.ClientBase(is_master=True)
|
||||
@@ -13,9 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
#
|
||||
|
||||
import contextlib
|
||||
import json
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
import netaddr
|
||||
@@ -23,39 +21,26 @@ from oslo.config import cfg
|
||||
import redis
|
||||
|
||||
from quark.agent.xapi import VIF
|
||||
from quark.cache import security_groups_client as sg_client
|
||||
from quark.db import models
|
||||
from quark import exceptions as q_exc
|
||||
from quark.security_groups import redis_client
|
||||
from quark.tests import test_base
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class TestRedisSerialization(test_base.TestBase):
|
||||
class TestRedisSecurityGroupsClient(test_base.TestBase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRedisSerialization, self).setUp()
|
||||
super(TestRedisSecurityGroupsClient, self).setUp()
|
||||
# Forces the connection pool to be recreated on every test
|
||||
redis_client.Client.connection_pool = None
|
||||
|
||||
@mock.patch("redis.ConnectionPool")
|
||||
@mock.patch("quark.security_groups.redis_client.redis.StrictRedis")
|
||||
def test_redis_key(self, strict_redis, conn_pool):
|
||||
host = "127.0.0.1"
|
||||
port = 6379
|
||||
client = redis_client.Client()
|
||||
device_id = str(uuid.uuid4())
|
||||
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF")
|
||||
|
||||
redis_key = client.rule_key(device_id, mac_address.value)
|
||||
expected = "%s.%s" % (device_id, "aabbccddeeff")
|
||||
self.assertEqual(expected, redis_key)
|
||||
conn_pool.assert_called_with(host=host, port=port)
|
||||
sg_client.SecurityGroupsClient.connection_pool = None
|
||||
|
||||
@mock.patch("uuid.uuid4")
|
||||
@mock.patch("redis.ConnectionPool")
|
||||
@mock.patch("quark.security_groups.redis_client.redis.StrictRedis")
|
||||
@mock.patch("quark.cache.redis_base.redis.StrictRedis")
|
||||
def test_apply_rules(self, strict_redis, conn_pool, uuid4):
|
||||
client = redis_client.Client(use_master=True)
|
||||
client = sg_client.SecurityGroupsClient(use_master=True)
|
||||
device_id = "device"
|
||||
uuid4.return_value = "uuid"
|
||||
|
||||
@@ -63,32 +48,26 @@ class TestRedisSerialization(test_base.TestBase):
|
||||
client.apply_rules(device_id, mac_address.value, [])
|
||||
self.assertTrue(client._client.hset.called)
|
||||
|
||||
redis_key = client.rule_key(device_id, mac_address.value)
|
||||
redis_key = client.vif_key(device_id, mac_address.value)
|
||||
|
||||
rule_dict = {"rules": [], "id": "uuid"}
|
||||
client._client.hset.assert_called_with(
|
||||
redis_key, redis_client.SECURITY_GROUP_HASH_ATTR,
|
||||
redis_key, sg_client.SECURITY_GROUP_HASH_ATTR,
|
||||
json.dumps(rule_dict))
|
||||
|
||||
@mock.patch("redis.ConnectionPool")
|
||||
@mock.patch("quark.security_groups.redis_client.Client.rule_key")
|
||||
@mock.patch("quark.security_groups.redis_client.redis.StrictRedis")
|
||||
def test_apply_rules_with_slave_fails(self, strict_redis, rule_key,
|
||||
@mock.patch(
|
||||
"quark.cache.security_groups_client.SecurityGroupsClient.vif_key")
|
||||
@mock.patch(
|
||||
"quark.cache.security_groups_client.redis_base.redis.StrictRedis")
|
||||
def test_apply_rules_with_slave_fails(self, strict_redis, vif_key,
|
||||
conn_pool):
|
||||
client = redis_client.Client()
|
||||
client = sg_client.SecurityGroupsClient()
|
||||
port_id = 1
|
||||
mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF")
|
||||
with self.assertRaises(q_exc.RedisSlaveWritesForbidden):
|
||||
client.apply_rules(port_id, mac_address.value, [])
|
||||
|
||||
@mock.patch("redis.ConnectionPool")
|
||||
def test_client_connection_fails_gracefully(self, conn_pool):
|
||||
conn_err = redis.ConnectionError
|
||||
with mock.patch("redis.StrictRedis") as redis_mock:
|
||||
redis_mock.side_effect = conn_err
|
||||
with self.assertRaises(q_exc.RedisConnectionFailure):
|
||||
redis_client.Client(use_master=True)
|
||||
|
||||
@mock.patch("redis.ConnectionPool")
|
||||
def test_apply_rules_set_fails_gracefully(self, conn_pool):
|
||||
port_id = 1
|
||||
@@ -98,25 +77,27 @@ class TestRedisSerialization(test_base.TestBase):
|
||||
mocked_redis_cli = mock.MagicMock()
|
||||
redis_mock.return_value = mocked_redis_cli
|
||||
|
||||
client = redis_client.Client(use_master=True)
|
||||
client = sg_client.SecurityGroupsClient(use_master=True)
|
||||
mocked_redis_cli.hset.side_effect = conn_err
|
||||
with self.assertRaises(q_exc.RedisConnectionFailure):
|
||||
client.apply_rules(port_id, mac_address.value, [])
|
||||
|
||||
@mock.patch("redis.ConnectionPool")
|
||||
@mock.patch("quark.security_groups.redis_client.redis.StrictRedis")
|
||||
@mock.patch(
|
||||
"quark.cache.security_groups_client.redis_base.redis.StrictRedis")
|
||||
def test_serialize_group_no_rules(self, strict_redis, conn_pool):
|
||||
client = redis_client.Client()
|
||||
client = sg_client.SecurityGroupsClient()
|
||||
group = models.SecurityGroup()
|
||||
payload = client.serialize_groups([group])
|
||||
self.assertEqual([], payload)
|
||||
|
||||
@mock.patch("redis.ConnectionPool")
|
||||
@mock.patch("quark.security_groups.redis_client.redis.StrictRedis")
|
||||
@mock.patch(
|
||||
"quark.cache.security_groups_client.redis_base.redis.StrictRedis")
|
||||
def test_serialize_group_with_rules(self, strict_redis, conn_pool):
|
||||
rule_dict = {"ethertype": 0x800, "protocol": 6, "port_range_min": 80,
|
||||
"port_range_max": 443, "direction": "ingress"}
|
||||
client = redis_client.Client()
|
||||
client = sg_client.SecurityGroupsClient()
|
||||
group = models.SecurityGroup()
|
||||
rule = models.SecurityGroupRule()
|
||||
rule.update(rule_dict)
|
||||
@@ -134,12 +115,13 @@ class TestRedisSerialization(test_base.TestBase):
|
||||
self.assertEqual("", rule["destination network"])
|
||||
|
||||
@mock.patch("redis.ConnectionPool")
|
||||
@mock.patch("quark.security_groups.redis_client.redis.StrictRedis")
|
||||
@mock.patch(
|
||||
"quark.cache.security_groups_client.redis_base.redis.StrictRedis")
|
||||
def test_serialize_group_with_rules_and_remote_network(self, strict_redis,
|
||||
conn_pool):
|
||||
rule_dict = {"ethertype": 0x800, "protocol": 1, "direction": "ingress",
|
||||
"remote_ip_prefix": "192.168.0.0/24"}
|
||||
client = redis_client.Client()
|
||||
client = sg_client.SecurityGroupsClient()
|
||||
group = models.SecurityGroup()
|
||||
rule = models.SecurityGroupRule()
|
||||
rule.update(rule_dict)
|
||||
@@ -157,12 +139,13 @@ class TestRedisSerialization(test_base.TestBase):
|
||||
self.assertEqual("", rule["destination network"])
|
||||
|
||||
@mock.patch("redis.ConnectionPool")
|
||||
@mock.patch("quark.security_groups.redis_client.redis.StrictRedis")
|
||||
@mock.patch(
|
||||
"quark.cache.security_groups_client.redis_base.redis.StrictRedis")
|
||||
def test_serialize_group_egress_rules(self, strict_redis, conn_pool):
|
||||
rule_dict = {"ethertype": 0x800, "protocol": 1,
|
||||
"direction": "egress",
|
||||
"remote_ip_prefix": "192.168.0.0/24"}
|
||||
client = redis_client.Client()
|
||||
client = sg_client.SecurityGroupsClient()
|
||||
group = models.SecurityGroup()
|
||||
rule = models.SecurityGroupRule()
|
||||
rule.update(rule_dict)
|
||||
@@ -180,105 +163,50 @@ class TestRedisSerialization(test_base.TestBase):
|
||||
self.assertEqual("", rule["source network"])
|
||||
|
||||
|
||||
class TestRedisSentinelConnection(test_base.TestBase):
|
||||
def setUp(self):
|
||||
super(TestRedisSentinelConnection, self).setUp()
|
||||
# Forces the connection pool to be recreated on every test
|
||||
redis_client.Client.connection_pool = None
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _stubs(self, use_sentinels, sentinels, master_label):
|
||||
CONF.set_override("redis_use_sentinels", True, "QUARK")
|
||||
CONF.set_override("redis_sentinel_hosts", sentinels, "QUARK")
|
||||
CONF.set_override("redis_sentinel_master", master_label, "QUARK")
|
||||
yield
|
||||
CONF.set_override("redis_use_sentinels", False, "QUARK")
|
||||
CONF.set_override("redis_sentinel_hosts", '', "QUARK")
|
||||
CONF.set_override("redis_sentinel_master", '', "QUARK")
|
||||
|
||||
@mock.patch("redis.sentinel.Sentinel")
|
||||
@mock.patch("redis.sentinel.SentinelConnectionPool")
|
||||
@mock.patch("redis.sentinel.Sentinel.master_for")
|
||||
@mock.patch("quark.security_groups.redis_client.redis.StrictRedis")
|
||||
def test_sentinel_connection(self, strict_redis, master_for,
|
||||
sentinel_pool, sentinel_mock):
|
||||
host = "127.0.0.1"
|
||||
port = 6379
|
||||
sentinels = ["%s:%s" % (host, port)]
|
||||
master_label = "master"
|
||||
sentinel_mock.return_value = sentinels
|
||||
|
||||
with self._stubs(True, sentinels, master_label):
|
||||
redis_client.Client(use_master=True)
|
||||
sentinel_pool.assert_called_with(master_label, sentinels,
|
||||
check_connection=True,
|
||||
is_master=True)
|
||||
|
||||
@mock.patch("redis.sentinel.SentinelConnectionPool")
|
||||
@mock.patch("redis.sentinel.Sentinel.master_for")
|
||||
@mock.patch("quark.security_groups.redis_client.redis.StrictRedis")
|
||||
def test_sentinel_connection_bad_format_raises(self, strict_redis,
|
||||
master_for, sentinel_pool):
|
||||
sentinels = ""
|
||||
master_label = "master"
|
||||
|
||||
with self._stubs(True, sentinels, master_label):
|
||||
with self.assertRaises(TypeError):
|
||||
redis_client.Client(is_master=True)
|
||||
|
||||
|
||||
class TestRedisForAgent(test_base.TestBase):
|
||||
def setUp(self):
|
||||
super(TestRedisForAgent, self).setUp()
|
||||
|
||||
patch = mock.patch("quark.security_groups.redis_client."
|
||||
patch = mock.patch("quark.cache.security_groups_client.redis_base."
|
||||
"redis.StrictRedis")
|
||||
self.MockSentinel = patch.start()
|
||||
self.addCleanup(patch.stop)
|
||||
|
||||
@mock.patch("quark.security_groups.redis_client.redis.StrictRedis")
|
||||
@mock.patch(
|
||||
"quark.cache.security_groups_client.redis_base.redis.StrictRedis")
|
||||
def test_get_security_groups_empty(self, strict_redis):
|
||||
mock_redis = mock.MagicMock()
|
||||
mock_pipeline = mock.MagicMock()
|
||||
strict_redis.return_value = mock_redis
|
||||
mock_redis.pipeline.return_value = mock_pipeline
|
||||
|
||||
rc = redis_client.Client()
|
||||
rc = sg_client.SecurityGroupsClient()
|
||||
group_uuids = rc.get_security_groups(set())
|
||||
mock_redis.pipeline.assert_called_once_with()
|
||||
self.assertEqual(mock_pipeline.get.call_count, 0)
|
||||
mock_pipeline.execute.assert_called_once_with()
|
||||
self.assertEqual(group_uuids, {})
|
||||
|
||||
@mock.patch("quark.security_groups.redis_client.redis.StrictRedis")
|
||||
def test_get_security_groups_nonempty(self, strict_redis):
|
||||
mock_redis = mock.MagicMock()
|
||||
mock_pipeline = mock.MagicMock()
|
||||
strict_redis.return_value = mock_redis
|
||||
mock_redis.pipeline.return_value = mock_pipeline
|
||||
@mock.patch(
|
||||
"quark.cache.security_groups_client.SecurityGroupsClient.get_fields")
|
||||
def test_get_security_groups_nonempty(self, mock_get_fields):
|
||||
rc = sg_client.SecurityGroupsClient()
|
||||
|
||||
rc = redis_client.Client()
|
||||
|
||||
mock_pipeline.execute.return_value = [
|
||||
mock_get_fields.return_value = [
|
||||
None,
|
||||
'{}',
|
||||
'{"%s": null}' % redis_client.SECURITY_GROUP_VERSION_UUID_KEY,
|
||||
'{"%s": "1-2-3"}' % redis_client.SECURITY_GROUP_VERSION_UUID_KEY]
|
||||
'{"%s": null}' % sg_client.SECURITY_GROUP_VERSION_UUID_KEY,
|
||||
'{"%s": "1-2-3"}' % sg_client.SECURITY_GROUP_VERSION_UUID_KEY]
|
||||
|
||||
new_interfaces = ([VIF(1, 2, 9), VIF(3, 4, 0), VIF(5, 6, 1),
|
||||
VIF(7, 8, 2)])
|
||||
|
||||
group_uuids = rc.get_security_groups(new_interfaces)
|
||||
mock_pipeline.hget.assert_has_calls(
|
||||
[mock.call("1.000000000002",
|
||||
redis_client.SECURITY_GROUP_HASH_ATTR),
|
||||
mock.call("3.000000000004",
|
||||
redis_client.SECURITY_GROUP_HASH_ATTR),
|
||||
mock.call("5.000000000006",
|
||||
redis_client.SECURITY_GROUP_HASH_ATTR),
|
||||
mock.call("7.000000000008",
|
||||
redis_client.SECURITY_GROUP_HASH_ATTR)],
|
||||
any_order=True)
|
||||
mock_pipeline.execute.assert_called_once_with()
|
||||
|
||||
mock_get_fields.assert_called_once_with(
|
||||
["1.000000000002", "3.000000000004", "5.000000000006",
|
||||
"7.000000000008"], sg_client.SECURITY_GROUP_HASH_ATTR)
|
||||
|
||||
self.assertEqual(group_uuids,
|
||||
{VIF(1, 2, 9): None,
|
||||
VIF(3, 4, 0): None,
|
||||
@@ -64,7 +64,7 @@ class TestUnmanagedDriver(test_base.TestBase):
|
||||
self.driver.update_port(context=self.context,
|
||||
network_id="public_network", port_id=2)
|
||||
|
||||
@mock.patch("quark.security_groups.redis_client.Client")
|
||||
@mock.patch("quark.cache.security_groups_client.SecurityGroupsClient")
|
||||
def test_update_port_with_security_groups_removal(self, redis_cli):
|
||||
mock_client = mock.MagicMock()
|
||||
redis_cli.return_value = mock_client
|
||||
@@ -81,7 +81,7 @@ class TestUnmanagedDriver(test_base.TestBase):
|
||||
mock_client.delete_vif_rules.assert_called_once_with(
|
||||
device_id, mac_address)
|
||||
|
||||
@mock.patch("quark.security_groups.redis_client.Client")
|
||||
@mock.patch("quark.cache.security_groups_client.SecurityGroupsClient")
|
||||
def test_update_port_with_security_groups(self, redis_cli):
|
||||
mock_client = mock.MagicMock()
|
||||
redis_cli.return_value = mock_client
|
||||
|
||||
@@ -51,9 +51,9 @@ from neutron.common import config
|
||||
import neutron.context
|
||||
from oslo.config import cfg
|
||||
|
||||
from quark.cache import security_groups_client as sg_client
|
||||
from quark.db import api as db_api
|
||||
from quark import exceptions as q_exc
|
||||
from quark.security_groups import redis_client
|
||||
|
||||
|
||||
class QuarkRedisTool(object):
|
||||
@@ -101,7 +101,7 @@ class QuarkRedisTool(object):
|
||||
"options")
|
||||
|
||||
def _get_connection(self, use_master=False, giveup=True):
|
||||
client = redis_client.Client(use_master=use_master)
|
||||
client = sg_client.SecurityGroupsClient(use_master=use_master)
|
||||
try:
|
||||
# You have to use the connection determine it's functional
|
||||
result = client.echo("connected")
|
||||
@@ -122,7 +122,7 @@ class QuarkRedisTool(object):
|
||||
|
||||
def vif_count(self):
|
||||
client = self._get_connection()
|
||||
print(len(client.vif_keys()))
|
||||
print(len(client.vif_keys(field=sg_client.SECURITY_GROUP_HASH_ATTR)))
|
||||
|
||||
def num_groups(self):
|
||||
ctx = neutron.context.get_admin_context()
|
||||
@@ -157,7 +157,7 @@ class QuarkRedisTool(object):
|
||||
|
||||
# Pop off the ones we find in the database
|
||||
for port in ports_with_groups:
|
||||
vif_key = client.rule_key(port["device_id"], port["mac_address"])
|
||||
vif_key = client.vif_key(port["device_id"], port["mac_address"])
|
||||
vifs.pop(vif_key, None)
|
||||
|
||||
if dryrun:
|
||||
Reference in New Issue
Block a user