Refactor dhcp code, flake8 and pep8 fixes

This commit is contained in:
Dmitry Shulyak 2013-09-30 13:50:15 +03:00
parent 146d8412e2
commit c8de4de836
7 changed files with 89 additions and 140 deletions

3
.gitignore vendored
View File

@ -23,5 +23,4 @@ lock
.idea
.DS_Store
Nailgun.egg-info
Shotgun.egg-info
*.egg-info

View File

@ -1 +1,13 @@
__author__ = 'dshulyak'
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -1,4 +1,3 @@
#!/usr/bin/python
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,13 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from scapy.all import *
import itertools
import multiprocessing
import functools
import subprocess
from scapy import all as scapy
from dhcp_checker import utils
from dhcp_checker import vlans_utils
@utils.multiproc_map
@ -31,24 +28,25 @@ def check_dhcp_on_eth(iface, timeout):
>>> check_dhcp_on_eth('eth1')
"""
conf.iface = iface
scapy.conf.iface = iface
conf.checkIPaddr = False
scapy.conf.checkIPaddr = False
dhcp_options = [("message-type", "discover"),
("param_req_list", utils.format_options([1, 2, 3, 4, 5, 6,
11, 12, 13, 15, 16, 17, 18, 22, 23,
28, 40, 41, 42, 43, 50, 51, 54, 58, 59, 60, 66, 67])),
("param_req_list", utils.format_options(
[1, 2, 3, 4, 5, 6,
11, 12, 13, 15, 16, 17, 18, 22, 23,
28, 40, 41, 42, 43, 50, 51, 54, 58, 59, 60, 66, 67])),
"end"]
fam, hw = get_if_raw_hwaddr(iface)
fam, hw = scapy.get_if_raw_hwaddr(iface)
dhcp_discover = (
Ether(src=hw, dst="ff:ff:ff:ff:ff:ff") /
IP(src="0.0.0.0", dst="255.255.255.255") /
UDP(sport=68, dport=67) /
BOOTP(chaddr=hw) /
DHCP(options=dhcp_options))
ans, unans = srp(dhcp_discover, multi=True,
nofilter=1, timeout=timeout, verbose=0)
scapy.Ether(src=hw, dst="ff:ff:ff:ff:ff:ff") /
scapy.IP(src="0.0.0.0", dst="255.255.255.255") /
scapy.UDP(sport=68, dport=67) /
scapy.BOOTP(chaddr=hw) /
scapy.DHCP(options=dhcp_options))
ans, unans = scapy.srp(dhcp_discover, multi=True,
nofilter=1, timeout=timeout, verbose=0)
return ans
@ -64,8 +62,8 @@ def check_dhcp(ifaces, timeout=5, repeat=2):
if not ifaces_filtered:
raise EnvironmentError("No valid interfaces provided.")
pool = multiprocessing.Pool(len(ifaces_filtered)*repeat)
return itertools.chain(*pool.map(check_dhcp_on_eth,
((iface, timeout) for iface in ifaces_filtered*repeat)))
return itertools.chain(*pool.map(check_dhcp_on_eth, (
(iface, timeout) for iface in ifaces_filtered*repeat)))
def check_dhcp_with_vlans(config, timeout=5, repeat=2):
@ -74,7 +72,7 @@ def check_dhcp_with_vlans(config, timeout=5, repeat=2):
@ifaces - string : eth0, eth1
@vlans - iterable (100, 101, 102)
"""
with vlans_utils.VlansActor(config) as vifaces:
with utils.VlansContext(config) as vifaces:
return check_dhcp(list(vifaces), timeout=timeout, repeat=repeat)
@ -85,22 +83,23 @@ def check_dhcp_request(iface, server, range_start, range_end, timeout=5):
>>> check_dhcp_request('eth1','10.10.0.5','10.10.0.10','10.10.0.15')
"""
conf.iface = iface
scapy.conf.iface = iface
conf.checkIPaddr = False
scapy.conf.checkIPaddr = False
fam, hw = get_if_raw_hwaddr(iface)
fam, hw = scapy.get_if_raw_hwaddr(iface)
ip_address = next(utils.pick_ip(range_start, range_end))
# note lxc dhcp server does not respond to unicast
dhcp_request = (Ether(src=hw, dst="ff:ff:ff:ff:ff:ff") /
IP(src="0.0.0.0", dst="255.255.255.255") /
UDP(sport=68, dport=67) /
BOOTP(chaddr=hw) /
DHCP(options=[("message-type", "request"),
("server_id", server),
("requested_addr", ip_address), "end"]))
ans, unans = srp(dhcp_request, nofilter=1, multi=True,
timeout=timeout, verbose=0)
dhcp_request = (scapy.Ether(src=hw, dst="ff:ff:ff:ff:ff:ff") /
scapy.IP(src="0.0.0.0", dst="255.255.255.255") /
scapy.UDP(sport=68, dport=67) /
scapy.BOOTP(chaddr=hw) /
scapy.DHCP(options=[("message-type", "request"),
("server_id", server),
("requested_addr", ip_address),
"end"]))
ans, unans = scapy.srp(dhcp_request, nofilter=1, multi=True,
timeout=timeout, verbose=0)
return ans

View File

@ -12,11 +12,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from cliff import lister
from cliff import command
from dhcp_checker import api
from itertools import chain
import json
class BaseCommand(command.Command):
@ -25,9 +25,9 @@ class BaseCommand(command.Command):
def get_parser(self, prog_name):
parser = super(BaseCommand, self).get_parser(prog_name)
parser.add_argument('--timeout', default=5, type=int,
help="Provide timeout for each network request")
help="Provide timeout for each network request")
parser.add_argument('--repeat', default=2, type=int,
help="Provide number of repeats for request")
help="Provide number of repeats for request")
return parser
@ -44,8 +44,8 @@ class ListDhcpServers(lister.Lister, BaseCommand):
def take_action(self, parsed_args):
res = api.check_dhcp(parsed_args.ifaces,
timeout=parsed_args.timeout,
repeat=parsed_args.repeat)
timeout=parsed_args.timeout,
repeat=parsed_args.repeat)
first = res.next()
columns = first.keys()
return columns, [first.values()] + [item.values() for item in res]
@ -86,13 +86,13 @@ class DhcpWithVlansCheck(lister.Lister, BaseCommand):
def get_parser(self, prog_name):
parser = super(DhcpWithVlansCheck, self).get_parser(prog_name)
parser.add_argument('config',
help='Ethernet interface name')
help='Ethernet interface name')
return parser
def take_action(self, parsed_args):
res = api.check_dhcp_with_vlans(json.loads(parsed_args.config),
timeout=parsed_args.timeout,
repeat=parsed_args.repeat)
timeout=parsed_args.timeout,
repeat=parsed_args.repeat)
first = res.next()
columns = first.keys()
return columns, [first.values()] + [item.values() for item in res]

View File

@ -11,10 +11,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from scapy.all import *
import subprocess
import functools
import re
import sys
from scapy import all as scapy
def command_util(*command):
@ -39,7 +41,7 @@ def check_network_up(iface):
def check_iface_exist(iface):
"""Check provided interface exists
"""
return not command_util("ip","link", "show", iface).stderr.read()
return not command_util("ip", "link", "show", iface).stderr.read()
def filtered_ifaces(ifaces):
@ -48,7 +50,8 @@ def filtered_ifaces(ifaces):
sys.stderr.write('Iface {0} does not exist.'.format(iface))
else:
if not check_network_up(iface):
sys.stderr.write('Network for iface {0} is down.'.format(iface))
sys.stderr.write('Network for iface {0} is down.'.format(
iface))
else:
yield iface
@ -104,18 +107,18 @@ def single_format(func):
iface = args[0]
ans = func(*args, **kwargs)
columns = ('iface', 'mac', 'server_ip', 'server_id', 'gateway',
'dport', 'message', 'yiaddr')
'dport', 'message', 'yiaddr')
data = []
#scapy stores all sequence of requests
#so ans[0][1] would be response to first request
for response in ans:
dhcp_options = dict(_dhcp_options(response[1][DHCP].options))
dhcp_options = dict(_dhcp_options(response[1][scapy.DHCP].options))
results = (
iface, response[1][Ether].src, response[1][IP].src,
dhcp_options['server_id'], response[1][BOOTP].giaddr,
response[1][UDP].sport,
DHCPTypes[dhcp_options['message-type']],
response[1][BOOTP].yiaddr)
iface, response[1][scapy.Ether].src, response[1][scapy.IP].src,
dhcp_options['server_id'], response[1][scapy.BOOTP].giaddr,
response[1][scapy.UDP].sport,
scapy.DHCPTypes[dhcp_options['message-type']],
response[1][scapy.BOOTP].yiaddr)
data.append(dict(zip(columns, results)))
return data
return formatter
@ -138,3 +141,24 @@ def filter_duplicated_results(func):
resp = func(*args, **kwargs)
return (dict(t) for t in set([tuple(d.items()) for d in resp]))
return wrapper
class VlansContext(object):
"""Contains all logic to manage vlans
"""
def __init__(self, config):
"""
@config - list or tuple of (iface, vlan) pairs
"""
self.config = config
def __enter__(self):
for iface, vlans in self.config.iteritems():
yield str(iface)
for vlan in vlans:
if vlan > 0:
yield '{0}.{1}'.format(iface, vlan)
def __exit__(self, type, value, trace):
pass

View File

@ -1,34 +0,0 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class VlansActor(object):
"""Contains all logic to manage vlans
"""
def __init__(self, config):
"""
@config - list or tuple of (iface, vlan) pairs
"""
self.config = config
def __enter__(self):
for iface, vlans in self.config.iteritems():
yield str(iface)
for vlan in vlans:
if vlan > 0:
yield '{0}.{1}'.format(iface, vlan)
def __exit__(self, type, value, trace):
pass

View File

@ -13,13 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import subprocess
import json
import unittest
import time
from dhcp_checker import utils
from dhcp_checker import vlans_utils
from dhcp_checker import api
@ -39,49 +34,3 @@ class TestDhcpServers(unittest.TestCase):
response = api.check_dhcp_on_eth('eth2', 5)
self.assertEqual(len(response), 1)
self.assertEqual(response[0]['server_ip'], '10.10.0.10')
class VlanCreationTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.vlan = vlans_utils.Vlan('eth0', '100')
def test_vlan_creation(self):
self.vlan.up()
time.sleep(5)
self.assertTrue(self.vlan.state)
def test_vlan_deletion(self):
self.assertTrue(self.vlan.state)
self.vlan.down()
time.sleep(5)
class VlanCreationWithExistingTestCase(unittest.TestCase):
def test_check_vlan_down_status(self):
self.vlan_down = vlans_utils.Vlan('eth0', '110')
self.vlan_down.create()
time.sleep(5)
self.assertEqual(self.vlan_down.state, 'DOWN')
self.vlan_down.down()
time.sleep(5)
def test_repeat_created_vlan(self):
self.vlan_up = vlans_utils.Vlan('eth0', '112')
self.vlan_up.up()
time.sleep(5)
self.assertEqual(self.vlan_up.state, 'UP')
self.vlan_up.down()
time.sleep(5)
class WithVlanDecoratorTestCase(unittest.TestCase):
def test_with_vlan_enter(self):
with vlans_utils.VlansContext('eth0', ('101','102','103'), delete=True) as vlan_list:
time.sleep(5)
for v in vlan_list:
self.assertEqual('UP', v.state)