Browse Source

Refactored Webservice Manager:

- webservices exposed via subdomanins
- added monitoring wstun tunnels (zombie processes managed)
- "proxies" module moved
- nginx management improved
- requirements updated

Added RPCs in Device Manager

Added "serializers" to Autobahn "Component"
Change-Id: Ie5f780c4cdcf854fd4c8af2d4ef6c3c52f68da10
changes/06/619706/4 0.4.1
Nicola Peditto 4 years ago
parent
commit
9bf78aa2da
  1. 3
      .gitignore
  2. 2
      README.rst
  3. 0
      __init__.py
  4. 4
      doc/installation/arduino_yun.rst
  5. 88
      doc/installation/raspberry_pi_3.rst
  6. 83
      doc/installation/ubuntu1604.rst
  7. 2
      etc/iotronic/iotronic.conf
  8. 1
      etc/logrotate.d/lightning-rod.log
  9. 57
      iotronic_lightningrod/common/exception.py
  10. 85
      iotronic_lightningrod/common/utils.py
  11. 503
      iotronic_lightningrod/lightningrod.py
  12. 73
      iotronic_lightningrod/modules/device_manager.py
  13. 13
      iotronic_lightningrod/modules/network_manager.py
  14. 3
      iotronic_lightningrod/modules/plugin_manager.py
  15. 0
      iotronic_lightningrod/modules/plugins/Plugin.py
  16. 0
      iotronic_lightningrod/modules/plugins/PluginSerializer.py
  17. 0
      iotronic_lightningrod/modules/plugins/__init__.py
  18. 0
      iotronic_lightningrod/modules/plugins/pluginApis.py
  19. 0
      iotronic_lightningrod/modules/plugins/plugins_examples/arduino_yun/demo.py
  20. 0
      iotronic_lightningrod/modules/plugins/plugins_examples/arduino_yun/led.json
  21. 0
      iotronic_lightningrod/modules/plugins/plugins_examples/arduino_yun/led.py
  22. 0
      iotronic_lightningrod/modules/plugins/plugins_examples/generics/echo.json
  23. 4
      iotronic_lightningrod/modules/plugins/plugins_examples/generics/echo.py
  24. 0
      iotronic_lightningrod/modules/plugins/plugins_examples/generics/runner.json
  25. 4
      iotronic_lightningrod/modules/plugins/plugins_examples/generics/runner.py
  26. 0
      iotronic_lightningrod/modules/proxies/Proxy.py
  27. 0
      iotronic_lightningrod/modules/proxies/__init__.py
  28. 0
      iotronic_lightningrod/modules/proxies/configs/default
  29. 0
      iotronic_lightningrod/modules/proxies/configs/iotronic
  30. 186
      iotronic_lightningrod/modules/proxies/nginx.py
  31. 299
      iotronic_lightningrod/modules/service_manager.py
  32. 43
      iotronic_lightningrod/modules/test.py
  33. 164
      iotronic_lightningrod/modules/vfs_library.py
  34. 509
      iotronic_lightningrod/modules/vfs_manager.py
  35. 144
      iotronic_lightningrod/modules/webservice_manager.py
  36. 4
      requirements.txt
  37. 2
      scripts/lr_configure
  38. 3
      setup.cfg

3
.gitignore vendored

@ -12,5 +12,6 @@ ChangeLog
*.md
.eggs
dist
STUFF/
iotronic_lightningrod/modules/test.py
iotronic_lightningrod/modules/vfs_*
iotronic_lightningrod/modules/vfs_*

2
README.rst

@ -18,5 +18,3 @@ Installation guides
* `Raspberry Pi 3 <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/raspberry_pi_3.rst>`_.
* `Ubuntu 16.04 <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/ubuntu1604.rst>`_.
* `Arduino YUN <https://github.com/MDSLab/iotronic-lightning-rod-agent/blob/master/doc/installation/arduino_yun.rst>`_.

0
iotronic_lightningrod/plugins/__init__.py → __init__.py

4
doc/installation/arduino_yun.rst

@ -1,5 +1,5 @@
IoTronic Lightning-rod installation guide for Arduino YUN
=========================================================
[DEPRECATED] IoTronic Lightning-rod installation guide for Arduino YUN
======================================================================
We tested this procedure on a Arduino YUN board with OpenWRT LininoIO image.

88
doc/installation/raspberry_pi_3.rst

@ -1,100 +1,42 @@
IoTronic Lightning-rod installation guide for Raspberry Pi 3
============================================================
We tested this procedure on a Raspberry Pi 3 board.
We tested this procedure on a Raspberry Pi 3 board (Raspbian).
Install from source code
------------------------
Install requirements
~~~~~~~~~~~~~~~~~~~~
::
pip install oslo.config oslo.log asyncio autobahn httplib2 psutil six
Set up environment:
~~~~~~~~~~~~~~~~~~~
::
mkdir -p /var/lib/iotronic
mkdir /var/lib/iotronic/plugins
mkdir /var/log/iotronic/
mkdir /etc/iotronic
Install Lightning-rod
~~~~~~~~~~~~~~~~~~~~~
Get source code
'''''''''''''''
::
cd /var/lib/iotronic
git clone git://github.com/MDSLab/iotronic-lightning-rod-agent.git
mv iotronic-lightning-rod-agent/ iotronic-lightning-rod/
pip3 install iotronic-lightningrod
Deployment
''''''''''
::
lr_install
cd iotronic-lightning-rod/
cp etc/iotronic/iotronic.conf /etc/iotronic/
cp settings.example.json /var/lib/iotronic/settings.json
cp plugins.example.json /var/lib/iotronic/plugins.json
cp services.example.json /var/lib/iotronic/services.json
cp etc/systemd/system/s4t-lightning-rod.service /etc/systemd/system/lightning-rod.service
chmod +x /etc/systemd/system/lightning-rod.service
systemctl daemon-reload
- Edit configuration file:
- nano /var/lib/iotronic/settings.json
::
{
"iotronic": {
"board": {
"token": "<REGISTRATION-TOKEN>"
},
"wamp": {
"registration-agent": {
"url": "ws://<WAMP-SERVER>:<WAMP-PORT>/",
"realm": "<IOTRONIC-REALM>"
}
}
}
}
- setup logrotate:
- nano /etc/logrotate.d/lightning-rod.log
::
/var/log/iotronic/lightning-rod.log {
weekly
rotate = 3
compress
su root root
maxsize 5M
}
Building
''''''''
Iotronic setup
''''''''''''''
::
lr_configure
Arguments required:
<REGISTRATION-TOKEN> : token released by IoTronic registration procedure
<WAMP-REG-AGENT-URL> : IoTronic Crossbar server URL
cd /var/lib/iotronic/iotronic-lightning-rod/
python setup.py install
e.g.
::
lr_configure 000001 ws(s)://<IOTRONIC-CROSSBAR-IP>:<IOTRONIC-CROSSBAR-PORT>/
Execution:
~~~~~~~~~~
::
systemctl restart lightning-rod.service
systemctl start lightning-rod.service
tail -f /var/log/iotronic/lightning-rod.log

83
doc/installation/ubuntu1604.rst

@ -4,98 +4,39 @@ IoTronic Lightning-rod installation guide for Ubuntu 16.04
We tested this procedure on a Ubuntu 16.04 (also within a LXD
container). Everything needs to be run as root.
Install from source code via Git
--------------------------------
Install requirements
~~~~~~~~~~~~~~~~~~~~
::
pip install oslo.config oslo.log asyncio autobahn httplib2 psutil six
Set up environment:
~~~~~~~~~~~~~~~~~~~
::
mkdir -p /var/lib/iotronic
mkdir /var/lib/iotronic/plugins
mkdir /var/log/iotronic/
mkdir /etc/iotronic
Install Lightning-rod
~~~~~~~~~~~~~~~~~~~~~
Get source code
'''''''''''''''
::
cd /var/lib/iotronic
git clone git://github.com/MDSLab/iotronic-lightning-rod-agent.git
mv iotronic-lightning-rod-agent/ iotronic-lightning-rod/
pip3 install iotronic-lightningrod
Deployment
''''''''''
::
lr_install
cd iotronic-lightning-rod/
cp etc/iotronic/iotronic.conf /etc/iotronic/
cp settings.example.json /var/lib/iotronic/settings.json
cp plugins.example.json /var/lib/iotronic/plugins.json
cp services.example.json /var/lib/iotronic/services.json
cp etc/systemd/system/s4t-lightning-rod.service /etc/systemd/system/lightning-rod.service
chmod +x /etc/systemd/system/lightning-rod.service
systemctl daemon-reload
- Edit configuration file:
- nano /var/lib/iotronic/settings.json
::
{
"iotronic": {
"board": {
"token": "<REGISTRATION-TOKEN>"
},
"wamp": {
"registration-agent": {
"url": "ws://<WAMP-SERVER>:<WAMP-PORT>/",
"realm": "<IOTRONIC-REALM>"
}
}
}
}
Iotronic setup
''''''''''''''
- setup logrotate:
- nano /etc/logrotate.d/lightning-rod.log
::
/var/log/iotronic/lightning-rod.log {
weekly
rotate = 3
compress
su root root
maxsize 5M
}
::
lr_configure
Building
''''''''
Arguments required:
<REGISTRATION-TOKEN> : token released by IoTronic registration procedure
<WAMP-REG-AGENT-URL> : IoTronic Crossbar server URL
e.g.
::
cd /var/lib/iotronic/iotronic-lightning-rod/
python setup.py install
lr_configure 000001 ws(s)://<IOTRONIC-CROSSBAR-IP>:<IOTRONIC-CROSSBAR-PORT>/
Execution:
~~~~~~~~~~
::
systemctl restart lightning-rod.service
systemctl start lightning-rod.service
tail -f /var/log/iotronic/lightning-rod.log

2
etc/iotronic/iotronic.conf

@ -4,3 +4,5 @@ skip_cert_verify = True
debug = True
proxy = nginx
log_file = /var/log/iotronic/lightning-rod.log
alive_timer = 600
rpc_alive_timer = 3

1
etc/logrotate.d/lightning-rod.log

@ -1,6 +1,5 @@
/var/log/iotronic/lightning-rod.log /var/log/wstun/wstun.log{
copytruncate
create
missingok
weekly
rotate = 3

57
iotronic_lightningrod/common/exception.py

@ -15,12 +15,13 @@
__author__ = "Nicola Peditto <n.peditto@gmail.com>"
import os
import signal
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
from iotronic_lightningrod.common import utils
def manageTimeout(error_message, action):
try:
@ -29,8 +30,36 @@ def manageTimeout(error_message, action):
except TimeoutError as err:
details = err.args[0]
LOG.warning("Board connection call timeout: " + str(details))
os._exit(1)
if (action == "ws_alive"):
LOG.warning("Iotronic RPC-ALIVE timeout details: " + str(details))
try:
utils.destroyWampSocket()
except Exception as e:
LOG.warning("Iotronic RPC-ALIVE timeout error: " + str(e))
else:
LOG.warning("Board connection call timeout: " + str(details))
utils.LR_restart()
"""
def manageTimeoutALIVE(error_message, action):
try:
raise TimeoutError(error_message, action)
except TimeoutError as err:
details = err.args[0]
LOG.warning("Iotronic RPC-ALIVE timeout details: " + str(details))
try:
utils.destroyWampSocket()
except Exception as e:
LOG.warning("Iotronic RPC-ALIVE timeout error: " + str(e))
"""
class NginxError(Exception):
@ -67,15 +96,31 @@ class timeout(object):
class timeoutRPC(object):
def __init__(self, seconds=1, error_message='Timeout', action=None):
def __init__(self, seconds=1, error_message='Timeout-RPC', action=None):
self.seconds = seconds
self.error_message = error_message
self.action = action
def handle_timeout(self, signum, frame):
manageTimeout(self.error_message, self.action)
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)
class timeoutALIVE(object):
def __init__(self, seconds=1, error_message='Timeout-Alive', action=None):
self.seconds = seconds
self.error_message = error_message
self.action = action
def handle_timeout(self, signum, frame):
manageTimeout(self.error_message, self.action)
# LOG.warning("RPC timeout: " + str(self.error_message))
# os._exit(1)
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)

85
iotronic_lightningrod/common/utils.py

@ -0,0 +1,85 @@
# Copyright 2018 MDSLAB - University of Messina
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__author__ = "Nicola Peditto <n.peditto@gmail.com>"
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
import os
import pkg_resources
import psutil
import subprocess
import sys
def LR_restart():
try:
LOG.warning("Lightning-rod RESTARTING...")
python = sys.executable
os.execl(python, python, *sys.argv)
except Exception as err:
LOG.error("Lightning-rod restarting error" + str(err))
def checkIotronicConf(lr_CONF):
try:
if(lr_CONF.log_file == None):
LOG.warning("'log_file' is not specified!")
return False
else:
print("View logs in " + lr_CONF.log_file)
return True
except Exception as err:
print(err)
return False
def destroyWampSocket():
LR_PID = os.getpid()
try:
process = subprocess.Popen(
["gdb", "-p", str(LR_PID)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
proc = psutil.Process()
print("WAMP RECOVERY: " + str(proc.connections()[0]))
ws_fd = proc.connections()[0].fd
first = b"call shutdown("
fd = str(ws_fd).encode('ascii')
last = b"u,0)\nquit\ny"
commands = b"%s%s%s" % (first, fd, last)
process.communicate(input=commands)[0]
msg = "Websocket-Zombie closed! Restoring..."
LOG.warning(msg)
print(msg)
except Exception as e:
LOG.warning("RPC-ALIVE - destroyWampSocket error: " + str(e))
def get_version(package):
package = package.lower()
return next((p.version for p in pkg_resources.working_set if
p.project_name.lower() == package), "No version")

503
iotronic_lightningrod/lightningrod.py

@ -15,28 +15,33 @@
__author__ = "Nicola Peditto <n.peditto@gmail.com>"
# Autobahn imports
import asyncio
from autobahn.asyncio.component import Component
from autobahn.wamp import exception
import txaio
# OSLO imports
from oslo_config import cfg
from oslo_log import log as logging
# MODULES imports
import asyncio
import inspect
import os
import pkg_resources
import psutil
import signal
import ssl
from stevedore import extension
import sys
import txaio
from pip._vendor import pkg_resources
# IoTronic imports
from iotronic_lightningrod.Board import Board
from iotronic_lightningrod.common.exception import timeoutALIVE
from iotronic_lightningrod.common.exception import timeoutRPC
from iotronic_lightningrod.common import utils
from iotronic_lightningrod.common.utils import get_version
import iotronic_lightningrod.wampmessage as WM
@ -52,6 +57,12 @@ lr_opts = [
default=True,
help=('Flag for skipping the verification of the server cert '
'(for the auto-signed ones)')),
cfg.IntOpt('alive_timer',
default=600,
help=('Wamp websocket check time')),
cfg.IntOpt('rpc_alive_timer',
default=3,
help=('RPC alive response time threshold')),
]
proxy_opts = [
@ -74,6 +85,7 @@ reconnection = False
RPC = {}
RPC_devices = {}
RPC_proxies = {}
zombie_alert = True
# ASYNCIO
loop = None
@ -86,128 +98,122 @@ global MODULES
MODULES = {}
def moduleReloadInfo(session):
"""This function is used in the reconnection stage to register
again the RPCs of each module and for device.
:param session: WAMP session object.
"""
class LightningRod(object):
LOG.info("\n\nModules reloading after WAMP recovery...\n\n")
def __init__(self):
try:
LogoLR()
# Call module restore procedures and
# register RPCs for each Lightning-rod module
for mod_name in MODULES:
LOG.info("- Registering RPCs for module " + str(mod_name))
moduleWampRegister(session, RPC[mod_name])
LOG.info(' - version: ' +
str(get_version("iotronic-lightningrod")))
LOG.info(' - PID: ' + str(os.getpid()))
LOG.info("- Restoring module " + str(mod_name))
MODULES[mod_name].restore()
LOG.info("LR available modules: ")
for ep in pkg_resources.iter_entry_points(group='s4t.modules'):
LOG.info(" - " + str(ep))
# Register RPCs for the device
for dev in RPC_devices:
LOG.info("- Registering RPCs for device " + str(dev))
moduleWampRegister(session, RPC_devices[dev])
logging.register_options(CONF)
DOMAIN = "s4t-lightning-rod"
CONF(project='iotronic')
logging.setup(CONF, DOMAIN)
except Exception as err:
LOG.warning("Board modules reloading error: " + str(err))
Bye()
if (utils.checkIotronicConf(CONF)):
if CONF.debug:
txaio.start_logging(level="debug")
def moduleWampRegister(session, meth_list):
"""This function register for each module methods the relative RPC.
signal.signal(signal.SIGINT, self.stop_handler)
:param session:
:param meth_list:
LogoLR()
"""
LOG.info('Lightning-rod: ')
LOG.info(' - version: ' +
str(get_version("iotronic-lightningrod")))
LOG.info(' - PID: ' + str(os.getpid()))
LOG.info(' - Logs: ' + CONF.log_file)
LOG.info(" - Home: " + CONF.lightningrod_home)
LOG.info(" - Alive Check timer: " + str(CONF.alive_timer) +
" seconds")
LOG.info(" - RPC-Alive Check timer: " + str(CONF.rpc_alive_timer) +
" seconds")
if len(meth_list) == 2:
global board
board = Board()
LOG.info(" - No procedures to register!")
self.w = WampManager(board.wamp_config)
else:
self.w.start()
for meth in meth_list:
# We don't considere the "__init__", "finalize" and
# "restore" methods
if (meth[0] != "__init__") & (meth[0] != "finalize") \
& (meth[0] != "restore"):
rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0]
else:
Bye()
if not meth[0].startswith('_'):
session.register(meth[1], rpc_addr)
LOG.info(" --> " + str(meth[0]))
def stop_handler(self, signum, frame):
try:
# No zombie alert activation
zombie_alert = False
LOG.info("LR is shutting down...")
self.w.stop()
Bye()
except Exception as e:
LOG.error("Error closing LR")
def modulesLoader(session):
"""Modules loader method thorugh stevedore libraries.
:param session:
class WampManager(object):
"""WAMP Manager: through this LR manages the connection to Crossbar server.
"""
LOG.info("Available modules: ")
ep = []
for ep in pkg_resources.iter_entry_points(group='s4t.modules'):
LOG.info(" - " + str(ep))
if not ep:
LOG.info("No modules available!")
sys.exit()
else:
modules = extension.ExtensionManager(
namespace='s4t.modules',
# invoke_on_load=True,
# invoke_args=(session,),
)
def __init__(self, wamp_conf):
LOG.info('Modules to load:')
# wampConnect configures and manages the connection to Crossbar server.
wampConnect(wamp_conf)
for ext in modules.extensions:
def start(self):
LOG.info(" - starting Lightning-rod WAMP server...")
# LOG.debug(ext.name)
global loop
loop = asyncio.get_event_loop()
component.start(loop)
loop.run_forever()
if (ext.name == 'gpio') & (board.type == 'server'):
LOG.info("- GPIO module disabled for 'server' devices")
def stop(self):
LOG.info("Stopping WAMP agent server...")
# Canceling pending tasks and stopping the loop
asyncio.gather(*asyncio.Task.all_tasks()).cancel()
LOG.info("WAMP server stopped!")
else:
mod = ext.plugin(board, session)
global MODULES
MODULES[mod.name] = mod
async def wamp_checks(session):
# Methods list for each module
meth_list = inspect.getmembers(mod, predicate=inspect.ismethod)
while (True):
global RPC
RPC[mod.name] = meth_list
try:
if len(meth_list) == 3:
# there are at least two methods for each module:
# "__init__" and "finalize"
# LOG.debug("ALIVE sending...")
LOG.info(" - No RPC to register for "
+ str(ext.name) + " module!")
with timeoutALIVE(seconds=CONF.rpc_alive_timer, action="ws_alive"):
res = await session.call(
str(board.agent) + u'.stack4things.alive'
# board_uuid=board.uuid,
# board_name=board.name
)
else:
LOG.info(" - RPC list of " + str(mod.name) + ":")
moduleWampRegister(SESSION, meth_list)
LOG.debug("WampCheck attempt " + str(res))
# Call the finalize procedure for each module
mod.finalize()
except exception.ApplicationError as e:
LOG.error(" - Iotronic Connection RPC error: " + str(e))
# Iotronic is offline the board can not call
# the "stack4things.alive" RPC.
# The board will disconnect from WAMP agent and retry later.
global reconnection
reconnection = True
utils.destroyWampSocket()
LOG.info("Lightning-rod modules loaded.")
LOG.info("\n\nListening...")
try:
await asyncio.sleep(CONF.alive_timer)
except Exception as e:
LOG.warning(" - asyncio alert: " + str(e))
async def IotronicLogin(board, session, details):
@ -231,33 +237,37 @@ async def IotronicLogin(board, session, details):
rpc = str(board.agent) + u'.stack4things.connection'
with timeoutRPC(seconds=3, action=rpc):
with timeoutRPC(seconds=5, action=rpc):
res = await session.call(
rpc,
uuid=board.uuid,
session=details.session
)
w_msg = WM.deserialize(res)
w_msg = WM.deserialize(res)
if w_msg.result == WM.SUCCESS:
if w_msg.result == WM.SUCCESS:
LOG.info(" - Access granted to Iotronic.")
LOG.info(" - Access granted to Iotronic.")
# LOADING BOARD MODULES
try:
# WS ALIVE
asyncio.run_coroutine_threadsafe(wamp_checks(session), loop)
modulesLoader(session)
# LOADING BOARD MODULES
try:
except Exception as e:
LOG.warning("WARNING - Could not load modules: " + str(e))
modulesLoader(session)
# Reset flag to False
# reconnection = False
except Exception as e:
LOG.warning("WARNING - Could not load modules: " + str(e))
utils.LR_restart()
else:
LOG.error(" - Access denied to Iotronic.")
Bye()
# Reset flag to False
# reconnection = False
else:
LOG.error(" - Access denied to Iotronic.")
Bye()
except exception.ApplicationError as e:
LOG.error(" - Iotronic Connection RPC error: " + str(e))
@ -266,8 +276,9 @@ async def IotronicLogin(board, session, details):
# The board will disconnect from WAMP agent and retry later.
reconnection = True
# We stop the Component in order to trigger the onDisconnect event
component.stop()
# We restart Lightning-rod if RPC 'stack4things.connection' is not
# available, this means Wagent is unreachable
utils.LR_restart()
except Exception as e:
LOG.warning("Iotronic board connection error: " + str(e))
@ -284,11 +295,12 @@ def wampConnect(wamp_conf):
try:
LOG.info("WAMP status @ boot:" +
"\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) +
"\n- connected = " + str(connected)
)
LOG.info(
"WAMP status @ boot:" +
"\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) +
"\n- connected = " + str(connected)
)
wamp_transport = wamp_conf['url']
wurl_list = wamp_transport.split(':')
@ -306,6 +318,7 @@ def wampConnect(wamp_conf):
wamp_transport = [
{
"url": wamp_transport,
"serializers": ["json"],
"endpoint": {
"type": "tcp",
"host": whost,
@ -339,6 +352,8 @@ def wampConnect(wamp_conf):
"""
print("WAMP SOCKET: " + str(psutil.Process().connections()[0]))
global connected
connected = True
@ -368,7 +383,10 @@ def wampConnect(wamp_conf):
LOG.info(" - Joined in realm " + board.wamp_config['realm'] + ":")
LOG.info(" - WAMP Agent: " + str(board.agent))
print(" - WAMP Agent: " + str(board.agent) + " - "
+ str(wamp_conf['url']))
LOG.info(" - Session ID: " + str(board.session_id))
print(" - Session ID: " + str(board.session_id))
LOG.info(" - Board status: " + str(board.status))
if reconnection is False:
@ -389,7 +407,7 @@ def wampConnect(wamp_conf):
rpc = u'stack4things.register'
with timeoutRPC(seconds=3, action=rpc):
with timeoutRPC(seconds=5, action=rpc):
res = await session.call(
rpc,
code=board.code,
@ -418,9 +436,10 @@ def wampConnect(wamp_conf):
"\n\nDisconnecting from Registration Agent "
"to load new settings...\n\n")
# We stop the Component in order to trigger the
# onDisconnect event
component.stop()
# We restart Lightning-rod if RPC
# 'stack4things.connection' is not available,
# this means Wagent is unreachable
utils.LR_restart()
else:
LOG.error("Registration denied by Iotronic: "
@ -435,9 +454,11 @@ def wampConnect(wamp_conf):
# TO ACTIVE BOOT CONNECTION RECOVERY MODE
reconnection = True
# We stop the Component in order to trigger the
# onDisconnect event
component.stop()
# We restart Lightning-rod if RPC
# 'stack4things.connection' is not available,
# this means Wagent is unreachable
utils.LR_restart()
except Exception as e:
LOG.warning(
@ -494,7 +515,7 @@ def wampConnect(wamp_conf):
rpc = str(board.agent) + u'.stack4things.connection'
with timeoutRPC(seconds=3, action=rpc):
with timeoutRPC(seconds=5, action=rpc):
res = await session.call(
rpc,
uuid=board.uuid,
@ -505,7 +526,7 @@ def wampConnect(wamp_conf):
if w_msg.result == WM.SUCCESS:
LOG.info(" - Access granted to Iotronic.")
LOG.info(" - Access granted to Iotronic (recovery).")
# LOADING BOARD MODULES
# If the board is in WAMP connection recovery state
@ -521,6 +542,12 @@ def wampConnect(wamp_conf):
LOG.info("\n\nListening...\n\n")
# WS ALIVE
asyncio.run_coroutine_threadsafe(
wamp_checks(session),
loop
)
except Exception as e:
LOG.warning(
"WARNING - Could not reload modules: "
@ -540,9 +567,10 @@ def wampConnect(wamp_conf):
# TO ACTIVE WAMP CONNECTION RECOVERY MODE
reconnection = False
# We stop the Component in order to trigger the
# onDisconnect event
component.stop()
# We restart Lightning-rod if RPC 'stack4things.connection'
# is not available, this means Wagent is unreachable
utils.LR_restart()
except Exception as e:
LOG.warning("Board connection error after WAMP recovery: "
@ -551,16 +579,14 @@ def wampConnect(wamp_conf):
@component.on_leave
async def onLeave(session, details):
LOG.warning('WAMP Session Left: ' + str(details))
LOG.warning("WAMP Session Left: reason = " + str(details.reason))
@component.on_disconnect
async def onDisconnect(session, was_clean):
"""Procedure triggered on WAMP connection lost, for istance
when we call component.stop().
:param connector: WAMP connector object
:param reason: WAMP connection failure reason
"""Procedure triggered on WAMP connection lost.
:param session:
:param was_clean:
:return:
"""
LOG.warning('WAMP Transport Left: was_clean = ' + str(was_clean))
@ -569,11 +595,12 @@ def wampConnect(wamp_conf):
global reconnection
LOG.info("WAMP status on disconnect:" +
"\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) +
"\n- connected = " + str(connected)
)
LOG.info(
"WAMP status on disconnect:" +
"\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) +
"\n- connected = " + str(connected)
)
if board.status == "operative" and reconnection is False:
@ -582,13 +609,21 @@ def wampConnect(wamp_conf):
#################
# we need to recover wamp session and
# we set reconnection flag to True in order to activate
# the RPCs module registration procedure for each module
# the module-RPCs registration procedure for each module
reconnection = True
# LR needs to reconncet to WAMP
if not connected:
component.start(loop)
LOG.warning(".............WAMP DISCONNECTION.............")
LOG.info(
"WAMP status on disconnect:" +
"\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) +
"\n- connected = " + str(connected)
)
# component.start(loop)
elif board.status == "operative" and reconnection is True:
@ -601,15 +636,21 @@ def wampConnect(wamp_conf):
# calling "stack4things.connection" RPC...
# it means IoTronic is offline!
# We need to reset the recconnection flag to False in order to
# do not enter in RPCs module registration procedure...
# We need to reset the reconnection flag to False in order to
# do not enter in module-RPCs registration procedure...
# At this stage the board tries to reconnect to
# IoTronic until it will come online again.
reconnection = False
# LR needs to reconncet to WAMP
if not connected:
component.start(loop)
LOG.warning(".............WAMP DISCONNECTION.............")
LOG.info("WAMP status on disconnect:" +
"\n- board = " + str(board.status) +
"\n- reconnection = " + str(reconnection) +
"\n- connected = " + str(connected)
)
# component.start(loop)
elif (board.status == "registered"):
######################
@ -635,117 +676,141 @@ def wampConnect(wamp_conf):
Bye()
class WampManager(object):
"""WAMP Manager: through this LR manages the connection to Crossbar server.
def moduleWampRegister(session, meth_list):
"""This function register for each module methods the relative RPC.
:param session:
:param meth_list:
"""
def __init__(self, wamp_conf):
if len(meth_list) == 2:
# wampConnect configures and manages the connection to Crossbar server.
wampConnect(wamp_conf)
LOG.info(" - No procedures to register!")
def start(self):
LOG.info(" - starting Lightning-rod WAMP server...")
else:
global loop
loop = asyncio.get_event_loop()
component.start(loop)
loop.run_forever()
for meth in meth_list:
# We don't considere the "__init__", "finalize" and
# "restore" methods
if (meth[0] != "__init__") & (meth[0] != "finalize") \
& (meth[0] != "restore"):
"""
# TEMPORARY ------------------------------------------------------
from subprocess import call
LOG.debug("Unmounting...")
rpc_addr = u'iotronic.' + str(board.session_id) + '.' + \
board.uuid + '.' + meth[0]
try:
mountPoint = "/opt/BBB"
# errorCode = self.libc.umount(mountPoint, None)
errorCode = call(["umount", "-l", mountPoint])
if not meth[0].startswith('_'):
session.register(meth[1], rpc_addr)
LOG.info(" --> " + str(meth[0]))
LOG.debug("Unmount " + mountPoint + " result: " + str(errorCode))
except Exception as msg:
result = "Unmounting error:", msg
LOG.debug(result)
# ------------------------------------------------------------------
"""
def modulesLoader(session):
"""Modules loader method thorugh stevedore libraries.
def stop(self):
LOG.info("Stopping WAMP agent server...")
# Canceling pending tasks and stopping the loop
asyncio.gather(*asyncio.Task.all_tasks()).cancel()
LOG.info("WAMP server stopped!")
:param session:
"""
def Bye():
LOG.info("Bye!")
os._exit(1)
LOG.info("Available modules: ")
ep = []
def LogoLR():
LOG.info('##############################')
LOG.info(' Stack4Things Lightning-rod')
LOG.info('##############################')
for ep in pkg_resources.iter_entry_points(group='s4t.modules'):
LOG.info(" - " + str(ep))
if not ep:
def checkIotronicConf(lr_CONF):
try:
if(lr_CONF.log_file == None):
LOG.warning("'log_file' is not specified!")
return False
else:
print("View logs in " + lr_CONF.log_file)
return True
except Exception as err:
print(err)
return False
LOG.info("No modules available!")
sys.exit()
else:
class LightningRod(object):
modules = extension.ExtensionManager(
namespace='s4t.modules',
# invoke_on_load=True,
# invoke_args=(session,),
)
def __init__(self):
LOG.info('Modules to load:')
LogoLR()
for ext in modules.extensions:
LOG.info("LR available modules: ")
for ep in pkg_resources.iter_entry_points(group='s4t.modules'):
LOG.info(" - " + str(ep))
# LOG.debug(ext.name)
logging.register_options(CONF)
DOMAIN = "s4t-lightning-rod"
CONF(project='iotronic')
logging.setup(CONF, DOMAIN)
if (ext.name == 'gpio') & (board.type == 'server'):
LOG.info("- GPIO module disabled for 'server' devices")
if (checkIotronicConf(CONF)):
else:
mod = ext.plugin(board, session)
if CONF.debug:
txaio.start_logging(level="debug")
global MODULES
MODULES[mod.name] = mod
signal.signal(signal.SIGINT, self.stop_handler)
# Methods list for each module
meth_list = inspect.getmembers(mod, predicate=inspect.ismethod)
LogoLR()
global RPC
RPC[mod.name] = meth_list
global board
board = Board()
if len(meth_list) == 3:
# there are at least two methods for each module:
# "__init__" and "finalize"
LOG.info('Lightning-rod configurations:')
LOG.info(' - Logs: ' + CONF.log_file)
LOG.info(" - Current time: " + board.getTimestamp())
LOG.info(" - Home: " + CONF.lightningrod_home)
LOG.info(" - WebServices Proxy: " + CONF.proxy)
LOG.info(" - No RPC to register for "
+ str(ext.name) + " module!")
self.w = WampManager(board.wamp_config)
else:
LOG.info(" - RPC list of " + str(mod.name) + ":")
moduleWampRegister(SESSION, meth_list)
self.w.start()
# Call the finalize procedure for each module
mod.finalize()
else:
Bye()
LOG.info("Lightning-rod modules loaded.")
LOG.info("\n\nListening...")
def stop_handler(self, signum, frame):
LOG.info("LR is shutting down...")
self.w.stop()
Bye()
def moduleReloadInfo(session):
"""This function is used in the reconnection stage to register
again the RPCs of each module and for device.
:param session: WAMP session object.
"""
LOG.info("\n\nModules reloading after WAMP recovery...\n\n")
try:
# Call module restore procedures and
# register RPCs for each Lightning-rod module
for mod_name in MODULES:
LOG.info("- Registering RPCs for module " + str(mod_name))
moduleWampRegister(session, RPC[mod_name])
LOG.info("- Restoring module " + str(mod_name))
MODULES[mod_name].restore()
# Register RPCs for the device
for dev in RPC_devices:
LOG.info("- Registering RPCs for device " + str(dev))
moduleWampRegister(session, RPC_devices[dev])
except Exception as err:
LOG.warning("Board modules reloading error: " + str(err))
utils.LR_restart()
def Bye():
LOG.info("Bye!")
os._exit(1)
def LogoLR():
LOG.info('##############################')
LOG.info(' Stack4Things Lightning-rod')
LOG.info('##############################')
def main():

73
iotronic_lightningrod/modules/device_manager.py

@ -19,6 +19,9 @@ import importlib as imp
import inspect
import os
import subprocess
import sys
import threading
import time
from datetime import datetime
@ -27,6 +30,7 @@ from iotronic_lightningrod.lightningrod import RPC_devices
from iotronic_lightningrod.lightningrod import SESSION
from iotronic_lightningrod.modules import Module
from iotronic_lightningrod.modules import utils
import iotronic_lightningrod.wampmessage as WM
from oslo_log import log as logging
@ -85,7 +89,10 @@ class DeviceManager(Module.Module):
if (meth[0] != "__init__") & (meth[0] != "finalize"):
# LOG.info(" - " + str(meth[0]))
rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0]
# rpc_addr = u'iotronic.' + board.uuid + '.' + meth[0]
rpc_addr = u'iotronic.' + str(board.session_id) + '.' + \
board.uuid + '.' + meth[0]
# LOG.debug(" --> " + str(rpc_addr))
SESSION.register(meth[1], rpc_addr)
@ -94,23 +101,50 @@ class DeviceManager(Module.Module):
async def DevicePing(self):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
return datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
message = datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
w_msg = WM.WampSuccess(message)
return w_msg.serialize()
async def DeviceReboot(self):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
command = "reboot"
subprocess.call(command, shell=True)
def delayBoardReboot():
time.sleep(3)
subprocess.call("reboot", shell=True)
threading.Thread(target=delayBoardReboot).start()
message = "Rebooting board in few seconds @" + \
str(datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'))
w_msg = WM.WampSuccess(message)
return w_msg.serialize()
async def DeviceRestartLR(self):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
def delayLRrestarting():
time.sleep(2)
python = sys.executable
os.execl(python, python, *sys.argv)
return datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
threading.Thread(target=delayLRrestarting).start()
message = "Restarting LR in 5 seconds (" + \
datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') + ")..."
w_msg = WM.WampSuccess(message)
return w_msg.serialize()
async def DeviceHostname(self):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
command = "hostname"
# subprocess.call(command, shell=True)
out = subprocess.Popen(
command,
@ -119,23 +153,28 @@ class DeviceManager(Module.Module):
)
output = out.communicate()[0].decode('utf-8').strip()
print(output)
return str(output) + "@" + \
message = str(output) + "@" + \
str(datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'))
w_msg = WM.WampSuccess(message)
return w_msg.serialize()
"""
async def DeviceWampDisconnect(self):
async def DeviceNetConfig(self):
rpc_name = utils.getFuncName()
LOG.info("RPC " + rpc_name + " CALLED")
import threading, time
command = "ifconfig"
out = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE
)
def delayDisconnection():
time.sleep(5)
SESSION.disconnect()
output = out.communicate()[0].decode('utf-8').strip()
threading.Thread(target=delayDisconnection).start()
message = str(output)
w_msg = WM.WampSuccess(message)
return "Device disconnection in 5 seconds..."
"""
return w_msg.serialize()

13
iotronic_lightningrod/modules/network_manager.py

@ -42,19 +42,6 @@ class NetworkManager(Module.Module):
def restore(self):
pass
async def test_function(self):
import random
s = random.uniform(0.5, 1.5)
await asyncio.sleep(s)
result = "DEVICE test result: TEST!"
LOG.info(result)
return result
async def add(self, x, y):
c = x + y
LOG.info("DEVICE add result: " + str(c))
return c