diff --git a/iotronic/common/exception.py b/iotronic/common/exception.py index edfe3d1..1baa673 100644 --- a/iotronic/common/exception.py +++ b/iotronic/common/exception.py @@ -253,6 +253,10 @@ class NodeNotFound(NotFound): message = _("Node %(node)s could not be found.") +class NodeNotConnected(Invalid): + message = _("Node %(node)s is not connected.") + + class NodeAssociated(InvalidState): message = _("Node %(node)s is associated with instance %(instance)s.") diff --git a/iotronic/conductor/manager.py b/iotronic/conductor/manager.py index 458f6f4..c0efe31 100644 --- a/iotronic/conductor/manager.py +++ b/iotronic/conductor/manager.py @@ -67,8 +67,10 @@ from iotronic.common.i18n import _LW from iotronic.common import states # from iotronic.common import swift from iotronic.conductor import task_manager -from iotronic import objects +from iotronic.wamp.rpcwamp import RPC_Wamp +from iotronic.wamp.wampresponse import WampResponse +from iotronic import objects from iotronic.openstack.common import periodic_task @@ -623,6 +625,7 @@ class ConductorManager(periodic_task.PeriodicTasks): self.host = host self.topic = topic self.drivers = ['fake'] + self.wamp = RPC_Wamp() # self.power_state_sync_count = collections.defaultdict(int) # self.notifier = rpc.get_notifier() ''' @@ -710,9 +713,6 @@ class ConductorManager(periodic_task.PeriodicTasks): LOG.critical(_LC('Failed to start keepalive')) self.del_host() - # from iotronic.wamp.rpcwampserver import RPC_Wamp_Server - # RPC_Wamp_Server() - def _collect_periodic_tasks(self, obj): for n, method in inspect.getmembers(obj, inspect.ismethod): if getattr(method, '_periodic_enabled', False): @@ -1671,7 +1671,7 @@ class ConductorManager(periodic_task.PeriodicTasks): return ret_dict @messaging.expected_exceptions(exception.NodeLocked, - exception.NodeAssociated, + exception.NodeNotConnected, exception.InvalidState) def destroy_node(self, context, node_id): """Delete a node. @@ -1679,8 +1679,7 @@ class ConductorManager(periodic_task.PeriodicTasks): :param context: request context. :param node_id: node id or uuid. :raises: NodeLocked if node is locked by another conductor. - :raises: NodeAssociated if the node contains an instance - associated with it. + :raises: NodeNotConnected if the node is not connected :raises: InvalidState if the node is in the wrong provision state to perform deletion. @@ -1688,46 +1687,17 @@ class ConductorManager(periodic_task.PeriodicTasks): with task_manager.acquire(context, node_id) as task: node = task.node - node.destroy() - LOG.info(_LI('Successfully deleted node %(node)s.'), - {'node': node.uuid}) - # if node.instance_uuid is not None: - # raise exception.NodeAssociated(node=node.uuid, - # instance=node.instance_uuid) - - # TODO(lucasagomes): We should add ENROLLED once it's part of our - # state machine - # NOTE(lucasagomes): For the *FAIL states we users should - # move it to a safe state prior to deletion. This is because we - # should try to avoid deleting a node in a dirty/whacky state, - # e.g: A node in DEPLOYFAIL, if deleted without passing through - # tear down/cleaning may leave data from the previous tenant - # in the disk. So nodes in *FAIL states should first be moved to: - # CLEANFAIL -> MANAGEABLE - # INSPECTIONFAIL -> MANAGEABLE - # DEPLOYFAIL -> DELETING - # ZAPFAIL -> MANAGEABLE (in the future) - ''' - valid_states = (states.AVAILABLE, states.NOSTATE, - states.MANAGEABLE) - if node.provision_state not in valid_states: - msg = (_('Can not delete node "%(node)s" while it is in ' - 'provision state "%(state)s". Valid provision states ' - 'to perform deletion are: "%(valid_states)s"') % - {'node': node.uuid, 'state': node.provision_state, - 'valid_states': valid_states}) - raise exception.InvalidState(msg) - if node.console_enabled: - try: - task.driver.console.stop_console(task) - except Exception as err: - LOG.error(_LE('Failed to stop console while deleting ' - 'the node %(node)s: %(err)s.'), - {'node': node.uuid, 'err': err}) - node.destroy() - LOG.info(_LI('Successfully deleted node %(node)s.'), - {'node': node.uuid}) - ''' + r = WampResponse() + r.clearConfig() + response = self.wamp.rpc_call( + 'stack4things.' + node.uuid + '.configure', + r.getResponse()) + if response['result'] == 0: + node.destroy() + LOG.info(_LI('Successfully deleted node %(node)s.'), + {'node': node.uuid}) + else: + raise exception.NodeNotConnected(node=node.uuid) @messaging.expected_exceptions(exception.NodeLocked, exception.NodeNotFound) diff --git a/iotronic/wamp/clientwamp.py b/iotronic/wamp/clientwamp.py deleted file mode 100644 index 4c80e38..0000000 --- a/iotronic/wamp/clientwamp.py +++ /dev/null @@ -1,108 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from autobahn.twisted.util import sleep -from autobahn.twisted.wamp import ApplicationRunner -from autobahn.twisted.wamp import ApplicationSession -import multiprocessing -from twisted.internet.defer import inlineCallbacks - - -msg_queue = None - - -class Publisher(ApplicationSession): - - def onJoin(self, details): - print("Publisher session ready") - - -class Subscriber(ApplicationSession): - - @inlineCallbacks - def onJoin(self, details): - print("Subscriber session ready") - self.topic_reader = self.config.extra['topic'] - - def manage_msg(*args): - print (args) - - try: - yield self.subscribe(manage_msg, self.topic_reader) - print("subscribed to topic") - except Exception as e: - print("could not subscribe to topic: {0}".format(e)) - - global msg_queue - while True: - if not msg_queue.empty(): - msg = msg_queue.get() - self.publish(msg['topic'], msg['message']) - yield sleep(0.01) - - -class PublisherClient(object): - - def __init__(self, ip, port, realm): - self.ip = unicode(ip) - self.port = unicode(port) - self.realm = unicode(realm) - self._url = "ws://" + self.ip + ":" + self.port + "/ws" - self.runner = ApplicationRunner( - url=unicode(self._url), - realm=self.realm, - # debug=True, debug_wamp=True, - # debug_app=True - ) - - def start(self): - # Pass start_reactor=False to all runner.run() calls - self.runner.run(Publisher, start_reactor=False) - - -class SubscriberClient(object): - - def __init__(self, ip, port, realm, topic): - self.ip = unicode(ip) - self.port = unicode(port) - self.realm = unicode(realm) - self.topic = unicode(topic) - self._url = "ws://" + self.ip + ":" + self.port + "/ws" - self.runner = ApplicationRunner( - url=unicode(self._url), - realm=self.realm, - # debug=True, debug_wamp=True, - # debug_app=True - ) - - def start(self): - # Pass start_reactor=False to all runner.run() calls - self.runner.run(Subscriber, start_reactor=False) - - -class ClientWamp(object): - - def __init__(self, ip, port, realm, topic='board.connection'): - server = SubscriberClient(ip, port, realm, topic) - sendMessage = PublisherClient(ip, port, realm) - server.start() - sendMessage.start() - - from twisted.internet import reactor - global msg_queue - msg_queue = multiprocessing.Queue() - multi = multiprocessing.Process(target=reactor.run, args=()) - multi.start() - - def send(self, topic, msg): - full_msg = {'topic': unicode(topic), 'message': unicode(msg)} - msg_queue.put(full_msg) diff --git a/iotronic/wamp/functions.py b/iotronic/wamp/functions.py index df1b13d..c985a7d 100644 --- a/iotronic/wamp/functions.py +++ b/iotronic/wamp/functions.py @@ -12,41 +12,38 @@ from iotronic.common import exception from iotronic import objects +from iotronic.wamp.wampresponse import WampResponse from oslo_log import log LOG = log.getLogger(__name__) def leave_function(session_id): - LOG.debug('Node with %s disconnectd', session_id) + LOG.debug('A node with %s disconnectd', session_id) try: old_session = objects.SessionWP({}).get_by_session_id({}, session_id) old_session.valid = False old_session.save() LOG.debug('Session %s deleted', session_id) except Exception: - LOG.debug('Error in deleting session %s', session_id) + LOG.debug('session %s not found', session_id) -def test(): - LOG.debug('hello') - return u'hello!' +def echo(text): + LOG.debug(text) + return text def registration(code_node, session_num): - LOG.debug( - 'Receved registration from %s with session %s', - code_node, - session_num) - response = '' + LOG.debug('Receved registration from %s with session %s', + code_node, session_num) try: node = objects.Node.get_by_code({}, code_node) except Exception: - response = exception.NodeNotFound(node=code_node) + return exception.NodeNotFound(node=code_node) try: old_session = objects.SessionWP( - {}).get_session_by_node_uuid( - node.uuid, valid=True) + {}).get_session_by_node_uuid(node.uuid, valid=True) old_session.valid = False old_session.save() except Exception: @@ -59,4 +56,38 @@ def registration(code_node, session_num): session.create() session.save() - return unicode(response) + r = WampResponse() + + r.addSection('config', []) + r.addConfig('add', 'config:node:uuid', node.uuid, 'config') + r.addConfig('add', 'config:iotronic:command-agent', + {"url": "ws://cmd-iotronic", "port": "8181", + "realm": "s4t"}, 'config') + + return r.getResponse() + + +def registration_uuid(uuid, session_num): + LOG.debug('Receved registration from %s with session %s', + uuid, session_num) + try: + node = objects.Node.get_by_uuid({}, uuid) + except Exception: + return exception.NodeNotFound(node=uuid) + try: + old_session = objects.SessionWP( + {}).get_session_by_node_uuid(node.uuid, valid=True) + old_session.valid = False + old_session.save() + except Exception: + LOG.debug('valid session for %s not found', node.uuid) + + session = objects.SessionWP({}) + session.node_id = node.id + session.node_uuid = node.uuid + session.session_id = session_num + session.create() + session.save() + r = WampResponse() + r.addSection('result', 0) + return r.getResponse() diff --git a/iotronic/wamp/rpcwamp.py b/iotronic/wamp/rpcwamp.py new file mode 100644 index 0000000..066e557 --- /dev/null +++ b/iotronic/wamp/rpcwamp.py @@ -0,0 +1,171 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import multiprocessing + +from autobahn.twisted.wamp import ApplicationRunner +from autobahn.twisted.wamp import ApplicationSession +from twisted.internet.defer import inlineCallbacks + +from oslo_config import cfg +from oslo_log import log +from twisted.internet import reactor + + +from multiprocessing import Pipe + + +LOG = log.getLogger(__name__) + +wamp_opts = [ + cfg.StrOpt('wamp_ip', + default='127.0.0.1', + help=('URL of wamp broker')), + cfg.IntOpt('wamp_port', + default=8181, + help='port wamp broker'), + cfg.StrOpt('wamp_realm', + default='s4t', + help=('realm broker')), +] +CONF = cfg.CONF +CONF.register_opts(wamp_opts, 'wamp') + + +class RPCWampServerManager(ApplicationSession): + + def __init__(self, config=None): + ApplicationSession.__init__(self, config) + LOG.info("RPC wamp manager created") + + ''' + #unused methods + def onConnect(self): + print("transport connected") + self.join(self.config.realm) + + def onChallenge(self, challenge): + print("authentication challenge received") + + def onLeave(self, details): + print("session left") + import os, signal + os.kill(multi.pid, signal.SIGKILL) + + def onDisconnect(self): + print("transport disconnected") + ''' + + @inlineCallbacks + def onJoin(self, details): + LOG.info('RPC Wamp Session ready') + import iotronic.wamp.functions as fun + self.subscribe(fun.leave_function, 'wamp.session.on_leave') + + try: + yield self.register(fun.echo, + u'stack4things.echo') + yield self.register(fun.registration, + u'stack4things.register') + yield self.register(fun.registration_uuid, + u'stack4things.register_uuid') + + LOG.info("Procedures registered") + except Exception as e: + LOG.error("could not register procedure: {0}".format(e)) + + +class RPCWampServer(object): + + def __init__(self, ip, port, realm): + self.ip = unicode(ip) + self.port = unicode(port) + self.realm = unicode(realm) + self._url = "ws://" + self.ip + ":" + self.port + "/ws" + self.runner = ApplicationRunner( + url=unicode(self._url), + realm=self.realm, + # debug = True, debug_wamp = True, debug_app = True + ) + self.runner.run(RPCWampServerManager, start_reactor=False) + + +class RPCWampManagerClient(ApplicationSession): + """An application component calling the different backend procedures. + + """ + + @inlineCallbacks + def onJoin(self, details): + LOG.debug("session attached") + rpc = self.config.extra['rpc'] + args = self.config.extra['args'] + self.pipe_out = self.config.extra['pipe'] + res = {'response': '', 'error': ''} + try: + res['response'] = yield self.call(rpc, args) + res['error'] = 0 + except Exception as e: + LOG.error(e) + res['response'] = e + res['error'] = 1 + self.pipe_out.send(res) + self.leave() + + def onDisconnect(self): + reactor.stop() + + +class RPCWampClient(object): + + def __init__(self, ip, port, realm, rpc, args, b_a_ext): + self.ip = unicode(ip) + self.port = unicode(port) + self.realm = unicode(realm) + self._url = "ws://" + self.ip + ":" + self.port + "/ws" + + self.runner = ApplicationRunner( + url=unicode(self._url), + realm=self.realm, + extra={'rpc': rpc, 'args': args, 'pipe': b_a_ext}, + # debug = False, debug_wamp = False, debug_app = False + ) + self.runner.run(RPCWampManagerClient, start_reactor=False) + + +class RPC_Wamp(object): + + def __init__(self): + self.ip = unicode(CONF.wamp.wamp_ip) + self.port = unicode(CONF.wamp.wamp_port) + self.realm = unicode(CONF.wamp.wamp_realm) + self.server = RPCWampServer(self.ip, self.port, self.realm) + self.b_a_int, self.b_a_ext = Pipe() + server_process = multiprocessing.Process(target=reactor.run, args=()) + server_process.start() + + def rpc_call(self, rpc, *args): + res = '' + RPCWampClient( + self.ip, self.port, self.realm, rpc, args, self.b_a_ext) + client_process = multiprocessing.Process(target=reactor.run, args=()) + client_process.start() + + while True: + if self.b_a_int.poll(): + res = self.b_a_int.recv() + client_process.join() + break + if res['error'] == 0: + return res['response'] + else: + return {'result': 1} diff --git a/iotronic/wamp/rpcwampserver.py b/iotronic/wamp/rpcwampserver.py deleted file mode 100644 index 5dd0ba9..0000000 --- a/iotronic/wamp/rpcwampserver.py +++ /dev/null @@ -1,101 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from autobahn.twisted.wamp import ApplicationRunner -from autobahn.twisted.wamp import ApplicationSession -import multiprocessing -from oslo_config import cfg -from oslo_log import log -from twisted.internet.defer import inlineCallbacks -from twisted.internet import reactor - - -LOG = log.getLogger(__name__) - -wamp_opts = [ - cfg.StrOpt('wamp_ip', default='127.0.0.1', help='URL of wamp broker'), - cfg.IntOpt('wamp_port', default=8181, help='port wamp broker'), - cfg.StrOpt('wamp_realm', default='s4t', help='realm broker') -] - -CONF = cfg.CONF -CONF.register_opts(wamp_opts, 'wamp') - - -class RPCWampManager(ApplicationSession): - - def __init__(self, config=None): - ApplicationSession.__init__(self, config) - LOG.info("RPC wamp manager created") - - ''' - #unused methods - def onConnect(self): - print("transport connected") - self.join(self.config.realm) - - def onChallenge(self, challenge): - print("authentication challenge received") - - def onLeave(self, details): - print("session left") - import os, signal - os.kill(multi.pid, signal.SIGKILL) - - def onDisconnect(self): - print("transport disconnected") - ''' - - @inlineCallbacks - def onJoin(self, details): - LOG.info('RPC Wamp Session ready') - import iotronic.wamp.functions as fun - self.subscribe(fun.leave_function, 'wamp.session.on_leave') - - try: - yield self.register(fun.test, u'stack4things.test') - yield self.register(fun.registration, u'stack4things.register') - - LOG.info("Procedures registered") - except Exception as e: - print("could not register procedure: {0}".format(e)) - - -class RPCWampServer(object): - - def __init__(self, ip, port, realm): - self.ip = unicode(ip) - self.port = unicode(port) - self.realm = unicode(realm) - self._url = "ws://" + self.ip + ":" + self.port + "/ws" - self.runner = ApplicationRunner( - url=unicode(self._url), - realm=self.realm, - # debug=True, debug_wamp=True, - # debug_app=True - ) - - def start(self): - # Pass start_reactor=False to all runner.run() calls - self.runner.run(RPCWampManager, start_reactor=False) - - -class RPC_Wamp_Server(object): - - def __init__(self): - self.ip = unicode(CONF.wamp.wamp_ip) - self.port = unicode(CONF.wamp.wamp_port) - self.realm = unicode(CONF.wamp.wamp_realm) - server = RPCWampServer(self.ip, self.port, self.realm) - server.start() - multi = multiprocessing.Process(target=reactor.run, args=()) - multi.start() diff --git a/iotronic/wamp/wampresponse.py b/iotronic/wamp/wampresponse.py new file mode 100644 index 0000000..9286415 --- /dev/null +++ b/iotronic/wamp/wampresponse.py @@ -0,0 +1,55 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + + +class WampResponse(object): + + def __init__(self): + self.response = {} + + def getResponse(self): + return json.dumps(self.response) + + def addSection(self, name, value=''): + self.response[name] = value + + def addElement(self, position, value, section): + if isinstance(self.response[section], list): + self.response[section].append({"position": position, + "value": value}) + elif isinstance(self.response[section], dict): + self.response[section][position] = value + + def addConfig(self, action, position, value, section='config'): + if isinstance(self.response[section], list): + self.response[section].append({"action": action, + "position": position, + "value": value}) + + def removeSection(self, name): + self.response.pop(name, None) + + def clearSection(self, name): + self.response[name] = '' + + def clearConfig(self): + self.addSection('config', []) + self.addConfig('clear', 'config', {"iotronic": {"registration-agent": { + "url": "", + "port": "", + "realm": "" + }}, + "node": { + "token": ""} + })