Merge "Neutron Network Rebalancing on DHCP Agents"

This commit is contained in:
Zuul 2019-04-04 14:49:58 +00:00 committed by Gerrit Code Review
commit 968d45a537
15 changed files with 1808 additions and 131 deletions

View File

@ -1145,7 +1145,6 @@ class NFVINetworkAPI(nfvi.api.v1.NFVINetworkAPI):
if neutron.lookup_extension(neutron.EXTENSION_NAMES.AGENT,
self._neutron_extensions):
# Send Query request to Neutron
future.work(neutron.get_network_agents,
self._token)
@ -1217,7 +1216,6 @@ class NFVINetworkAPI(nfvi.api.v1.NFVINetworkAPI):
if neutron.lookup_extension(neutron.EXTENSION_NAMES.AGENT,
self._neutron_extensions):
# Send Query request to Neutron
future.work(neutron.get_agent_routers,
self._token, agent_id)
@ -1255,6 +1253,80 @@ class NFVINetworkAPI(nfvi.api.v1.NFVINetworkAPI):
callback.send(response)
callback.close()
def get_dhcp_agent_networks(self, future, agent_id, callback):
"""
Get Networks hosted by DHCP Network Agent.
"""
response = dict()
response['completed'] = False
response['result-data'] = ''
response['reason'] = ''
try:
future.set_timeouts(config.CONF.get('nfvi-timeouts', None))
if self._token is None or \
self._token.is_expired():
future.work(openstack.get_token, self._directory)
future.result = (yield)
if not future.result.is_complete() or \
future.result.data is None:
DLOG.error("OpenStack get-token did not complete, "
"agent_id=%s." % agent_id)
return
self._token = future.result.data
if self._neutron_extensions is None:
future.work(neutron.get_extensions, self._token)
future.result = (yield)
if not future.result.is_complete():
DLOG.error("Neutron get-extensions did not complete.")
return
self._neutron_extensions = future.result.data
if neutron.lookup_extension(neutron.EXTENSION_NAMES.AGENT,
self._neutron_extensions):
future.work(neutron.get_dhcp_agent_networks,
self._token, agent_id)
future.result = (yield)
if not future.result.is_complete():
DLOG.error("Neutron get-dhcp-agent-networks failed, "
"operation did not complete, agent_id=%s"
% agent_id)
return
else:
response['result-data'] = future.result.data
else:
DLOG.warn("Neutron Agent Extension not available")
return
response['completed'] = True
except exceptions.OpenStackRestAPIException as e:
if httplib.UNAUTHORIZED == e.http_status_code:
response['error-code'] = nfvi.NFVI_ERROR_CODE.TOKEN_EXPIRED
if self._token is not None:
self._token.set_expired()
else:
DLOG.exception("Caught exception while trying to get "
"dhcp networks, error=%s." % e)
except Exception as e:
DLOG.exception("Caught exception while trying to get %s "
"neutron dhcp networks, error=%s."
% (agent_id, e))
finally:
callback.send(response)
callback.close()
def get_router_ports(self, future, router_id, callback):
"""
Get Ports on a Router
@ -1292,7 +1364,6 @@ class NFVINetworkAPI(nfvi.api.v1.NFVINetworkAPI):
if neutron.lookup_extension(neutron.EXTENSION_NAMES.AGENT,
self._neutron_extensions):
# Send Query request to Neutron
future.work(neutron.get_router_ports,
self._token, router_id)
@ -1367,7 +1438,6 @@ class NFVINetworkAPI(nfvi.api.v1.NFVINetworkAPI):
if neutron.lookup_extension(neutron.EXTENSION_NAMES.AGENT,
self._neutron_extensions):
# Send Query request to Neutron
future.work(neutron.add_router_to_agent,
self._token, agent_id, router_id)
@ -1405,6 +1475,80 @@ class NFVINetworkAPI(nfvi.api.v1.NFVINetworkAPI):
callback.send(response)
callback.close()
def add_network_to_dhcp_agent(self, future, agent_id, network_id, callback):
"""
Add a network to an DHCP Agent.
"""
response = dict()
response['completed'] = False
response['result-data'] = ''
response['reason'] = ''
try:
future.set_timeouts(config.CONF.get('nfvi-timeouts', None))
if self._token is None or \
self._token.is_expired():
future.work(openstack.get_token, self._directory)
future.result = (yield)
if not future.result.is_complete() or \
future.result.data is None:
DLOG.error("OpenStack get-token did not complete, "
"network_id=%s." % network_id)
return
self._token = future.result.data
if self._neutron_extensions is None:
future.work(neutron.get_extensions, self._token)
future.result = (yield)
if not future.result.is_complete():
DLOG.error("Neutron get-extensions did not complete.")
return
self._neutron_extensions = future.result.data
if neutron.lookup_extension(neutron.EXTENSION_NAMES.AGENT,
self._neutron_extensions):
future.work(neutron.add_network_to_dhcp_agent,
self._token, agent_id, network_id)
future.result = (yield)
if not future.result.is_complete():
DLOG.error("Neutron add-network-to-dhcp-agent failed, "
"operation did not complete, agent_id=%s "
"network_id=%s" % (agent_id, network_id))
return
else:
response['result-data'] = future.result.data
else:
DLOG.warn("Neutron Agent Extension not available")
return
response['completed'] = True
except exceptions.OpenStackRestAPIException as e:
if httplib.UNAUTHORIZED == e.http_status_code:
response['error-code'] = nfvi.NFVI_ERROR_CODE.TOKEN_EXPIRED
if self._token is not None:
self._token.set_expired()
else:
DLOG.exception("Caught exception while trying to add "
"network to agent, error=%s." % e)
except Exception as e:
DLOG.exception("Caught exception while trying to add "
"network_id=%s to agent_id=%s, error=%s."
% (network_id, agent_id, e))
finally:
callback.send(response)
callback.close()
def remove_router_from_agent(self, future, agent_id, router_id, callback):
"""
Remove a router from an L3 Agent.
@ -1442,7 +1586,6 @@ class NFVINetworkAPI(nfvi.api.v1.NFVINetworkAPI):
if neutron.lookup_extension(neutron.EXTENSION_NAMES.AGENT,
self._neutron_extensions):
# Send Query request to Neutron
future.work(neutron.remove_router_from_agent,
self._token, agent_id, router_id)
@ -1480,6 +1623,80 @@ class NFVINetworkAPI(nfvi.api.v1.NFVINetworkAPI):
callback.send(response)
callback.close()
def remove_network_from_dhcp_agent(self, future, agent_id, network_id, callback):
"""
Remove a network from a DHCP Agent.
"""
response = dict()
response['completed'] = False
response['result-data'] = ''
response['reason'] = ''
try:
future.set_timeouts(config.CONF.get('nfvi-timeouts', None))
if self._token is None or \
self._token.is_expired():
future.work(openstack.get_token, self._directory)
future.result = (yield)
if not future.result.is_complete() or \
future.result.data is None:
DLOG.error("OpenStack get-token did not complete, "
"network_id=%s." % network_id)
return
self._token = future.result.data
if self._neutron_extensions is None:
future.work(neutron.get_extensions, self._token)
future.result = (yield)
if not future.result.is_complete():
DLOG.error("Neutron get-extensions did not complete.")
return
self._neutron_extensions = future.result.data
if neutron.lookup_extension(neutron.EXTENSION_NAMES.AGENT,
self._neutron_extensions):
future.work(neutron.remove_network_from_dhcp_agent,
self._token, agent_id, network_id)
future.result = (yield)
if not future.result.is_complete():
DLOG.error("Neutron remove-network-from-dhcp-agent failed, "
"operation did not complete, agent_id=%s "
"network_id=%s" % (agent_id, network_id))
return
else:
response['result-data'] = future.result.data
else:
DLOG.warn("Neutron Agent Extension not available")
return
response['completed'] = True
except exceptions.OpenStackRestAPIException as e:
if httplib.UNAUTHORIZED == e.http_status_code:
response['error-code'] = nfvi.NFVI_ERROR_CODE.TOKEN_EXPIRED
if self._token is not None:
self._token.set_expired()
else:
DLOG.exception("Caught exception while trying to remove "
"network from agent, error=%s." % e)
except Exception as e:
DLOG.exception("Caught exception while trying to remove "
"network_id=%s from agent_id=%s, error=%s."
% (network_id, agent_id, e))
finally:
callback.send(response)
callback.close()
def get_physical_network(self, future, network_id, callback):
"""
Get Physical Network of a network.
@ -1517,7 +1734,6 @@ class NFVINetworkAPI(nfvi.api.v1.NFVINetworkAPI):
if neutron.lookup_extension(neutron.EXTENSION_NAMES.AGENT,
self._neutron_extensions):
# Send Query request to Neutron
future.work(neutron.get_physical_network,
self._token, network_id)

View File

@ -87,6 +87,64 @@ def get_network_agents(token):
return result_data
def get_dhcp_agent_networks(token, agent_id):
"""
Get all networks hosted by a particular dhcp agent
Paging not supported by API.
"""
url = token.get_service_url(OPENSTACK_SERVICE.NEUTRON)
if url is None:
raise ValueError("OpenStack Neutron URL is invalid")
api_cmd = url + "/v2.0/agents/" + agent_id + "/dhcp-networks?fields=id&fields=provider%3Aphysical_network"
api_cmd_headers = dict()
api_cmd_headers['Content-Type'] = "application/json"
response = rest_api_request(token, "GET", api_cmd, api_cmd_headers)
result_data = response.result_data['networks']
return result_data
def add_network_to_dhcp_agent(token, agent_id, network_id):
"""
Schedule a network on a DHCP agent
"""
url = token.get_service_url(OPENSTACK_SERVICE.NEUTRON)
if url is None:
raise ValueError("OpenStack Neutron URL is invalid")
api_cmd = url + "/v2.0/agents/" + agent_id + "/dhcp-networks"
api_cmd_headers = dict()
api_cmd_headers['Content-Type'] = "application/json"
api_cmd_payload = dict()
api_cmd_payload['network_id'] = network_id
response = rest_api_request(token, "POST", api_cmd, api_cmd_headers,
json.dumps(api_cmd_payload))
return response
def remove_network_from_dhcp_agent(token, agent_id, network_id):
"""
Unschedule a network from a DHCP agent
"""
url = token.get_service_url(OPENSTACK_SERVICE.NEUTRON)
if url is None:
raise ValueError("OpenStack Neutron URL is invalid")
api_cmd = url + "/v2.0/agents/" + agent_id + "/dhcp-networks/" + network_id
api_cmd_headers = dict()
api_cmd_headers['Content-Type'] = "application/json"
response = rest_api_request(token, "DELETE", api_cmd, api_cmd_headers)
return response
def get_agent_routers(token, agent_id):
"""
Get all routers hosted by a particular agent

View File

@ -0,0 +1,356 @@
#
# Copyright (c) 2015-2016 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import mock
import random
from nfv_vim.network_rebalance._dhcp_rebalance import _add_network_to_dhcp_agent_callback_body # noqa: H501
from nfv_vim.network_rebalance._dhcp_rebalance import _DHCPRebalance
from nfv_vim.network_rebalance._dhcp_rebalance import _get_datanetworks_callback_body # noqa: H501
from nfv_vim.network_rebalance._dhcp_rebalance import _get_dhcp_agent_networks_callback_body # noqa: H501
from nfv_vim.network_rebalance._dhcp_rebalance import _get_network_agents_callback_body # noqa: H501
from nfv_vim.network_rebalance._dhcp_rebalance import _remove_network_from_dhcp_agent_callback_body # noqa: H501
from nfv_vim.network_rebalance._dhcp_rebalance import _run_state_machine
from nfv_vim.network_rebalance._dhcp_rebalance import add_rebalance_work_dhcp
from nfv_vim.network_rebalance._dhcp_rebalance import DHCP_REBALANCE_STATE
from . import testcase # noqa: H304
from . import utils # noqa: H304
DEBUG_PRINTING = False
# DEBUG_PRINTING = True
_fake_host_table = dict()
class _fake_host(object):
def __init__(self, uuid):
self.uuid = uuid
MAX_AGENTS = 40
MAX_NETWORKS = 200
MAX_LOOPCOUNT = 2 * MAX_AGENTS * MAX_NETWORKS
def build_get_agents_response():
get_agents_response = dict()
get_agents_response['completed'] = True
get_agents_response['reason'] = ''
get_agents_response['result-data'] = list()
NUM_AGENTS = random.randint(2, MAX_AGENTS - 1)
for x in range(0, NUM_AGENTS):
host_name = "compute-" + str(x)
get_agents_response_entry = \
{"host": host_name, "agent_type": "DHCP agent",
"id": host_name + "_id", "alive": True,
"admin_state_up": True}
get_agents_response['result-data'].append(get_agents_response_entry)
add_to_fake_host_table(host_name)
return get_agents_response
def build_get_dhcp_agent_networks_response(agent_id, use_strange_networks=False):
get_dhcp_agent_networks_response = dict()
get_dhcp_agent_networks_response['completed'] = True
get_dhcp_agent_networks_response['reason'] = ''
get_dhcp_agent_networks_response['result-data'] = list()
for x in range(0, random.randint(0, MAX_NETWORKS - 1)):
host_name = "compute-" + str(x)
net_idx = 0
net = "physnet0"
if use_strange_networks:
net_idx = random.randint(0, 3)
if net_idx > 0:
net = "physnet3"
get_dhcp_agent_networks_response_entry = \
{"id": agent_id + "_network_" + str(x),
"provider:physical_network": net}
get_dhcp_agent_networks_response['result-data'].append(
get_dhcp_agent_networks_response_entry)
return get_dhcp_agent_networks_response
def build_get_datanetworks_response(host_id):
get_datanetworks_response = dict()
get_datanetworks_response['completed'] = True
get_datanetworks_response['reason'] = ''
get_datanetworks_response['result-data'] = list()
get_datanetworks_response['result-data'].append(
{u'datanetwork_name': u'physnet0'})
get_datanetworks_response['result-data'].append(
{u'datanetwork_name': u'physnet1'})
return get_datanetworks_response
dlog_local = utils.dlog(DEBUG_PRINTING)
def fake_nfvi_get_network_agents(a):
response = build_get_agents_response()
if DEBUG_PRINTING:
print("fake_nfvi_get_network_agents")
print("response = %s" % response)
_get_network_agents_callback_body(response)
def fake_nfvi_get_dhcp_agent_networks_strange_nets(agent_id, b):
response = build_get_dhcp_agent_networks_response(agent_id, True)
if DEBUG_PRINTING:
print("fake_nfvi_get_dhcp_agent_networks_strange_nets")
print("agent_id = %s" % agent_id)
print("response = %s" % response)
_get_dhcp_agent_networks_callback_body(agent_id, response)
def fake_nfvi_get_dhcp_agent_networks(agent_id, b):
response = build_get_dhcp_agent_networks_response(agent_id)
if DEBUG_PRINTING:
print("fake_nfvi_get_dhcp_agent_networks")
print("agent_id = %s" % agent_id)
print("response = %s" % response)
_get_dhcp_agent_networks_callback_body(agent_id, response)
def fake_nfvi_get_datanetworks(host_id, b):
response = build_get_datanetworks_response(host_id)
if DEBUG_PRINTING:
print("fake_nfvi_get_datanetworks")
print("response = %s" % response)
_get_datanetworks_callback_body(host_id, response)
def fake_nfvi_remove_network_from_dhcp_agent(a, b, c):
response = dict()
response['completed'] = True
response['reason'] = ''
if DEBUG_PRINTING:
print("fake_nfvi_remove_network_from_dhcp_agent")
print("response = %s" % response)
_remove_network_from_dhcp_agent_callback_body(a, b, response)
def fake_nfvi_add_network_to_dhcp_agent(a, b, c):
response = dict()
response['completed'] = True
response['reason'] = ''
if DEBUG_PRINTING:
print("fake_nfvi_add_network_to_dhcp_agent")
print("response = %s" % response)
_add_network_to_dhcp_agent_callback_body(response)
def fake_tables_get_host_table():
return _fake_host_table
def add_to_fake_host_table(host_name):
_fake_host_table[host_name] = _fake_host(host_name + "_uuid")
@mock.patch('nfv_vim.network_rebalance._dhcp_rebalance.DLOG',
dlog_local)
@mock.patch('nfv_vim.nfvi.nfvi_remove_network_from_dhcp_agent',
fake_nfvi_remove_network_from_dhcp_agent)
@mock.patch('nfv_vim.nfvi.nfvi_get_network_agents',
fake_nfvi_get_network_agents)
@mock.patch('nfv_vim.nfvi.nfvi_get_datanetworks',
fake_nfvi_get_datanetworks)
@mock.patch('nfv_vim.nfvi.nfvi_remove_network_from_dhcp_agent',
fake_nfvi_remove_network_from_dhcp_agent)
@mock.patch('nfv_vim.nfvi.nfvi_add_network_to_dhcp_agent',
fake_nfvi_add_network_to_dhcp_agent)
@mock.patch('nfv_vim.tables.tables_get_host_table',
fake_tables_get_host_table)
class TestNeutronDHCPRebalance(testcase.NFVTestCase):
def setUp(self):
super(TestNeutronDHCPRebalance, self).setUp()
def tearDown(self):
super(TestNeutronDHCPRebalance, self).tearDown()
@mock.patch('nfv_vim.nfvi.nfvi_get_dhcp_agent_networks',
fake_nfvi_get_dhcp_agent_networks)
def test_rebalance_down_host_randomized_w_api_calls(self):
initial_network_count = 0
initial_network_config = list()
for x in range(1, 200):
_DHCPRebalance.network_diff_threshold = random.randint(1, 4)
add_rebalance_work_dhcp('compute-0', True)
loopcount = 0
if DEBUG_PRINTING:
print("HOST DOWN TEST NUMBER %s" % str(x))
while True:
loopcount += 1
old_state = _DHCPRebalance.get_state()
_run_state_machine()
new_state = _DHCPRebalance.get_state()
if ((old_state ==
DHCP_REBALANCE_STATE.GET_NETWORKS_HOSTED_ON_AGENT) and
(new_state ==
DHCP_REBALANCE_STATE.GET_HOST_PHYSICAL_NETWORKS)):
for idx in range(len(_DHCPRebalance.num_networks_on_agents)):
initial_network_config.append(
_DHCPRebalance.num_networks_on_agents[idx])
initial_network_count = \
sum(_DHCPRebalance.num_networks_on_agents)
if (_DHCPRebalance.get_state() == DHCP_REBALANCE_STATE.DONE) and \
(len(_DHCPRebalance.host_down_queue) == 0):
final_network_count = \
sum(_DHCPRebalance.num_networks_on_agents)
if DEBUG_PRINTING:
print("network_diff_threshold: %s" %
_DHCPRebalance.network_diff_threshold)
print("initial_network_count: %s, "
"final_network_count: %s" %
(initial_network_count, final_network_count))
print("initial num_networks_on_agents: %s, "
"final num_networks_on_agents: %s" %
(initial_network_config,
_DHCPRebalance.num_networks_on_agents))
del initial_network_config[:]
if len(_DHCPRebalance.num_networks_on_agents) > 2:
num_networks_length = \
len(_DHCPRebalance.num_networks_on_agents)
assert ((num_networks_length == 0) or
_DHCPRebalance.num_networks_on_agents[0] == 0)
assert (initial_network_count == final_network_count)
else:
if DEBUG_PRINTING:
print("less than 2 agents, nothing to do")
break
if loopcount > MAX_LOOPCOUNT:
print("Loopcount exit!!! loopcount:%s" % loopcount)
assert loopcount < MAX_LOOPCOUNT
@mock.patch('nfv_vim.nfvi.nfvi_get_dhcp_agent_networks',
fake_nfvi_get_dhcp_agent_networks)
def test_rebalance_up_host_randomized_w_api_calls(self):
initial_network_count = 0
initial_network_config = list()
for x in range(1, 200):
_DHCPRebalance.network_diff_threshold = random.randint(1, 4)
add_rebalance_work_dhcp('compute-0', False)
loopcount = 0
if DEBUG_PRINTING:
print("HOST UP TEST NUMBER %s" % str(x))
while True:
loopcount += 1
old_state = _DHCPRebalance.get_state()
_run_state_machine()
new_state = _DHCPRebalance.get_state()
if ((old_state ==
DHCP_REBALANCE_STATE.GET_NETWORKS_HOSTED_ON_AGENT) and
((new_state ==
DHCP_REBALANCE_STATE.GET_HOST_PHYSICAL_NETWORKS) or
(new_state == DHCP_REBALANCE_STATE.DONE))):
# new_state DONE is for already balanced case
for idx in range(len(_DHCPRebalance.num_networks_on_agents)):
initial_network_config.append(
_DHCPRebalance.num_networks_on_agents[idx])
initial_network_count = sum(
_DHCPRebalance.num_networks_on_agents)
if ((_DHCPRebalance.get_state() == DHCP_REBALANCE_STATE.DONE) and
(len(_DHCPRebalance.host_up_queue) == 0)):
final_network_count = sum(
_DHCPRebalance.num_networks_on_agents)
if DEBUG_PRINTING:
print("network_diff_threshold: %s" %
_DHCPRebalance.network_diff_threshold)
print("initial_network_count: %s, "
"final_network_count: %s" %
(initial_network_count, final_network_count))
print("initial num_networks_on_agents: %s, "
"final num_networks_on_agents: %s" %
(initial_network_config,
_DHCPRebalance.num_networks_on_agents))
del initial_network_config[:]
if len(_DHCPRebalance.num_networks_on_agents) > 2:
assert (initial_network_count == final_network_count)
assert (max(_DHCPRebalance.num_networks_on_agents) -
min(_DHCPRebalance.num_networks_on_agents) <=
_DHCPRebalance.network_diff_threshold)
else:
if DEBUG_PRINTING:
print("less than 2 agents, nothing to do")
break
if loopcount > MAX_LOOPCOUNT:
print("Loopcount exit!!! loopcount:%s" % loopcount)
assert loopcount < MAX_LOOPCOUNT
@mock.patch('nfv_vim.nfvi.nfvi_get_dhcp_agent_networks',
fake_nfvi_get_dhcp_agent_networks_strange_nets)
def test_rebalance_up_strange_networks(self):
initial_network_count = 0
initial_network_config = list()
for x in range(1, 200):
_DHCPRebalance.network_diff_threshold = random.randint(1, 4)
add_rebalance_work_dhcp('compute-0', False)
loopcount = 0
if DEBUG_PRINTING:
print("HOST UP TEST NUMBER %s" % str(x))
while True:
loopcount += 1
old_state = _DHCPRebalance.get_state()
_run_state_machine()
new_state = _DHCPRebalance.get_state()
if ((old_state ==
DHCP_REBALANCE_STATE.GET_NETWORKS_HOSTED_ON_AGENT) and
((new_state ==
DHCP_REBALANCE_STATE.GET_HOST_PHYSICAL_NETWORKS) or
(new_state == DHCP_REBALANCE_STATE.DONE))):
# new_state DONE is for already balanced case
for idx in range(len(_DHCPRebalance.num_networks_on_agents)):
initial_network_config.append(
_DHCPRebalance.num_networks_on_agents[idx])
initial_network_count = sum(
_DHCPRebalance.num_networks_on_agents)
if ((_DHCPRebalance.get_state() == DHCP_REBALANCE_STATE.DONE) and
(len(_DHCPRebalance.host_up_queue) == 0)):
final_network_count = sum(
_DHCPRebalance.num_networks_on_agents)
if DEBUG_PRINTING:
print("network_diff_threshold: %s" %
_DHCPRebalance.network_diff_threshold)
print("initial_network_count: %s, "
"final_network_count: %s" %
(initial_network_count, final_network_count))
print("initial num_networks_on_agents: %s, "
"final num_networks_on_agents: %s" %
(initial_network_config,
_DHCPRebalance.num_networks_on_agents))
del initial_network_config[:]
if len(_DHCPRebalance.num_networks_on_agents) > 2:
assert (initial_network_count == final_network_count)
else:
if DEBUG_PRINTING:
print("less than 2 agents, nothing to do")
break
if loopcount > MAX_LOOPCOUNT:
print("Loopcount exit!!! loopcount:%s" % loopcount)
assert loopcount < MAX_LOOPCOUNT

View File

@ -15,10 +15,11 @@ from nfv_vim.network_rebalance._network_rebalance import _get_router_ports_callb
from nfv_vim.network_rebalance._network_rebalance import _L3Rebalance
from nfv_vim.network_rebalance._network_rebalance import _remove_router_from_agent_callback_body # noqa: H501
from nfv_vim.network_rebalance._network_rebalance import _run_state_machine
from nfv_vim.network_rebalance._network_rebalance import add_rebalance_work
from nfv_vim.network_rebalance._network_rebalance import add_rebalance_work_l3
from nfv_vim.network_rebalance._network_rebalance import L3_REBALANCE_STATE
from . import testcase # noqa: H304
from . import utils # noqa:H304
DEBUG_PRINTING = False
@ -118,35 +119,7 @@ def build_get_datanetworks_response(host_id):
return get_datanetworks_response
class dlog(object):
def __init__(self):
self.nothing = 0
def verbose(self, string):
if DEBUG_PRINTING:
print("Verbose: " + string)
else:
pass
def info(self, string):
if DEBUG_PRINTING:
print("Info: " + string)
else:
pass
def warn(self, string):
print("Warn: " + string)
def error(self, string):
print("Error: " + string)
def debug(self, string):
if DEBUG_PRINTING:
print("Debug: " + string)
else:
pass
dlog_local = dlog()
dlog_local = utils.dlog(DEBUG_PRINTING)
def fake_nfvi_get_network_agents(a):
@ -252,7 +225,7 @@ class TestNeutronRebalance2(testcase.NFVTestCase):
initial_router_config = list()
for x in range(1, 200):
_L3Rebalance.router_diff_threshold = random.randint(1, 4)
add_rebalance_work('compute-0', True)
add_rebalance_work_l3('compute-0', True)
loopcount = 0
if DEBUG_PRINTING:
print("HOST DOWN TEST NUMBER %s" % str(x))
@ -309,7 +282,7 @@ class TestNeutronRebalance2(testcase.NFVTestCase):
initial_router_config = list()
for x in range(1, 200):
_L3Rebalance.router_diff_threshold = random.randint(1, 4)
add_rebalance_work('compute-0', False)
add_rebalance_work_l3('compute-0', False)
loopcount = 0
if DEBUG_PRINTING:
print("HOST UP TEST NUMBER %s" % str(x))

View File

@ -25,3 +25,33 @@ def instance_type_to_flavor_dict(instance_type):
flavor['extra_specs'] = extra_specs
return flavor
class dlog(object):
def __init__(self, debug_printing=False):
self.nothing = 0
self.debug_printing = debug_printing
def verbose(self, string):
if self.debug_printing:
print("Verbose: " + string)
else:
pass
def info(self, string):
if self.debug_printing:
print("Info: " + string)
else:
pass
def warn(self, string):
print("Warn: " + string)
def error(self, string):
print("Error: " + string)
def debug(self, string):
if self.debug_printing:
print("Debug: " + string)
else:
pass

View File

@ -50,6 +50,11 @@ timer_interval=1
router_diff_threshold=3
hold_off=10
[dhcp-agent-rebalance]
timer_interval=1
network_diff_threshold=3
hold_off=10
[vim]
rpc_host=127.0.0.1
rpc_port=4343

View File

@ -151,6 +151,7 @@ nfv_vim.strategy.stage: debug.level.info
nfv_vim.strategy.step: debug.level.info
nfv_vim.dor: debug.level.verbose
nfv_vim.l3_rebalance: debug.level.info
nfv_vim.dhcp_rebalance: debug.level.info
nfv_vim: debug.level.verbose
# ----------------------------------------------------------------------------
nfv_vim.api.openstack: debug.level.verbose

View File

@ -192,7 +192,8 @@ class NotifyHostDisabledTaskWork(state_machine.StateTaskWork):
if (self._host.kubernetes_configured and
(self._service == objects.HOST_SERVICES.NETWORK)):
DLOG.info("Queueing rebalance for host %s disable" % self._host.name)
network_rebalance.add_rebalance_work(self._host.name, True)
network_rebalance.add_rebalance_work_l3(self._host.name, True)
network_rebalance.add_rebalance_work_dhcp(self._host.name, True)
else:
if self.force_pass:
@ -204,7 +205,8 @@ class NotifyHostDisabledTaskWork(state_machine.StateTaskWork):
if (self._host.kubernetes_configured and
(self._service == objects.HOST_SERVICES.NETWORK)):
DLOG.info("Queueing rebalance for host %s disable" % self._host.name)
network_rebalance.add_rebalance_work(self._host.name, True)
network_rebalance.add_rebalance_work_l3(self._host.name, True)
network_rebalance.add_rebalance_work_dhcp(self._host.name, True)
else:
self.task.task_work_complete(
@ -847,7 +849,8 @@ class EnableHostServicesTaskWork(state_machine.StateTaskWork):
if (self._host.kubernetes_configured and
(self._service == objects.HOST_SERVICES.NETWORK)):
DLOG.info("Queueing rebalance for host %s enable" % self._host.name)
network_rebalance.add_rebalance_work(self._host.name, False)
network_rebalance.add_rebalance_work_l3(self._host.name, False)
network_rebalance.add_rebalance_work_dhcp(self._host.name, False)
else:
if self.force_pass:
DLOG.info("Enable-Host-Services callback for %s, "
@ -858,7 +861,8 @@ class EnableHostServicesTaskWork(state_machine.StateTaskWork):
if (self._host.kubernetes_configured and
(self._service == objects.HOST_SERVICES.NETWORK)):
DLOG.info("Queueing rebalance for host %s enable" % self._host.name)
network_rebalance.add_rebalance_work(self._host.name, False)
network_rebalance.add_rebalance_work_l3(self._host.name, False)
network_rebalance.add_rebalance_work_dhcp(self._host.name, False)
else:
self._host.host_services_update(

View File

@ -3,6 +3,9 @@
#
# SPDX-License-Identifier: Apache-2.0
#
from nfv_vim.network_rebalance._network_rebalance import add_rebalance_work # noqa: F401
from nfv_vim.network_rebalance._dhcp_rebalance import add_rebalance_work_dhcp # noqa: F401
from nfv_vim.network_rebalance._dhcp_rebalance import dr_finalize # noqa: F401
from nfv_vim.network_rebalance._dhcp_rebalance import dr_initialize # noqa: F401
from nfv_vim.network_rebalance._network_rebalance import add_rebalance_work_l3 # noqa: F401
from nfv_vim.network_rebalance._network_rebalance import nr_finalize # noqa: F401
from nfv_vim.network_rebalance._network_rebalance import nr_initialize # noqa: F401

View File

@ -0,0 +1,909 @@
#
# Copyright (c) 2015-2019 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import six
from nfv_common.helpers import Constant
from nfv_common.helpers import Constants
from nfv_common.helpers import coroutine
from nfv_common.helpers import Singleton
from nfv_common import config
from nfv_common import debug
from nfv_common import timers
from nfv_vim import nfvi
DLOG = debug.debug_get_logger('nfv_vim.dhcp_rebalance')
@six.add_metaclass(Singleton)
class AgentType(Constants):
"""
AGENT TYPE Constants
"""
L3 = Constant('L3 agent')
DHCP = Constant('DHCP agent')
AGENT_TYPE = AgentType()
@six.add_metaclass(Singleton)
class DHCPRebalanceState(Constants):
"""
DHCP REBALANCE STATE Constants
"""
GET_DHCP_AGENTS = Constant('GET_DHCP_AGENTS')
GET_NETWORKS_HOSTED_ON_AGENT = Constant('GET_NETWORKS_HOSTED_ON_AGENT')
GET_HOST_PHYSICAL_NETWORKS = Constant('GET_HOST_PHYSICAL_NETWORKS')
RESCHEDULE_DOWN_AGENT = Constant('RESCHEDULE_DOWN_AGENT')
RESCHEDULE_NEW_AGENT = Constant('RESCHEDULE_NEW_AGENT')
HOLD_OFF = Constant('HOLD_OFF')
DONE = Constant('DONE')
DHCP_REBALANCE_STATE = DHCPRebalanceState()
@six.add_metaclass(Singleton)
class DHCPAgentRebalance(object):
def __init__(self):
# Our state.
self.state = DHCP_REBALANCE_STATE.DONE
# If rebalance occurring due to down agent,
# entry zero in below list will be for the down agent
# list of dictionaries of agent information.
self.dhcp_agents = list()
# Dictionary based on agent_id of network ids hosted
# on an agent.
self.network_ids_per_agent = dict()
# For keeping track of networks that cant be schedule.
# Useful for logging and test.
self.network_ids_per_agent_cant_schedule = dict()
# Dictionary based on network_id of physical
# network of network.
self.physnet_per_network = dict()
# For determining whether state machine work is finished
# in a tick and the state machine can progress.
self.state_machine_in_progress = False
# Indexes into the various data structures.
self.dhcpagent_idx = 0
self.num_dhcp_agents = 0
# If rebalance occurring due to down agent,
# entry zero in below list will be for the down agent
self.num_networks_on_agents = list()
# The queue of work that is to be processed
self.work_queue = list()
# The difference between maximum networks on an agent
# and minimum networks on an agent we are trying to achieve.
self.network_diff_threshold = 1
# List of (index, number of networks on agents) tuples
# for all agents.
self.agent_list = list()
# For rebalance, below will be None, as we don't actually
# care about the name of a new host who's agent has just
# come up. For agent down, it will be the name of the host
# going down.
self.working_host = None
# Number of ticks to wait after seeing work to begin work.
self.hold_off = 3
# Current number of ticks waiting to begin work.
self.current_hold_off_count = 0
# queues that maintain host names of hosts coming up and going down.
self.host_up_queue = list()
self.host_down_queue = list()
def reinit(self):
self.num_dhcp_agents = 0
self.dhcpagent_idx = 0
del self.dhcp_agents[:]
self.network_ids_per_agent = {}
self.network_ids_per_agent_cant_schedule = {}
self.physnet_per_network = {}
del self.num_networks_on_agents[:]
def add_agent(self, agent_id):
self.network_ids_per_agent[agent_id] = list()
self.num_dhcp_agents += 1
def get_current_dhcp_agent(self):
agent_id = self.dhcp_agents[self.dhcpagent_idx]['id']
host_name = self.dhcp_agents[self.dhcpagent_idx]['host']
return agent_id, host_name
def update_current_dhcp_agent(self, key, value):
self.dhcp_agents[self.dhcpagent_idx][key] = value
def add_network_to_agent(self, agent_id, network_id, physical_network):
self.network_ids_per_agent[agent_id].append(network_id)
self.physnet_per_network[network_id] = physical_network
def agent_networks_done(self):
agent_id = self.dhcp_agents[self.dhcpagent_idx]['id']
_DHCPRebalance.num_networks_on_agents.append(
len(_DHCPRebalance.network_ids_per_agent[agent_id]))
self.dhcpagent_idx += 1
return self.dhcpagent_idx == self.num_dhcp_agents
def get_host_id_of_current_dhcp_agent(self):
return self.dhcp_agents[self.dhcpagent_idx]['host_uuid']
def update_datanetworks(self, datanetwork_name):
if not self.dhcp_agents[self.dhcpagent_idx].get('datanets', False):
self.dhcp_agents[self.dhcpagent_idx]['datanets'] = list()
self.dhcp_agents[self.dhcpagent_idx]['datanets'].append(
datanetwork_name)
def datanetworks_done(self):
self.dhcpagent_idx += 1
if self.dhcpagent_idx == self.num_dhcp_agents:
return True
else:
return False
def get_next_network_to_move(self):
# If there are any networks on the down agent, then
# return the next network and its networks.
agent_id = self.get_down_agent_id()
if len(self.network_ids_per_agent[agent_id]) > 0:
network_to_move = self.network_ids_per_agent[agent_id][0]
network_to_move_physical_network = \
self.physnet_per_network[network_to_move]
return network_to_move, network_to_move_physical_network
else:
return None, None
def find_network_with_physical_networks(self,
agent_idx,
physical_networks):
agent_networks = \
self.network_ids_per_agent[self.dhcp_agents[agent_idx]['id']]
network_match = False
for network_id in agent_networks:
physnet = self.physnet_per_network[network_id]
DLOG.debug("network_id: %s physnet:%s, physical_networks:%s" %
(network_id, physnet, physical_networks))
if physnet in physical_networks:
network_match = True
network_id_matched = network_id
break
if network_match:
return network_id_matched
else:
# we couldn't find a network with networks matching the
# requirements
return None
def populate_dhcp_agents(self, result_data):
for agent in result_data:
if agent['agent_type'] == AGENT_TYPE.DHCP:
agent_info_dict = dict()
agent_info_dict['host'] = agent['host']
agent_info_dict['id'] = agent['id']
# For simplicity and easy of access, place the down host
# (if applicable) first in the list.
if agent['host'] == self.get_working_host():
self.dhcp_agents.insert(0, agent_info_dict)
elif agent['alive'] and agent['admin_state_up']:
self.dhcp_agents.append(agent_info_dict)
self.add_agent(agent['id'])
DLOG.debug("self.dhcp_agents = %s" % self.dhcp_agents)
return len(self.dhcp_agents)
def get_down_agent_id(self):
return self.dhcp_agents[0]['id']
def get_agent_id_from_index(self, agent_index):
return self.dhcp_agents[agent_index]['id']
def get_host_physical_networks(self, agent_index):
return self.dhcp_agents[agent_index]['datanets']
def move_agent_network(self, network_to_move, source_agent, target_agent):
target_agent_id = _DHCPRebalance.get_agent_id_from_index(target_agent)
source_agent_id = _DHCPRebalance.get_agent_id_from_index(source_agent)
_DHCPRebalance.num_networks_on_agents[target_agent] += 1
_DHCPRebalance.num_networks_on_agents[source_agent] -= 1
self.network_ids_per_agent[source_agent_id].remove(network_to_move)
self.network_ids_per_agent[target_agent_id].append(network_to_move)
def move_agent_network_to_cant_schedule(self, network_to_move,
agent_index):
source_agent_id = _DHCPRebalance.get_agent_id_from_index(agent_index)
_DHCPRebalance.num_networks_on_agents[agent_index] -= 1
self.network_ids_per_agent[source_agent_id].remove(network_to_move)
if self.network_ids_per_agent_cant_schedule.get(source_agent_id,
None) is None:
self.network_ids_per_agent_cant_schedule[source_agent_id] = list()
self.network_ids_per_agent_cant_schedule[source_agent_id].append(
network_to_move)
def get_working_host(self):
# working_host will be None if we are doing a rebalance
# due to a new dhcp agent becoming available.
return self.working_host
def set_working_host(self, host_name=None):
# working_host will be None if we are doing a rebalance
# due to a new dhcp agent becoming available.
self.working_host = host_name
def networks_are_balanced(self):
possible_agent_targets = range(0, len(self.num_networks_on_agents))
# find the agent with the least amount of networks.
agent_with_least_networks = \
min(possible_agent_targets,
key=self.num_networks_on_agents.__getitem__)
min_networks = self.num_networks_on_agents[agent_with_least_networks]
agent_with_most_networks = \
max(possible_agent_targets,
key=self.num_networks_on_agents.__getitem__)
max_networks = self.num_networks_on_agents[agent_with_most_networks]
if ((max_networks - min_networks) <=
_DHCPRebalance.network_diff_threshold):
DLOG.info("DHCP Agent networks balanced, max:%s "
"min:%s threshold:%s" %
(max_networks, min_networks,
_DHCPRebalance.network_diff_threshold))
return True
return False
def no_networks_on_down_host(self):
return self.num_networks_on_agents[0] == 0
def set_state(self, state):
# Set up state for next tick.
self.state = state
self.dhcpagent_idx = 0
self.state_machine_in_progress = False
if ((state == DHCP_REBALANCE_STATE.RESCHEDULE_DOWN_AGENT) or
(state == DHCP_REBALANCE_STATE.RESCHEDULE_NEW_AGENT)):
self.create_agent_list()
elif state == DHCP_REBALANCE_STATE.DONE:
self.debug_dump()
elif state == DHCP_REBALANCE_STATE.HOLD_OFF:
self.current_hold_off_count = 0
def get_state(self):
return self.state
def add_rebalance_work(self, host_name, host_is_going_down):
if host_is_going_down:
self.host_down_queue.append(host_name)
else:
self.host_up_queue.append(host_name)
def create_agent_list(self):
del self.agent_list[:]
for index, agent in enumerate(self.dhcp_agents):
agent_list_entry = (index, self.num_networks_on_agents[index])
self.agent_list.append(agent_list_entry)
def get_min_agent_list_data(self):
agent_with_least_networks_entry = min(self.agent_list,
key=lambda t: t[1])
return agent_with_least_networks_entry[0], \
agent_with_least_networks_entry[1]
def get_max_agent_list_data(self):
agent_with_most_networks_entry = max(self.agent_list,
key=lambda t: t[1])
return agent_with_most_networks_entry[0], \
agent_with_most_networks_entry[1]
def get_agent_list_scheduling_info(self):
possible_agent_targets = list()
num_networks_on_agents = list()
for entry in self.agent_list:
possible_agent_targets.append(entry[0])
num_networks_on_agents.append(entry[1])
return num_networks_on_agents, possible_agent_targets
def agent_list_remove(self, agent_list_tuple):
self.agent_list.remove(agent_list_tuple)
def agent_list_increment(self, agent_index):
for idx, val in enumerate(self.agent_list):
if val[0] == agent_index:
self.agent_list[idx] = (val[0], val[1] + 1)
break
def agent_list_decrement(self, agent_index):
for idx, val in enumerate(self.agent_list):
if val[0] == agent_index:
self.agent_list[idx] = (val[0], val[1] - 1)
break
def hold_off_is_done(self):
self.current_hold_off_count += 1
return self.current_hold_off_count >= self.hold_off
def debug_dump(self):
DLOG.debug("_DHCPRebalance.dhcp_agents = %s" %
_DHCPRebalance.dhcp_agents)
DLOG.debug("_DHCPRebalance.network_ids_per_agent= %s" %
_DHCPRebalance.network_ids_per_agent)
DLOG.debug("_DHCPRebalance.physnet_per_network= %s" %
_DHCPRebalance.physnet_per_network)
DLOG.debug("_DHCPRebalance.state_machine_in_progress= %s" %
_DHCPRebalance.state_machine_in_progress)
DLOG.debug("_DHCPRebalance.dhcpagent_idx= %s" %
_DHCPRebalance.dhcpagent_idx)
DLOG.debug("_DHCPRebalance.num_dhcp_agents= %s" %
_DHCPRebalance.num_dhcp_agents)
DLOG.debug("_DHCPRebalance.num_networks_on_agents= %s" %
_DHCPRebalance.num_networks_on_agents)
_DHCPRebalance = DHCPAgentRebalance()
def add_rebalance_work_dhcp(host_name, host_is_going_down):
"""
API for external use to launch a rebalance operation.
host_is_going_down is boolean indicating if the host is
coming up (rebalance networks, moving some to newly available host),
or going down (move networks off this host, distributing amongst rest)
"""
global _DHCPRebalance
_DHCPRebalance.add_rebalance_work(host_name, host_is_going_down)
@coroutine
def _add_network_to_dhcp_agent_callback():
"""
Add network to dhcp agent callback
"""
response = (yield)
_add_network_to_dhcp_agent_callback_body(response)
def _add_network_to_dhcp_agent_callback_body(response):
"""
Add network to dhcp agent callback body
"""
global _DHCPRebalance
_DHCPRebalance.state_machine_in_progress = False
DLOG.debug("_add_network_to_dhcp_agent_callback, response = %s" % response)
if not response['completed']:
# Nothing we can really do except log this and resume
# our state machine.
DLOG.warn("Unable to add network to dhcp agent, response = %s" %
response)
@coroutine
def _remove_network_from_dhcp_agent_callback(to_agent_id, network_id):
"""
Remove network from agent callback
"""
response = (yield)
_remove_network_from_dhcp_agent_callback_body(to_agent_id, network_id,
response)
def _remove_network_from_dhcp_agent_callback_body(to_agent_id, network_id,
response):
"""
Remove network from agent callback body
"""
global _DHCPRebalance
DLOG.debug("_remove_network_from_dhcp_agent_callback , response = %s" %
response)
if response['completed']:
# After successfully detaching network from agent, attach
# to target agent.
nfvi.nfvi_add_network_to_dhcp_agent(
to_agent_id, network_id,
_add_network_to_dhcp_agent_callback())
else:
# Couldn't detach the network, no sense trying to attach.
# Just resume state machine.
_DHCPRebalance.state_machine_in_progress = False
DLOG.warn("Unable to remove network from dhcp agent, response = %s" %
response)
@coroutine
def _get_datanetworks_callback(host_id):
"""
Get data networks callback
"""
response = (yield)
_get_datanetworks_callback_body(host_id, response)
def _get_datanetworks_callback_body(host_id, response):
"""
Get data networks callback body
"""
global _DHCPRebalance
_DHCPRebalance.state_machine_in_progress = False
DLOG.debug("_get_datanetworks_callback, response = %s" % response)
if response['completed']:
result_data = response.get('result-data', None)
for data_net in result_data:
_DHCPRebalance.update_datanetworks(data_net['datanetwork_name'])
if _DHCPRebalance.datanetworks_done():
# Make the choice of which state to enter here
if _DHCPRebalance.get_working_host() is not None:
_DHCPRebalance.set_state(
DHCP_REBALANCE_STATE.RESCHEDULE_DOWN_AGENT)
else:
_DHCPRebalance.set_state(
DHCP_REBALANCE_STATE.RESCHEDULE_NEW_AGENT)
else:
DLOG.error("Unable to retrieve data networks for host: %s" % host_id)
# TODO(KSMITH) Is this error recoverable? For now, abort.
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
def _get_host_data_networks():
"""
Get the physical networks supported by a host.
"""
host_id = _DHCPRebalance.get_host_id_of_current_dhcp_agent()
nfvi.nfvi_get_datanetworks(host_id, _get_datanetworks_callback(host_id))
@coroutine
def _get_dhcp_agent_networks_callback(agent_id):
"""
Get DHCP Agent Networks callback
"""
response = (yield)
_get_dhcp_agent_networks_callback_body(agent_id, response)
def _get_dhcp_agent_networks_callback_body(agent_id, response):
"""
Get DHCP Agent Networks callback body
"""
global _DHCPRebalance
_DHCPRebalance.state_machine_in_progress = False
DLOG.debug("_get_dhcp_agent_networks_callback, response = %s" % response)
if response['completed']:
result_data = response.get('result-data', None)
for network in result_data:
_DHCPRebalance.add_network_to_agent(
agent_id, network['id'],
network['provider:physical_network'])
DLOG.debug("_DHCPRebalance.dhcpagent_idx = %s, "
"_DHCPRebalance.num_dhcp_agents = %s" %
(_DHCPRebalance.dhcpagent_idx,
_DHCPRebalance.num_dhcp_agents))
if _DHCPRebalance.agent_networks_done():
_DHCPRebalance.set_state(
DHCP_REBALANCE_STATE.GET_HOST_PHYSICAL_NETWORKS)
# Do this check here to save us from going through the rest
# of the state machine
if _DHCPRebalance.get_working_host() is None:
if _DHCPRebalance.networks_are_balanced():
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
return
if _DHCPRebalance.get_working_host() is not None:
if _DHCPRebalance.no_networks_on_down_host():
# Check to see if there are no networks on the
# down host in the first place.
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
else:
DLOG.error("Could not get networks on agent: %s" % agent_id)
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
def _get_networks_on_agents():
"""
Get Networks hosted by an DHCP Agent
Note paging is not supported by the dhcp-agent api.
"""
from nfv_vim import tables
global _DHCPRebalance
# Agent of interest is first in the list.
# In the case of an agent going down, this will be important
agent_id, host_name = _DHCPRebalance.get_current_dhcp_agent()
host_table = tables.tables_get_host_table()
host = host_table.get(host_name, None)
if host is not None:
_DHCPRebalance.update_current_dhcp_agent('host_uuid', host.uuid)
else:
DLOG.error("Cannot find rebalance host: %s" % host_name)
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
nfvi.nfvi_get_dhcp_agent_networks(
agent_id, _get_dhcp_agent_networks_callback(agent_id))
@coroutine
def _get_network_agents_callback():
"""
Get Network Agents callback
"""
response = (yield)
_get_network_agents_callback_body(response)
def _get_network_agents_callback_body(response):
"""
Get Network Agents callback
"""
global _DHCPRebalance
_DHCPRebalance.state_machine_in_progress = False
DLOG.debug("_get_network_agents_callback, response = %s" % response)
if response['completed']:
result_data = response.get('result-data', None)
num_agents = _DHCPRebalance.populate_dhcp_agents(result_data)
if num_agents < 2:
# no sense doing anything if less than 2 agents
DLOG.debug("Less than 2 dhcp agents, no rebalancing required")
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
else:
_DHCPRebalance.set_state(
DHCP_REBALANCE_STATE.GET_NETWORKS_HOSTED_ON_AGENT)
else:
DLOG.error("Failed to get network agents, aborting dhcp agent "
"rebalance")
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
def _get_network_agents():
"""
Get Network Agents
Note paging is not supported for getting network agents.
"""
global _DHCPRebalance
nfvi.nfvi_get_network_agents(_get_network_agents_callback())
def _reschedule_down_agent():
"""
Reschedule down agent
"""
# For each network on the down agent, schedule it to the host with the
# least amount of networks that also hosts the required physical networks.
global _DHCPRebalance
found_network_to_move = False
network_to_move = ''
num_networks_on_agents, possible_agent_targets = \
_DHCPRebalance.get_agent_list_scheduling_info()
# Remove the agent going down from consideration.
possible_agent_targets.pop(0)
num_networks_on_agents.pop(0)
while not found_network_to_move:
network_to_move, network_to_move_physical_network = \
_DHCPRebalance.get_next_network_to_move()
if network_to_move is None:
# we're done...
break
agent_with_least_networks = 0
while len(possible_agent_targets) > 0:
min_networks = min(num_networks_on_agents)
agent_with_least_networks_index = \
num_networks_on_agents.index(min_networks)
agent_with_least_networks = \
possible_agent_targets[agent_with_least_networks_index]
host_physical_networks = \
_DHCPRebalance.get_host_physical_networks(
agent_with_least_networks)
# Does the host of this agent have the needed physical network?
if network_to_move_physical_network not in host_physical_networks:
# Check next agent/host
possible_agent_targets.pop(agent_with_least_networks_index)
num_networks_on_agents.pop(agent_with_least_networks_index)
else:
# This target agent/host is good.
break
if len(possible_agent_targets) == 0:
_DHCPRebalance.move_agent_network_to_cant_schedule(network_to_move,
0)
DLOG.debug("Unable to reschedule network: %s, no valid"
" target found" % network_to_move)
found_network_to_move = False
else:
found_network_to_move = True
_DHCPRebalance.move_agent_network(network_to_move, 0,
agent_with_least_networks)
_DHCPRebalance.agent_list_increment(agent_with_least_networks)
_DHCPRebalance.agent_list_decrement(0)
source_agent_id = _DHCPRebalance.get_agent_id_from_index(0)
target_agent_id = \
_DHCPRebalance.get_agent_id_from_index(
agent_with_least_networks)
DLOG.debug("Rescheduling network:%s to agent: %s" %
(network_to_move, target_agent_id))
nfvi.nfvi_remove_network_from_dhcp_agent(
source_agent_id,
network_to_move,
_remove_network_from_dhcp_agent_callback(
target_agent_id,
network_to_move))
if not found_network_to_move:
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
def _reschedule_new_agent():
"""
Reschedule for a new agent coming up.
Try to achieve a balance of networks hosted by the DHCP agents.
"""
global _DHCPRebalance
agent_with_least_networks, min_networks = \
_DHCPRebalance.get_min_agent_list_data()
agent_with_most_networks, max_networks = \
_DHCPRebalance.get_max_agent_list_data()
if (max_networks - min_networks) <= _DHCPRebalance.network_diff_threshold:
DLOG.debug("Threshold exit")
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
return
num_networks_on_agents = list()
possible_agent_targets = list()
num_networks_on_agents, possible_agent_targets = \
_DHCPRebalance.get_agent_list_scheduling_info()
# Remove our current max network agent from consideration.
agent_with_most_networks_index = \
possible_agent_targets.index(agent_with_most_networks)
possible_agent_targets.pop(agent_with_most_networks_index)
num_networks_on_agents.pop(agent_with_most_networks_index)
while (True):
min_networks = min(num_networks_on_agents)
agent_with_least_networks_index = \
num_networks_on_agents.index(min_networks)
agent_with_least_networks = \
possible_agent_targets[agent_with_least_networks_index]
host_physical_networks = _DHCPRebalance.get_host_physical_networks(
agent_with_least_networks)
# find a network on the agent with most networks whose corresponding
# physical network is supported of the agent with least networks.
network_to_move = _DHCPRebalance.find_network_with_physical_networks(
agent_with_most_networks,
host_physical_networks)
if network_to_move is None:
# Couldn't find a match, eliminate the current least network agent
# as a candidate.
DLOG.debug("Could not find a network to move to agent %s" %
agent_with_least_networks)
agent_with_least_networks_index = \
possible_agent_targets.index(agent_with_least_networks)
possible_agent_targets.pop(agent_with_least_networks_index)
num_networks_on_agents.pop(agent_with_least_networks_index)
if len(possible_agent_targets) == 0:
# no more agents left to try, we can't move any networks off
# the current max network agent. Remove it from consideration.
DLOG.debug("No more agents to try for max network agent:%s" %
agent_with_most_networks)
_DHCPRebalance.agent_list_remove((agent_with_most_networks,
max_networks))
# keep same state so we will come back, clear the below flag
# as no callback will do it for us.
_DHCPRebalance.state_machine_in_progress = False
return
else:
# before we move this network, it is possible that due to
# incompatible networks, we now are looking at an agent that
# doesn't meet our threshold requirements if that is the case,
# do not move the network. We are done trying to move networks
# off this agent
if (max_networks - min_networks) <= \
_DHCPRebalance.network_diff_threshold:
DLOG.debug("No more agents to try for max network agent:%s "
"and threshold not met, cannot balance." %
agent_with_most_networks)
_DHCPRebalance.agent_list_remove((agent_with_most_networks,
max_networks))
# clear the below flag as no callback will do it for us.
_DHCPRebalance.state_machine_in_progress = False
return
_DHCPRebalance.move_agent_network(network_to_move,
agent_with_most_networks,
agent_with_least_networks)
_DHCPRebalance.agent_list_increment(agent_with_least_networks)
_DHCPRebalance.agent_list_decrement(agent_with_most_networks)
source_agent_id = \
_DHCPRebalance.get_agent_id_from_index(
agent_with_most_networks)
target_agent_id = \
_DHCPRebalance.get_agent_id_from_index(
agent_with_least_networks)
DLOG.debug("Rescheduling network:%s from agent: %s to agent: %s" %
(network_to_move, source_agent_id, target_agent_id))
nfvi.nfvi_remove_network_from_dhcp_agent(
source_agent_id,
network_to_move,
_remove_network_from_dhcp_agent_callback(
target_agent_id,
network_to_move))
return
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
@coroutine
def _dr_timer():
"""
DHCP Network Rebalance timer
"""
from nfv_vim import dor
while True:
(yield)
if dor.dor_is_complete():
_run_state_machine()
def _run_state_machine():
"""
DHCP Network Rebalance state machine
"""
global _DHCPRebalance
if not _DHCPRebalance.state_machine_in_progress:
_DHCPRebalance.state_machine_in_progress = True
my_state = _DHCPRebalance.get_state()
DLOG.debug("Network Rebalance State %s" % my_state)
if my_state == DHCP_REBALANCE_STATE.GET_DHCP_AGENTS:
_DHCPRebalance.reinit()
_get_network_agents()
elif my_state == DHCP_REBALANCE_STATE.GET_NETWORKS_HOSTED_ON_AGENT:
_get_networks_on_agents()
elif my_state == DHCP_REBALANCE_STATE.GET_HOST_PHYSICAL_NETWORKS:
_get_host_data_networks()
elif my_state == DHCP_REBALANCE_STATE.RESCHEDULE_DOWN_AGENT:
_reschedule_down_agent()
elif my_state == DHCP_REBALANCE_STATE.RESCHEDULE_NEW_AGENT:
_reschedule_new_agent()
elif my_state == DHCP_REBALANCE_STATE.DONE:
_DHCPRebalance.state_machine_in_progress = False
# Check for work...
if ((len(_DHCPRebalance.host_up_queue) > 0) or
(len(_DHCPRebalance.host_down_queue) > 0)):
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.HOLD_OFF)
elif my_state == DHCP_REBALANCE_STATE.HOLD_OFF:
_DHCPRebalance.state_machine_in_progress = False
if _DHCPRebalance.hold_off_is_done():
if len(_DHCPRebalance.host_down_queue) > 0:
# A reschedule for every down host is required.
# Do the down hosts rescheduling before handling
# the up hosts, as if both are pending, we don't
# want to move networks to agents that are about to
# go down.
down_host = _DHCPRebalance.host_down_queue.pop(0)
_DHCPRebalance.set_working_host(down_host)
DLOG.info("Triggering DHCP Agent reschedule for "
"disabled dhcp agent host: %s" %
down_host)
elif len(_DHCPRebalance.host_up_queue) > 0:
# Even if multiple hosts come up, we only need to
# reschedule once
_DHCPRebalance.set_working_host(None)
DLOG.info("Triggering DHCP Agent reschedule for "
"enabled dhcp agent host(s): %s" %
_DHCPRebalance.host_up_queue)
del _DHCPRebalance.host_up_queue[:]
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.GET_DHCP_AGENTS)
else:
DLOG.error("Unknown state: %s, resetting" % my_state)
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
def dr_initialize():
"""
Initialize DHCP Network Rebalance handling
"""
global _DHCPRebalance
_DHCPRebalance.set_state(DHCP_REBALANCE_STATE.DONE)
if config.section_exists('dhcp-agent-rebalance'):
section = config.CONF['dhcp-agent-rebalance']
_dr_timer_interval = int(section.get('timer_interval', 10))
_DHCPRebalance.network_diff_threshold = \
int(section.get('network_diff_threshold', 3))
_DHCPRebalance.hold_off = int(section.get('hold_off', 3))
else:
_dr_timer_interval = 10
_DHCPRebalance.network_diff_threshold = 3
_DHCPRebalance.hold_off = 3
timers.timers_create_timer('nr', 1, _dr_timer_interval, _dr_timer)
def dr_finalize():
"""
Finalize DHCP Network Rebalance handling
"""
pass

View File

@ -38,7 +38,8 @@ class L3RebalanceState(Constants):
GET_NETWORK_AGENTS = Constant('GET_NETWORK_AGENTS')
GET_ROUTERS_HOSTED_ON_AGENT = Constant('GET_ROUTERS_HOSTED_ON_AGENT')
GET_ROUTER_PORT_NETWORKS = Constant('GET_ROUTER_PORT_NETWORKS')
GET_PHYSICAL_NETWORK_FROM_NETWORKS = Constant('GET_PHYSICAL_NETWORK_FROM_NETWORKS')
GET_PHYSICAL_NETWORK_FROM_NETWORKS = \
Constant('GET_PHYSICAL_NETWORK_FROM_NETWORKS')
GET_HOST_PHYSICAL_NETWORKS = Constant('GET_HOST_PHYSICAL_NETWORKS')
RESCHEDULE_DOWN_AGENT = Constant('RESCHEDULE_DOWN_AGENT')
RESCHEDULE_NEW_AGENT = Constant('RESCHEDULE_NEW_AGENT')
@ -143,8 +144,8 @@ class L3AgentRebalance(object):
# move on to next one.
self.router_idx = 0
self.l3agent_idx += 1
if (((self.working_host is not None) and (self.l3agent_idx == 1)) or
(self.l3agent_idx == self.num_l3agents)):
if (((self.working_host is not None) and (self.l3agent_idx == 1))
or (self.l3agent_idx == self.num_l3agents)):
# We have router port info for all routers on all agents
# that we care about. Get the Physical Network info for these.
return True
@ -153,7 +154,8 @@ class L3AgentRebalance(object):
return False
def get_current_working_router(self):
agent_routers = self.router_ids_per_agent[self.l3agents[self.l3agent_idx]['id']]
agent_routers = \
self.router_ids_per_agent[self.l3agents[self.l3agent_idx]['id']]
self.num_routers = len(agent_routers)
if self.num_routers > 0:
working_router = agent_routers[self.router_idx]
@ -163,11 +165,13 @@ class L3AgentRebalance(object):
return None
def get_current_working_network(self):
agent_routers = self.router_ids_per_agent[self.l3agents[self.l3agent_idx]['id']]
agent_routers = \
self.router_ids_per_agent[self.l3agents[self.l3agent_idx]['id']]
self.num_routers = len(agent_routers)
if self.num_routers > 0:
working_router = agent_routers[self.router_idx]
working_network = self.networks_per_router[working_router][self.net_idx]
working_network = \
self.networks_per_router[working_router][self.net_idx]
return working_network
else:
return None
@ -184,10 +188,12 @@ class L3AgentRebalance(object):
physical_network.
Returns True if there are no more networks to gather, False otherwise
"""
agent_routers = self.router_ids_per_agent[self.l3agents[self.l3agent_idx]['id']]
agent_routers = \
self.router_ids_per_agent[self.l3agents[self.l3agent_idx]['id']]
working_router = agent_routers[self.router_idx]
# overwrite the netid with that of the physical network.
self.networks_per_router[working_router][self.net_idx] = physical_network
self.networks_per_router[working_router][self.net_idx] = \
physical_network
self.net_idx += 1
if self.net_idx == len(self.networks_per_router[working_router]):
@ -196,11 +202,14 @@ class L3AgentRebalance(object):
if self.router_idx >= len(agent_routers):
self.router_idx = 0
self.l3agent_idx += 1
if (self.l3agent_idx >= self.num_l3agents) or self.get_working_host():
if (self.l3agent_idx >= self.num_l3agents) or \
self.get_working_host():
return True
# Iterate until we find an agent with routers, or until we've run out of agents
while (len(self.router_ids_per_agent[self.l3agents[self.l3agent_idx]['id']]) == 0):
# Iterate until we find an agent with routers, or until
# we've run out of agents
while (len(self.router_ids_per_agent[
self.l3agents[self.l3agent_idx]['id']]) == 0):
self.l3agent_idx += 1
if (self.l3agent_idx >= self.num_l3agents):
return True
@ -225,7 +234,8 @@ class L3AgentRebalance(object):
agent_id = self.get_down_agent_id()
if len(self.router_ids_per_agent[agent_id]) > 0:
router_to_move = self.router_ids_per_agent[agent_id][0]
router_to_move_physical_networks = self.networks_per_router[router_to_move]
router_to_move_physical_networks = \
self.networks_per_router[router_to_move]
return router_to_move, router_to_move_physical_networks
else:
return None, None
@ -233,14 +243,16 @@ class L3AgentRebalance(object):
def find_router_with_physical_networks(self,
agent_idx,
physical_networks):
agent_routers = self.router_ids_per_agent[self.l3agents[agent_idx]['id']]
agent_routers = \
self.router_ids_per_agent[self.l3agents[agent_idx]['id']]
all_networks_found = False
router_id = None
for router_id in agent_routers:
router_networks = self.networks_per_router[router_id]
DLOG.debug("router_networks = %s, physical_networks = %s" % (router_networks, physical_networks))
DLOG.debug("router_networks = %s, physical_networks = %s" %
(router_networks, physical_networks))
all_networks_found = True
for network in router_networks:
if network not in physical_networks:
@ -299,10 +311,12 @@ class L3AgentRebalance(object):
_L3Rebalance.num_routers_on_agents[agent_index] -= 1
self.router_ids_per_agent[source_agent_id].remove(router_to_move)
if self.router_ids_per_agent_cant_schedule.get(source_agent_id, None) is None:
if self.router_ids_per_agent_cant_schedule.get(source_agent_id, None) \
is None:
self.router_ids_per_agent_cant_schedule[source_agent_id] = list()
self.router_ids_per_agent_cant_schedule[source_agent_id].append(router_to_move)
self.router_ids_per_agent_cant_schedule[source_agent_id].append(
router_to_move)
def get_working_host(self):
# working_host will be None if we are doing a rebalance
@ -319,17 +333,20 @@ class L3AgentRebalance(object):
possible_agent_targets = range(0, len(self.num_routers_on_agents))
# find the agent with the least amount of routers.
agent_with_least_routers = min(possible_agent_targets,
key=self.num_routers_on_agents.__getitem__)
agent_with_least_routers = \
min(possible_agent_targets,
key=self.num_routers_on_agents.__getitem__)
min_routers = self.num_routers_on_agents[agent_with_least_routers]
agent_with_most_routers = max(possible_agent_targets,
key=self.num_routers_on_agents.__getitem__)
agent_with_most_routers = \
max(possible_agent_targets,
key=self.num_routers_on_agents.__getitem__)
max_routers = self.num_routers_on_agents[agent_with_most_routers]
if ((max_routers - min_routers) <= _L3Rebalance.router_diff_threshold):
DLOG.info("L3 Agent routers balanced, max:%s min:%s threshold:%s" %
(max_routers, min_routers, _L3Rebalance.router_diff_threshold))
(max_routers, min_routers,
_L3Rebalance.router_diff_threshold))
return True
return False
@ -373,12 +390,16 @@ class L3AgentRebalance(object):
self.agent_list.append(agent_list_entry)
def get_min_agent_list_data(self):
agent_with_least_routers_entry = min(self.agent_list, key=lambda t: t[1])
return agent_with_least_routers_entry[0], agent_with_least_routers_entry[1]
agent_with_least_routers_entry = \
min(self.agent_list, key=lambda t: t[1])
return agent_with_least_routers_entry[0], \
agent_with_least_routers_entry[1]
def get_max_agent_list_data(self):
agent_with_most_routers_entry = max(self.agent_list, key=lambda t: t[1])
return agent_with_most_routers_entry[0], agent_with_most_routers_entry[1]
agent_with_most_routers_entry = \
max(self.agent_list, key=lambda t: t[1])
return agent_with_most_routers_entry[0], \
agent_with_most_routers_entry[1]
def get_agent_list_scheduling_info(self):
possible_agent_targets = list()
@ -410,19 +431,26 @@ class L3AgentRebalance(object):
def debug_dump(self):
DLOG.debug("_L3Rebalance.l3agents = %s" % _L3Rebalance.l3agents)
DLOG.debug("_L3Rebalance.router_ids_per_agent= %s" % _L3Rebalance.router_ids_per_agent)
DLOG.debug("_L3Rebalance.networks_per_router= %s" % _L3Rebalance.networks_per_router)
DLOG.debug("_L3Rebalance.state_machine_in_progress= %s" % _L3Rebalance.state_machine_in_progress)
DLOG.debug("_L3Rebalance.l3agent_idx= %s" % _L3Rebalance.l3agent_idx)
DLOG.debug("_L3Rebalance.num_l3agents= %s" % _L3Rebalance.num_l3agents)
DLOG.debug("_L3Rebalance.router_idx= %s" % _L3Rebalance.router_idx)
DLOG.debug("_L3Rebalance.num_routers_on_agents= %s" % _L3Rebalance.num_routers_on_agents)
DLOG.debug("_L3Rebalance.router_ids_per_agent= %s" %
_L3Rebalance.router_ids_per_agent)
DLOG.debug("_L3Rebalance.networks_per_router= %s" %
_L3Rebalance.networks_per_router)
DLOG.debug("_L3Rebalance.state_machine_in_progress= %s" %
_L3Rebalance.state_machine_in_progress)
DLOG.debug("_L3Rebalance.l3agent_idx= %s" %
_L3Rebalance.l3agent_idx)
DLOG.debug("_L3Rebalance.num_l3agents= %s" %
_L3Rebalance.num_l3agents)
DLOG.debug("_L3Rebalance.router_idx= %s" %
_L3Rebalance.router_idx)
DLOG.debug("_L3Rebalance.num_routers_on_agents= %s" %
_L3Rebalance.num_routers_on_agents)
_L3Rebalance = L3AgentRebalance()
def add_rebalance_work(host_name, host_is_going_down):
def add_rebalance_work_l3(host_name, host_is_going_down):
"""
API for external use to launch a rebalance operation.
host_is_going_down is boolean indicating if the host is
@ -453,7 +481,8 @@ def _add_router_to_agent_callback_body(response):
_L3Rebalance.state_machine_in_progress = False
DLOG.debug("_add_router_to_agent_callback, response = %s" % response)
if not response['completed']:
# Nothing we can really do except log this and resume our state machine..
# Nothing we can really do except log this and resume our
# state machine.
DLOG.warn("Unable to add router to l3 agent, response = %s" % response)
@ -477,12 +506,14 @@ def _remove_router_from_agent_callback_body(to_agent_id, router_id, response):
if response['completed']:
# After successfully detaching router from agent, attach
# to target agent.
nfvi.nfvi_add_router_to_agent(to_agent_id, router_id, _add_router_to_agent_callback())
nfvi.nfvi_add_router_to_agent(to_agent_id, router_id,
_add_router_to_agent_callback())
else:
# Couldn't detach the router, no sense trying to attach.
# Just resume state machine.
_L3Rebalance.state_machine_in_progress = False
DLOG.warn("Unable to remove router from l3 agent, response = %s" % response)
DLOG.warn("Unable to remove router from l3 agent, response = %s" %
response)
@coroutine
@ -511,9 +542,11 @@ def _get_datanetworks_callback_body(host_id, response):
if _L3Rebalance.datanetworks_done():
# Make the choice of which state to enter here
if _L3Rebalance.get_working_host() is not None:
_L3Rebalance.set_state(L3_REBALANCE_STATE.RESCHEDULE_DOWN_AGENT)
_L3Rebalance.set_state(
L3_REBALANCE_STATE.RESCHEDULE_DOWN_AGENT)
else:
_L3Rebalance.set_state(L3_REBALANCE_STATE.RESCHEDULE_NEW_AGENT)
_L3Rebalance.set_state(
L3_REBALANCE_STATE.RESCHEDULE_NEW_AGENT)
else:
DLOG.error("Unable to retrieve data networks for host: %s" % host_id)
# TODO(KSMITH) Is this error recoverable? For now, abort.
@ -549,10 +582,13 @@ def _get_physical_network_callback_body(network_id, response):
DLOG.debug("_get_physical_network_callback, response = %s" % response)
if response['completed']:
result_data = response.get('result-data', None)
if _L3Rebalance.update_network(result_data['provider:physical_network']):
_L3Rebalance.set_state(L3_REBALANCE_STATE.GET_HOST_PHYSICAL_NETWORKS)
if _L3Rebalance.update_network(
result_data['provider:physical_network']):
_L3Rebalance.set_state(
L3_REBALANCE_STATE.GET_HOST_PHYSICAL_NETWORKS)
else:
DLOG.error("Unable to get physical network for network: %s" % network_id)
DLOG.error("Unable to get physical network for network: %s" %
network_id)
# TODO(KSMITH) Is this error recoverable? For now, abort.
_L3Rebalance.set_state(L3_REBALANCE_STATE.DONE)
@ -565,7 +601,9 @@ def _get_physical_networks():
network_id = _L3Rebalance.get_current_working_network()
DLOG.debug("Current working network: %s" % network_id)
if network_id is not None:
nfvi.nfvi_get_physical_network(network_id, _get_physical_network_callback(network_id))
nfvi.nfvi_get_physical_network(network_id,
_get_physical_network_callback(
network_id))
else:
# We get here if there are no routers on this agent,
# Stay in same state, but advance to next agent
@ -608,9 +646,11 @@ def _get_router_ports_callback_body(router, response):
if _L3Rebalance.router_ports_done():
# we're done getting routers for this agent.
_L3Rebalance.set_state(L3_REBALANCE_STATE.GET_PHYSICAL_NETWORK_FROM_NETWORKS)
_L3Rebalance.set_state(
L3_REBALANCE_STATE.GET_PHYSICAL_NETWORK_FROM_NETWORKS)
DLOG.debug("_L3Rebalance.networks_per_router = %s" % _L3Rebalance.networks_per_router)
DLOG.debug("_L3Rebalance.networks_per_router = %s" %
_L3Rebalance.networks_per_router)
else:
DLOG.error("Unable to get ports for router: %s" % router)
# TODO(KSMITH) Is this error recoverable? For now, abort.
@ -633,7 +673,8 @@ def _get_router_port_networks():
elif _L3Rebalance.router_ports_done():
# we're done getting routers port networks,
# advance to next state
_L3Rebalance.set_state(L3_REBALANCE_STATE.GET_PHYSICAL_NETWORK_FROM_NETWORKS)
_L3Rebalance.set_state(
L3_REBALANCE_STATE.GET_PHYSICAL_NETWORK_FROM_NETWORKS)
else:
# We get here if there are no routers on this agent,
# Stay in same state, but advance to next agent
@ -664,7 +705,8 @@ def _get_agent_routers_callback_body(agent_id, response):
for router in result_data:
_L3Rebalance.add_router_to_agent(agent_id, router['id'])
DLOG.debug("_L3Rebalance.l3agent_idx = %s, _L3Rebalance.num_l3agents = %s" %
DLOG.debug("_L3Rebalance.l3agent_idx = %s, "
"_L3Rebalance.num_l3agents = %s" %
(_L3Rebalance.l3agent_idx, _L3Rebalance.num_l3agents))
if _L3Rebalance.agent_routers_done():
@ -712,7 +754,8 @@ def _get_routers_on_agents():
DLOG.error("Cannot find rebalance host: %s" % host_name)
_L3Rebalance.set_state(L3_REBALANCE_STATE.DONE)
nfvi.nfvi_get_agent_routers(agent_id, _get_agent_routers_callback(agent_id))
nfvi.nfvi_get_agent_routers(agent_id,
_get_agent_routers_callback(agent_id))
@coroutine
@ -743,7 +786,8 @@ def _get_network_agents_callback_body(response):
DLOG.info("Less than 2 l3agents, no rebalancing required")
_L3Rebalance.set_state(L3_REBALANCE_STATE.DONE)
else:
_L3Rebalance.set_state(L3_REBALANCE_STATE.GET_ROUTERS_HOSTED_ON_AGENT)
_L3Rebalance.set_state(
L3_REBALANCE_STATE.GET_ROUTERS_HOSTED_ON_AGENT)
else:
DLOG.error("Failed to get network agents, aborting l3 agent rebalance")
@ -795,10 +839,14 @@ def _reschedule_down_agent():
min_routers = min(num_routers_on_agents)
agent_with_least_routers_index = num_routers_on_agents.index(min_routers)
agent_with_least_routers = possible_agent_targets[agent_with_least_routers_index]
agent_with_least_routers_index = \
num_routers_on_agents.index(min_routers)
agent_with_least_routers = \
possible_agent_targets[agent_with_least_routers_index]
host_physical_networks = _L3Rebalance.get_host_physical_networks(agent_with_least_routers)
host_physical_networks = \
_L3Rebalance.get_host_physical_networks(
agent_with_least_routers)
# Does the host of this agent have the needed physical networks?
target_good = True
@ -821,19 +869,23 @@ def _reschedule_down_agent():
found_router_to_move = False
else:
found_router_to_move = True
_L3Rebalance.move_agent_router(router_to_move, 0, agent_with_least_routers)
_L3Rebalance.move_agent_router(router_to_move, 0,
agent_with_least_routers)
_L3Rebalance.agent_list_increment(agent_with_least_routers)
_L3Rebalance.agent_list_decrement(0)
source_agent_id = _L3Rebalance.get_agent_id_from_index(0)
target_agent_id = _L3Rebalance.get_agent_id_from_index(agent_with_least_routers)
DLOG.debug("Rescheduling router:%s to agent: %s" % (router_to_move, target_agent_id))
nfvi.nfvi_remove_router_from_agent(source_agent_id,
router_to_move,
_remove_router_from_agent_callback(
target_agent_id,
router_to_move))
target_agent_id = \
_L3Rebalance.get_agent_id_from_index(agent_with_least_routers)
DLOG.debug("Rescheduling router:%s to agent: %s" %
(router_to_move, target_agent_id))
nfvi.nfvi_remove_router_from_agent(
source_agent_id,
router_to_move,
_remove_router_from_agent_callback(
target_agent_id,
router_to_move))
if not found_router_to_move:
_L3Rebalance.set_state(L3_REBALANCE_STATE.DONE)
@ -846,8 +898,10 @@ def _reschedule_new_agent():
"""
global _L3Rebalance
agent_with_least_routers, min_routers = _L3Rebalance.get_min_agent_list_data()
agent_with_most_routers, max_routers = _L3Rebalance.get_max_agent_list_data()
agent_with_least_routers, min_routers = \
_L3Rebalance.get_min_agent_list_data()
agent_with_most_routers, max_routers = \
_L3Rebalance.get_max_agent_list_data()
if (max_routers - min_routers) <= _L3Rebalance.router_diff_threshold:
DLOG.debug("Threshold exit")
@ -860,7 +914,8 @@ def _reschedule_new_agent():
_L3Rebalance.get_agent_list_scheduling_info()
# Remove our current max router agent from consideration.
agent_with_most_routers_index = possible_agent_targets.index(agent_with_most_routers)
agent_with_most_routers_index = \
possible_agent_targets.index(agent_with_most_routers)
possible_agent_targets.pop(agent_with_most_routers_index)
num_routers_on_agents.pop(agent_with_most_routers_index)
@ -868,8 +923,10 @@ def _reschedule_new_agent():
min_routers = min(num_routers_on_agents)
agent_with_least_routers_index = num_routers_on_agents.index(min_routers)
agent_with_least_routers = possible_agent_targets[agent_with_least_routers_index]
agent_with_least_routers_index = \
num_routers_on_agents.index(min_routers)
agent_with_least_routers = \
possible_agent_targets[agent_with_least_routers_index]
host_physical_networks = _L3Rebalance.get_host_physical_networks(
agent_with_least_routers)
@ -881,10 +938,12 @@ def _reschedule_new_agent():
host_physical_networks)
if router_to_move is None:
# Couldn't find a match, eliminate the current least router agent
# as a candidate.
DLOG.debug("Could not find a router to move to agent %s" % agent_with_least_routers)
agent_with_least_routers_index = possible_agent_targets.index(agent_with_least_routers)
# Couldn't find a match, eliminate the current least router
# agent as a candidate.
DLOG.debug("Could not find a router to move to agent %s" %
agent_with_least_routers)
agent_with_least_routers_index = \
possible_agent_targets.index(agent_with_least_routers)
possible_agent_targets.pop(agent_with_least_routers_index)
num_routers_on_agents.pop(agent_with_least_routers_index)
@ -893,21 +952,25 @@ def _reschedule_new_agent():
# the current max router agent. Remove it from consideration.
DLOG.debug("No more agents to try for max router agent")
_L3Rebalance.agent_list_remove((agent_with_most_routers, max_routers))
# keep same state so we will come back, clear the below flag as no callback
# will do it for us.
_L3Rebalance.agent_list_remove((agent_with_most_routers,
max_routers))
# keep same state so we will come back, clear the below flag
# as no callback will do it for us.
_L3Rebalance.state_machine_in_progress = False
return
else:
# before we move this router, it is possible that due to incompatible networks,
# we now are looking at an agent that doesn't meet our threshold requirements
# if that is the case, do not move the router. We are done trying to move
# before we move this router, it is possible that due to
# incompatible networks, we now are looking at an agent that
# doesn't meet our threshold requirements if that is the case,
# do not move the router. We are done trying to move
# routers off this agent
if (max_routers - min_routers) <= _L3Rebalance.router_diff_threshold:
if (max_routers - min_routers) <= \
_L3Rebalance.router_diff_threshold:
DLOG.debug("No more agents to try for max router agent "
"and threshold not met, cannot balance.")
_L3Rebalance.agent_list_remove((agent_with_most_routers, max_routers))
_L3Rebalance.agent_list_remove((agent_with_most_routers,
max_routers))
# clear the below flag as no callback will do it for us.
_L3Rebalance.state_machine_in_progress = False
return
@ -919,16 +982,19 @@ def _reschedule_new_agent():
_L3Rebalance.agent_list_increment(agent_with_least_routers)
_L3Rebalance.agent_list_decrement(agent_with_most_routers)
source_agent_id = _L3Rebalance.get_agent_id_from_index(agent_with_most_routers)
target_agent_id = _L3Rebalance.get_agent_id_from_index(agent_with_least_routers)
source_agent_id = \
_L3Rebalance.get_agent_id_from_index(agent_with_most_routers)
target_agent_id = \
_L3Rebalance.get_agent_id_from_index(agent_with_least_routers)
DLOG.debug("Rescheduling router:%s from agent: %s to agent: %s" %
(router_to_move, source_agent_id, target_agent_id))
nfvi.nfvi_remove_router_from_agent(source_agent_id,
router_to_move,
_remove_router_from_agent_callback(
target_agent_id,
router_to_move))
nfvi.nfvi_remove_router_from_agent(
source_agent_id,
router_to_move,
_remove_router_from_agent_callback(
target_agent_id,
router_to_move))
return
@ -1018,11 +1084,12 @@ def _nr_timer():
"""
Network Rebalance timer
"""
global _L3Rebalance
from nfv_vim import dor
while True:
(yield)
_run_state_machine()
if dor.dor_is_complete():
_run_state_machine()
def nr_initialize():
@ -1036,10 +1103,12 @@ def nr_initialize():
if config.section_exists('l3agent-rebalance'):
section = config.CONF['l3agent-rebalance']
_nr_timer_interval = int(section.get('timer_interval', 10))
_L3Rebalance.router_diff_threshold = int(section.get('router_diff_threshold', 3))
_L3Rebalance.router_diff_threshold = \
int(section.get('router_diff_threshold', 3))
_L3Rebalance.hold_off = int(section.get('hold_off', 3))
if _L3Rebalance.router_diff_threshold < 1:
DLOG.warn("Invalid setting for router_diff_threshold: %s, forcing to 1" %
DLOG.warn("Invalid setting for router_diff_threshold: %s, "
"forcing to 1" %
_L3Rebalance.router_diff_threshold)
_L3Rebalance.router_diff_threshold = 1
if _nr_timer_interval < 1:

View File

@ -125,6 +125,7 @@ from nfv_vim.nfvi._nfvi_module import nfvi_finalize # noqa: F401
from nfv_vim.nfvi._nfvi_module import nfvi_initialize # noqa: F401
from nfv_vim.nfvi._nfvi_module import nfvi_reinitialize # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_add_network_to_dhcp_agent # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_add_router_to_agent # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_create_network # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_create_network_host_services # noqa: F401
@ -134,6 +135,7 @@ from nfv_vim.nfvi._nfvi_network_module import nfvi_delete_network_host_services
from nfv_vim.nfvi._nfvi_network_module import nfvi_delete_subnet # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_enable_network_host_services # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_get_agent_routers # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_get_dhcp_agent_networks # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_get_network # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_get_network_agents # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_get_networks # noqa: F401
@ -144,6 +146,7 @@ from nfv_vim.nfvi._nfvi_network_module import nfvi_get_subnets # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_network_plugin_disabled # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_notify_network_host_disabled # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_query_network_host_services # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_remove_network_from_dhcp_agent # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_remove_router_from_agent # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_update_network # noqa: F401
from nfv_vim.nfvi._nfvi_network_module import nfvi_update_subnet # noqa: F401

View File

@ -150,6 +150,15 @@ def nfvi_get_network_agents(callback):
return cmd_id
def nfvi_get_dhcp_agent_networks(agent_id, callback):
"""
Get networks hosted on a dhcp agent
"""
cmd_id = _network_plugin.invoke_plugin('get_dhcp_agent_networks',
agent_id, callback=callback)
return cmd_id
def nfvi_get_agent_routers(agent_id, callback):
"""
Get routers hosted on a l3 agent
@ -168,6 +177,24 @@ def nfvi_get_router_ports(router_id, callback):
return cmd_id
def nfvi_add_network_to_dhcp_agent(agent_id, network_id, callback):
"""
Add a network to a DHCP agent
"""
cmd_id = _network_plugin.invoke_plugin('add_network_to_dhcp_agent',
agent_id, network_id, callback=callback)
return cmd_id
def nfvi_remove_network_from_dhcp_agent(agent_id, network_id, callback):
"""
Remove a network from a DHCP Agent
"""
cmd_id = _network_plugin.invoke_plugin('remove_network_from_dhcp_agent',
agent_id, network_id, callback=callback)
return cmd_id
def nfvi_add_router_to_agent(agent_id, router_id, callback):
"""
Add a router to an L3 agent

View File

@ -137,6 +137,13 @@ class NFVINetworkAPI(object):
"""
pass
@abc.abstractmethod
def get_dhcp_agent_networks(self, future, agent_id, callback):
"""
Get networks hosted by a dhcp agent using the plugin
"""
pass
@abc.abstractmethod
def get_agent_routers(self, future, agent_id, callback):
"""
@ -151,6 +158,20 @@ class NFVINetworkAPI(object):
"""
pass
@abc.abstractmethod
def add_network_to_dhcp_agent(self, future, agent_id, network_id, callback):
"""
Add a network to a dhcp agent using the plugin
"""
pass
@abc.abstractmethod
def remove_network_from_dhcp_agent(self, future, agent_id, network_id, callback):
"""
Remove a network from a dhcp agent using the plugin
"""
pass
@abc.abstractmethod
def add_router_to_agent(self, future, agent_id, router_id, callback):
"""

View File

@ -85,8 +85,9 @@ def process_initialize():
directors.directors_initialize()
events.events_initialize()
audits.audits_initialize()
dor.dor_initialize()
network_rebalance.nr_initialize()
network_rebalance.dr_initialize()
dor.dor_initialize()
return init_complete
@ -109,6 +110,7 @@ def process_finalize():
Virtual Infrastructure Manager - Finalize
"""
dor.dor_finalize()
network_rebalance.dr_finalize()
network_rebalance.nr_finalize()
audits.audits_finalize()
events.events_finalize()