Add route table, routes, ip rules support for nmstate provider

Adding route table, routes, route rules for nmstate provider.
The supported ip rules options are from, to, iif, fwmark/mask
priority.
Supported RPDB rules type: blackhole, unreachable, prohibit

Change-Id: I12a705b132e54a15d0184cbe683d10419dbac8f6
This commit is contained in:
Karthik S
2023-05-03 11:17:32 +05:30
parent 29028c14d7
commit 0350a82f19
3 changed files with 562 additions and 13 deletions

View File

@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
from libnmstate import netapplier
from libnmstate import netinfo
from libnmstate.schema import DNS
@@ -24,12 +25,15 @@ from libnmstate.schema import InterfaceIPv4
from libnmstate.schema import InterfaceIPv6
from libnmstate.schema import InterfaceState
from libnmstate.schema import InterfaceType
from libnmstate.schema import Route as NMRoute
from libnmstate.schema import RouteRule as NMRouteRule
import logging
import netaddr
import re
import yaml
import os_net_config
from os_net_config import common
from os_net_config import objects
logger = logging.getLogger(__name__)
@@ -37,11 +41,28 @@ logger = logging.getLogger(__name__)
# Import the raw NetConfig object so we can call its methods
netconfig = os_net_config.NetConfig()
_OS_NET_CONFIG_MANAGED = "# os-net-config managed table"
_ROUTE_TABLE_DEFAULT = """# reserved values
#
255\tlocal
254\tmain
253\tdefault
0\tunspec
#
# local
#
#1\tinr.ruhep\n"""
IPV4_DEFAULT_GATEWAY_DESTINATION = "0.0.0.0/0"
IPV6_DEFAULT_GATEWAY_DESTINATION = "::/0"
def route_table_config_path():
return "/etc/iproute2/rt_tables"
def _get_type_value(str_val):
if isinstance(str_val, str):
if str_val.isdigit():
@@ -53,6 +74,16 @@ def _get_type_value(str_val):
return str_val
def get_route_options(route_options, key):
items = route_options.split(' ')
iter_list = iter(items)
for item in iter_list:
if key in item:
return _get_type_value(next(iter_list))
return
def is_dict_subset(superset, subset):
"""Check to see if one dict is a subset of another dict."""
@@ -99,13 +130,22 @@ def _add_sub_tree(data, subtree):
return config
def _is_any_ip_addr(address):
if address.lower() == 'any' or address.lower() == 'all':
return True
return False
class NmstateNetConfig(os_net_config.NetConfig):
"""Configure network interfaces using NetworkManager via nmstate API."""
def __init__(self, noop=False, root_dir=''):
super(NmstateNetConfig, self).__init__(noop, root_dir)
self.interface_data = {}
self.route_data = {}
self.rules_data = []
self.dns_data = {'server': [], 'domain': []}
self.route_table_data = {}
logger.info('nmstate net config provider created.')
def __dump_config(self, config, msg="Applying config"):
@@ -114,6 +154,84 @@ class NmstateNetConfig(os_net_config.NetConfig):
logger.debug("----------------------------")
logger.debug(f"{msg}\n{cfg_dump}")
def get_route_tables(self):
"""Generate configuration content for routing tables.
This method first extracts the existing route table definitions. If
any non-default tables exist, they will be kept unless they conflict
with new tables defined in the route_tables dict.
:param route_tables: A dict of RouteTable objects
"""
rt_tables = {}
rt_config = common.get_file_data(route_table_config_path()).split('\n')
for line in rt_config:
# ignore comments and black lines
line = line.strip()
if not line or line.startswith('#'):
pass
else:
id_name = line.split()
if len(id_name) > 1 and id_name[0].isdigit():
rt_tables[id_name[1]] = int(id_name[0])
self.__dump_config(rt_tables,
msg='Contents of /etc/iproute2/rt_tables')
return rt_tables
def generate_route_table_config(self, route_tables):
"""Generate configuration content for routing tables.
This method first extracts the existing route table definitions. If
any non-default tables exist, they will be kept unless they conflict
with new tables defined in the route_tables dict.
:param route_tables: A dict of RouteTable objects
"""
custom_tables = {}
res_ids = ['0', '253', '254', '255']
res_names = ['unspec', 'default', 'main', 'local']
rt_config = common.get_file_data(route_table_config_path()).split('\n')
rt_defaults = _ROUTE_TABLE_DEFAULT.split("\n")
data = _ROUTE_TABLE_DEFAULT
for line in rt_config:
line = line.strip()
if line in rt_defaults:
continue
# Leave non-standard comments intact in file
if line.startswith('#'):
data += f"{line}\n"
# Ignore old managed entries, will be added back if in new config.
elif line.find(_OS_NET_CONFIG_MANAGED) == -1:
id_name = line.split()
# Keep custom tables if there is no conflict with new tables.
if len(id_name) > 1 and id_name[0].isdigit():
if not id_name[0] in res_ids:
if not id_name[1] in res_names:
if not int(id_name[0]) in route_tables:
if not id_name[1] in route_tables.values():
# Replicate line with any comments appended
custom_tables[id_name[0]] = id_name[1]
data += f"{line}\n"
if custom_tables:
logger.debug(f"Existing route tables: {custom_tables}")
for id in sorted(route_tables):
if str(id) in res_ids:
message = f"Table {route_tables[id]}({id}) conflicts with " \
f"reserved table "\
f"{res_names[res_ids.index(str(id))]}({id})"
raise os_net_config.ConfigurationError(message)
elif route_tables[id] in res_names:
message = f"Table {route_tables[id]}({id}) conflicts with "\
f"reserved table {route_tables[id]}" \
f"({res_ids[res_names.index(route_tables[id])]})"
raise os_net_config.ConfigurationError(message)
else:
data += f"{id}\t{route_tables[id]} "\
f"{_OS_NET_CONFIG_MANAGED}\n"
return data
def iface_state(self, name=''):
"""Return the current interface state according to nmstate.
@@ -149,6 +267,39 @@ class NmstateNetConfig(os_net_config.NetConfig):
if not self.noop:
netapplier.apply(state, verify_change=True)
def route_state(self, name=''):
"""Return the current routes set according to nmstate.
Return the current routes for all interfaces, or the named interface.
:param name: name of the interface to return state, otherwise all.
:returns: list of all interfaces, or those matching name if specified
"""
routes = netinfo.show_running_config()[
NMRoute.KEY][NMRoute.CONFIG]
if name != "":
route = list(x for x in routes if x[
NMRoute.NEXT_HOP_INTERFACE] == name)
self.__dump_config(route,
msg=f'Running route config for {name}')
return route
else:
self.__dump_config(routes, msg=f'Running routes config')
return routes
def rule_state(self):
"""Return the current rules set according to nmstate.
Return the current ip rules for all interfaces, or the named interface.
:param name: name of the interface to return state, otherwise all.
:returns: list of all interfaces, or those matching name if specified
"""
rules = netinfo.show_running_config()[
NMRouteRule.KEY][NMRouteRule.CONFIG]
self.__dump_config(rules, msg=f'List of IP rules running config')
return rules
def set_ifaces(self, iface_data, verify=True):
"""Apply the desired state using nmstate.
@@ -173,6 +324,78 @@ class NmstateNetConfig(os_net_config.NetConfig):
if not self.noop:
netapplier.apply(state, verify_change=verify)
def set_routes(self, route_data, verify=True):
"""Apply the desired routes using nmstate.
:param route_data: list of routes
:param verify: boolean that determines if config will be verified
"""
state = {NMRoute.KEY: {NMRoute.CONFIG: route_data}}
self.__dump_config(state, msg=f'Applying routes')
if not self.noop:
netapplier.apply(state, verify_change=verify)
def set_rules(self, rule_data, verify=True):
"""Apply the desired rules using nmstate.
:param rule_data: list of rules
:param verify: boolean that determines if config will be verified
"""
state = {NMRouteRule.KEY: {NMRouteRule.CONFIG: rule_data}}
self.__dump_config(state, msg=f'Applying rules')
if not self.noop:
netapplier.apply(state, verify_change=verify)
def generate_routes(self, interface_name):
"""Generate the route configurations required. Add/Remove routes
: param interface_name: interface name for which routes are required
"""
reqd_route = self.route_data.get(interface_name, [])
curr_routes = self.route_state(interface_name)
routes = []
self.__dump_config(curr_routes,
msg=f'Running route config for {interface_name}')
self.__dump_config(reqd_route,
msg=f'Required route changes for {interface_name}')
for c_route in curr_routes:
no_metric = copy.deepcopy(c_route)
if NMRoute.METRIC in no_metric:
del no_metric[NMRoute.METRIC]
if c_route not in reqd_route and no_metric not in reqd_route:
c_route[NMRoute.STATE] = NMRoute.STATE_ABSENT
routes.append(c_route)
logger.info(f'Removing route {c_route}')
routes.extend(reqd_route)
return routes
def generate_rules(self):
"""Generate the rule configurations required. Add/Remove rules
"""
reqd_rule = self.rules_data
curr_rules = self.rule_state()
rules = []
self.__dump_config(curr_rules,
msg=f'Running set of ip rules')
self.__dump_config(reqd_rule,
msg=f'Required ip rules')
for c_rule in curr_rules:
if c_rule not in reqd_rule:
c_rule[NMRouteRule.STATE] = NMRouteRule.STATE_ABSENT
rules.append(c_rule)
logger.info(f'Removing rule {c_rule}')
rules.extend(reqd_rule)
return rules
def add_ethtool_subtree(self, data, sub_config, command):
config = _add_sub_tree(data, sub_config['sub-tree'])
ethtool_map = sub_config['map']
@@ -412,13 +635,148 @@ class NmstateNetConfig(os_net_config.NetConfig):
if base_opt.domain:
self._add_dns_domain(base_opt.domain)
if base_opt.routes:
msg = "Error: Routes not yet supported by impl_nmstate"
raise os_net_config.NotImplemented(msg)
self._add_routes(base_opt.name, base_opt.routes)
if base_opt.rules:
msg = "Error: IP Rules are not yet supported by impl_nmstate"
raise os_net_config.NotImplemented(msg)
self._add_rules(base_opt.name, base_opt.rules)
return data
def _add_routes(self, interface_name, routes=[]):
routes_data = []
logger.info(f'adding custom route for interface: {interface_name}')
for route in routes:
route_data = {}
if route.route_options:
value = get_route_options(route.route_options, 'metric')
if value:
route.metric = value
value = get_route_options(route.route_options, 'table')
if value:
route.route_table = value
if route.metric:
route_data[NMRoute.METRIC] = route.metric
if route.ip_netmask:
route_data[NMRoute.DESTINATION] = route.ip_netmask
if route.next_hop:
route_data[NMRoute.NEXT_HOP_ADDRESS] = route.next_hop
route_data[NMRoute.NEXT_HOP_INTERFACE] = interface_name
if route.default:
if ":" in route.next_hop:
route_data[NMRoute.DESTINATION] = \
IPV6_DEFAULT_GATEWAY_DESTINATION
else:
route_data[NMRoute.DESTINATION] = \
IPV4_DEFAULT_GATEWAY_DESTINATION
rt_tables = self.get_route_tables()
if route.route_table:
if str(route.route_table).isdigit():
route_data[NMRoute.TABLE_ID] = route.route_table
elif route.route_table in rt_tables:
route_data[NMRoute.TABLE_ID] = \
rt_tables[route.route_table]
else:
logger.error(f'Unidentified mapping for route_table '
'{route.route_table}')
routes_data.append(route_data)
self.route_data[interface_name] = routes_data
logger.debug(f'route data: {self.route_data[interface_name]}')
def add_route_table(self, route_table):
"""Add a RouteTable object to the net config object.
:param route_table: the RouteTable object to add.
"""
logger.info(f'adding route table: {route_table.table_id} '
f'{route_table.name}')
self.route_table_data[int(route_table.table_id)] = route_table.name
location = route_table_config_path()
data = self.generate_route_table_config(self.route_table_data)
self.write_config(location, data)
def _parse_ip_rules(self, rule):
nm_rule_map = {
'blackhole': {'nm_key': NMRouteRule.ACTION,
'nm_value': NMRouteRule.ACTION_BLACKHOLE},
'unreachable': {'nm_key': NMRouteRule.ACTION,
'nm_value': NMRouteRule.ACTION_UNREACHABLE},
'prohibit': {'nm_key': NMRouteRule.ACTION,
'nm_value': NMRouteRule.ACTION_PROHIBIT},
'fwmark': {'nm_key': NMRouteRule.FWMARK, 'nm_value': None},
'fwmask': {'nm_key': NMRouteRule.FWMASK, 'nm_value': None},
'iif': {'nm_key': NMRouteRule.IIF, 'nm_value': None},
'from': {'nm_key': NMRouteRule.IP_FROM, 'nm_value': None},
'to': {'nm_key': NMRouteRule.IP_TO, 'nm_value': None},
'priority': {'nm_key': NMRouteRule.PRIORITY, 'nm_value': None},
'table': {'nm_key': NMRouteRule.ROUTE_TABLE, 'nm_value': None}}
logger.debug(f"Parse Rule {rule}")
items = rule.split()
keyword = items[0]
parse_start_index = 1
rule_config = {}
if keyword == 'del':
rule_config[NMRouteRule.STATE] = NMRouteRule.STATE_ABSENT
elif keyword in nm_rule_map.keys():
parse_start_index = 0
elif keyword != 'add':
msg = f"unhandled ip rule command {rule}"
raise os_net_config.ConfigurationError(msg)
items_iter = iter(items[parse_start_index:])
parse_complete = True
while True:
try:
parse_complete = True
item = next(items_iter)
logger.debug(f"parse item {item}")
if item in nm_rule_map.keys():
value = _get_type_value(nm_rule_map[item]['nm_value'])
if not value:
parse_complete = False
value = _get_type_value(next(items_iter))
rule_config[nm_rule_map[item]['nm_key']] = value
else:
msg = f"unhandled ip rule command {rule}"
raise os_net_config.ConfigurationError(msg)
except StopIteration:
if not parse_complete:
msg = f"incomplete ip rule command {rule}"
raise os_net_config.ConfigurationError(msg)
break
# Just remove the from/to address when its all/any
# the address defaults to all/any.
if NMRouteRule.IP_FROM in rule_config:
if _is_any_ip_addr(rule_config[NMRouteRule.IP_FROM]):
del rule_config[NMRouteRule.IP_FROM]
if NMRouteRule.IP_TO in rule_config:
if _is_any_ip_addr(rule_config[NMRouteRule.IP_TO]):
del rule_config[NMRouteRule.IP_TO]
# TODO(Karthik) Add support for ipv6 rules as well
# When neither IP_FROM nor IP_TO is set, specify the IP family
if (NMRouteRule.IP_FROM not in rule_config.keys() and
NMRouteRule.IP_TO not in rule_config.keys()):
rule_config[NMRouteRule.FAMILY] = NMRouteRule.FAMILY_IPV4
if NMRouteRule.PRIORITY not in rule_config.keys():
logger.warning(f"The ip rule {rule} doesn't have the priority set."
"Its advisable to configure the priorities in "
"order to have a deterministic behaviour")
return rule_config
def _add_rules(self, interface_name, rules=[]):
for rule in rules:
rule_nm = self._parse_ip_rules(rule.rule)
self.rules_data.append(rule_nm)
logger.debug(f'rule data: {self.rules_data}')
def _add_dns_servers(self, dns_servers):
for dns_server in dns_servers:
if dns_server not in self.dns_data['server']:
@@ -441,7 +799,7 @@ class NmstateNetConfig(os_net_config.NetConfig):
:param interface: The Interface object to add.
"""
logger.info('adding interface: %s' % interface.name)
logger.info(f'adding interface: {interface.name}')
data = self._add_common(interface)
if isinstance(interface, objects.Interface):
data[Interface.TYPE] = InterfaceType.ETHERNET
@@ -457,7 +815,7 @@ class NmstateNetConfig(os_net_config.NetConfig):
if interface.hwaddr:
data[Interface.MAC] = interface.hwaddr
logger.debug('interface data: %s' % data)
logger.debug(f'interface data: {data}')
self.interface_data[interface.name] = data
def apply(self, cleanup=False, activate=True):
@@ -480,6 +838,7 @@ class NmstateNetConfig(os_net_config.NetConfig):
logger.info('Cleaning up all network configs...')
self.cleanup_all_ifaces()
apply_routes = []
updated_interfaces = {}
logger.debug("----------------------------")
for interface_name, iface_data in self.interface_data.items():
@@ -487,21 +846,39 @@ class NmstateNetConfig(os_net_config.NetConfig):
if not is_dict_subset(iface_state, iface_data):
updated_interfaces[interface_name] = iface_data
else:
logger.info('No changes required for interface: %s' %
interface_name)
logger.info('No changes required for interface: '
f'{interface_name}')
routes_data = self.generate_routes(interface_name)
logger.info(f'Routes_data {routes_data}')
apply_routes.extend(routes_data)
if activate:
if not self.noop:
try:
self.set_ifaces(list(updated_interfaces.values()))
except Exception as e:
msg = 'Error setting interfaces state: %s' % str(e)
msg = f'Error setting interfaces state: {str(e)}'
raise os_net_config.ConfigurationError(msg)
try:
self.set_routes(apply_routes)
except Exception as e:
msg = f'Error setting routes: {str(e)}'
raise os_net_config.ConfigurationError(msg)
rules_data = self.generate_rules()
logger.info(f'Rules_data {rules_data}')
try:
self.set_rules(rules_data)
except Exception as e:
msg = f'Error setting rules: {str(e)}'
raise os_net_config.ConfigurationError(msg)
try:
self.set_dns()
except Exception as e:
msg = 'Error setting dns servers: %s' % str(e)
msg = f'Error setting dns servers: {str(e)}'
raise os_net_config.ConfigurationError(msg)
if self.errors:

View File

@@ -236,12 +236,13 @@ class Route(object):
"""Base class for network routes."""
def __init__(self, next_hop, ip_netmask="", default=False,
route_options="", route_table=None):
route_options="", route_table=None, metric=None):
self.next_hop = next_hop
self.ip_netmask = ip_netmask
self.default = default
self.route_options = route_options
self.route_table = route_table
self.metric = metric
@staticmethod
def from_json(json):
@@ -266,7 +267,9 @@ class Route(object):
default = strutils.bool_from_string(str(json.get('default', False)))
route_options = json.get('route_options', "")
route_table = json.get('table', "")
return Route(next_hop, ip_netmask, default, route_options, route_table)
metric = json.get('metric', "")
return Route(next_hop, ip_netmask, default,
route_options, route_table, metric)
class Address(object):

View File

@@ -17,16 +17,32 @@
from libnmstate.schema import Ethernet
from libnmstate.schema import Ethtool
import os.path
import tempfile
import yaml
import os_net_config
from os_net_config import impl_nmstate
from os_net_config import objects
from os_net_config.tests import base
from os_net_config import utils
TEST_ENV_DIR = os.path.realpath(os.path.join(os.path.dirname(__file__),
'environment'))
_RT_DEFAULT = """# reserved values
#
255\tlocal
254\tmain
253\tdefault
0\tunspec
#
# local
#
#1\tinr.ruhep\n"""
_RT_CUSTOM = _RT_DEFAULT + "# Custom\n10\tcustom # Custom table\n20\ttable1\n"
_BASE_IFACE_CFG = """
-
type: interface
@@ -150,7 +166,7 @@ _V6_NMCFG_MULTIPLE = _V6_NMCFG + """ - ip: 2001:abc:b::1
class TestNmstateNetConfig(base.TestCase):
def setUp(self):
super(TestNmstateNetConfig, self).setUp()
self.temp_route_table_file = tempfile.NamedTemporaryFile()
self.provider = impl_nmstate.NmstateNetConfig()
def stub_is_ovs_installed():
@@ -158,6 +174,13 @@ class TestNmstateNetConfig(base.TestCase):
self.stub_out('os_net_config.utils.is_ovs_installed',
stub_is_ovs_installed)
def test_route_table_path():
return self.temp_route_table_file.name
self.stub_out(
'os_net_config.impl_nmstate.route_table_config_path',
test_route_table_path)
utils.write_config(self.temp_route_table_file.name, _RT_CUSTOM)
def get_interface_config(self, name='em1'):
return self.provider.interface_data[name]
@@ -172,11 +195,21 @@ class TestNmstateNetConfig(base.TestCase):
def get_dns_data(self):
return self.provider.dns_data
def get_route_table_config(self, name='custom', table_id=200):
return self.provider.route_table_data.get(name, table_id)
def get_rule_config(self):
return self.provider.rules_data
def get_route_config(self, name):
return self.provider.route_data.get(name, '')
def test_add_base_interface(self):
interface = objects.Interface('em1')
self.provider.add_interface(interface)
self.assertEqual(yaml.safe_load(_NO_IP)[0],
self.get_interface_config())
self.assertEqual('', self.get_route_config('em1'))
def test_add_interface_with_v6(self):
v6_addr = objects.Address('2001:abc:a::/64')
@@ -184,6 +217,7 @@ class TestNmstateNetConfig(base.TestCase):
self.provider.add_interface(interface)
self.assertEqual(yaml.safe_load(_V6_NMCFG)[0],
self.get_interface_config())
self.assertEqual('', self.get_route_config('em1'))
def test_add_interface_with_v4_v6(self):
addresses = [objects.Address('2001:abc:a::2/64'),
@@ -192,6 +226,7 @@ class TestNmstateNetConfig(base.TestCase):
self.provider.add_interface(interface)
self.assertEqual(yaml.safe_load(_V4_V6_NMCFG)[0],
self.get_interface_config())
self.assertEqual('', self.get_route_config('em1'))
def test_add_interface_with_v6_multiple(self):
addresses = [objects.Address('2001:abc:a::/64'),
@@ -201,6 +236,7 @@ class TestNmstateNetConfig(base.TestCase):
self.provider.add_interface(interface)
self.assertEqual(yaml.safe_load(_V6_NMCFG_MULTIPLE)[0],
self.get_interface_config())
self.assertEqual('', self.get_route_config('em1'))
def test_interface_defroute(self):
interface1 = objects.Interface('em1')
@@ -235,8 +271,10 @@ class TestNmstateNetConfig(base.TestCase):
"""
self.assertEqual(yaml.safe_load(em1_config)[0],
self.get_interface_config('em1'))
self.assertEqual('', self.get_route_config('em1'))
self.assertEqual(yaml.safe_load(em2_config)[0],
self.get_interface_config('em2'))
self.assertEqual('', self.get_route_config('em2'))
def test_interface_dns_server(self):
interface1 = objects.Interface('em1', dns_servers=['1.2.3.4'])
@@ -332,6 +370,7 @@ class TestNmstateNetConfig(base.TestCase):
# Unsupported format
interface9 = objects.Interface('em9',
ethtool_opts='s $DEVICE rx 78')
self.provider.add_interface(interface1)
self.provider.add_interface(interface2)
self.provider.add_interface(interface3)
@@ -402,6 +441,136 @@ class TestNmstateNetConfig(base.TestCase):
self.provider.add_interface,
interface9)
def test_add_route_table(self):
route_table1 = objects.RouteTable('table1', 200)
route_table2 = objects.RouteTable('table2', '201')
self.provider.add_route_table(route_table1)
self.provider.add_route_table(route_table2)
self.assertEqual("table1", self.get_route_table_config(200))
self.assertEqual("table2", self.get_route_table_config(201))
def test_add_route_with_table(self):
expected_route_table = """
- destination: 172.19.0.0/24
next-hop-address: 192.168.1.1
next-hop-interface: em1
table-id: 200
- destination: 172.20.0.0/24
next-hop-address: 192.168.1.1
next-hop-interface: em1
table-id: 201
- destination: 172.21.0.0/24
next-hop-address: 192.168.1.1
next-hop-interface: em1
table-id: 200
"""
expected_rule = """
- ip-from: 192.0.2.0/24
route-table: 200
"""
route_table1 = objects.RouteTable('table1', 200)
self.provider.add_route_table(route_table1)
route_rule1 = objects.RouteRule('from 192.0.2.0/24 table 200',
'test comment')
# Test route table by name
route1 = objects.Route('192.168.1.1', '172.19.0.0/24', False,
route_table="table1")
# Test that table specified in route_options takes precedence
route2 = objects.Route('192.168.1.1', '172.20.0.0/24', False,
'table 201', route_table=200)
# Test route table specified by integer ID
route3 = objects.Route('192.168.1.1', '172.21.0.0/24', False,
route_table=200)
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr],
routes=[route1, route2, route3],
rules=[route_rule1])
self.provider.add_interface(interface)
self.assertEqual(yaml.safe_load(expected_route_table),
self.get_route_config('em1'))
self.assertEqual(yaml.safe_load(expected_rule),
self.get_rule_config())
def test_ip_rules(self):
expected_rule = """
- action: blackhole
ip-from: 172.19.40.0/24
route-table: 200
- action: unreachable
iif: em1
ip-from: 192.168.1.0/24
- family: ipv4
iif: em1
route-table: 200
"""
rule1 = objects.RouteRule(
'add blackhole from 172.19.40.0/24 table 200', 'rule1')
rule2 = objects.RouteRule(
'add unreachable iif em1 from 192.168.1.0/24', 'rule2')
rule3 = objects.RouteRule('iif em1 table 200', 'rule3')
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr],
rules=[rule1, rule2, rule3])
self.provider.add_interface(interface)
self.assertEqual(yaml.safe_load(expected_rule),
self.get_rule_config())
def test_network_with_routes(self):
expected_route_table = """
- destination: 0.0.0.0/0
metric: 10
next-hop-address: 192.168.1.1
next-hop-interface: em1
- destination: 172.19.0.0/24
next-hop-address: 192.168.1.1
next-hop-interface: em1
- destination: 172.20.0.0/24
metric: 100
next-hop-address: 192.168.1.5
next-hop-interface: em1
"""
route1 = objects.Route('192.168.1.1', default=True,
route_options="metric 10")
route2 = objects.Route('192.168.1.1', '172.19.0.0/24')
route3 = objects.Route('192.168.1.5', '172.20.0.0/24',
route_options="metric 100")
v4_addr = objects.Address('192.168.1.2/24')
interface = objects.Interface('em1', addresses=[v4_addr],
routes=[route1, route2, route3])
self.provider.add_interface(interface)
self.assertEqual(yaml.safe_load(expected_route_table),
self.get_route_config('em1'))
def test_network_with_ipv6_routes(self):
expected_route_table = """
- destination: ::/0
next-hop-address: 2001:db8::1
next-hop-interface: em1
- destination: 2001:db8:dead:beef:cafe::/56
next-hop-address: fd00:fd00:2000::1
next-hop-interface: em1
- destination: 2001:db8:dead:beff::/64
metric: 100
next-hop-address: fd00:fd00:2000::1
next-hop-interface: em1
"""
route4 = objects.Route('2001:db8::1', default=True)
route5 = objects.Route('fd00:fd00:2000::1',
'2001:db8:dead:beef:cafe::/56')
route6 = objects.Route('fd00:fd00:2000::1',
'2001:db8:dead:beff::/64',
route_options="metric 100")
v4_addr = objects.Address('192.168.1.2/24')
v6_addr = objects.Address('2001:abc:a::/64')
interface = objects.Interface('em1', addresses=[v4_addr, v6_addr],
routes=[route4, route5, route6])
self.provider.add_interface(interface)
self.assertEqual(yaml.safe_load(expected_route_table),
self.get_route_config('em1'))
class TestNmstateNetConfigApply(base.TestCase):