# # Copyright (c) 2015-2016,2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # from nfv_common import debug from nfv_common.tasks._task_work import TaskWork DLOG = debug.debug_get_logger('nfv_common.tasks.task_future') class TaskFuture(object): """ Task Future """ def __init__(self, scheduler): """ Create a task future """ self._scheduler = scheduler self._result = None self._timeouts = None def set_timeouts(self, timeouts): """ Set the timeout values to be used when work is to be done Parameter timeouts is a dictionary of target and the timeout in seconds """ self._timeouts = timeouts def work(self, target, *args, **kwargs): """ Schedule work in the future """ timeout_in_secs = None if self._timeouts is not None: # Look for a target specific timeout module_name = target.__module__.split('.')[-1] timeout_name = "%s.%s" % (module_name, target.__name__) timeout_in_secs = self._timeouts.get(timeout_name, None) if timeout_in_secs is not None: timeout_in_secs = int(timeout_in_secs) else: # Look for a module level timeout timeout_name = "%s" % module_name timeout_in_secs = self._timeouts.get(timeout_name, None) if timeout_in_secs is not None: timeout_in_secs = int(timeout_in_secs) if timeout_in_secs is None: if kwargs: timeout_in_secs = kwargs.get('timeout_in_secs', None) if timeout_in_secs is not None: del kwargs['timeout_in_secs'] if timeout_in_secs is None: # WARNING: Any change to the default timeout must be reflected in # the timeouts used for any work being done. # The timeout to be changed back to 20sec when the start # software-api is async for patch-release timeout_in_secs = 600 elif 0 >= timeout_in_secs: timeout_in_secs = None # No timeout wanted, wait forever # Note about timeouts. When the timeout expires, the VIM will terminate # the worker process doing the work. Unfortunately, the python # multiprocessing library used to manage these processes results in # leaked file descriptors each time a process is terminated. That # means this timeout should be a last resort - the work being done # (e.g. sending a REST API request) must have its own timeout # mechanism to ensure it completes before the worker process times # out. Adding 5 seconds to the configured (or default) timeout to # ensure the underlying timeout mechanism has the opportunity to # abort the work being done. if timeout_in_secs is not None: timeout_in_secs += 5 if self._scheduler.running_task is not None: task_work = TaskWork(timeout_in_secs, target, *args, **kwargs) self._scheduler.running_task.add_task_work(task_work) self._result = None return task_work.id else: raise LookupError("Running task no longer running") def timer(self, name, interval_secs): """ Schedule a timer to be fired after so many milliseconds, callback is a co-routine that is sent the timer identifier that has fired """ if self._scheduler.running_task is not None: timer_id = self._scheduler.running_task.add_timer(name, interval_secs) return timer_id else: raise LookupError("Running task no longer running") def cancel_timer(self, timer_id): """ Cancel a scheduled timer """ if self._scheduler.running_task is not None: self._scheduler.running_task.cancel_timer(timer_id) else: raise LookupError("Running task no longer running") def io_read_wait(self, select_obj): """ Wait on a read selection object """ if self._scheduler.running_task is not None: self._scheduler.running_task.add_io_read_wait(select_obj) else: raise LookupError("Running task no longer running") def io_read_wait_cancel(self, select_obj): """ Cancel a wait on a read selection object """ if self._scheduler.running_task is not None: self._scheduler.running_task.cancel_io_read_wait(select_obj) else: raise LookupError("Running task no longer running") def io_write_wait(self, select_obj): """ Wait on a write selection object """ if self._scheduler.running_task is not None: self._scheduler.running_task.add_io_write_wait(select_obj) else: raise LookupError("Running task no longer running") def io_write_wait_cancel(self, select_obj): """ Cancel a wait on a write selection object """ if self._scheduler.running_task is not None: self._scheduler.running_task.cancel_io_write_wait(select_obj) else: raise LookupError("Running task no longer running") @property def result(self): """ Returns the result of a future """ return self._result @result.setter def result(self, result): """ Set the result of a future """ self._result = result