Updated Module class: restore method was added and all modules was upgraded. Service Manager upgraded: services restore management added.

Change-Id: I1ca9bb8dfe28f5678e213a2af42390ec7136194d
This commit is contained in:
Nicola Peditto 2018-03-07 11:14:30 +01:00
parent b817d23bd5
commit e884ed07cb
13 changed files with 673 additions and 321 deletions

View File

@ -15,9 +15,8 @@ board-side probe.
Installation guides Installation guides
------------------- -------------------
* `Arduino YUN <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/arduino_yun.rst>`_. * `Raspberry Pi 3 <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/raspberry_pi_3.rst>`_.
* `Ubuntu 16.04 <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/ubuntu1604.rst>`_. * `Ubuntu 16.04 <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/ubuntu1604.rst>`_.
* `Raspberry Pi 3 <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/raspberry_pi_3.rst>`_. * `Arduino YUN <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/arduino_yun.rst>`_.

View File

@ -1,8 +1,7 @@
IoTronic Lightning-rod installation guide for Arduino YUN IoTronic Lightning-rod installation guide for Arduino YUN
========================================================= =========================================================
We tested this procedure on a Arduino YUN board with OpenWRT LininoIO We tested this procedure on a Arduino YUN board with OpenWRT LininoIO image.
image.
Install from source code Install from source code
------------------------ ------------------------
@ -25,28 +24,13 @@ Install dependencies
:: ::
opkg install git bzip2 python-netifaces opkg install git bzip2 python-netifaces
pip install --no-cache-dir zope.interface pyserial Babel oslo.config oslo.log pip install --no-cache-dir zope.interface pyserial Babel oslo.config
oslo.log
easy_install httplib2 easy_install httplib2
Install Autobahn: Install Autobahn:
''''''''''''''''' '''''''''''''''''
::
# Install Twisted:
wget --no-check-certificate https://pypi.python.org/packages/source/T/Twisted/Twisted-14.0.2.tar.bz2
bzip2 -d Twisted-14.0.2.tar.bz2
tar -xvf Twisted-14.0.2.tar
cd Twisted-14.0.2/
vi setup.py
comment line 63:
#conditionalExtensions=getExtensions(),
python setup.py install
cd /opt/
rm -rf /opt/Twisted-14.0.2*
:: ::
easy_install autobahn easy_install autobahn
@ -82,6 +66,7 @@ Deployment
cp etc/iotronic/iotronic.conf /etc/iotronic/ cp etc/iotronic/iotronic.conf /etc/iotronic/
cp settings.example.json /var/lib/iotronic/settings.json cp settings.example.json /var/lib/iotronic/settings.json
cp plugins.example.json /var/lib/iotronic/plugins.json cp plugins.example.json /var/lib/iotronic/plugins.json
cp services.example.json /var/lib/iotronic/services.json
cp etc/init.d/lightning-rod /etc/init.d/lightning-rod cp etc/init.d/lightning-rod /etc/init.d/lightning-rod
chmod +x /etc/init.d/lightning-rod chmod +x /etc/init.d/lightning-rod
touch /var/log/iotronic/lightning-rod.log touch /var/log/iotronic/lightning-rod.log

View File

@ -11,7 +11,7 @@ Install requirements
:: ::
pip install oslo-config oslo_log twisted autobahn httplib2 pip install oslo.config oslo.log asyncio autobahn httplib2 psutil six
Set up environment: Set up environment:
~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
@ -44,6 +44,7 @@ Deployment
cp etc/iotronic/iotronic.conf /etc/iotronic/ cp etc/iotronic/iotronic.conf /etc/iotronic/
cp settings.example.json /var/lib/iotronic/settings.json cp settings.example.json /var/lib/iotronic/settings.json
cp plugins.example.json /var/lib/iotronic/plugins.json cp plugins.example.json /var/lib/iotronic/plugins.json
cp services.example.json /var/lib/iotronic/services.json
cp etc/systemd/system/s4t-lightning-rod.service /etc/systemd/system/lightning-rod.service cp etc/systemd/system/s4t-lightning-rod.service /etc/systemd/system/lightning-rod.service
chmod +x /etc/systemd/system/lightning-rod.service chmod +x /etc/systemd/system/lightning-rod.service
systemctl daemon-reload systemctl daemon-reload

View File

@ -12,7 +12,7 @@ Install requirements
:: ::
pip install oslo-config oslo_log twisted autobahn httplib2 pip install oslo.config oslo.log asyncio autobahn httplib2 psutil six
Set up environment: Set up environment:
~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
@ -45,6 +45,7 @@ Deployment
cp etc/iotronic/iotronic.conf /etc/iotronic/ cp etc/iotronic/iotronic.conf /etc/iotronic/
cp settings.example.json /var/lib/iotronic/settings.json cp settings.example.json /var/lib/iotronic/settings.json
cp plugins.example.json /var/lib/iotronic/plugins.json cp plugins.example.json /var/lib/iotronic/plugins.json
cp services.example.json /var/lib/iotronic/services.json
cp etc/systemd/system/s4t-lightning-rod.service /etc/systemd/system/lightning-rod.service cp etc/systemd/system/s4t-lightning-rod.service /etc/systemd/system/lightning-rod.service
chmod +x /etc/systemd/system/lightning-rod.service chmod +x /etc/systemd/system/lightning-rod.service
systemctl daemon-reload systemctl daemon-reload

View File

@ -1,4 +1,4 @@
[DEFAULT] [DEFAULT]
debug = True debug = True
log_file = /var/log/iotronic/lightning-rod.log log_file = /var/log/iotronic/lightning-rod.log
lightningrod_home = /var/lib/iotronic lightningrod_home = /var/lib/iotronic

View File

@ -66,6 +66,9 @@ txaio.start_logging(level="info")
RUNNER = None RUNNER = None
connected = False connected = False
global MODULES
MODULES = {}
def moduleReloadInfo(session): def moduleReloadInfo(session):
"""This function is used in the reconnection stage to register """This function is used in the reconnection stage to register
@ -76,18 +79,22 @@ def moduleReloadInfo(session):
""" """
LOG.info("Modules reloading after WAMP recovery...") LOG.info("\n\nModules reloading after WAMP recovery...\n\n")
try: try:
# Register RPCs for each Lightning-rod module # Call module restore procedures and
for mod in RPC: # register RPCs for each Lightning-rod module
LOG.info("- Reloading module RPcs for " + str(mod)) for mod_name in MODULES:
moduleWampRegister(session, RPC[mod]) LOG.info("- Registering RPCs for module " + str(mod_name))
moduleWampRegister(session, RPC[mod_name])
LOG.info("- Restoring module " + str(mod_name))
MODULES[mod_name].restore()
# Register RPCs for the device # Register RPCs for the device
for dev in RPC_devices: for dev in RPC_devices:
LOG.info("- Reloading device RPCs for " + str(dev)) LOG.info("- Registering RPCs for device " + str(dev))
moduleWampRegister(session, RPC_devices[dev]) moduleWampRegister(session, RPC_devices[dev])
except Exception as err: except Exception as err:
@ -110,13 +117,15 @@ def moduleWampRegister(session, meth_list):
else: else:
for meth in meth_list: for meth in meth_list:
# We don't considere the __init__ and finalize methods # We don't considere the "__init__", "finalize" and
if (meth[0] != "__init__") & (meth[0] != "finalize"): # "restore" methods
if (meth[0] != "__init__") & (meth[0] != "finalize") \
& (meth[0] != "restore"):
rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0] rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0]
session.register(meth[1], rpc_addr) if not meth[0].startswith('_'):
session.register(meth[1], rpc_addr)
LOG.info(" --> " + str(meth[0])) LOG.info(" --> " + str(meth[0]))
def modulesLoader(session): def modulesLoader(session):
@ -158,13 +167,16 @@ def modulesLoader(session):
else: else:
mod = ext.plugin(board, session) mod = ext.plugin(board, session)
global MODULES
MODULES[mod.name] = mod
# Methods list for each module # Methods list for each module
meth_list = inspect.getmembers(mod, predicate=inspect.ismethod) meth_list = inspect.getmembers(mod, predicate=inspect.ismethod)
global RPC global RPC
RPC[mod.name] = meth_list RPC[mod.name] = meth_list
if len(meth_list) == 2: if len(meth_list) == 3:
# there are at least two methods for each module: # there are at least two methods for each module:
# "__init__" and "finalize" # "__init__" and "finalize"
@ -222,8 +234,7 @@ async def IotronicLogin(board, session, details):
modulesLoader(session) modulesLoader(session)
except Exception as e: except Exception as e:
LOG.warning("WARNING - Could not register procedures: " LOG.warning("WARNING - Could not load modules: " + str(e))
+ str(e))
# Reset flag to False # Reset flag to False
# reconnection = False # reconnection = False
@ -257,10 +268,10 @@ def wampConnect(wamp_conf):
try: try:
LOG.info("WAMP status:" + LOG.info("WAMP status @ boot:" +
"\n- board = " + str(board.status) + "\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) + "\n- reconnection = " + str(reconnection) +
"\n- connection = " + str(connected) "\n- connected = " + str(connected)
) )
# LR creates the Autobahn Asyncio Component that points to the # LR creates the Autobahn Asyncio Component that points to the
@ -406,10 +417,10 @@ def wampConnect(wamp_conf):
LOG.info("\n\n\nBoard is becoming operative...\n\n\n") LOG.info("\n\n\nBoard is becoming operative...\n\n\n")
board.updateStatus("operative") board.updateStatus("operative")
board.loadSettings() board.loadSettings()
LOG.info("WAMP status:" + LOG.info("WAMP status @ firt connection:" +
"\n- board = " + str(board.status) + "\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) + "\n- reconnection = " + str(reconnection) +
"\n- connection = " + str(connected) "\n- connected = " + str(connected)
) )
await IotronicLogin(board, session, details) await IotronicLogin(board, session, details)
@ -471,7 +482,7 @@ def wampConnect(wamp_conf):
except Exception as e: except Exception as e:
LOG.warning( LOG.warning(
"WARNING - Could not register procedures: " "WARNING - Could not reload modules: "
+ str(e)) + str(e))
Bye() Bye()
@ -481,7 +492,7 @@ def wampConnect(wamp_conf):
Bye() Bye()
except exception.ApplicationError as e: except exception.ApplicationError as e:
LOG.error("IoTronic connection error: " + str(e)) LOG.error("IoTronic connection error:\n" + str(e))
# Iotronic is offline the board can not call # Iotronic is offline the board can not call
# the "stack4things.connection" RPC. # the "stack4things.connection" RPC.
# The board will disconnect from WAMP agent and retry later # The board will disconnect from WAMP agent and retry later
@ -517,10 +528,10 @@ def wampConnect(wamp_conf):
global reconnection global reconnection
LOG.info("WAMP status:" + LOG.info("WAMP status on disconnect:" +
"\n- board = " + str(board.status) + "\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) + "\n- reconnection = " + str(reconnection) +
"\n- connection = " + str(connected) "\n- connected = " + str(connected)
) )
if board.status == "operative" and reconnection is False: if board.status == "operative" and reconnection is False:
@ -579,7 +590,7 @@ def wampConnect(wamp_conf):
LOG.error("Reconnection wrong status!") LOG.error("Reconnection wrong status!")
except Exception as err: except Exception as err:
LOG.error(" - URI validation error: " + str(err)) LOG.error(" - WAMP connection error: " + str(err))
Bye() Bye()

View File

@ -28,8 +28,6 @@ class Module(object):
""" """
# __metaclass__ = abc.ABCMeta
def __init__(self, name, board): def __init__(self, name, board):
self.name = name self.name = name
@ -40,3 +38,7 @@ class Module(object):
@abc.abstractmethod @abc.abstractmethod
def finalize(self): def finalize(self):
pass pass
@abc.abstractmethod
def restore(self):
pass

View File

@ -29,22 +29,6 @@ from oslo_log import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def deviceWampRegister(dev_meth_list, board):
LOG.info(" - " + str(board.type).capitalize()
+ " device registering RPCs:")
for meth in dev_meth_list:
if (meth[0] != "__init__") & (meth[0] != "finalize"):
# LOG.info(" - " + str(meth[0]))
rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0]
# LOG.debug(" --> " + str(rpc_addr))
SESSION.register(meth[1], rpc_addr)
LOG.info(" --> " + str(meth[0]) + " registered!")
class DeviceManager(Module.Module): class DeviceManager(Module.Module):
def __init__(self, board, session): def __init__(self, board, session):
@ -71,7 +55,7 @@ class DeviceManager(Module.Module):
RPC_devices[device_type] = dev_meth_list RPC_devices[device_type] = dev_meth_list
deviceWampRegister(dev_meth_list, board) self._deviceWampRegister(dev_meth_list, board)
board.device = device board.device = device
@ -80,3 +64,21 @@ class DeviceManager(Module.Module):
def finalize(self): def finalize(self):
pass pass
def restore(self):
pass
def _deviceWampRegister(self, dev_meth_list, board):
LOG.info(" - " + str(board.type).capitalize()
+ " device registering RPCs:")
for meth in dev_meth_list:
if (meth[0] != "__init__") & (meth[0] != "finalize"):
# LOG.info(" - " + str(meth[0]))
rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0]
# LOG.debug(" --> " + str(rpc_addr))
SESSION.register(meth[1], rpc_addr)
LOG.info(" --> " + str(meth[0]) + " registered!")

View File

@ -18,7 +18,6 @@ __author__ = "Nicola Peditto <npeditto@unime.it"
from datetime import datetime from datetime import datetime
import imp import imp
import inspect
import json import json
import os import os
import queue import queue
@ -27,9 +26,11 @@ import time
from iotronic_lightningrod.modules import Module from iotronic_lightningrod.modules import Module
from iotronic_lightningrod.modules import utils
from iotronic_lightningrod.plugins import PluginSerializer from iotronic_lightningrod.plugins import PluginSerializer
import iotronic_lightningrod.wampmessage as WM import iotronic_lightningrod.wampmessage as WM
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -39,162 +40,6 @@ PLUGINS_THRS = {}
PLUGINS_CONF_FILE = CONF.lightningrod_home + "/plugins.json" PLUGINS_CONF_FILE = CONF.lightningrod_home + "/plugins.json"
def getFuncName():
return inspect.stack()[1][3]
def createPluginsConf():
"""Create plugins.json file if it does not exist.
"""
if not os.path.exists(PLUGINS_CONF_FILE):
LOG.debug("plugins.json does not exist: creating...")
plugins_conf = {'plugins': {}}
with open(PLUGINS_CONF_FILE, 'w') as f:
json.dump(plugins_conf, f, indent=4)
def loadPluginsConf():
"""Load plugins.json JSON configuration.
:return: JSON Plugins configuration
"""
try:
with open(PLUGINS_CONF_FILE) as settings:
plugins_conf = json.load(settings)
except Exception as err:
LOG.error("Parsing error in " + PLUGINS_CONF_FILE + ": " + str(err))
plugins_conf = None
return plugins_conf
def getEnabledPlugins():
"""This function gets the list of all asynchronous plugins.
We considered only those plugins with 'callable' flag set to False
and 'onboot' flag set to True.
:return: enabledPlugins List
"""
enabledPlugins = []
plugins_conf = loadPluginsConf()
for plugin in plugins_conf['plugins']:
if plugins_conf['plugins'][plugin]['callable'] is False:
if plugins_conf['plugins'][plugin]['onboot'] is True:
if plugins_conf['plugins'][plugin]['status'] == "operative":
enabledPlugins.append(plugin)
if len(enabledPlugins) != 0:
LOG.info(" - Enabled plugins list: " + str(enabledPlugins))
return enabledPlugins
def makeNothing():
"""Sandbox function.
"""
pass
def RebootOnBootPlugins():
"""Reboot at boot each enabled asynchronous plugin
:return:
"""
rpc_name = getFuncName()
LOG.info("Rebooting enabled plugins:")
enabledPlugins = getEnabledPlugins()
if enabledPlugins.__len__() == 0:
message = "No plugin to reboot!"
LOG.info(" - " + message)
else:
for plugin_uuid in enabledPlugins:
plugins_conf = loadPluginsConf()
plugin_name = plugins_conf['plugins'][plugin_uuid]['name']
# plugin_status = plugins_conf['plugins'][plugin_uuid]['status']
try:
if (plugin_uuid in PLUGINS_THRS) and (
PLUGINS_THRS[plugin_uuid].isAlive()
):
LOG.warning(" - Plugin "
+ plugin_uuid + " already started!")
else:
LOG.info(" - Rebooting plugin " + plugin_uuid)
plugin_home = CONF.lightningrod_home + "/plugins/" \
+ plugin_uuid
plugin_filename = plugin_home + "/" + plugin_uuid + ".py"
plugin_params_file = \
plugin_home + "/" + plugin_uuid + ".json"
if os.path.exists(plugin_filename):
task = imp.load_source("plugin", plugin_filename)
if os.path.exists(plugin_params_file):
with open(plugin_params_file) as conf:
plugin_params = json.load(conf)
worker = task.Worker(
plugin_uuid,
plugin_name,
q_result=None,
params=plugin_params
)
PLUGINS_THRS[plugin_uuid] = worker
LOG.info(" - Starting plugin " + str(worker))
worker.start()
else:
message = "ERROR " \
+ plugin_params_file + " does not exist!"
LOG.error(" - "
+ worker.complete(rpc_name, message))
else:
message = "ERROR " \
+ plugin_filename + " does not exist!"
LOG.error(" - " + worker.complete(rpc_name, message))
message = "rebooted!"
LOG.info(" - " + worker.complete(rpc_name, message))
except Exception as err:
message = "Error rebooting plugin " \
+ plugin_uuid + ": " + str(err)
LOG.error(" - " + message)
class PluginManager(Module.Module): class PluginManager(Module.Module):
"""Plugin module to manage board plugins. """Plugin module to manage board plugins.
@ -213,7 +58,7 @@ class PluginManager(Module.Module):
super(PluginManager, self).__init__("PluginManager", board) super(PluginManager, self).__init__("PluginManager", board)
# Creation of plugins.json configuration file # Creation of plugins.json configuration file
createPluginsConf() self._createPluginsConf()
def finalize(self): def finalize(self):
"""Function called at the end of module loading. """Function called at the end of module loading.
@ -224,7 +69,154 @@ class PluginManager(Module.Module):
""" """
# Reboot boot enabled plugins # Reboot boot enabled plugins
RebootOnBootPlugins() self._rebootOnBootPlugins()
def restore(self):
pass
def _loadPluginsConf(self):
"""Load plugins.json JSON configuration.
:return: JSON Plugins configuration
"""
try:
with open(PLUGINS_CONF_FILE) as settings:
plugins_conf = json.load(settings)
except Exception as err:
LOG.error(
"Parsing error in " + PLUGINS_CONF_FILE + ": " + str(err))
plugins_conf = None
return plugins_conf
def _getEnabledPlugins(self):
"""This function gets the list of all asynchronous plugins.
We considered only those plugins with 'callable' flag set to False
and 'onboot' flag set to True.
:return: enabledPlugins List
"""
enabledPlugins = []
plugins_conf = self._loadPluginsConf()
for plugin in plugins_conf['plugins']:
if plugins_conf['plugins'][plugin]['callable'] is False:
if plugins_conf['plugins'][plugin]['onboot'] is True:
if plugins_conf['plugins'][plugin]['status'] == \
"operative":
enabledPlugins.append(plugin)
if len(enabledPlugins) != 0:
LOG.info(" - Enabled plugins list: " + str(enabledPlugins))
return enabledPlugins
def _rebootOnBootPlugins(self):
"""Reboot at boot each enabled asynchronous plugin
:return:
"""
f_name = utils.getFuncName()
LOG.info("Rebooting enabled plugins:")
enabledPlugins = self._getEnabledPlugins()
if enabledPlugins.__len__() == 0:
message = "No plugin to reboot!"
LOG.info(" - " + message)
else:
for plugin_uuid in enabledPlugins:
plugins_conf = self._loadPluginsConf()
plugin_name = plugins_conf['plugins'][plugin_uuid]['name']
try:
if (plugin_uuid in PLUGINS_THRS) and (
PLUGINS_THRS[plugin_uuid].isAlive()
):
LOG.warning(" - Plugin "
+ plugin_uuid + " already started!")
else:
LOG.info(" - Rebooting plugin " + plugin_uuid)
plugin_home = \
CONF.lightningrod_home + "/plugins/" + plugin_uuid
plugin_filename = \
plugin_home + "/" + plugin_uuid + ".py"
plugin_params_file = \
plugin_home + "/" + plugin_uuid + ".json"
if os.path.exists(plugin_filename):
task = imp.load_source("plugin", plugin_filename)
if os.path.exists(plugin_params_file):
with open(plugin_params_file) as conf:
plugin_params = json.load(conf)
worker = task.Worker(
plugin_uuid,
plugin_name,
q_result=None,
params=plugin_params
)
PLUGINS_THRS[plugin_uuid] = worker
LOG.info(" - Starting plugin " + str(worker))
worker.start()
else:
message = "ERROR " + plugin_params_file \
+ " does not exist!"
LOG.error(" - "
+ worker.complete(f_name, message))
else:
message = "ERROR " \
+ plugin_filename + " does not exist!"
LOG.error(
" - " + worker.complete(f_name, message))
message = "rebooted!"
LOG.info(" - " + worker.complete(f_name, message))
except Exception as err:
message = "Error rebooting plugin " \
+ plugin_uuid + ": " + str(err)
LOG.error(" - " + message)
def _createPluginsConf(self):
"""Create plugins.json file if it does not exist.
"""
if not os.path.exists(PLUGINS_CONF_FILE):
LOG.debug("plugins.json does not exist: creating...")
plugins_conf = {'plugins': {}}
with open(PLUGINS_CONF_FILE, 'w') as f:
json.dump(plugins_conf, f, indent=4)
async def PluginInject(self, plugin, onboot): async def PluginInject(self, plugin, onboot):
"""Plugin injection procedure into the board: """Plugin injection procedure into the board:
@ -239,7 +231,7 @@ class PluginManager(Module.Module):
""" """
rpc_name = getFuncName() rpc_name = utils.getFuncName()
try: try:
@ -269,7 +261,7 @@ class PluginManager(Module.Module):
pluginfile.write(loaded) pluginfile.write(loaded)
# Load plugins.json configuration file # Load plugins.json configuration file
plugins_conf = loadPluginsConf() plugins_conf = self._loadPluginsConf()
# LOG.debug("Plugin setup:\n" # LOG.debug("Plugin setup:\n"
# + json.dumps(plugin, indent=4, sort_keys=True)) # + json.dumps(plugin, indent=4, sort_keys=True))
@ -287,7 +279,7 @@ class PluginManager(Module.Module):
plugins_conf['plugins'][plugin_uuid]['updated_at'] = "" plugins_conf['plugins'][plugin_uuid]['updated_at'] = ""
plugins_conf['plugins'][plugin_uuid]['status'] = "injected" plugins_conf['plugins'][plugin_uuid]['status'] = "injected"
LOG.info("Plugin " + plugin_name + " created!") LOG.info(" - Plugin '" + plugin_name + "' created!")
message = rpc_name + " result: INJECTED" message = rpc_name + " result: INJECTED"
else: else:
@ -299,11 +291,11 @@ class PluginManager(Module.Module):
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
plugins_conf['plugins'][plugin_uuid]['status'] = "updated" plugins_conf['plugins'][plugin_uuid]['status'] = "updated"
LOG.info("Plugin " + plugin_name LOG.info("Plugin '" + plugin_name
+ " (" + str(plugin_uuid) + ") updated!") + "' (" + str(plugin_uuid) + ") updated!")
message = rpc_name + " result: UPDATED" message = rpc_name + " result: UPDATED"
LOG.info("Plugin setup:\n" + json.dumps( LOG.info(" - Plugin setup:\n" + json.dumps(
plugins_conf['plugins'][plugin_uuid], plugins_conf['plugins'][plugin_uuid],
indent=4, indent=4,
sort_keys=True sort_keys=True
@ -314,12 +306,12 @@ class PluginManager(Module.Module):
json.dump(plugins_conf, f, indent=4) json.dump(plugins_conf, f, indent=4)
LOG.info(" - " + message) LOG.info(" - " + message)
w_msg = await WM.WampSuccess(message) w_msg = WM.WampSuccess(message)
except Exception as err: except Exception as err:
message = "Plugin injection error: " + str(err) message = "Plugin injection error: " + str(err)
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = await WM.WampError(str(err)) w_msg = WM.WampError(str(err))
return w_msg.serialize() return w_msg.serialize()
@ -338,11 +330,11 @@ class PluginManager(Module.Module):
try: try:
rpc_name = getFuncName() rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " called for '" LOG.info("RPC " + rpc_name + " called for '"
+ plugin_uuid + "' plugin:") + plugin_uuid + "' plugin:")
plugins_conf = loadPluginsConf() plugins_conf = self._loadPluginsConf()
if plugin_uuid in plugins_conf['plugins']: if plugin_uuid in plugins_conf['plugins']:
@ -356,7 +348,7 @@ class PluginManager(Module.Module):
message = "ALREADY STARTED!" message = "ALREADY STARTED!"
LOG.warning(" - Plugin " LOG.warning(" - Plugin "
+ plugin_uuid + " already started!") + plugin_uuid + " already started!")
w_msg = await WM.WampError(message) w_msg = WM.WampError(message)
else: else:
@ -408,27 +400,27 @@ class PluginManager(Module.Module):
response = "STARTED" response = "STARTED"
LOG.info(" - " + worker.complete(rpc_name, response)) LOG.info(" - " + worker.complete(rpc_name, response))
w_msg = await WM.WampSuccess(response) w_msg = WM.WampSuccess(response)
else: else:
message = \ message = \
rpc_name + " - ERROR " \ rpc_name + " - ERROR " \
+ plugin_filename + " does not exist!" + plugin_filename + " does not exist!"
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = await WM.WampError(message) w_msg = WM.WampError(message)
else: else:
message = "Plugin " + plugin_uuid \ message = "Plugin " + plugin_uuid \
+ " does not exist in this board!" + " does not exist in this board!"
LOG.warning(" - " + message) LOG.warning(" - " + message)
w_msg = await WM.WampError(message) w_msg = WM.WampError(message)
except Exception as err: except Exception as err:
message = \ message = \
rpc_name + " - ERROR - plugin (" + plugin_uuid + ") - " \ rpc_name + " - ERROR - plugin (" + plugin_uuid + ") - " \
+ str(err) + str(err)
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = await WM.WampError(str(err)) w_msg = WM.WampError(str(err))
return w_msg.serialize() return w_msg.serialize()
@ -442,7 +434,7 @@ class PluginManager(Module.Module):
:return: return a response to RPC request :return: return a response to RPC request
""" """
rpc_name = getFuncName() rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED for '" LOG.info("RPC " + rpc_name + " CALLED for '"
+ plugin_uuid + "' plugin:") + plugin_uuid + "' plugin:")
@ -464,13 +456,13 @@ class PluginManager(Module.Module):
if 'delay' in parameters: if 'delay' in parameters:
time.sleep(delay) time.sleep(delay)
await worker.stop() worker.stop()
del PLUGINS_THRS[plugin_uuid] del PLUGINS_THRS[plugin_uuid]
message = "STOPPED" message = "STOPPED"
LOG.info(" - " + worker.complete(rpc_name, message)) LOG.info(" - " + worker.complete(rpc_name, message))
w_msg = await WM.WampSuccess(message) w_msg = WM.WampSuccess(message)
else: else:
message = \ message = \
@ -478,21 +470,21 @@ class PluginManager(Module.Module):
+ " - ERROR - plugin (" + plugin_uuid \ + " - ERROR - plugin (" + plugin_uuid \
+ ") is instantiated but is not running anymore!" + ") is instantiated but is not running anymore!"
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = await WM.WampError(message) w_msg = WM.WampError(message)
else: else:
message = \ message = \
rpc_name + " - WARNING " \ rpc_name + " - WARNING " \
+ plugin_uuid + " is not running!" + plugin_uuid + " is not running!"
LOG.warning(" - " + message) LOG.warning(" - " + message)
w_msg = await WM.WampWarning(message) w_msg = WM.WampWarning(message)
except Exception as err: except Exception as err:
message = \ message = \
rpc_name \ rpc_name \
+ " - ERROR - plugin (" + plugin_uuid + ") - " + str(err) + " - ERROR - plugin (" + plugin_uuid + ") - " + str(err)
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = await WM.WampError(str(err)) w_msg = WM.WampError(str(err))
return w_msg.serialize() return w_msg.serialize()
@ -507,7 +499,7 @@ class PluginManager(Module.Module):
""" """
rpc_name = getFuncName() rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED for " + plugin_uuid + " plugin:") LOG.info("RPC " + rpc_name + " CALLED for " + plugin_uuid + " plugin:")
try: try:
@ -527,7 +519,7 @@ class PluginManager(Module.Module):
plugin_filename = plugin_home + "/" + plugin_uuid + ".py" plugin_filename = plugin_home + "/" + plugin_uuid + ".py"
plugin_params_file = plugin_home + "/" + plugin_uuid + ".json" plugin_params_file = plugin_home + "/" + plugin_uuid + ".json"
plugins_conf = loadPluginsConf() plugins_conf = self._loadPluginsConf()
plugin_name = plugins_conf['plugins'][plugin_uuid]['name'] plugin_name = plugins_conf['plugins'][plugin_uuid]['name']
# Import plugin (as python module) # Import plugin (as python module)
@ -619,7 +611,7 @@ class PluginManager(Module.Module):
""" """
rpc_name = getFuncName() rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " for plugin " + plugin_uuid) LOG.info("RPC " + rpc_name + " for plugin " + plugin_uuid)
@ -630,7 +622,7 @@ class PluginManager(Module.Module):
message = "Plugin paths or files do not exist!" message = "Plugin paths or files do not exist!"
LOG.error(message) LOG.error(message)
w_msg = await WM.WampError(message) w_msg = WM.WampError(message)
return w_msg.serialize() return w_msg.serialize()
@ -652,14 +644,14 @@ class PluginManager(Module.Module):
message = "Removing plugin's files error in " \ message = "Removing plugin's files error in " \
+ plugin_path + ": " + str(err) + plugin_path + ": " + str(err)
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = await WM.WampError(str(err)) w_msg = WM.WampError(str(err))
return w_msg.serialize() return w_msg.serialize()
# Remove from plugins.json file its configuration # Remove from plugins.json file its configuration
try: try:
plugins_conf = loadPluginsConf() plugins_conf = self._loadPluginsConf()
if plugin_uuid in plugins_conf['plugins']: if plugin_uuid in plugins_conf['plugins']:
@ -674,8 +666,8 @@ class PluginManager(Module.Module):
if plugin_uuid in PLUGINS_THRS: if plugin_uuid in PLUGINS_THRS:
worker = PLUGINS_THRS[plugin_uuid] worker = PLUGINS_THRS[plugin_uuid]
if worker.isAlive(): if worker.isAlive():
LOG.info(" - Plugin " LOG.info(" - Plugin '"
+ plugin_name + " is running...") + plugin_name + "' is running...")
worker.stop() worker.stop()
LOG.info(" ...stopped!") LOG.info(" ...stopped!")
@ -690,21 +682,21 @@ class PluginManager(Module.Module):
+ plugin_uuid + " already removed!" + plugin_uuid + " already removed!"
LOG.warning(" - " + message) LOG.warning(" - " + message)
w_msg = await WM.WampSuccess(message) w_msg = WM.WampSuccess(message)
return w_msg.serialize() return w_msg.serialize()
except Exception as err: except Exception as err:
message = "Updating plugins.json error: " + str(err) message = "Updating plugins.json error: " + str(err)
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = await WM.WampError(str(err)) w_msg = WM.WampError(str(err))
return w_msg.serialize() return w_msg.serialize()
except Exception as err: except Exception as err:
message = "Plugin removing error: {0}".format(err) message = "Plugin removing error: {0}".format(err)
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = await WM.WampError(str(err)) w_msg = WM.WampError(str(err))
return w_msg.serialize() return w_msg.serialize()
@ -715,7 +707,7 @@ class PluginManager(Module.Module):
""" """
rpc_name = getFuncName() rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED for '" LOG.info("RPC " + rpc_name + " CALLED for '"
+ plugin_uuid + "' plugin:") + plugin_uuid + "' plugin:")
@ -729,7 +721,7 @@ class PluginManager(Module.Module):
plugin_filename = plugin_home + "/" + plugin_uuid + ".py" plugin_filename = plugin_home + "/" + plugin_uuid + ".py"
plugin_params_file = plugin_home + "/" + plugin_uuid + ".json" plugin_params_file = plugin_home + "/" + plugin_uuid + ".json"
plugins_conf = loadPluginsConf() plugins_conf = self._loadPluginsConf()
plugin_name = plugins_conf['plugins'][plugin_uuid]['name'] plugin_name = plugins_conf['plugins'][plugin_uuid]['name']
callable = plugins_conf['plugins'][plugin_uuid]['callable'] callable = plugins_conf['plugins'][plugin_uuid]['callable']
@ -789,18 +781,18 @@ class PluginManager(Module.Module):
message = "REBOOTED" message = "REBOOTED"
LOG.info(" - " + worker.complete(rpc_name, message)) LOG.info(" - " + worker.complete(rpc_name, message))
w_msg = await WM.WampSuccess(message) w_msg = WM.WampSuccess(message)
else: else:
message = "ERROR '" + plugin_filename + "' does not exist!" message = "ERROR '" + plugin_filename + "' does not exist!"
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = await WM.WampError(message) w_msg = WM.WampError(message)
except Exception as err: except Exception as err:
message = "Error rebooting plugin '" \ message = "Error rebooting plugin '" \
+ plugin_uuid + "': " + str(err) + plugin_uuid + "': " + str(err)
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = await WM.WampError(str(err)) w_msg = WM.WampError(str(err))
return w_msg.serialize() return w_msg.serialize()
@ -814,7 +806,7 @@ class PluginManager(Module.Module):
""" """
rpc_name = getFuncName() rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED for '" LOG.info("RPC " + rpc_name + " CALLED for '"
+ plugin_uuid + "' plugin:") + plugin_uuid + "' plugin:")
@ -830,20 +822,20 @@ class PluginManager(Module.Module):
result = "DEAD" result = "DEAD"
LOG.info(" - " + worker.complete(rpc_name, result)) LOG.info(" - " + worker.complete(rpc_name, result))
w_msg = await WM.WampSuccess(result) w_msg = WM.WampSuccess(result)
else: else:
result = "DEAD" result = "DEAD"
LOG.info(" - " + rpc_name + " result for " LOG.info(" - " + rpc_name + " result for "
+ plugin_uuid + ": " + result) + plugin_uuid + ": " + result)
w_msg = await WM.WampSuccess(result) w_msg = WM.WampSuccess(result)
except Exception as err: except Exception as err:
message = \ message = \
rpc_name \ rpc_name \
+ " - ERROR - plugin (" + plugin_uuid + ") - " + str(err) + " - ERROR - plugin (" + plugin_uuid + ") - " + str(err)
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = await WM.WampError(str(err)) w_msg = WM.WampError(str(err))
return w_msg.serialize() return w_msg.serialize()

View File

@ -15,8 +15,11 @@
__author__ = "Nicola Peditto <npeditto@unime.it" __author__ = "Nicola Peditto <npeditto@unime.it"
import inspect from datetime import datetime
import errno
import json
import os import os
import psutil
import signal import signal
import subprocess import subprocess
from urllib.parse import urlparse from urllib.parse import urlparse
@ -26,109 +29,454 @@ from iotronic_lightningrod.modules import utils
import iotronic_lightningrod.wampmessage as WM import iotronic_lightningrod.wampmessage as WM
from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF
SERVICES_CONF_FILE = CONF.lightningrod_home + "/services.json"
class ServiceManager(Module.Module): class ServiceManager(Module.Module):
def __init__(self, board, session): def __init__(self, board, session):
super(ServiceManager, self).__init__("ServiceManager", board) super(ServiceManager, self).__init__("ServiceManager", board)
self.url_ip = urlparse(board.wamp_config["url"])[1].split(':')[0]
self.wagent_url = "ws://" + self.url_ip + ":8080"
def finalize(self): def finalize(self):
pass LOG.info("Cloud service tunnels to initialization:")
async def ServiceEnable(self, name, public_port, local_port): # Load services.json configuration file
services_conf = self._loadServicesConf()
LOG.info("RPC " + utils.getFuncName() if len(services_conf['services']) != 0:
+ " CALLED for " + name + " service:")
wstun_process_list = []
for p in psutil.process_iter():
if len(p.cmdline()) != 0:
if (p.name() == "node" and "wstun" in p.cmdline()[1]):
wstun_process_list.append(p)
for service_uuid in services_conf['services']:
service_name = services_conf['services'][service_uuid]['name']
service_pid = services_conf['services'][service_uuid]['pid']
LOG.info(" - " + service_name)
if len(wstun_process_list) != 0:
for wp in wstun_process_list:
if service_pid == wp.pid:
LOG.info(" --> the tunnel for '" + service_name +
"' already exists; killing...")
# 1. Kill wstun process (if exists)
try:
os.kill(service_pid, signal.SIGKILL)
LOG.info(" --> service '" + service_name
+ "' with PID " + str(service_pid)
+ " was killed; creating new one...")
except OSError:
LOG.warning(" - WSTUN process already killed, "
"creating new one...")
break
# 2. Create the reverse tunnel
public_port = \
services_conf['services'][service_uuid]['public_port']
local_port = \
services_conf['services'][service_uuid]['local_port']
wstun = self._startWstun(public_port, local_port)
if wstun != None:
service_pid = wstun.pid
# 3. Update services.json file
services_conf['services'][service_uuid]['pid'] = \
service_pid
services_conf['services'][service_uuid]['updated_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
self._updateServiceConf(services_conf, service_uuid,
output=True)
LOG.info(" --> Cloud service '" + service_name
+ "' tunnel established.")
else:
message = "Error spawning " + str(service_name) \
+ " service tunnel!"
LOG.error(" - " + message)
else:
LOG.info(" --> No service tunnels to establish.")
def restore(self):
LOG.info("Cloud service tunnels to restore:")
# Load services.json configuration file
services_conf = self._loadServicesConf()
if len(services_conf['services']) != 0:
wstun_process_list = []
# Collect all alive WSTUN proccesses
for p in psutil.process_iter():
if len(p.cmdline()) != 0:
if (p.name() == "node") and ("wstun" in p.cmdline()[1]):
wstun_process_list.append(p)
for service_uuid in services_conf['services']:
service_name = services_conf['services'][service_uuid]['name']
service_pid = services_conf['services'][service_uuid]['pid']
LOG.info(" - " + service_name)
s_alive = False
# WSTUN is still alive
if len(wstun_process_list) != 0:
for wp in wstun_process_list:
if service_pid == wp.pid:
LOG.warning(" --> the tunnel for '" + service_name
+ "' is still established.")
s_alive = True
break
if not s_alive:
# Create the reverse tunnel again
public_port = services_conf['services'][service_uuid]
['public_port']
local_port = services_conf['services'][service_uuid]
['local_port']
wstun = self._startWstun(public_port, local_port)
if wstun != None:
service_pid = wstun.pid
# 3. Update services.json file
services_conf['services'][service_uuid]['pid'] = \
service_pid
services_conf['services'][service_uuid]['updated_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
self._updateServiceConf(services_conf,
service_uuid, output=True)
LOG.info(" --> Cloud service '" + service_name
+ "' tunnel restored.")
else:
message = "Error spawning " + str(service_name) \
+ " service tunnel!"
LOG.error(" - " + message)
else:
LOG.info(" --> No service tunnels to restore.")
def _loadServicesConf(self):
"""Load services.json JSON configuration.
:return: JSON Services configuration
"""
try: try:
url_ip = urlparse(self.board.wamp_config["url"])[1].split(':')[0] with open(SERVICES_CONF_FILE) as settings:
services_conf = json.load(settings)
# "wstun -r6030:127.0.0.1:22 ws://192.168.17.103:8080" except Exception as err:
opt_reverse = "-r" + str(public_port) + ":127.0.0.1:" \ LOG.error(
+ str(local_port) "Parsing error in " + SERVICES_CONF_FILE + ": " + str(err))
wagent_url = "ws://" + url_ip + ":8080" services_conf = None
return services_conf
def _startWstun(self, public_port, local_port):
opt_reverse = "-r" + str(
public_port) + ":127.0.0.1:" + str(local_port)
try:
wstun = subprocess.Popen( wstun = subprocess.Popen(
['/usr/bin/wstun', opt_reverse, wagent_url], ['/usr/bin/wstun', opt_reverse, self.wagent_url],
stdout=subprocess.PIPE stdout=subprocess.PIPE
) )
except Exception as err:
LOG.error("Error spawning WSTUN process: " + str(err))
wstun = None
LOG.debug(" - WSTUN stdout: " + str(wstun.stdout)) return wstun
message = "Cloud service " + str(name) + " exposed on port " \ def _updateServiceConf(self, services_conf, service_uuid, output=True):
+ str(public_port) + " on " + url_ip # Apply the changes to services.json
with open(SERVICES_CONF_FILE, 'w') as f:
json.dump(services_conf, f, indent=4)
LOG.info(" - " + message + " with PID " + str(wstun.pid)) if output:
LOG.info(" - service updated:\n" + json.dumps(
services_conf['services'][service_uuid],
indent=4,
sort_keys=True
))
else:
LOG.info(" - services.json file updated!")
w_msg = WM.WampSuccess([wstun.pid, message]) async def ServiceEnable(self, service, public_port):
rpc_name = utils.getFuncName()
service_name = service['name']
service_uuid = service['uuid']
local_port = service['port']
LOG.info("RPC " + rpc_name + " CALLED for '" + service_name
+ "' (" + service_uuid + ") service:")
try:
wstun = self._startWstun(public_port, local_port)
if wstun != None:
service_pid = wstun.pid
LOG.debug(" - WSTUN stdout: " + str(wstun.stdout))
# Update services.json file
# Load services.json configuration file
services_conf = self._loadServicesConf()
# Save plugin settings in services.json
if service_uuid not in services_conf['services']:
# It is a new plugin
services_conf['services'][service_uuid] = {}
services_conf['services'][service_uuid]['name'] = \
service_name
services_conf['services'][service_uuid]['public_port'] = \
public_port
services_conf['services'][service_uuid]['local_port'] = \
local_port
services_conf['services'][service_uuid]['pid'] = \
service_pid
services_conf['services'][service_uuid]['enabled_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
services_conf['services'][service_uuid]['updated_at'] = ""
else:
# The service was already added and we are updating it
services_conf['services'][service_uuid]['updated_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
LOG.info(" - services.json file updated!")
# Apply the changes to services.json
self._updateServiceConf(services_conf, service_uuid,
output=True)
message = "Cloud service '" + str(service_name) \
+ "' exposed on port " \
+ str(public_port) + " on " + self.url_ip
LOG.info(" - " + message + " with PID " + str(service_pid))
w_msg = WM.WampSuccess(message)
else:
message = "Error spawning " + str(service_name) \
+ " service tunnel!"
LOG.error(" - " + message)
w_msg = WM.WampError(message)
except Exception as err: except Exception as err:
message = "Error exposing " + str(name) + " service: " + str(err) message = "Error exposing " + str(service_name) \
+ " service: " + str(err)
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = WM.WampError(message) w_msg = WM.WampError(message)
return w_msg.serialize() return w_msg.serialize()
async def ServiceDisable(self, name, pid): async def ServiceDisable(self, service):
LOG.info("RPC " + utils.getFuncName() + " CALLED for " rpc_name = utils.getFuncName()
+ name + " service:")
service_name = service['name']
service_uuid = service['uuid']
LOG.info("RPC " + rpc_name
+ " CALLED for '" + service_name
+ "' (" + service_uuid + ") service:")
# Remove from services.json file
try: try:
os.kill(pid, signal.SIGKILL) # Load services.json configuration file
services_conf = self._loadServicesConf()
message = "Cloud service " + str(name) + " disabled." if service_uuid in services_conf['services']:
LOG.info(" - " + message) service_pid = services_conf['services'][service_uuid]['pid']
w_msg = WM.WampSuccess(message)
try:
os.kill(service_pid, signal.SIGKILL)
message = "Cloud service '" \
+ str(service_name) + "' tunnel disabled."
del services_conf['services'][service_uuid]
self._updateServiceConf(services_conf, service_uuid,
output=False)
LOG.info(" - " + message)
w_msg = WM.WampSuccess(message)
except Exception as err:
if err.errno == errno.ESRCH: # ESRCH == No such process
message = "Service '" + str(
service_name) + "' WSTUN process is not running!"
LOG.warning(" - " + message)
del services_conf['services'][service_uuid]
self._updateServiceConf(services_conf, service_uuid,
output=False)
w_msg = WM.WampWarning(message)
else:
message = "Error disabling '" + str(
service_name) + "' service tunnel: " + str(err)
LOG.error(" - " + message)
w_msg = WM.WampError(message)
else:
message = rpc_name + " result: " + service_uuid \
+ " already removed!"
LOG.error(" - " + message)
w_msg = WM.WampError(message)
except Exception as err: except Exception as err:
message = "Error disabling " + str(name) + " service: " + str(err) message = "Updating services.json error: " + str(err)
LOG.error(" - " + message) LOG.error(" - " + message)
w_msg = WM.WampError(message) w_msg = WM.WampError(message)
return w_msg.serialize() return w_msg.serialize()
async def ServiceRestore(self, name, public_port, local_port, pid): async def ServiceRestore(self, service, public_port):
LOG.info("RPC " + utils.getFuncName() + " CALLED for " rpc_name = utils.getFuncName()
+ name + " service:")
try: service_name = service['name']
service_uuid = service['uuid']
LOG.info("RPC " + rpc_name
+ " CALLED for '" + service_name
+ "' (" + service_uuid + ") service:")
# Load services.json configuration file
services_conf = self._loadServicesConf()
if service_uuid in services_conf['services']:
local_port = \
services_conf['services'][service_uuid]['local_port']
service_pid = \
services_conf['services'][service_uuid]['pid']
# 1. Kill wstun process (if exists)
try: try:
os.kill(pid, signal.SIGKILL)
LOG.info(" - service " + name + " with PID " + str(pid)
+ " killed.")
except OSError:
LOG.warning(" - WSTUN process already killed: "
"creating new one...")
# 2. Create the reverse tunnel # 1. Kill wstun process (if exists)
url_ip = urlparse(self.board.wamp_config["url"])[1].split(':')[0] try:
opt_reverse = "-r" + str(public_port) + ":127.0.0.1:" + str( os.kill(service_pid, signal.SIGKILL)
local_port) LOG.info(" - service '" + service_name
wagent_url = "ws://" + url_ip + ":8080" + "' with PID " + str(service_pid)
wstun = subprocess.Popen( + " was killed.")
['/usr/bin/wstun', opt_reverse, wagent_url], except OSError:
stdout=subprocess.PIPE LOG.warning(" - WSTUN process already killed: "
) "creating new one...")
message = "service " + str(name) + " restored on port " \ # 2. Create the reverse tunnel
+ str(public_port) + " on " + url_ip wstun = self._startWstun(public_port, local_port)
LOG.info(" - " + message + " with PID " + str(wstun.pid))
w_msg = WM.WampSuccess([wstun.pid, message])
except Exception as err: if wstun != None:
message = "Error restoring " + str(name) + " service: " + str(err) service_pid = wstun.pid
LOG.error(" - " + message)
w_msg = WM.WampError(message) # UPDATE services.json file
services_conf['services'][service_uuid]['pid'] = \
service_pid
services_conf['services'][service_uuid]['updated_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
self._updateServiceConf(services_conf, service_uuid,
output=True)
message = "service " + str(service_name) \
+ " restored on port " \
+ str(public_port) + " on " + self.url_ip
LOG.info(" - " + message + " with PID " + str(service_pid))
w_msg = WM.WampSuccess(message)
else:
message = "Error spawning " + str(service_name) \
+ " service tunnel!"
LOG.error(" - " + message)
w_msg = WM.WampError(message)
except Exception as err:
message = "Error restoring '" + str(service_name) \
+ "' service tunnel: " + str(err)
LOG.error(" - " + message)
w_msg = WM.WampError(message)
else:
local_port = service['port']
wstun = self._startWstun(public_port, local_port)
if wstun != None:
service_pid = wstun.pid
services_conf['services'][service_uuid] = {}
services_conf['services'][service_uuid]['name'] = \
service_name
services_conf['services'][service_uuid]['public_port'] = \
public_port
services_conf['services'][service_uuid]['local_port'] = \
local_port
services_conf['services'][service_uuid]['pid'] = \
service_pid
services_conf['services'][service_uuid]['enabled_at'] = \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
services_conf['services'][service_uuid]['updated_at'] = ""
self._updateServiceConf(services_conf, service_uuid,
output=True)
message = "service " + str(service_name) \
+ " restored on port " \
+ str(public_port) + " on " + self.url_ip
LOG.info(" - " + message + " with PID " + str(service_pid))
w_msg = WM.WampSuccess(message)
else:
message = "Error spawning " + str(service_name) \
+ " service tunnel!"
LOG.error(" - " + message)
w_msg = WM.WampError(message)
return w_msg.serialize() return w_msg.serialize()

View File

@ -67,6 +67,9 @@ class Utility(Module.Module):
def finalize(self): def finalize(self):
pass pass
def restore(self):
pass
async def hello(self, client_name, message): async def hello(self, client_name, message):
import random import random
s = random.uniform(0.5, 3.0) s = random.uniform(0.5, 3.0)

View File

@ -57,6 +57,9 @@ class VfsManager(Module.Module):
def finalize(self): def finalize(self):
pass pass
def restore(self):
pass
def mountLocal(self, mountSource, mountPoint): def mountLocal(self, mountSource, mountPoint):
try: try:

5
services.example.json Normal file
View File

@ -0,0 +1,5 @@
{
"services": {
}
}