Merge "Wait for pods to terminate when host is locked"

changes/11/688011/1
Zuul 3 years ago committed by Gerrit Code Review
commit c5f174263a
  1. 2
      nfv/centos/build_srpm.data
  2. 23
      nfv/nfv-plugins/nfv_plugins/nfvi_plugins/clients/kubernetes_client.py
  3. 30
      nfv/nfv-plugins/nfv_plugins/nfvi_plugins/nfvi_infrastructure_api.py
  4. 122
      nfv/nfv-tests/nfv_unit_tests/tests/test_plugin_kubernetes_client.py
  5. 138
      nfv/nfv-vim/nfv_vim/host_fsm/_host_task_work.py
  6. 3
      nfv/nfv-vim/nfv_vim/host_fsm/_host_tasks.py
  7. 1
      nfv/nfv-vim/nfv_vim/nfvi/__init__.py
  8. 9
      nfv/nfv-vim/nfv_vim/nfvi/_nfvi_infrastructure_module.py

@ -1 +1 @@
TIS_PATCH_VER=76
TIS_PATCH_VER=77

@ -175,3 +175,26 @@ def mark_all_pods_not_ready(node_name, reason):
pod.metadata.namespace))
break
return
def get_terminating_pods(node_name):
"""
Get all pods on a node that are terminating
"""
# Get the client.
kube_client = get_client()
# Retrieve the pods on the specified node.
response = kube_client.list_namespaced_pod(
"", field_selector="spec.nodeName=%s" % node_name)
terminating_pods = list()
pods = response.items
if pods is not None:
for pod in pods:
# The presence of the deletion_timestamp indicates the pod is
# terminating.
if pod.metadata.deletion_timestamp is not None:
terminating_pods.append(pod.metadata.name)
return Result(','.join(terminating_pods))

@ -2110,6 +2110,36 @@ class NFVIInfrastructureAPI(nfvi.api.v1.NFVIInfrastructureAPI):
callback.send(response)
callback.close()
def get_terminating_pods(self, future, host_name, callback):
"""
Get list of terminating pods on a host
"""
response = dict()
response['completed'] = False
response['reason'] = ''
try:
future.set_timeouts(config.CONF.get('nfvi-timeouts', None))
future.work(kubernetes_client.get_terminating_pods, host_name)
future.result = (yield)
if not future.result.is_complete():
DLOG.error("Kubernetes get_terminating_pods failed, operation "
"did not complete, host_name=%s" % host_name)
return
response['result-data'] = future.result.data
response['completed'] = True
except Exception as e:
DLOG.exception("Caught exception while trying to get "
"terminating pods on %s, error=%s." % (host_name, e))
finally:
callback.send(response)
callback.close()
def host_rest_api_get_handler(self, request_dispatch):
"""
Host Rest-API GET handler callback

@ -310,3 +310,125 @@ class TestNFVPluginsK8SMarkAllPodsNotReady(testcase.NFVTestCase):
self.mock_patch_namespaced_pod_status.assert_called_with(
"test-pod-ready", "test-namespace-1", mock.ANY)
self.mock_patch_namespaced_pod_status.assert_called_once()
@mock.patch('kubernetes.config.load_kube_config', mock_load_kube_config)
class TestNFVPluginsK8SGetTerminatingPods(testcase.NFVTestCase):
list_namespaced_pod_result = {
'test-node-1': kubernetes.client.V1PodList(
api_version="v1",
items=[
kubernetes.client.V1Pod(
api_version="v1",
kind="Pod",
metadata=kubernetes.client.V1ObjectMeta(
name="test-pod-not-terminating",
namespace="test-namespace-1",
deletion_timestamp=None)
),
kubernetes.client.V1Pod(
api_version="v1",
kind="Pod",
metadata=kubernetes.client.V1ObjectMeta(
name="test-pod-terminating",
namespace="test-namespace-1",
deletion_timestamp="2019-10-03T16:54:25Z")
),
kubernetes.client.V1Pod(
api_version="v1",
kind="Pod",
metadata=kubernetes.client.V1ObjectMeta(
name="test-pod-not-terminating-2",
namespace="test-namespace-1",
deletion_timestamp=None)
)
]
),
'test-node-2': kubernetes.client.V1PodList(
api_version="v1",
items=[
kubernetes.client.V1Pod(
api_version="v1",
kind="Pod",
metadata=kubernetes.client.V1ObjectMeta(
name="test-pod-not-terminating",
namespace="test-namespace-1",
deletion_timestamp=None)
),
kubernetes.client.V1Pod(
api_version="v1",
kind="Pod",
metadata=kubernetes.client.V1ObjectMeta(
name="test-pod-not-terminating-2",
namespace="test-namespace-1",
deletion_timestamp=None)
)
]
),
'test-node-3': kubernetes.client.V1PodList(
api_version="v1",
items=[
kubernetes.client.V1Pod(
api_version="v1",
kind="Pod",
metadata=kubernetes.client.V1ObjectMeta(
name="test-pod-not-terminating",
namespace="test-namespace-1",
deletion_timestamp=None)
),
kubernetes.client.V1Pod(
api_version="v1",
kind="Pod",
metadata=kubernetes.client.V1ObjectMeta(
name="test-pod-terminating",
namespace="test-namespace-1",
deletion_timestamp="2019-10-03T16:54:25Z")
),
kubernetes.client.V1Pod(
api_version="v1",
kind="Pod",
metadata=kubernetes.client.V1ObjectMeta(
name="test-pod-terminating-2",
namespace="test-namespace-1",
deletion_timestamp="2019-10-03T16:55:25Z")
)
]
)
}
def setUp(self):
super(TestNFVPluginsK8SGetTerminatingPods, self).setUp()
def mock_list_namespaced_pod(obj, namespace, field_selector=""):
node_name = field_selector.split('spec.nodeName=', 1)[1]
return self.list_namespaced_pod_result[node_name]
self.mocked_list_namespaced_pod = mock.patch(
'kubernetes.client.CoreV1Api.list_namespaced_pod',
mock_list_namespaced_pod)
self.mocked_list_namespaced_pod.start()
def tearDown(self):
super(TestNFVPluginsK8SGetTerminatingPods, self).tearDown()
self.mocked_list_namespaced_pod.stop()
def test_get_terminating_with_terminating(self):
result = kubernetes_client.get_terminating_pods("test-node-1")
assert result.result_data == 'test-pod-terminating'
def test_get_terminating_no_terminating(self):
result = kubernetes_client.get_terminating_pods("test-node-2")
assert result.result_data == ''
def test_get_terminating_with_two_terminating(self):
result = kubernetes_client.get_terminating_pods("test-node-3")
assert result.result_data == \
'test-pod-terminating,test-pod-terminating-2'

@ -981,6 +981,144 @@ class DisableHostServicesTaskWork(state_machine.StateTaskWork):
return state_machine.STATE_TASK_WORK_RESULT.WAIT, empty_reason
class WaitHostServicesDisabledTaskWork(state_machine.StateTaskWork):
"""
Wait Host Services Disabled Task Work
"""
def __init__(self, task, host, service):
super(WaitHostServicesDisabledTaskWork, self).__init__(
'wait-host-services-disabled_%s_%s' % (host.name, service), task,
timeout_in_secs=180)
self._host_reference = weakref.ref(host)
self._service = service
self._query_inprogress = False
self._start_timestamp = None
@property
def _host(self):
"""
Returns the host
"""
host = self._host_reference()
return host
def timeout(self):
"""
Handle task work timeout
"""
if self._host.is_force_lock():
DLOG.info("Wait-Host-Services-Disabled timeout for %s, "
"force-locking, passing." % self._host.name)
return state_machine.STATE_TASK_WORK_RESULT.SUCCESS, empty_reason
if not self._host.has_reason():
self._host.update_failure_reason(
"Terminating pods on disabled host %s timed out." %
self._host.name)
return state_machine.STATE_TASK_WORK_RESULT.TIMED_OUT, empty_reason
@coroutine
def _get_callback(self):
"""
Callback for get terminating pods
"""
response = (yield)
self._query_inprogress = False
if self.task is not None:
DLOG.verbose("Get-Terminating-Pods callback for service: "
"%s %s, response=%s." %
(self._service, self._host.name, response))
if response['completed']:
if response['result-data'] != '':
DLOG.info("Get-Terminating-Pods callback for %s, "
"pods %s terminating" %
(self._service, response['result-data']))
return
# An empty response means no terminating pods exist.
DLOG.info("Get-Terminating-Pods callback for %s, "
"no pods are terminating" % self._service)
self.task.task_work_complete(
state_machine.STATE_TASK_WORK_RESULT.SUCCESS,
empty_reason)
else:
DLOG.info("Get-Terminating-Pods callback for %s, "
"failed" % self._host.name)
@coroutine
def _extend_callback(self):
"""
Callback for host services disable extend
"""
response = (yield)
if response['completed']:
DLOG.info("Extended host services disable timeout for host %s."
% self._host.name)
self._host.disable_extend_timestamp = \
timers.get_monotonic_timestamp_in_ms()
else:
DLOG.error("Failed to extend host services disable timeout for "
"host %s." % self._host.name)
def run(self):
"""
Run wait host services disabled
"""
from nfv_vim import objects
DLOG.verbose("Wait-Host-Services-Disabled for %s for service %s." %
(self._host.name, self._service))
now_ms = timers.get_monotonic_timestamp_in_ms()
self._host.disable_extend_timestamp = now_ms
self._start_timestamp = now_ms
if self._service == objects.HOST_SERVICES.CONTAINER:
# We will wait for a bit before doing our first query to ensure
# kubernetes has had time to start terminating the pods.
return state_machine.STATE_TASK_WORK_RESULT.WAIT, empty_reason
else:
reason = ("Trying to wait for unknown host service %s" %
self._service)
DLOG.error(reason)
self._host.update_failure_reason(reason)
return state_machine.STATE_TASK_WORK_RESULT.FAILED, reason
def handle_event(self, event, event_data=None):
"""
Handle events while waiting for host services to be disabled
"""
from nfv_vim import objects
handled = False
if HOST_EVENT.PERIODIC_TIMER == event:
if self._service == objects.HOST_SERVICES.CONTAINER:
if not self._query_inprogress:
now_ms = timers.get_monotonic_timestamp_in_ms()
elapsed_secs = (now_ms - self._start_timestamp) / 1000
# Wait 10s before doing our first query
if 10 <= elapsed_secs:
DLOG.verbose("Wait-Host-Services-Disabled for %s for "
"service %s. Doing query." %
(self._host.name, self._service))
self._query_inprogress = True
nfvi.nfvi_get_terminating_pods(self._host.name,
self._get_callback())
handled = True
elif HOST_EVENT.AUDIT == event:
now_ms = timers.get_monotonic_timestamp_in_ms()
elapsed_secs = (now_ms - self._host.disable_extend_timestamp) / 1000
if 120 <= elapsed_secs:
nfvi.nfvi_notify_host_services_disable_extend(
self._host.uuid, self._host.name, self._extend_callback())
return handled
class NotifyHostServicesEnabledTaskWork(state_machine.StateTaskWork):
"""
Notify Host Services Enabled Task Work

@ -28,6 +28,7 @@ from nfv_vim.host_fsm._host_task_work import NotifyInstancesHostDisabledTaskWork
from nfv_vim.host_fsm._host_task_work import NotifyInstancesHostDisablingTaskWork
from nfv_vim.host_fsm._host_task_work import QueryHypervisorTaskWork
from nfv_vim.host_fsm._host_task_work import WaitHostServicesCreatedTaskWork
from nfv_vim.host_fsm._host_task_work import WaitHostServicesDisabledTaskWork
DLOG = debug.debug_get_logger('nfv_vim.state_machine.host_task')
@ -245,6 +246,8 @@ class DisableHostTask(state_machine.StateTask):
if not sw_mgmt_director.single_controller:
task_work_list.append(DisableHostServicesTaskWork(
self, host, objects.HOST_SERVICES.CONTAINER))
task_work_list.append(WaitHostServicesDisabledTaskWork(
self, host, objects.HOST_SERVICES.CONTAINER))
task_work_list.append(notify_host_services_task(
self, host, force_pass=True))
if host.host_service_configured(objects.HOST_SERVICES.COMPUTE):

@ -101,6 +101,7 @@ from nfv_vim.nfvi._nfvi_infrastructure_module import nfvi_get_hosts # noqa: F40
from nfv_vim.nfvi._nfvi_infrastructure_module import nfvi_get_logs # noqa: F401
from nfv_vim.nfvi._nfvi_infrastructure_module import nfvi_get_system_info # noqa: F401
from nfv_vim.nfvi._nfvi_infrastructure_module import nfvi_get_system_state # noqa: F401
from nfv_vim.nfvi._nfvi_infrastructure_module import nfvi_get_terminating_pods # noqa: F401
from nfv_vim.nfvi._nfvi_infrastructure_module import nfvi_get_upgrade # noqa: F401
from nfv_vim.nfvi._nfvi_infrastructure_module import nfvi_lock_host # noqa: F401
from nfv_vim.nfvi._nfvi_infrastructure_module import nfvi_notify_host_failed # noqa: F401

@ -278,6 +278,15 @@ def nfvi_get_alarm_history(start_period, end_period, callback):
return cmd_id
def nfvi_get_terminating_pods(host_name, callback):
"""
Get terminating pods
"""
cmd_id = _infrastructure_plugin.invoke_plugin('get_terminating_pods',
host_name, callback=callback)
return cmd_id
def nfvi_register_host_add_callback(callback):
"""
Register for host add notifications

Loading…
Cancel
Save