Merge "Support Router Advertisement Daemon (radvd) for IPv6"

This commit is contained in:
Jenkins 2014-07-21 17:52:41 +00:00 committed by Gerrit Code Review
commit 8abd632e05
8 changed files with 344 additions and 88 deletions

View File

@ -14,6 +14,7 @@ arping: CommandFilter, arping, root
# l3_agent
sysctl: CommandFilter, sysctl, root
route: CommandFilter, route, root
radvd: CommandFilter, radvd, root
# metadata proxy
metadata_proxy: CommandFilter, neutron-ns-metadata-proxy, root
@ -26,6 +27,8 @@ metadata_proxy_local_quantum: CommandFilter, /usr/local/bin/quantum-ns-metadata-
kill_metadata: KillFilter, root, /usr/bin/python, -9
kill_metadata7: KillFilter, root, /usr/bin/python2.7, -9
kill_metadata6: KillFilter, root, /usr/bin/python2.6, -9
kill_radvd_usr: KillFilter, root, /usr/sbin/radvd, -9, -HUP
kill_radvd: KillFilter, root, /sbin/radvd, -9, -HUP
# ip_lib
ip: IpFilter, ip, root

View File

@ -29,6 +29,7 @@ from neutron.agent.linux import interface
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.agent.linux import ovs_lib # noqa
from neutron.agent.linux import ra
from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron.common import constants as l3_constants
@ -427,6 +428,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
if self.conf.enable_metadata_proxy:
self._destroy_metadata_proxy(ns[len(NS_PREFIX):], ns)
ra.disable_ipv6_ra(ns[len(NS_PREFIX):], ns, self.root_helper)
try:
self._destroy_router_namespace(ns)
except RuntimeError:
@ -579,15 +581,31 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
p['id'] not in existing_port_ids]
old_ports = [p for p in ri.internal_ports if
p['id'] not in current_port_ids]
new_ipv6_port = False
old_ipv6_port = False
for p in new_ports:
self._set_subnet_info(p)
self.internal_network_added(ri, p['network_id'], p['id'],
p['ip_cidr'], p['mac_address'])
ri.internal_ports.append(p)
if (not new_ipv6_port and
netaddr.IPNetwork(p['subnet']['cidr']).version == 6):
new_ipv6_port = True
for p in old_ports:
self.internal_network_removed(ri, p['id'], p['ip_cidr'])
ri.internal_ports.remove(p)
if (not old_ipv6_port and
netaddr.IPNetwork(p['subnet']['cidr']).version == 6):
old_ipv6_port = True
if new_ipv6_port or old_ipv6_port:
ra.enable_ipv6_ra(ri.router_id,
ri.ns_name,
internal_ports,
self.get_internal_device_name,
self.root_helper)
existing_devices = self._get_existing_devices(ri)
current_internal_devs = set([n for n in existing_devices

View File

@ -14,8 +14,6 @@
#
# @author: Mark McClain, DreamHost
import os
from oslo.config import cfg
from neutron.agent.linux import ip_lib
@ -38,25 +36,40 @@ class ProcessManager(object):
Note: The manager expects uuid to be in cmdline.
"""
def __init__(self, conf, uuid, root_helper='sudo', namespace=None):
def __init__(self, conf, uuid, root_helper='sudo',
namespace=None, service=None):
self.conf = conf
self.uuid = uuid
self.root_helper = root_helper
self.namespace = namespace
if service:
self.service_pid_fname = 'pid.' + service
else:
self.service_pid_fname = 'pid'
def enable(self, cmd_callback):
def enable(self, cmd_callback, reload_cfg=False):
if not self.active:
cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True))
ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
ip_wrapper.netns.execute(cmd)
elif reload_cfg:
self.reload_cfg()
def disable(self):
def reload_cfg(self):
self.disable('HUP')
def disable(self, sig='9'):
pid = self.pid
if self.active:
cmd = ['kill', '-9', pid]
cmd = ['kill', '-%s' % (sig), pid]
utils.execute(cmd, self.root_helper)
# In the case of shutting down, remove the pid file
if sig == '9':
utils.remove_conf_file(self.conf.external_pids,
self.uuid,
self.service_pid_fname)
elif pid:
LOG.debug(_('Process for %(uuid)s pid %(pid)d is stale, ignoring '
'command'), {'uuid': self.uuid, 'pid': pid})
@ -65,28 +78,18 @@ class ProcessManager(object):
def get_pid_file_name(self, ensure_pids_dir=False):
"""Returns the file name for a given kind of config file."""
pids_dir = os.path.abspath(os.path.normpath(self.conf.external_pids))
if ensure_pids_dir and not os.path.isdir(pids_dir):
os.makedirs(pids_dir, 0o755)
return os.path.join(pids_dir, self.uuid + '.pid')
return utils.get_conf_file_name(self.conf.external_pids,
self.uuid,
self.service_pid_fname,
ensure_pids_dir)
@property
def pid(self):
"""Last known pid for this external process spawned for this uuid."""
file_name = self.get_pid_file_name()
msg = _('Error while reading %s')
try:
with open(file_name, 'r') as f:
return int(f.read())
except IOError:
msg = _('Unable to access %s')
except ValueError:
msg = _('Unable to convert value in %s')
LOG.debug(msg, file_name)
return None
return utils.get_value_from_conf_file(self.conf.external_pids,
self.uuid,
self.service_pid_fname,
int)
@property
def active(self):

122
neutron/agent/linux/ra.py Normal file
View File

@ -0,0 +1,122 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from oslo.config import cfg
import six
from neutron.agent.linux import external_process
from neutron.agent.linux import utils
from neutron.common import constants
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
OPTS = [
cfg.StrOpt('ra_confs',
default='$state_path/ra',
help=_('Location to store IPv6 RA config files')),
]
cfg.CONF.register_opts(OPTS)
prefix_fmt = """interface %s
{
AdvSendAdvert on;
MinRtrAdvInterval 3;
MaxRtrAdvInterval 10;
prefix %s
{
AdvOnLink on;
AdvAutonomous on;
};
};
"""
default_fmt = """interface %s
{
AdvSendAdvert on;
MinRtrAdvInterval 3;
MaxRtrAdvInterval 10;
};
"""
def _is_slaac(ra_mode):
return (ra_mode == constants.IPV6_SLAAC or
ra_mode == constants.DHCPV6_STATELESS)
def _generate_radvd_conf(router_id, router_ports, dev_name_helper):
radvd_conf = utils.get_conf_file_name(cfg.CONF.ra_confs,
router_id,
'radvd.conf',
True)
buf = six.StringIO()
for p in router_ports:
if netaddr.IPNetwork(p['subnet']['cidr']).version == 6:
interface_name = dev_name_helper(p['id'])
if _is_slaac(p['subnet']['ipv6_ra_mode']):
conf_str = prefix_fmt % (interface_name,
p['subnet']['cidr'])
else:
conf_str = default_fmt % interface_name
buf.write('%s' % conf_str)
utils.replace_file(radvd_conf, buf.getvalue())
return radvd_conf
def _spawn_radvd(router_id, radvd_conf, router_ns, root_helper):
def callback(pid_file):
radvd_cmd = ['radvd',
'-C', '%s' % radvd_conf,
'-p', '%s' % pid_file]
return radvd_cmd
radvd = external_process.ProcessManager(cfg.CONF,
router_id,
root_helper,
router_ns,
'radvd')
radvd.enable(callback, True)
LOG.debug("radvd enabled for router %s", router_id)
def enable_ipv6_ra(router_id, router_ns, router_ports,
dev_name_helper, root_helper):
for p in router_ports:
if netaddr.IPNetwork(p['subnet']['cidr']).version == 6:
break
else:
# Kill the daemon if it's running
disable_ipv6_ra(router_id, router_ns, root_helper)
return
LOG.debug("Enable IPv6 RA for router %s", router_id)
radvd_conf = _generate_radvd_conf(router_id, router_ports, dev_name_helper)
_spawn_radvd(router_id, radvd_conf, router_ns, root_helper)
def disable_ipv6_ra(router_id, router_ns, root_helper):
radvd = external_process.ProcessManager(cfg.CONF,
router_id,
root_helper,
router_ns,
'radvd')
radvd.disable()
utils.remove_conf_files(cfg.CONF.ra_confs, router_id)
LOG.debug("radvd disabled for router %s", router_id)

View File

@ -18,6 +18,7 @@
import fcntl
import os
import shlex
import shutil
import socket
import struct
import tempfile
@ -126,3 +127,51 @@ def find_child_pids(pid):
ctxt.reraise = False
return []
return [x.strip() for x in raw_pids.split('\n') if x.strip()]
def _get_conf_dir(cfg_root, uuid, ensure_conf_dir):
confs_dir = os.path.abspath(os.path.normpath(cfg_root))
conf_dir = os.path.join(confs_dir, uuid)
if ensure_conf_dir:
if not os.path.isdir(conf_dir):
os.makedirs(conf_dir, 0o755)
return conf_dir
def get_conf_file_name(cfg_root, uuid, cfg_file, ensure_conf_dir=False):
"""Returns the file name for a given kind of config file."""
conf_dir = _get_conf_dir(cfg_root, uuid, ensure_conf_dir)
return os.path.join(conf_dir, cfg_file)
def get_value_from_conf_file(cfg_root, uuid, cfg_file, converter=None):
"""A helper function to read a value from one of a config file."""
file_name = get_conf_file_name(cfg_root, uuid, cfg_file)
msg = _('Error while reading %s')
try:
with open(file_name, 'r') as f:
try:
return converter and converter(f.read()) or f.read()
except ValueError:
msg = _('Unable to convert value in %s')
except IOError:
msg = _('Unable to access %s')
LOG.debug(msg % file_name)
return None
def remove_conf_files(cfg_root, uuid):
conf_dir = _get_conf_dir(cfg_root, uuid, False)
shutil.rmtree(conf_dir, ignore_errors=True)
def remove_conf_file(cfg_root, uuid, cfg_file):
"""Remove a config file. Remove the directory if this is the last file."""
conf_file = get_conf_file_name(cfg_root, uuid, cfg_file)
if os.path.exists(conf_file):
os.unlink(conf_file)
conf_dir = _get_conf_dir(cfg_root, uuid, False)
if not os.listdir(conf_dir):
shutil.rmtree(conf_dir, ignore_errors=True)

View File

@ -1007,7 +1007,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
network_ids = set(p['network_id'] for p, _ in each_port_with_ip())
filters = {'network_id': [id for id in network_ids]}
fields = ['id', 'cidr', 'gateway_ip', 'network_id']
fields = ['id', 'cidr', 'gateway_ip',
'network_id', 'ipv6_ra_mode']
subnets_by_network = dict((id, []) for id in network_ids)
for subnet in self._core_plugin.get_subnets(context, filters, fields):
@ -1018,7 +1019,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
for subnet in subnets_by_network[port['network_id']]:
subnet_info = {'id': subnet['id'],
'cidr': subnet['cidr'],
'gateway_ip': subnet['gateway_ip']}
'gateway_ip': subnet['gateway_ip'],
'ipv6_ra_mode': subnet['ipv6_ra_mode']}
if subnet['id'] == fixed_ip['subnet_id']:
port['subnet'] = subnet_info

View File

@ -121,7 +121,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
def setUp(self):
super(TestBasicRouterOperations, self).setUp()
self.conf = cfg.ConfigOpts()
self.conf = agent_config.setup_conf()
self.conf.register_opts(base_config.core_opts)
self.conf.register_opts(l3_agent.L3NATAgent.OPTS)
agent_config.register_interface_driver_opts_helper(self.conf)
@ -141,6 +141,10 @@ class TestBasicRouterOperations(base.BaseTestCase):
'neutron.agent.linux.utils.execute')
self.utils_exec = self.utils_exec_p.start()
self.utils_replace_file_p = mock.patch(
'neutron.agent.linux.utils.replace_file')
self.utils_replace_file = self.utils_replace_file_p.start()
self.external_process_p = mock.patch(
'neutron.agent.linux.external_process.ProcessManager')
self.external_process = self.external_process_p.start()
@ -441,6 +445,38 @@ class TestBasicRouterOperations(base.BaseTestCase):
else:
self.assertIn(r.rule, expected_rules)
@staticmethod
def _router_append_interface(router, count=1, ip_version=4,
ra_mode=None, addr_mode=None):
if ip_version == 4:
ip_pool = '35.4.%i.4'
cidr_pool = '35.4.%i.0/24'
gw_pool = '35.4.%i.1'
elif ip_version == 6:
ip_pool = 'fd01:%x::6'
cidr_pool = 'fd01:%x::/64'
gw_pool = 'fd01:%x::1'
else:
raise ValueError("Invalid ip_version: %s" % ip_version)
interfaces = router[l3_constants.INTERFACE_KEY]
current = sum(
[netaddr.IPNetwork(p['subnet']['cidr']).version == ip_version
for p in interfaces])
for i in range(current, current + count):
interfaces.append(
{'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': ip_pool % i,
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': cidr_pool % i,
'gateway_ip': gw_pool % i,
'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': addr_mode}})
def _prepare_router_data(self, ip_version=4,
enable_snat=None, num_internal_ports=1):
if ip_version == 4:
@ -451,6 +487,8 @@ class TestBasicRouterOperations(base.BaseTestCase):
ip_addr = 'fd00::4'
cidr = 'fd00::/64'
gateway_ip = 'fd00::1'
else:
raise ValueError("Invalid ip_version: %s" % ip_version)
router_id = _uuid()
ex_gw_port = {'id': _uuid(),
@ -459,22 +497,15 @@ class TestBasicRouterOperations(base.BaseTestCase):
'subnet_id': _uuid()}],
'subnet': {'cidr': cidr,
'gateway_ip': gateway_ip}}
int_ports = []
for i in range(num_internal_ports):
int_ports.append({'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': '35.4.%s.4' % i,
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': '35.4.%s.0/24' % i,
'gateway_ip': '35.4.%s.1' % i}})
router = {
'id': router_id,
l3_constants.INTERFACE_KEY: int_ports,
l3_constants.INTERFACE_KEY: [],
'routes': [],
'gw_port': ex_gw_port}
self._router_append_interface(router, count=num_internal_ports,
ip_version=ip_version)
if enable_snat is not None:
router['enable_snat'] = enable_snat
return router
@ -725,15 +756,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
agent.process_router(ri)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Add an interface and reprocess
router[l3_constants.INTERFACE_KEY].append(
{'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': '35.4.1.4',
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': '35.4.1.0/24',
'gateway_ip': '35.4.1.1'}})
self._router_append_interface(router)
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
@ -772,9 +795,9 @@ class TestBasicRouterOperations(base.BaseTestCase):
self.assertFalse(external_gateway_nat_rules.called)
self.assertEqual(orig_nat_rules, new_nat_rules)
def test_process_router_ipv6_interface_added(self):
def _process_router_ipv6_interface_added(
self, router, ra_mode=None, addr_mode=None):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = self._prepare_router_data()
ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
self.conf.use_namespaces, router=router)
agent.external_gateway_added = mock.Mock()
@ -782,23 +805,53 @@ class TestBasicRouterOperations(base.BaseTestCase):
agent.process_router(ri)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Add an IPv6 interface and reprocess
router[l3_constants.INTERFACE_KEY].append(
{'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': 'fd00::2',
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': 'fd00::/64',
'gateway_ip': 'fd00::1'}})
self._router_append_interface(router, count=1, ip_version=6,
ra_mode=ra_mode, addr_mode=addr_mode)
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
# For some reason set logic does not work well with
# IpTablesRule instances
# IPv4 NAT rules should not be changed by adding an IPv6 interface
nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
if r not in orig_nat_rules]
self.assertFalse(nat_rules_delta)
return ri
def _expected_call_lookup_ri_process(self, ri, process):
"""Expected call if a process is looked up in a router instance."""
return [mock.call(cfg.CONF,
ri.router['id'],
self.conf.root_helper,
ri.ns_name,
process)]
def _assert_ri_process_enabled(self, ri, process):
"""Verify that process was enabled for a router instance."""
expected_calls = self._expected_call_lookup_ri_process(ri, process)
expected_calls.append(mock.call().enable(mock.ANY, True))
self.assertEqual(expected_calls, self.external_process.mock_calls)
def _assert_ri_process_disabled(self, ri, process):
"""Verify that process was disabled for a router instance."""
expected_calls = self._expected_call_lookup_ri_process(ri, process)
expected_calls.append(mock.call().disable())
self.assertEqual(expected_calls, self.external_process.mock_calls)
def test_process_router_ipv6_interface_added(self):
router = self._prepare_router_data()
ri = self._process_router_ipv6_interface_added(router)
self._assert_ri_process_enabled(ri, 'radvd')
# Expect radvd configured without prefix
self.assertNotIn('prefix',
self.utils_replace_file.call_args[0][1].split())
def test_process_router_ipv6_slaac_interface_added(self):
router = self._prepare_router_data()
ri = self._process_router_ipv6_interface_added(
router, ra_mode=l3_constants.IPV6_SLAAC)
self._assert_ri_process_enabled(ri, 'radvd')
# Expect radvd configured with prefix
self.assertIn('prefix',
self.utils_replace_file.call_args[0][1].split())
def test_process_router_ipv6v4_interface_added(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
@ -810,28 +863,12 @@ class TestBasicRouterOperations(base.BaseTestCase):
agent.process_router(ri)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Add an IPv4 and IPv6 interface and reprocess
router[l3_constants.INTERFACE_KEY].append(
{'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': '35.4.1.4',
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': '35.4.1.0/24',
'gateway_ip': '35.4.1.1'}})
router[l3_constants.INTERFACE_KEY].append(
{'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': 'fd00::2',
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': 'fd00::/64',
'gateway_ip': 'fd00::1'}})
self._router_append_interface(router, count=1, ip_version=4)
self._router_append_interface(router, count=1, ip_version=6)
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
self._assert_ri_process_enabled(ri, 'radvd')
# For some reason set logic does not work well with
# IpTablesRule instances
nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
@ -862,6 +899,25 @@ class TestBasicRouterOperations(base.BaseTestCase):
# send_arp is called both times process_router is called
self.assertEqual(self.send_arp.call_count, 2)
def test_process_router_ipv6_interface_removed(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = self._prepare_router_data()
ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
self.conf.use_namespaces, router=router)
agent.external_gateway_added = mock.Mock()
ri.router = router
agent.process_router(ri)
# Add an IPv6 interface and reprocess
self._router_append_interface(router, count=1, ip_version=6)
agent.process_router(ri)
self._assert_ri_process_enabled(ri, 'radvd')
# Reset the calls so we can check for disable radvd
self.external_process.reset_mock()
# Remove the IPv6 interface and reprocess
del router[l3_constants.INTERFACE_KEY][1]
agent.process_router(ri)
self._assert_ri_process_disabled(ri, 'radvd')
def test_process_router_internal_network_added_unexpected_error(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = self._prepare_router_data()
@ -1358,7 +1414,10 @@ class TestBasicRouterOperations(base.BaseTestCase):
ns_list = agent._list_namespaces()
agent._cleanup_namespaces(ns_list, [r['id'] for r in router_list])
self.assertEqual(pm.disable.call_count, len(stale_namespace_list))
# Expect process manager to disable two processes (metadata_proxy
# and radvd) per stale namespace.
expected_pm_disables = 2 * len(stale_namespace_list)
self.assertEqual(expected_pm_disables, pm.disable.call_count)
self.assertEqual(agent._destroy_router_namespace.call_count,
len(stale_namespace_list))
expected_args = [mock.call(ns) for ns in stale_namespace_list]

View File

@ -120,27 +120,27 @@ class TestProcessManager(base.BaseTestCase):
debug.assert_called_once_with(mock.ANY, mock.ANY)
def test_get_pid_file_name_existing(self):
with mock.patch.object(ep.os.path, 'isdir') as isdir:
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
isdir.return_value = True
manager = ep.ProcessManager(self.conf, 'uuid')
retval = manager.get_pid_file_name(ensure_pids_dir=True)
self.assertEqual(retval, '/var/path/uuid.pid')
self.assertEqual(retval, '/var/path/uuid/pid')
def test_get_pid_file_name_not_existing(self):
with mock.patch.object(ep.os.path, 'isdir') as isdir:
with mock.patch.object(ep.os, 'makedirs') as makedirs:
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
with mock.patch.object(ep.utils.os, 'makedirs') as makedirs:
isdir.return_value = False
manager = ep.ProcessManager(self.conf, 'uuid')
retval = manager.get_pid_file_name(ensure_pids_dir=True)
self.assertEqual(retval, '/var/path/uuid.pid')
makedirs.assert_called_once_with('/var/path', 0o755)
self.assertEqual(retval, '/var/path/uuid/pid')
makedirs.assert_called_once_with('/var/path/uuid', 0o755)
def test_get_pid_file_name_default(self):
with mock.patch.object(ep.os.path, 'isdir') as isdir:
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
isdir.return_value = True
manager = ep.ProcessManager(self.conf, 'uuid')
retval = manager.get_pid_file_name(ensure_pids_dir=False)
self.assertEqual(retval, '/var/path/uuid.pid')
self.assertEqual(retval, '/var/path/uuid/pid')
self.assertFalse(isdir.called)
def test_pid(self):