diff --git a/nailgun/db.py b/nailgun/db.py index 007bd8f3c..b0a103d92 100644 --- a/nailgun/db.py +++ b/nailgun/db.py @@ -1,12 +1,23 @@ # -*- coding: utf-8 -*- + import web from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.orm.query import Query + from api.models import engine +class Query(Query): + def __init__(self, *args, **kwargs): + self._populate_existing = True + super(Query, self).__init__(*args, **kwargs) + + def load_db_driver(handler): - web.ctx.orm = scoped_session(sessionmaker(bind=engine)) + web.ctx.orm = scoped_session( + sessionmaker(bind=engine, query_cls=Query) + ) try: return handler() except web.HTTPError: diff --git a/nailgun/manage.py b/nailgun/manage.py index 829d91b7c..bd6ddb060 100755 --- a/nailgun/manage.py +++ b/nailgun/manage.py @@ -76,6 +76,10 @@ if __name__ == "__main__": else: parser.print_help() elif params.action in ("run", "runwsgi"): + from rpc import threaded + q = threaded.rpc_queue + rpc_thread = threaded.RPCThread() + if params.action == "run": app = web.application(urls, locals(), autoreload=True) else: @@ -94,9 +98,11 @@ if __name__ == "__main__": app.wsgifunc() ) try: + rpc_thread.start() server.start() except KeyboardInterrupt: logging.info("Stopping WSGI app...") + q.put("exit") server.stop() logging.info("Done") elif params.action == "shell": diff --git a/nailgun/rpc/threaded.py b/nailgun/rpc/threaded.py new file mode 100644 index 000000000..c74ea927c --- /dev/null +++ b/nailgun/rpc/threaded.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- + +import time +import Queue +import logging +import threading + +from sqlalchemy.orm import scoped_session, sessionmaker + +import rpc +from db import Query +from api.models import engine + +rpc_queue = Queue.Queue() + + +class TestReceiver(object): + """Simple Proxy class so the consumer has methods to call. + + Uses static methods because we aren't actually storing any state. + + """ + + @staticmethod + def echo(value): + """Simply returns whatever value is sent in.""" + return value + + @staticmethod + def multicall_three_nones(value): + yield None + yield None + yield None + + @staticmethod + def echo_three_times_yield(value): + yield value + yield value + 1 + yield value + 2 + + @staticmethod + def fail(value): + """Raises an exception with the value sent in.""" + raise Exception(value) + + @staticmethod + def block(value): + time.sleep(2) + + +import eventlet +eventlet.monkey_patch() + + +class RPCThread(threading.Thread): + def __init__(self, testmode=False): + super(RPCThread, self).__init__() + self.queue = rpc_queue + self.db = scoped_session( + sessionmaker(bind=engine, query_cls=Query) + ) + self.conn = rpc.create_connection(True) + self.testmode = testmode + if not self.testmode: + self.receiver = TestReceiver() + self.conn.create_consumer('test', self.receiver, False) + self.conn.consume_in_thread() + + def run(self): + logging.info("Starting RPC thread...") + self.running = True + + while self.running: + try: + msg = self.queue.get_nowait() + getattr(self, msg)() + if not self.running: + break + except Queue.Empty: + pass + try: + ans = rpc.call('test', { + "method": "echo", + "args": { + "value": 1 + } + }) + # update db here with data + except Exception as error: + # update db here with error + logging.info("ERROR!!!!") + time.sleep(5) + self.conn.close() + + def exit(self): + logging.info("Stopping RPC thread...") + self.running = False diff --git a/nailgun/test/test_db_refresh.py b/nailgun/test/test_db_refresh.py new file mode 100644 index 000000000..2dcda4e83 --- /dev/null +++ b/nailgun/test/test_db_refresh.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- + +from unittest import TestCase + +from paste.fixture import TestApp +from sqlalchemy.orm.events import orm + +from api.models import engine, Node +from db import dropdb, syncdb, flush, Query +from manage import app + + +class TestDBRefresh(TestCase): + @classmethod + def setUpClass(cls): + dropdb() + syncdb() + + def setUp(self): + self.app = TestApp(app.wsgifunc()) + self.db = orm.scoped_session( + orm.sessionmaker(bind=engine, query_cls=Query) + )() + self.db2 = orm.scoped_session( + orm.sessionmaker(bind=engine, query_cls=Query) + )() + self.default_headers = { + "Content-Type": "application/json" + } + flush() + + def test_session_update(self): + node = Node() + node.mac = u"ASDFGHJKLMNOPR" + self.db.add(node) + self.db.commit() + + node2 = self.db2.query(Node).filter( + Node.id == node.id + ).first() + node2.mac = u"12345678" + self.db2.add(node2) + self.db2.commit() + node1 = self.db.query(Node).filter( + Node.id == node.id + ).first() + self.assertEquals(node.mac, u"12345678") diff --git a/nailgun/test/test_rpc.py b/nailgun/test/test_rpc.py index 3909e6205..dad483d37 100644 --- a/nailgun/test/test_rpc.py +++ b/nailgun/test/test_rpc.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + from datetime import time import eventlet eventlet.monkey_patch()