Refactor dhcp checker to use pcap

Scapy packet filtering is slow and blocking.
Pcap filtering can be used in same thread,
this allows us to use higher timeout for dhcp offer waiting

Setup proper logger for dhcp_checker app.
ERROR level logs will be redirected to stderr
DEBUG level to /var/log/dhcp_checker.log

Change-Id: I7d9a0f1b8e3082637bc2474302fc0f7c17e0adb8
Closes-Bug: #1247284
This commit is contained in:
Dima 2013-11-06 11:46:12 +02:00 committed by Gerrit Code Review
parent a3fedc45a4
commit 9bd1db420e
6 changed files with 119 additions and 67 deletions

View File

@ -15,10 +15,8 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
end
config.vm.define :develop do |config|
config.vm.provision :shell, :path => "test_env/epel_scapy.sh"
config.vm.network :private_network, ip: "192.168.0.2"
config.vm.network :private_network, ip: "10.10.0.5"
config.vm.provision :shell, :inline => "cd /vagrant && python setup.py develop"
end
config.vm.define :dhcp2 do |config|

View File

@ -14,29 +14,23 @@
import itertools
import logging
import multiprocessing
import time
logging.getLogger('scapy.runtime').setLevel(logging.ERROR)
from scapy import config as scapy_config
scapy_config.use_pcap = True
logging.getLogger('scapy.runtime').setLevel(logging.CRITICAL)
from dhcp_checker import utils
import pcap
from scapy import all as scapy
CONCURRENCY_LIMIT = 10
LOG = logging.getLogger(__name__)
@utils.multiproc_map
@utils.single_format
def check_dhcp_on_eth(iface, timeout):
"""Check if there is roque dhcp server in network on given iface
@iface - name of the ethernet interface
@timeout - scapy timeout for waiting on response
>>> check_dhcp_on_eth('eth1')
"""
def _get_dhcp_discover_message(iface):
scapy.conf.iface = iface
scapy.conf.checkIPaddr = False
dhcp_options = [("message-type", "discover"),
("param_req_list", utils.format_options(
[1, 2, 3, 4, 5, 6,
@ -45,19 +39,33 @@ def check_dhcp_on_eth(iface, timeout):
"end"]
fam, hw = scapy.get_if_raw_hwaddr(iface)
dhcp_discover = (
scapy.Ether(src=hw, dst="ff:ff:ff:ff:ff:ff") /
scapy.IP(src="0.0.0.0", dst="255.255.255.255") /
scapy.UDP(sport=68, dport=67) /
scapy.BOOTP(chaddr=hw) /
scapy.DHCP(options=dhcp_options))
return dhcp_discover
@utils.single_format
def check_dhcp_on_eth(iface, timeout):
"""Check if there is roque dhcp server in network on given iface
@iface - name of the ethernet interface
@timeout - scapy timeout for waiting on response
>>> check_dhcp_on_eth('eth1')
"""
scapy.conf.iface = iface
scapy.conf.checkIPaddr = False
dhcp_discover = _get_dhcp_discover_message(iface)
ans, unans = scapy.srp(dhcp_discover, multi=True,
nofilter=1, timeout=timeout, verbose=0)
return ans
@utils.filter_duplicated_results
def check_dhcp(ifaces, timeout=5, repeat=2):
"""Given list of ifaces. Process them in separate processes
@ifaces - lsit of ifaces
@ -65,25 +73,48 @@ def check_dhcp(ifaces, timeout=5, repeat=2):
@repeat - number of packets sended
>>> check_dhcp(['eth1', 'eth2'])
"""
ifaces_filtered = list(utils.filtered_ifaces(ifaces))
if not ifaces_filtered:
raise EnvironmentError("No valid interfaces provided.")
concurrency_limit = (CONCURRENCY_LIMIT
if len(ifaces_filtered) > CONCURRENCY_LIMIT
else len(ifaces_filtered))
pool = multiprocessing.Pool(concurrency_limit)
return itertools.chain(*pool.map(check_dhcp_on_eth, (
(iface, timeout) for iface in ifaces_filtered * repeat)))
config = {}
for iface in ifaces:
config[iface] = ()
return check_dhcp_with_vlans(config, timeout=timeout, repeat=repeat)
def send_dhcp_discover(iface):
dhcp_discover = _get_dhcp_discover_message(iface)
scapy.sendp(dhcp_discover, iface=iface, verbose=0)
def make_listeners(ifaces):
listeners = []
for iface in ifaces:
try:
listener = pcap.pcap(iface)
listener.setfilter('dst port 68')
listeners.append(listener)
except Exception:
LOG.warning(
'Spawning listener for {iface} failed.'.format(iface=iface))
return listeners
@utils.filter_duplicated_results
def check_dhcp_with_vlans(config, timeout=5, repeat=2):
"""Provide config of {iface: [vlans..]} pairs
@config - {'eth0': (100, 101), 'eth1': (100, 102)}
@ifaces - string : eth0, eth1
@vlans - iterable (100, 101, 102)
"""
# vifaces - list of pairs ('eth0', ['eth0.100', 'eth0.101'])
with utils.VlansContext(config) as vifaces:
return check_dhcp(list(vifaces), timeout=timeout, repeat=repeat)
ifaces, vlans = zip(*vifaces)
listeners = make_listeners(ifaces)
for i in utils.filtered_ifaces(itertools.chain(ifaces, *vlans)):
send_dhcp_discover(i)
time.sleep(timeout)
for l in listeners:
for pkt in l.readpkts():
yield utils.format_answer(scapy.Ether(pkt[1]), l.name)
@utils.single_format

View File

@ -11,12 +11,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
#explicitly set editor for cmd2, will throw a lot of which /usr/bin/.. thrash
#this is known bug fixed in cmd2==0.6.6
os.environ['EDITOR'] = '/usr/bin/nano'
import logging
from logging import handlers
import os
import sys
# fixed in cmd2 >=0.6.6
os.environ['EDITOR'] = '/usr/bin/nano'
from cliff.app import App
from cliff.commandmanager import CommandManager
@ -34,8 +35,26 @@ class DhcpApp(App):
def configure_logging(self):
super(DhcpApp, self).configure_logging()
logger = logging.getLogger(None)
logger.propagate = False
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s %(levelname)s (%(module)s) %(message)s',
"%Y-%m-%d %H:%M:%S")
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.ERROR)
stream_handler.setFormatter(formatter)
file_handler = handlers.TimedRotatingFileHandler(
'/var/log/dhcp_checker.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.addHandler(file_handler)
# set scapy logger level only to ERROR
# due to a lot of spam
runtime_logger = logging.getLogger('scapy.runtime')
runtime_logger.setLevel(logging.ERROR)

View File

@ -11,13 +11,18 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
from cliff import command
from cliff import lister
from dhcp_checker import api
LOG = logging.getLogger(__name__)
class BaseCommand(command.Command):
"""Base command for all app
"""
@ -42,6 +47,7 @@ class ListDhcpServers(lister.Lister, BaseCommand):
return parser
def take_action(self, parsed_args):
LOG.info('Starting dhcp discover for {0}'.format(parsed_args.ifaces))
res = api.check_dhcp(parsed_args.ifaces,
timeout=parsed_args.timeout,
repeat=parsed_args.repeat)

View File

@ -15,7 +15,6 @@
import os
import unittest
import mock
from mock import patch
from scapy import all as scapy
@ -68,24 +67,21 @@ class TestDhcpApi(unittest.TestCase):
response = api.check_dhcp_on_eth('eth1', timeout=5)
self.assertEqual([], response)
@patch('dhcp_checker.api.utils')
@patch('dhcp_checker.api.check_dhcp_on_eth')
@patch('dhcp_checker.api.send_dhcp_discover')
@patch('dhcp_checker.api.make_listeners')
def test_check_dhcp_with_multiple_ifaces(
self, dhcp_on_eth_mock, utils_mock):
dhcp_on_eth_mock.return_value = [expected_response]
# issue with pickling mock
dhcp_on_eth_mock.__class__ = mock.MagicMock
utils_mock.filtered_ifaces.side_effect = lambda ifaces: ifaces
response = api.check_dhcp(['eth1', 'eth2'])
self.assertEqual(list(response), [expected_response])
self, make_listeners, send_discover):
api.check_dhcp(['eth1', 'eth2'])
make_listeners.assert_called_once_with(('eth2', 'eth1'))
self.assertEqual(send_discover.call_count, 2)
@patch('dhcp_checker.api.check_dhcp')
def test_check_dhcp_with_vlans(self, check_dhcp):
@patch('dhcp_checker.api.send_dhcp_discover')
@patch('dhcp_checker.api.make_listeners')
def test_check_dhcp_with_vlans(self, make_listeners, send_discover):
config_sample = {
'eth0': (100, 101),
'eth1': (100, 102)
}
api.check_dhcp_with_vlans(config_sample)
check_dhcp.assert_called_once_with(
['eth1', 'eth1.100', 'eth1.102', 'eth0', 'eth0.100', 'eth0.101'],
repeat=2, timeout=5)
api.check_dhcp_with_vlans(config_sample, timeout=1)
make_listeners.assert_called_once_with(('eth1', 'eth0'))
self.assertEqual(send_discover.call_count, 2)

View File

@ -112,6 +112,19 @@ def _dhcp_options(dhcp_options):
yield (header, option[1])
def format_answer(ans, iface):
columns = ('iface', 'mac', 'server_ip', 'server_id', 'gateway',
'dport', 'message', 'yiaddr')
dhcp_options = dict(_dhcp_options(ans[scapy.DHCP].options))
results = (
iface, ans[scapy.Ether].src, ans[scapy.IP].src,
dhcp_options['server_id'], ans[scapy.BOOTP].giaddr,
ans[scapy.UDP].sport,
scapy.DHCPTypes[dhcp_options['message-type']],
ans[scapy.BOOTP].yiaddr)
return dict(zip(columns, results))
def single_format(func):
"""Manage format of dhcp response
"""
@ -119,21 +132,9 @@ def single_format(func):
def formatter(*args, **kwargs):
iface = args[0]
ans = func(*args, **kwargs)
columns = ('iface', 'mac', 'server_ip', 'server_id', 'gateway',
'dport', 'message', 'yiaddr')
data = []
#scapy stores all sequence of requests
#so ans[0][1] would be response to first request
for response in ans:
dhcp_options = dict(_dhcp_options(response[1][scapy.DHCP].options))
results = (
iface, response[1][scapy.Ether].src, response[1][scapy.IP].src,
dhcp_options['server_id'], response[1][scapy.BOOTP].giaddr,
response[1][scapy.UDP].sport,
scapy.DHCPTypes[dhcp_options['message-type']],
response[1][scapy.BOOTP].yiaddr)
data.append(dict(zip(columns, results)))
return data
return [format_answer(response[1], iface) for response in ans]
return formatter
@ -168,10 +169,11 @@ class VlansContext(object):
def __enter__(self):
for iface, vlans in self.config.iteritems():
yield str(iface)
vifaces = []
for vlan in vlans:
if vlan > 0:
yield '{0}.{1}'.format(iface, vlan)
vifaces.append('{0}.{1}'.format(iface, vlan))
yield str(iface), vifaces
def __exit__(self, type, value, trace):
pass