Merge "Standalone servers for keepalive and receiver"
This commit is contained in:
commit
66bd881f81
@ -233,7 +233,7 @@ def action_run(params):
|
||||
if params.config_file:
|
||||
settings.update_from_file(params.config_file)
|
||||
from nailgun.wsgi import appstart
|
||||
appstart(keepalive=params.keepalive)
|
||||
appstart()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -11,8 +11,3 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from watcher import KeepAliveThread
|
||||
|
||||
keep_alive = KeepAliveThread()
|
61
nailgun/nailgun/assassin/assassind.py
Normal file
61
nailgun/nailgun/assassin/assassind.py
Normal file
@ -0,0 +1,61 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2013 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
|
||||
import time
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from sqlalchemy.sql import not_
|
||||
|
||||
from nailgun import notifier
|
||||
|
||||
from nailgun.db import db
|
||||
from nailgun.db.sqlalchemy.models import Node
|
||||
from nailgun.logger import logger
|
||||
from nailgun.settings import settings
|
||||
|
||||
|
||||
def update_nodes_status(timeout):
|
||||
to_update = db().query(Node).filter(
|
||||
not_(Node.status == 'provisioning')
|
||||
).filter(
|
||||
datetime.now() > (Node.timestamp + timedelta(seconds=timeout))
|
||||
).filter_by(online=True)
|
||||
for node_db in to_update:
|
||||
notifier.notify(
|
||||
"error",
|
||||
u"Node '{0}' has gone away".format(
|
||||
node_db.human_readable_name),
|
||||
node_id=node_db.id
|
||||
)
|
||||
to_update.update({"online": False})
|
||||
db().commit()
|
||||
|
||||
|
||||
def run():
|
||||
logger.info('Running Assassind...')
|
||||
try:
|
||||
while True:
|
||||
update_nodes_status(settings.KEEPALIVE['timeout'])
|
||||
time.sleep(settings.KEEPALIVE['interval'])
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
logger.info('Stopping Assassind...')
|
||||
sys.exit(1)
|
@ -1,85 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2013 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from itertools import repeat
|
||||
from sqlalchemy.sql import not_
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from nailgun import notifier
|
||||
|
||||
from nailgun.db import db
|
||||
from nailgun.db.sqlalchemy.models import Node
|
||||
from nailgun.logger import logger
|
||||
from nailgun.settings import settings
|
||||
|
||||
|
||||
class KeepAliveThread(threading.Thread):
|
||||
|
||||
def __init__(self, interval=None, timeout=None):
|
||||
super(KeepAliveThread, self).__init__()
|
||||
self.stop_status_checking = threading.Event()
|
||||
self.interval = interval or settings.KEEPALIVE['interval']
|
||||
self.timeout = timeout or settings.KEEPALIVE['timeout']
|
||||
|
||||
def reset_nodes_timestamp(self):
|
||||
db().query(Node).update({'timestamp': datetime.now()})
|
||||
db().commit()
|
||||
|
||||
def join(self, timeout=None):
|
||||
self.stop_status_checking.set()
|
||||
super(KeepAliveThread, self).join(timeout)
|
||||
|
||||
def sleep(self, interval=None):
|
||||
map(
|
||||
lambda i: not self.stop_status_checking.isSet() and time.sleep(i),
|
||||
repeat(1, interval or self.interval)
|
||||
)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
try:
|
||||
self.reset_nodes_timestamp()
|
||||
while not self.stop_status_checking.isSet():
|
||||
self.update_status_nodes()
|
||||
self.sleep()
|
||||
except Exception:
|
||||
logger.error(traceback.format_exc())
|
||||
time.sleep(1)
|
||||
|
||||
if self.stop_status_checking.isSet():
|
||||
break
|
||||
|
||||
def update_status_nodes(self):
|
||||
to_update = db().query(Node).filter(
|
||||
not_(Node.status == 'provisioning')
|
||||
).filter(
|
||||
datetime.now() > (Node.timestamp + timedelta(seconds=self.timeout))
|
||||
).filter_by(
|
||||
online=True
|
||||
)
|
||||
for node_db in to_update:
|
||||
notifier.notify(
|
||||
"error",
|
||||
u"Node '{0}' has gone away".format(
|
||||
node_db.human_readable_name),
|
||||
node_id=node_db.id
|
||||
)
|
||||
to_update.update({"online": False})
|
||||
db().commit()
|
66
nailgun/nailgun/rpc/receiverd.py
Normal file
66
nailgun/nailgun/rpc/receiverd.py
Normal file
@ -0,0 +1,66 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2013 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
|
||||
import traceback
|
||||
|
||||
from kombu import Connection
|
||||
from kombu.mixins import ConsumerMixin
|
||||
|
||||
from nailgun.db import db
|
||||
from nailgun.errors import errors
|
||||
from nailgun.logger import logger
|
||||
import nailgun.rpc as rpc
|
||||
from nailgun.rpc.receiver import NailgunReceiver
|
||||
|
||||
|
||||
class RPCConsumer(ConsumerMixin):
|
||||
|
||||
def __init__(self, connection, receiver):
|
||||
self.connection = connection
|
||||
self.receiver = receiver
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
return [Consumer(queues=[rpc.nailgun_queue],
|
||||
callbacks=[self.consume_msg])]
|
||||
|
||||
def consume_msg(self, body, msg):
|
||||
callback = getattr(self.receiver, body["method"])
|
||||
try:
|
||||
callback(**body["args"])
|
||||
db().commit()
|
||||
except errors.CannotFindTask as e:
|
||||
logger.warn(str(e))
|
||||
db().rollback()
|
||||
except Exception:
|
||||
logger.error(traceback.format_exc())
|
||||
db().rollback()
|
||||
finally:
|
||||
msg.ack()
|
||||
db().expire_all()
|
||||
|
||||
|
||||
def run():
|
||||
logger.info("Starting standalone RPC consumer...")
|
||||
with Connection(rpc.conn_str) as conn:
|
||||
try:
|
||||
RPCConsumer(conn, NailgunReceiver).run()
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
logger.info("Stopping standalone RPC consumer...")
|
44
nailgun/nailgun/test/integration/test_assassin.py
Normal file
44
nailgun/nailgun/test/integration/test_assassin.py
Normal file
@ -0,0 +1,44 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2013 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from nailgun.assassin import assassind
|
||||
from nailgun.test.base import BaseIntegrationTest
|
||||
|
||||
|
||||
class TestKeepalive(BaseIntegrationTest):
|
||||
|
||||
VERY_LONG_TIMEOUT = 60 * 60 # 1 hour
|
||||
ZERO_TIMEOUT = 0
|
||||
|
||||
def test_node_becomes_offline(self):
|
||||
node = self.env.create_node(
|
||||
status="discover",
|
||||
roles=["controller"],
|
||||
name="Dead or alive"
|
||||
)
|
||||
assassind.update_nodes_status(self.VERY_LONG_TIMEOUT)
|
||||
self.assertEqual(node.online, True)
|
||||
assassind.update_nodes_status(self.ZERO_TIMEOUT)
|
||||
self.assertEqual(node.online, False)
|
||||
|
||||
def test_provisioning_node_not_becomes_offline(self):
|
||||
node = self.env.create_node(
|
||||
status="provisioning",
|
||||
roles=["controller"],
|
||||
name="Dead or alive"
|
||||
)
|
||||
assassind.update_nodes_status(self.ZERO_TIMEOUT)
|
||||
self.assertEqual(node.online, True)
|
@ -1,60 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2013 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from nailgun.keepalive.watcher import KeepAliveThread
|
||||
from nailgun.test.base import BaseIntegrationTest
|
||||
|
||||
|
||||
class TestKeepalive(BaseIntegrationTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestKeepalive, self).setUp()
|
||||
self.watcher = KeepAliveThread(
|
||||
interval=2,
|
||||
timeout=1
|
||||
)
|
||||
self.watcher.start()
|
||||
self.timeout = self.watcher.interval + 10
|
||||
|
||||
def tearDown(self):
|
||||
self.watcher.join()
|
||||
super(TestKeepalive, self).tearDown()
|
||||
|
||||
def check_online(self, node, online):
|
||||
self.env.refresh_nodes()
|
||||
return node.online == online
|
||||
|
||||
def test_node_becomes_offline(self):
|
||||
node = self.env.create_node(status="discover",
|
||||
roles=["controller"],
|
||||
name="Dead or alive")
|
||||
|
||||
self.assertEquals(node.online, True)
|
||||
self.env.wait_for_true(
|
||||
self.check_online,
|
||||
args=[node, False],
|
||||
timeout=self.timeout)
|
||||
|
||||
def test_provisioning_node_not_becomes_offline(self):
|
||||
node = self.env.create_node(status="provisioning",
|
||||
roles=["controller"],
|
||||
name="Dead or alive")
|
||||
|
||||
time.sleep(self.watcher.interval + 2)
|
||||
self.env.refresh_nodes()
|
||||
self.assertEqual(node.online, True)
|
@ -67,7 +67,7 @@ def run_server(func, server_address=('0.0.0.0', 8080)):
|
||||
server.stop()
|
||||
|
||||
|
||||
def appstart(keepalive=False):
|
||||
def appstart():
|
||||
logger.info("Fuel version: %s", str(settings.VERSION))
|
||||
if not engine.dialect.has_table(engine.connect(), "nodes"):
|
||||
logger.error(
|
||||
@ -77,33 +77,10 @@ def appstart(keepalive=False):
|
||||
|
||||
app = build_app()
|
||||
|
||||
from nailgun.keepalive import keep_alive
|
||||
from nailgun.rpc import threaded
|
||||
|
||||
if keepalive:
|
||||
logger.info("Running KeepAlive watcher...")
|
||||
keep_alive.start()
|
||||
|
||||
if not settings.FAKE_TASKS:
|
||||
if not keep_alive.is_alive() \
|
||||
and not settings.FAKE_TASKS_AMQP:
|
||||
logger.info("Running KeepAlive watcher...")
|
||||
keep_alive.start()
|
||||
rpc_process = threaded.RPCKombuThread()
|
||||
logger.info("Running RPC consumer...")
|
||||
rpc_process.start()
|
||||
logger.info("Running WSGI app...")
|
||||
|
||||
wsgifunc = build_middleware(app.wsgifunc)
|
||||
|
||||
run_server(wsgifunc,
|
||||
(settings.LISTEN_ADDRESS, int(settings.LISTEN_PORT)))
|
||||
|
||||
logger.info("Stopping WSGI app...")
|
||||
if keep_alive.is_alive():
|
||||
logger.info("Stopping KeepAlive watcher...")
|
||||
keep_alive.join()
|
||||
if not settings.FAKE_TASKS:
|
||||
logger.info("Stopping RPC consumer...")
|
||||
rpc_process.join()
|
||||
logger.info("Done")
|
||||
|
@ -69,7 +69,9 @@ if __name__ == "__main__":
|
||||
'nailgun_fixtures = \
|
||||
nailgun.db.sqlalchemy.fixman:upload_fixtures',
|
||||
'nailgund = nailgun.wsgi:appstart',
|
||||
'nailgun_dump = nailgun.task.task:dump'
|
||||
'nailgun_dump = nailgun.task.task:dump',
|
||||
'assassind = nailgun.assassin.assassind:run',
|
||||
'receiverd = nailgun.rpc.receiverd:run'
|
||||
],
|
||||
},
|
||||
data_files=recursive_data_files([('share/nailgun', 'static')])
|
||||
|
Loading…
Reference in New Issue
Block a user