Release v0.4.2:

- WAMP alive RPC refactored
 - WSTUN tunnels monitoring: added recovery procedure
 - NGINX proxy updated
 - Iotronic oslo conf refactored
 - Pyinotify requirement added (fixed)

Change-Id: I877d6b8e8e401efaca315a4e8917d73ebbeb593c
This commit is contained in:
Nicola Peditto 2018-12-05 18:25:38 +01:00
parent 9bf78aa2da
commit 84678996b0
9 changed files with 242 additions and 105 deletions

View File

@ -2,7 +2,13 @@
lightningrod_home = /var/lib/iotronic lightningrod_home = /var/lib/iotronic
skip_cert_verify = True skip_cert_verify = True
debug = True debug = True
proxy = nginx
log_file = /var/log/iotronic/lightning-rod.log log_file = /var/log/iotronic/lightning-rod.log
connection_timer = 10
alive_timer = 600 alive_timer = 600
rpc_alive_timer = 3 rpc_alive_timer = 3
[services]
wstun_bin = /usr/bin/wstun
[webservices]
proxy = nginx

View File

@ -41,26 +41,10 @@ def manageTimeout(error_message, action):
LOG.warning("Iotronic RPC-ALIVE timeout error: " + str(e)) LOG.warning("Iotronic RPC-ALIVE timeout error: " + str(e))
else: else:
LOG.warning("Board connection call timeout: " + str(details)) LOG.warning("Board connection call timeout ["
+ str(action) + "]: " + str(details))
utils.LR_restart() utils.LR_restart()
"""
def manageTimeoutALIVE(error_message, action):
try:
raise TimeoutError(error_message, action)
except TimeoutError as err:
details = err.args[0]
LOG.warning("Iotronic RPC-ALIVE timeout details: " + str(details))
try:
utils.destroyWampSocket()
except Exception as e:
LOG.warning("Iotronic RPC-ALIVE timeout error: " + str(e))
"""
class NginxError(Exception): class NginxError(Exception):

View File

@ -57,6 +57,9 @@ lr_opts = [
default=True, default=True,
help=('Flag for skipping the verification of the server cert ' help=('Flag for skipping the verification of the server cert '
'(for the auto-signed ones)')), '(for the auto-signed ones)')),
cfg.IntOpt('connection_timer',
default=10,
help=('IoTronic connection RPC timer')),
cfg.IntOpt('alive_timer', cfg.IntOpt('alive_timer',
default=600, default=600,
help=('Wamp websocket check time')), help=('Wamp websocket check time')),
@ -65,18 +68,8 @@ lr_opts = [
help=('RPC alive response time threshold')), help=('RPC alive response time threshold')),
] ]
proxy_opts = [
cfg.StrOpt(
'proxy',
choices=[('nginx', ('nginx proxy')), ],
help=('Proxy for WebServices Manager')
),
]
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(lr_opts) CONF.register_opts(lr_opts)
CONF.register_opts(proxy_opts)
SESSION = None SESSION = None
global board global board
@ -194,9 +187,9 @@ async def wamp_checks(session):
with timeoutALIVE(seconds=CONF.rpc_alive_timer, action="ws_alive"): with timeoutALIVE(seconds=CONF.rpc_alive_timer, action="ws_alive"):
res = await session.call( res = await session.call(
str(board.agent) + u'.stack4things.alive' str(board.agent) + u'.stack4things.wamp_alive',
# board_uuid=board.uuid, board_uuid=board.uuid,
# board_name=board.name board_name=board.name
) )
LOG.debug("WampCheck attempt " + str(res)) LOG.debug("WampCheck attempt " + str(res))
@ -237,7 +230,7 @@ async def IotronicLogin(board, session, details):
rpc = str(board.agent) + u'.stack4things.connection' rpc = str(board.agent) + u'.stack4things.connection'
with timeoutRPC(seconds=5, action=rpc): with timeoutRPC(seconds=CONF.connection_timer, action=rpc):
res = await session.call( res = await session.call(
rpc, rpc,
uuid=board.uuid, uuid=board.uuid,
@ -515,7 +508,7 @@ def wampConnect(wamp_conf):
rpc = str(board.agent) + u'.stack4things.connection' rpc = str(board.agent) + u'.stack4things.connection'
with timeoutRPC(seconds=5, action=rpc): with timeoutRPC(seconds=CONF.connection_timer, action=rpc):
res = await session.call( res = await session.call(
rpc, rpc,
uuid=board.uuid, uuid=board.uuid,

View File

@ -51,22 +51,54 @@ class ProxyManager(Proxy.Proxy):
nginxMsg = {} nginxMsg = {}
try: try:
stat = subprocess.Popen('systemctl status nginx.service',
shell=True, stdout=subprocess.PIPE)
stdout_list = str(stat.communicate()[0]).split('\n')
for line in stdout_list:
if 'Active:' in line:
nginxMsg['log'] = line.split('\\n')[2].replace(" ", "") get_service = 'pidof systemd > /dev/null ' \
'&& echo "systemd" || echo "init.d"'
service_cmd = subprocess.Popen(get_service,
shell=True, stdout=subprocess.PIPE)
if '(running)' in line: service_mng = \
nginxMsg['status'] = True service_cmd.communicate()[0].decode("utf-8").split("\n")[0]
else:
nginxMsg['status'] = False
nginxMsg = json.dumps(nginxMsg) if service_mng == 'init.d':
# print('INIT')
stat = subprocess.Popen('service nginx status',
shell=True, stdout=subprocess.PIPE)
stdout_list = stat.communicate()[0].decode("utf-8").split("\n")
return nginxMsg for line in stdout_list:
if 'running' in line:
nginxMsg['log'] = stdout_list[0]
if 'running' in line:
nginxMsg['status'] = True
else:
nginxMsg['status'] = False
nginxMsg = json.dumps(nginxMsg)
return nginxMsg
elif service_mng == 'systemd':
# print('SYSTEMD')
stat = subprocess.Popen('systemctl status nginx.service',
shell=True, stdout=subprocess.PIPE)
stdout_list = str(stat.communicate()[0]).split('\n')
for line in stdout_list:
if 'Active:' in line:
nginxMsg['log'] = \
line.split('\\n')[2].replace(" ", "")
if '(running)' in line:
nginxMsg['status'] = True
else:
nginxMsg['status'] = False
nginxMsg = json.dumps(nginxMsg)
return nginxMsg
except Exception as err: except Exception as err:
LOG.error("Error check NGINX status: " + str(err)) LOG.error("Error check NGINX status: " + str(err))
@ -109,6 +141,8 @@ class ProxyManager(Proxy.Proxy):
nginxMsg = {} nginxMsg = {}
stat = None
try: try:
stat = subprocess.call('service nginx reload', shell=True) stat = subprocess.call('service nginx reload', shell=True)

View File

@ -19,10 +19,10 @@ import errno
import json import json
import os import os
import psutil import psutil
import pyinotify
import signal import signal
import subprocess import subprocess
import time import time
import traceback
from datetime import datetime from datetime import datetime
from threading import Thread from threading import Thread
@ -40,7 +40,24 @@ from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
wstun_opts = [
cfg.StrOpt(
'wstun_bin',
default='/usr/bin/wstun',
help=('WSTUN bin for Services Manager')
),
]
CONF = cfg.CONF CONF = cfg.CONF
service_group = cfg.OptGroup(
name='services', title='Services options'
)
CONF.register_group(service_group)
CONF.register_opts(wstun_opts, group=service_group)
SERVICES_CONF_FILE = CONF.lightningrod_home + "/services.json" SERVICES_CONF_FILE = CONF.lightningrod_home + "/services.json"
@ -49,6 +66,8 @@ class ServiceManager(Module.Module):
def __init__(self, board, session): def __init__(self, board, session):
super(ServiceManager, self).__init__("ServiceManager", board) super(ServiceManager, self).__init__("ServiceManager", board)
print("\nWSTUN bin path: " + str(CONF.services.wstun_bin))
self.wstun_ip = urlparse(board.wamp_config["url"])[1].split(':')[0] self.wstun_ip = urlparse(board.wamp_config["url"])[1].split(':')[0]
self.wstun_port = "8080" self.wstun_port = "8080"
@ -126,6 +145,7 @@ class ServiceManager(Module.Module):
wstun = self._startWstun(public_port, local_port, event="boot") wstun = self._startWstun(public_port, local_port, event="boot")
if wstun != None: if wstun != None:
service_pid = wstun.pid service_pid = wstun.pid
# 3. Update services.json file # 3. Update services.json file
@ -155,6 +175,52 @@ class ServiceManager(Module.Module):
signal.signal(signal.SIGCHLD, self._zombie_hunter) signal.signal(signal.SIGCHLD, self._zombie_hunter)
def restore(self):
LOG.info("Cloud service tunnels to restore:")
# Load services.json configuration file
services_conf = self._loadServicesConf()
if len(services_conf['services']) != 0:
wstun_process_list = []
# No zombie alert activation
lightningrod.zombie_alert = False
LOG.debug("[WSTUN-RESTORE] - Restore zombie_alert: " + str(
lightningrod.zombie_alert))
# Collect all alive WSTUN proccesses
for p in psutil.process_iter():
if (p.name() == "node"):
if (p.status() == psutil.STATUS_ZOMBIE):
LOG.warning("WSTUN ZOMBIE: " + str(p))
wstun_process_list.append(p)
elif ("wstun" in p.cmdline()[1]):
LOG.warning("WSTUN ALIVE: " + str(p))
wstun_process_list.append(p)
psutil.Process(p.pid).kill()
LOG.warning(" --> PID " + str(p.pid) + " killed!")
LOG.debug("[WSTUN-RESTORE] - WSTUN processes to restore:\n"
+ str(wstun_process_list))
for service_uuid in services_conf['services']:
Thread(
target=self._restoreWSTUN,
args=(services_conf, service_uuid,)
).start()
time.sleep(2)
# Reactivate zombies monitoring
if not lightningrod.zombie_alert:
lightningrod.zombie_alert = True
else:
LOG.info(" --> No service tunnels to restore.")
def _zombie_hunter(self, signum, frame): def _zombie_hunter(self, signum, frame):
wstun_found = False wstun_found = False
@ -282,52 +348,6 @@ class ServiceManager(Module.Module):
+ " service tunnel!" + " service tunnel!"
LOG.error(" - " + message) LOG.error(" - " + message)
def restore(self):
LOG.info("Cloud service tunnels to restore:")
# Load services.json configuration file
services_conf = self._loadServicesConf()
if len(services_conf['services']) != 0:
wstun_process_list = []
# No zombie alert activation
lightningrod.zombie_alert = False
LOG.debug("[WSTUN-RESTORE] - Restore zombie_alert: " + str(
lightningrod.zombie_alert))
# Collect all alive WSTUN proccesses
for p in psutil.process_iter():
if (p.name() == "node"):
if (p.status() == psutil.STATUS_ZOMBIE):
LOG.warning("WSTUN ZOMBIE: " + str(p))
wstun_process_list.append(p)
elif ("wstun" in p.cmdline()[1]):
LOG.warning("WSTUN ALIVE: " + str(p))
wstun_process_list.append(p)
psutil.Process(p.pid).kill()
LOG.warning(" --> PID " + str(p.pid) + " killed!")
LOG.debug("[WSTUN-RESTORE] - WSTUN processes to restore:\n"
+ str(wstun_process_list))
for service_uuid in services_conf['services']:
Thread(
target=self._restoreWSTUN,
args=(services_conf, service_uuid,)
).start()
time.sleep(2)
# Reactivate zombies monitoring
if not lightningrod.zombie_alert:
lightningrod.zombie_alert = True
else:
LOG.info(" --> No service tunnels to restore.")
def _loadServicesConf(self): def _loadServicesConf(self):
"""Load services.json JSON configuration. """Load services.json JSON configuration.
@ -347,24 +367,106 @@ class ServiceManager(Module.Module):
return services_conf return services_conf
def _wstunMon(self, wstun):
wfd_check = True
while (wfd_check):
try:
wp = psutil.Process(int(wstun.pid))
wstun_fd = wp.connections()[0].fd
if len(wp.connections()) != 0:
LOG.debug("WSTUN alive socket: " + str(wp.connections()))
wfd_check = False
except IndexError as err:
# LOG.error(str(err) + " - RETRY...")
pass
time.sleep(1)
class EventProcessor(pyinotify.ProcessEvent):
_methods = [
# "IN_CREATE",
# "IN_OPEN",
# "IN_ACCESS",
# "IN_ATTRIB",
"IN_CLOSE_NOWRITE",
"IN_CLOSE_WRITE",
"IN_DELETE",
"IN_DELETE_SELF",
# "IN_IGNORED",
# "IN_MODIFY",
# "IN_MOVE_SELF",
# "IN_MOVED_FROM",
# "IN_MOVED_TO",
# "IN_Q_OVERFLOW",
# "IN_UNMOUNT",
"default"
]
def process_generator(cls, method):
def _method_name(self, event):
if(event.maskname == "IN_CLOSE_WRITE"):
LOG.info("WSTUN FD SOCKET CLOSED: " + str(event.pathname))
LOG.debug(
"\nMethod name: process_{}()\n"
"Path name: {}\n"
"Event Name: {}\n".format(
method, event.pathname, event.maskname
)
)
os.kill(wstun.pid, signal.SIGKILL)
_method_name.__name__ = "process_{}".format(method)
setattr(cls, _method_name.__name__, _method_name)
for method in EventProcessor._methods:
process_generator(EventProcessor, method)
watch_manager = pyinotify.WatchManager()
event_notifier = pyinotify.ThreadedNotifier(
watch_manager, EventProcessor()
)
watch_this = os.path.abspath(
"/proc/" + str(wstun.pid) + "/fd/" + str(wstun_fd)
)
watch_manager.add_watch(watch_this, pyinotify.ALL_EVENTS)
event_notifier.start()
def _startWstun(self, public_port, local_port, event="no-set"): def _startWstun(self, public_port, local_port, event="no-set"):
opt_reverse = "-r" + str(public_port) + ":127.0.0.1:" + str(local_port) opt_reverse = "-r" + str(public_port) + ":127.0.0.1:" + str(local_port)
try: try:
wstun = subprocess.Popen( wstun = subprocess.Popen(
['/usr/bin/wstun', opt_reverse, self.wstun_url], [CONF.services.wstun_bin, opt_reverse, self.wstun_url],
stdout=subprocess.PIPE stdout=subprocess.PIPE
) )
if(event != "boot"): if(event != "boot"):
print("WSTUN start event:") print("WSTUN start event:")
cmd_print = 'WSTUN exec: /usr/bin/wstun ' \ cmd_print = 'WSTUN exec: ' + str(CONF.services.wstun_bin) \
+ opt_reverse + ' ' + self.wstun_url + opt_reverse + ' ' + self.wstun_url
print(" - " + str(cmd_print)) print(" - " + str(cmd_print))
LOG.debug(cmd_print) LOG.debug(cmd_print)
# WSTUN MON
# ##############################################################
Thread(
target=self._wstunMon,
args=(wstun,)
).start()
# self._wstunMon(wstun)
# ##############################################################
except Exception as err: except Exception as err:
LOG.error("Error spawning WSTUN process: " + str(err)) LOG.error("Error spawning WSTUN process: " + str(err))
wstun = None wstun = None

View File

@ -22,11 +22,6 @@ from iotronic_lightningrod.modules import Module
from iotronic_lightningrod.modules import utils from iotronic_lightningrod.modules import utils
import iotronic_lightningrod.wampmessage as WM import iotronic_lightningrod.wampmessage as WM
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
import importlib as imp import importlib as imp
import inspect import inspect
@ -35,16 +30,36 @@ import OpenSSL.crypto
import os import os
import time import time
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
proxy_opts = [
cfg.StrOpt(
'proxy',
choices=[('nginx', ('nginx proxy')), ],
help=('Proxy for WebServices Manager')
),
]
CONF = cfg.CONF
webservice_group = cfg.OptGroup(
name='webservices', title='WebServices options'
)
CONF.register_group(webservice_group)
CONF.register_opts(proxy_opts, group=webservice_group)
class WebServiceManager(Module.Module): class WebServiceManager(Module.Module):
def __init__(self, board, session): def __init__(self, board, session):
super(WebServiceManager, self).__init__("WebServiceManager", board) super(WebServiceManager, self).__init__("WebServiceManager", board)
LOG.info(" - Proxy used: " + CONF.proxy.upper()) LOG.info(" - Proxy used: " + CONF.webservices.proxy.upper())
try: try:
proxy_type = CONF.proxy proxy_type = CONF.webservices.proxy
path = package_path + "/modules/proxies/" + proxy_type + ".py" path = package_path + "/modules/proxies/" + proxy_type + ".py"
if os.path.exists(path): if os.path.exists(path):

View File

@ -9,3 +9,4 @@ httplib2>=0.9.1 # MIT
psutil>=5.4.7 # BSD psutil>=5.4.7 # BSD
oslo.config>=5.1.0 # Apache-2.0 oslo.config>=5.1.0 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0 oslo.log>=3.36.0 # Apache-2.0
pyinotify>=0.9.6;sys_platform!='win32' and sys_platform!='darwin' and sys_platform!='sunos5' # MIT

View File

@ -55,7 +55,9 @@ else:
+ 'etc/iotronic/iotronic.conf /etc/iotronic/iotronic.conf') + 'etc/iotronic/iotronic.conf /etc/iotronic/iotronic.conf')
print(' - iotronic.conf - Created.') print(' - iotronic.conf - Created.')
else: else:
print(' - iotronic.conf - Already exists.') os.system('cp ' + py_dist_pack + '/iotronic_lightningrod/'
+ 'etc/iotronic/iotronic.conf /etc/iotronic/iotronic.conf')
print(' - iotronic.conf - Overwritten.')
if not os.path.exists('/var/lib/iotronic/'): if not os.path.exists('/var/lib/iotronic/'):

View File

@ -51,4 +51,4 @@ basepython = python3.5
show-source = True show-source = True
builtins = _ builtins = _
ignore = E711,E712,H404,H405,E123,E125,E901 ignore = E711,E712,H404,H405,E123,E125,E901
exclude = .venv,.git,.tox,dist,doc,etc,*lib/python*,*egg,build,iotronic_lightningrod/plugins/plugins_examples/ exclude = .venv,.git,.tox,dist,doc,etc,*lib/python*,*egg,build,iotronic_lightningrod/plugins/plugins_examples/,STUFF