octavia/octavia/cmd/driver_agent.py

169 lines
6.0 KiB
Python

# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functools import partial
import multiprocessing
import os
import signal
import sys
import time
from oslo_config import cfg
from oslo_log import log as logging
from oslo_reports import guru_meditation_report as gmr
import setproctitle
from stevedore import enabled as stevedore_enabled
from octavia.api.drivers.driver_agent import driver_listener
from octavia.common import service
from octavia import version
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
PROVIDER_AGENT_PROCESSES = []
def _mutate_config(*args, **kwargs):
CONF.mutate_config_files()
def _handle_mutate_config(status_proc_pid, stats_proc_pid, *args, **kwargs):
LOG.info("Driver agent received HUP signal, mutating config.")
_mutate_config()
os.kill(status_proc_pid, signal.SIGHUP)
os.kill(stats_proc_pid, signal.SIGHUP)
def _check_if_provider_agent_enabled(extension):
if extension.name in CONF.driver_agent.enabled_provider_agents:
return True
return False
def _process_wrapper(exit_event, proc_name, function, agent_name=None):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGHUP, _mutate_config)
if agent_name:
process_title = 'octavia-driver-agent - {} -- {}'.format(
proc_name, agent_name)
else:
process_title = 'octavia-driver-agent - {}'.format(proc_name)
setproctitle.setproctitle(process_title)
while not exit_event.is_set():
try:
function(exit_event)
except Exception as e:
if agent_name:
LOG.exception('Provider agent "%s" raised exception: %s. '
'Restarting the "%s" provider agent.',
agent_name, str(e), agent_name)
else:
LOG.exception('%s raised exception: %s. '
'Restarting %s.',
proc_name, str(e), proc_name)
time.sleep(1)
continue
break
def _start_provider_agents(exit_event):
extensions = stevedore_enabled.EnabledExtensionManager(
namespace='octavia.driver_agent.provider_agents',
check_func=_check_if_provider_agent_enabled)
for ext in extensions:
ext_process = multiprocessing.Process(
name=ext.name, target=_process_wrapper,
args=(exit_event, 'provider_agent', ext.plugin),
kwargs={'agent_name': ext.name})
PROVIDER_AGENT_PROCESSES.append(ext_process)
ext_process.start()
LOG.info('Started enabled provider agent: "%s" with PID: %d.',
ext.name, ext_process.pid)
def main():
service.prepare_service(sys.argv)
gmr.TextGuruMeditation.setup_autorun(version)
processes = []
exit_event = multiprocessing.Event()
status_listener_proc = multiprocessing.Process(
name='status_listener', target=_process_wrapper,
args=(exit_event, 'status_listener', driver_listener.status_listener))
processes.append(status_listener_proc)
LOG.info("Driver agent status listener process starts:")
status_listener_proc.start()
stats_listener_proc = multiprocessing.Process(
name='stats_listener', target=_process_wrapper,
args=(exit_event, 'stats_listener', driver_listener.stats_listener))
processes.append(stats_listener_proc)
LOG.info("Driver agent statistics listener process starts:")
stats_listener_proc.start()
get_listener_proc = multiprocessing.Process(
name='get_listener', target=_process_wrapper,
args=(exit_event, 'get_listener', driver_listener.get_listener))
processes.append(get_listener_proc)
LOG.info("Driver agent get listener process starts:")
get_listener_proc.start()
_start_provider_agents(exit_event)
def process_cleanup(*args, **kwargs):
LOG.info("Driver agent exiting due to signal.")
exit_event.set()
status_listener_proc.join()
stats_listener_proc.join()
get_listener_proc.join()
for proc in PROVIDER_AGENT_PROCESSES:
LOG.info('Waiting up to %s seconds for provider agent "%s" to '
'shutdown.',
CONF.driver_agent.provider_agent_shutdown_timeout,
proc.name)
try:
proc.join(CONF.driver_agent.provider_agent_shutdown_timeout)
if proc.exitcode is None:
# TODO(johnsom) Change to proc.kill() once
# python 3.7 or newer only
os.kill(proc.pid, signal.SIGKILL)
LOG.warning(
'Forcefully killed "%s" provider agent because it '
'failed to shutdown in %s seconds.', proc.name,
CONF.driver_agent.provider_agent_shutdown_timeout)
except Exception as e:
LOG.warning('Unknown error "%s" while shutting down "%s", '
'ignoring and continuing shutdown process.',
str(e), proc.name)
else:
LOG.info('Provider agent "%s" has succesfully shutdown.',
proc.name)
signal.signal(signal.SIGTERM, process_cleanup)
signal.signal(signal.SIGHUP, partial(
_handle_mutate_config, status_listener_proc.pid,
stats_listener_proc.pid, get_listener_proc.pid))
try:
for process in processes:
process.join()
except KeyboardInterrupt:
process_cleanup()