Fix hacking 0.10 in network-checker

Change-Id: I1ef6a2fb34092d498c4f6ea90fbe4ed27605b467
Partial-Bug: #1410810
This commit is contained in:
Maciej Kwiek 2015-10-08 17:30:31 +02:00
parent 571fa2a17c
commit 390cd8bd20
12 changed files with 54 additions and 53 deletions

View File

@ -53,6 +53,7 @@ def _get_dhcp_discover_message(iface):
@utils.single_format
def check_dhcp_on_eth(iface, timeout):
"""Check if there is roque dhcp server in network on given iface
@iface - name of the ethernet interface
@timeout - scapy timeout for waiting on response
>>> check_dhcp_on_eth('eth1')
@ -68,6 +69,7 @@ def check_dhcp_on_eth(iface, timeout):
def check_dhcp(ifaces, timeout=5, repeat=2):
"""Given list of ifaces. Process them in separate processes
@ifaces - lsit of ifaces
@timeout - timeout for scapy to wait for response
@repeat - number of packets sended
@ -102,6 +104,7 @@ def make_listeners(ifaces):
@utils.filter_duplicated_results
def check_dhcp_with_vlans(config, timeout=5, repeat=2):
"""Provide config of {iface: [vlans..]} pairs
@config - {'eth0': (100, 101), 'eth1': (100, 102)}
"""
# vifaces - list of pairs ('eth0', ['eth0.100', 'eth0.101'])
@ -122,6 +125,7 @@ def check_dhcp_with_vlans(config, timeout=5, repeat=2):
@utils.single_format
def check_dhcp_request(iface, server, range_start, range_end, timeout=5):
"""Provide interface, server endpoint and pool of ip adresses
Should be used after offer received
>>> check_dhcp_request('eth1','10.10.0.5','10.10.0.10','10.10.0.15')
"""

View File

@ -25,8 +25,7 @@ LOG = logging.getLogger(__name__)
class BaseCommand(command.Command):
"""Base command for all app
"""
"""Base command for all app"""
def get_parser(self, prog_name):
parser = super(BaseCommand, self).get_parser(prog_name)
parser.add_argument('--timeout', default=5, type=int,
@ -37,8 +36,7 @@ class BaseCommand(command.Command):
class ListDhcpServers(lister.Lister, BaseCommand):
"""Show list of dhcp servers on ethernet interfaces.
"""
"""Show list of dhcp servers on ethernet interfaces"""
def get_parser(self, prog_name):
parser = super(ListDhcpServers, self).get_parser(prog_name)
@ -53,7 +51,7 @@ class ListDhcpServers(lister.Lister, BaseCommand):
parsed_args.ifaces,
timeout=parsed_args.timeout,
repeat=parsed_args.repeat))
#NOTE(dshulyak) unfortunately cliff doesnt allow to configure
# NOTE(dshulyak) unfortunately cliff doesnt allow to configure
# PrettyTable output, see link:
# https://github.com/dhellmann/cliff/blob/master/
# cliff/formatters/table.py#L34
@ -67,8 +65,7 @@ class ListDhcpServers(lister.Lister, BaseCommand):
class ListDhcpAssignment(lister.Lister, BaseCommand):
"""Make dhcp request to servers and receive acknowledgement messages
"""
"""Make dhcp request to servers and receive acknowledgement messages"""
def get_parser(self, prog_name):
parser = super(ListDhcpAssignment, self).get_parser(prog_name)
@ -95,6 +92,7 @@ class ListDhcpAssignment(lister.Lister, BaseCommand):
class DhcpWithVlansCheck(lister.Lister, BaseCommand):
"""Provide iface with list of vlans to check
If no vlans created - they will be. After creation they won't be deleted.
"""

View File

@ -25,8 +25,7 @@ from dhcp_checker import utils
class TestDhcpServers(unittest.TestCase):
def test_dhcp_server_on_eth1(self):
"""Test verifies dhcp server on eth1 iface
"""
"""Test verifies dhcp server on eth1 iface"""
response = api.check_dhcp_on_eth('eth1', 2)
self.assertEqual(len(response), 1)
# we need to guarantee that received answer has server_ip
@ -34,8 +33,7 @@ class TestDhcpServers(unittest.TestCase):
self.assertTrue(response[0]['server_ip'])
def test_dhcp_server_on_eth2(self):
"""Test verifies dhcp server on eth2 iface
"""
"""Test verifies dhcp server on eth2 iface"""
response = api.check_dhcp_on_eth('eth2', 2)
self.assertEqual(len(response), 1)
self.assertTrue(response[0]['server_ip'])
@ -48,14 +46,12 @@ class TestDhcpUtils(unittest.TestCase):
utils.command_util('ifconfig', self.iface_down, 'down')
def test_check_network_up(self):
"""Verify that true would be returned on test network up
"""
"""Verify that true would be returned on test network up"""
result = utils.check_network_up('eth0')
self.assertTrue(result)
def test_check_network_down(self):
"""Verify that false would be returned on test network down
"""
"""Verify that false would be returned on test network down"""
self.assertFalse(utils.check_network_up(self.iface_down))
def tearDown(self):
@ -70,9 +66,7 @@ class TestDhcpWithNetworkDown(unittest.TestCase):
utils.command_util('ifconfig', self.iface_down, 'down')
def test_dhcp_server_on_eth2_down(self):
"""Test verifies that iface would be ifuped in case it's down
and rolledback after
"""
"""iface should be ifuped in case it's down and rolledback after"""
manager = utils.IfaceState(self.iface_down)
with manager as iface:
response = api.check_dhcp_on_eth(iface, 2)
@ -84,8 +78,7 @@ class TestDhcpWithNetworkDown(unittest.TestCase):
self.assertEqual(manager.post_iface_state, 'DOWN')
def test_dhcp_server_on_eth0_up(self):
"""Test verifies that if iface is up, it won't be touched
"""
"""Test verifies that if iface is up, it won't be touched"""
manager = utils.IfaceState(self.iface_up)
with manager as iface:
response = api.check_dhcp_on_eth(iface, 2)

View File

@ -95,7 +95,9 @@ class TestDhcpFormat(unittest.TestCase):
self.dhcp_response = self.scapy_data[1:]
def test_single_format_decorator(self):
"""Test verifies that single_format decorator contains logic to modify
"""Test modifying scapy response objects
Test verifies that single_format decorator contains logic to modify
scapy response object into dict with predefined fields
"""
@ -113,7 +115,9 @@ class TestDhcpFormat(unittest.TestCase):
class TestMultiprocMap(unittest.TestCase):
"""Test verifies that working with function decorated by multiproc_map
"""Check if multiproc_map decorator works the same for different arg types
Test verifies that working with function decorated by multiproc_map
will work indifirently either args passed as tuple, or *args
"""

View File

@ -24,20 +24,19 @@ DHCP_OFFER_COLUMNS = ('iface', 'mac', 'server_ip', 'server_id', 'gateway',
def command_util(*command):
"""object with stderr and stdout
"""
"""object with stderr and stdout"""
return subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
def _check_vconfig():
"""Check vconfig installed or not
"""
"""Check vconfig installed or not"""
return not command_util('which', 'vconfig').stderr.read()
def _iface_state(iface):
"""For a given iface return it's state
returns UP, DOWN, UNKNOWN
"""
state = command_util('ip', 'link', 'show', iface).stdout.read()
@ -56,8 +55,7 @@ def check_network_up(iface):
def check_iface_exist(iface):
"""Check provided interface exists
"""
"""Check provided interface exists"""
return not command_util("ip", "link", "show", iface).stderr.read()
@ -75,6 +73,7 @@ def filtered_ifaces(ifaces):
def pick_ip(range_start, range_end):
"""Given start_range, end_range generate list of ips
>>> next(pick_ip('192.168.1.10','192.168.1.13'))
'192.168.1.10'
"""
@ -95,6 +94,7 @@ def pick_ip(range_start, range_end):
def get_item_properties(item, columns):
"""Get specified in columns properties, with preserved order.
Required for correct cli table generation
:param item: dict
@ -108,6 +108,7 @@ def get_item_properties(item, columns):
def format_options(options):
"""Util for serializing dhcp options
@options = [1,2,3]
>>> format_options([1, 2, 3])
'\x01\x02\x03'
@ -117,6 +118,7 @@ def format_options(options):
def _dhcp_options(dhcp_options):
"""Dhcp options returned by scapy is not in usable format
[('message-type', 2), ('server_id', '192.168.0.5'),
('name_server', '192.168.0.1', '192.168.0.2'), 'end']
"""
@ -141,14 +143,13 @@ def format_answer(ans, iface):
def single_format(func):
"""Manage format of dhcp response
"""
"""Manage format of dhcp response"""
@functools.wraps(func)
def formatter(*args, **kwargs):
iface = args[0]
ans = func(*args, **kwargs)
#scapy stores all sequence of requests
#so ans[0][1] would be response to first request
# scapy stores all sequence of requests
# so ans[0][1] would be response to first request
return [format_answer(response[1], iface) for response in ans]
return formatter
@ -173,11 +174,11 @@ def filter_duplicated_results(func):
class VlansContext(object):
"""Contains all logic to manage vlans
"""
"""Contains all logic to manage vlans"""
def __init__(self, config):
"""Initialize VlansContext
@config - list or tuple of (iface, vlan) pairs
"""
self.config = config
@ -195,8 +196,7 @@ class VlansContext(object):
class IfaceState(object):
"""Context manager to control state of iface when dhcp checker is running
"""
"""Context manager to control state of iface while dhcp checker runs"""
def __init__(self, iface, rollback=True, retry=3):
self.rollback = rollback
@ -226,9 +226,10 @@ class IfaceState(object):
def create_mac_filter(iface):
'''tcpdump can not cactch all 6 octets so it is splitted.
"""tcpdump can not catch all 6 octets so it is splitted
See http://blog.jasonantman.com/2010/04/dhcp-debugging-and-handy-tcpdump-filters # noqa
'''
"""
mac = scapy.get_if_hwaddr(iface).split(':')
filter1 = '(udp[36:2] = 0x{0})'.format(''.join(mac[:2]))
filter2 = '(udp[38:4] = 0x{0})'.format(''.join(mac[2:]))

View File

@ -39,7 +39,7 @@ class Api(object):
invoke_kwds=self.verification_config)
self.driver = self.manager.driver
rpc_server = xmlrpc.get_server(self.server_config)
#TODO(dshulyak) verification api should know what methods to serve
# TODO(dshulyak) verification api should know what methods to serve
rpc_server.register_function(self.driver.listen, 'listen')
rpc_server.register_function(self.driver.send, 'send')
rpc_server.register_function(self.driver.get_info, 'get_info')

View File

@ -45,7 +45,7 @@ def parse_args():
parser.add_argument(
'actions', nargs='+',
help="List of actions to perform.")
#TODO(dshulyak) Add posibility to provide arguments like regular shell args
# TODO(dshulyak) Add posibility to provide args like regular shell args
parser.add_argument(
'-c', '--config', default='{}',
help="User defined configuration in json format.")

View File

@ -29,7 +29,7 @@ def run_server(server, config):
keep_fds=[0, 1, 2, server.fileno()])
try:
daemon.start()
#this is required to do some stuff after server is daemonized
# this is required to do some stuff after server is daemonized
except SystemExit as e:
if e.code is 0:
return True

View File

@ -171,8 +171,7 @@ class Actor(object):
return (state == 'UP')
def _iface_up(self, iface, vid=None):
"""Brings interface with vid up
"""
"""Brings interface with vid up"""
if vid and not self._try_viface_create(iface, vid):
# if viface does not exist we raise exception
raise ActorException(
@ -191,8 +190,7 @@ class Actor(object):
"up"])
def _ensure_iface_up(self, iface, vid=None):
"""Ensures interface is with vid up.
"""
"""Ensures interface is with vid up."""
if not self._try_iface_up(iface, vid):
# if iface is not up we try to bring it up
self._iface_up(iface, vid)
@ -224,6 +222,7 @@ class Actor(object):
def _try_viface_create(self, iface, vid):
"""Tries to find vlan interface on iface with VLAN_ID=vid and return it
:returns: name of vlan interface if it exists or None
"""
self.logger.debug("Checking if vlan %s on interface %s exists",
@ -236,6 +235,7 @@ class Actor(object):
def _viface_create(self, iface, vid):
"""Creates VLAN interface with VLAN_ID=vid on interface iface
:returns: None
"""
self.logger.debug("Creating vlan %s on interface %s", str(vid), iface)
@ -248,7 +248,9 @@ class Actor(object):
"id", str(vid)])
def _ensure_viface_create(self, iface, vid):
"""Ensures that vlan interface exists. If it does not already
"""Ensures that vlan interface exists.
If it does not already
exist, then we need it to be created. It also marks newly created
vlan interface to remove it after probing procedure.
"""

View File

@ -32,6 +32,7 @@ def timeout_handler(timeout, signum, frame):
@contextmanager
def signal_timeout(timeout, raise_exc=True):
"""Timeout handling using signals
:param timeout: timeout in seconds, integer
:param raise_exc: bool to control suppressing of exception
"""

View File

@ -14,7 +14,7 @@ commands = py.test {toxinidir}/url_access_checker/tests
downloadcache = ~/cache/pip
[testenv:pep8]
deps = hacking==0.7
deps = hacking==0.10
usedevelop = False
commands =
flake8 {posargs:.}

View File

@ -25,8 +25,7 @@ logger = getLogger(__name__)
def get_default_gateway():
"""Return ipaddress, interface pair for default gateway
"""
"""Return ipaddress, interface pair for default gateway"""
gws = netifaces.gateways()
if 'default' in gws:
return gws['default'][netifaces.AF_INET]
@ -34,8 +33,7 @@ def get_default_gateway():
def check_ifaddress_present(iface, addr):
"""Check if required ipaddress already assigned to the iface
"""
"""Check if required ipaddress already assigned to the iface"""
for ifaddress in netifaces.ifaddresses(iface).get(netifaces.AF_INET, []):
if ifaddress['addr'] in addr:
return True
@ -173,8 +171,8 @@ class Route(object):
elif ((self.default_gateway, self.df_iface)
!= (self.gateway, self.iface)):
execute(['ip', 'ro', 'change',
'default', 'via', self.default_gateway,
'dev', self.df_iface])
'default', 'via', self.default_gateway,
'dev', self.df_iface])
@contextmanager