rpc thread created
This commit is contained in:
parent
ece293b1c5
commit
df2ffdacc9
@ -1,12 +1,23 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
import web
|
||||
from sqlalchemy.orm import scoped_session, sessionmaker
|
||||
from sqlalchemy.orm.query import Query
|
||||
|
||||
from api.models import engine
|
||||
|
||||
|
||||
class Query(Query):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._populate_existing = True
|
||||
super(Query, self).__init__(*args, **kwargs)
|
||||
|
||||
|
||||
def load_db_driver(handler):
|
||||
web.ctx.orm = scoped_session(sessionmaker(bind=engine))
|
||||
web.ctx.orm = scoped_session(
|
||||
sessionmaker(bind=engine, query_cls=Query)
|
||||
)
|
||||
try:
|
||||
return handler()
|
||||
except web.HTTPError:
|
||||
|
@ -76,6 +76,10 @@ if __name__ == "__main__":
|
||||
else:
|
||||
parser.print_help()
|
||||
elif params.action in ("run", "runwsgi"):
|
||||
from rpc import threaded
|
||||
q = threaded.rpc_queue
|
||||
rpc_thread = threaded.RPCThread()
|
||||
|
||||
if params.action == "run":
|
||||
app = web.application(urls, locals(), autoreload=True)
|
||||
else:
|
||||
@ -94,9 +98,11 @@ if __name__ == "__main__":
|
||||
app.wsgifunc()
|
||||
)
|
||||
try:
|
||||
rpc_thread.start()
|
||||
server.start()
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Stopping WSGI app...")
|
||||
q.put("exit")
|
||||
server.stop()
|
||||
logging.info("Done")
|
||||
elif params.action == "shell":
|
||||
|
97
nailgun/rpc/threaded.py
Normal file
97
nailgun/rpc/threaded.py
Normal file
@ -0,0 +1,97 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import time
|
||||
import Queue
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from sqlalchemy.orm import scoped_session, sessionmaker
|
||||
|
||||
import rpc
|
||||
from db import Query
|
||||
from api.models import engine
|
||||
|
||||
rpc_queue = Queue.Queue()
|
||||
|
||||
|
||||
class TestReceiver(object):
|
||||
"""Simple Proxy class so the consumer has methods to call.
|
||||
|
||||
Uses static methods because we aren't actually storing any state.
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def echo(value):
|
||||
"""Simply returns whatever value is sent in."""
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def multicall_three_nones(value):
|
||||
yield None
|
||||
yield None
|
||||
yield None
|
||||
|
||||
@staticmethod
|
||||
def echo_three_times_yield(value):
|
||||
yield value
|
||||
yield value + 1
|
||||
yield value + 2
|
||||
|
||||
@staticmethod
|
||||
def fail(value):
|
||||
"""Raises an exception with the value sent in."""
|
||||
raise Exception(value)
|
||||
|
||||
@staticmethod
|
||||
def block(value):
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
||||
|
||||
class RPCThread(threading.Thread):
|
||||
def __init__(self, testmode=False):
|
||||
super(RPCThread, self).__init__()
|
||||
self.queue = rpc_queue
|
||||
self.db = scoped_session(
|
||||
sessionmaker(bind=engine, query_cls=Query)
|
||||
)
|
||||
self.conn = rpc.create_connection(True)
|
||||
self.testmode = testmode
|
||||
if not self.testmode:
|
||||
self.receiver = TestReceiver()
|
||||
self.conn.create_consumer('test', self.receiver, False)
|
||||
self.conn.consume_in_thread()
|
||||
|
||||
def run(self):
|
||||
logging.info("Starting RPC thread...")
|
||||
self.running = True
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
msg = self.queue.get_nowait()
|
||||
getattr(self, msg)()
|
||||
if not self.running:
|
||||
break
|
||||
except Queue.Empty:
|
||||
pass
|
||||
try:
|
||||
ans = rpc.call('test', {
|
||||
"method": "echo",
|
||||
"args": {
|
||||
"value": 1
|
||||
}
|
||||
})
|
||||
# update db here with data
|
||||
except Exception as error:
|
||||
# update db here with error
|
||||
logging.info("ERROR!!!!")
|
||||
time.sleep(5)
|
||||
self.conn.close()
|
||||
|
||||
def exit(self):
|
||||
logging.info("Stopping RPC thread...")
|
||||
self.running = False
|
47
nailgun/test/test_db_refresh.py
Normal file
47
nailgun/test/test_db_refresh.py
Normal file
@ -0,0 +1,47 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from unittest import TestCase
|
||||
|
||||
from paste.fixture import TestApp
|
||||
from sqlalchemy.orm.events import orm
|
||||
|
||||
from api.models import engine, Node
|
||||
from db import dropdb, syncdb, flush, Query
|
||||
from manage import app
|
||||
|
||||
|
||||
class TestDBRefresh(TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
dropdb()
|
||||
syncdb()
|
||||
|
||||
def setUp(self):
|
||||
self.app = TestApp(app.wsgifunc())
|
||||
self.db = orm.scoped_session(
|
||||
orm.sessionmaker(bind=engine, query_cls=Query)
|
||||
)()
|
||||
self.db2 = orm.scoped_session(
|
||||
orm.sessionmaker(bind=engine, query_cls=Query)
|
||||
)()
|
||||
self.default_headers = {
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
flush()
|
||||
|
||||
def test_session_update(self):
|
||||
node = Node()
|
||||
node.mac = u"ASDFGHJKLMNOPR"
|
||||
self.db.add(node)
|
||||
self.db.commit()
|
||||
|
||||
node2 = self.db2.query(Node).filter(
|
||||
Node.id == node.id
|
||||
).first()
|
||||
node2.mac = u"12345678"
|
||||
self.db2.add(node2)
|
||||
self.db2.commit()
|
||||
node1 = self.db.query(Node).filter(
|
||||
Node.id == node.id
|
||||
).first()
|
||||
self.assertEquals(node.mac, u"12345678")
|
@ -1,3 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from datetime import time
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
Loading…
Reference in New Issue
Block a user