Standalone servers for keepalive and receiver

- keepalive renamed to assasin (name conflicts) and starts as separate process controlled by supervisor
 - RPC receiver moved to separate server

Implements: blueprint nailgun-move-to-uwsgi

Change-Id: I75898a1975794a85e2335a1aa3dc56d073c3af39
This commit is contained in:
demon.mhm 2014-02-10 16:00:16 +04:00
parent 2df782e1a6
commit 44dd504911
9 changed files with 176 additions and 176 deletions

View File

@ -198,7 +198,7 @@ def action_run(params):
if params.config_file:
settings.update_from_file(params.config_file)
from nailgun.wsgi import appstart
appstart(keepalive=params.keepalive)
appstart()
if __name__ == "__main__":

View File

@ -11,8 +11,3 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from watcher import KeepAliveThread
keep_alive = KeepAliveThread()

View File

@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
import time
from datetime import datetime
from datetime import timedelta
from sqlalchemy.sql import not_
from nailgun import notifier
from nailgun.db import db
from nailgun.db.sqlalchemy.models import Node
from nailgun.logger import logger
from nailgun.settings import settings
def update_nodes_status(timeout):
to_update = db().query(Node).filter(
not_(Node.status == 'provisioning')
).filter(
datetime.now() > (Node.timestamp + timedelta(seconds=timeout))
).filter_by(online=True)
for node_db in to_update:
notifier.notify(
"error",
u"Node '{0}' has gone away".format(
node_db.human_readable_name),
node_id=node_db.id
)
to_update.update({"online": False})
db().commit()
def run():
logger.info('Running Assassind...')
try:
while True:
update_nodes_status(settings.KEEPALIVE['timeout'])
time.sleep(settings.KEEPALIVE['interval'])
except (KeyboardInterrupt, SystemExit):
logger.info('Stopping Assassind...')
sys.exit(1)

View File

@ -1,85 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from datetime import timedelta
from itertools import repeat
from sqlalchemy.sql import not_
import threading
import time
import traceback
from nailgun import notifier
from nailgun.db import db
from nailgun.db.sqlalchemy.models import Node
from nailgun.logger import logger
from nailgun.settings import settings
class KeepAliveThread(threading.Thread):
def __init__(self, interval=None, timeout=None):
super(KeepAliveThread, self).__init__()
self.stop_status_checking = threading.Event()
self.interval = interval or settings.KEEPALIVE['interval']
self.timeout = timeout or settings.KEEPALIVE['timeout']
def reset_nodes_timestamp(self):
db().query(Node).update({'timestamp': datetime.now()})
db().commit()
def join(self, timeout=None):
self.stop_status_checking.set()
super(KeepAliveThread, self).join(timeout)
def sleep(self, interval=None):
map(
lambda i: not self.stop_status_checking.isSet() and time.sleep(i),
repeat(1, interval or self.interval)
)
def run(self):
while True:
try:
self.reset_nodes_timestamp()
while not self.stop_status_checking.isSet():
self.update_status_nodes()
self.sleep()
except Exception:
logger.error(traceback.format_exc())
time.sleep(1)
if self.stop_status_checking.isSet():
break
def update_status_nodes(self):
to_update = db().query(Node).filter(
not_(Node.status == 'provisioning')
).filter(
datetime.now() > (Node.timestamp + timedelta(seconds=self.timeout))
).filter_by(
online=True
)
for node_db in to_update:
notifier.notify(
"error",
u"Node '{0}' has gone away".format(
node_db.human_readable_name),
node_id=node_db.id
)
to_update.update({"online": False})
db().commit()

View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
import traceback
from kombu import Connection
from kombu.mixins import ConsumerMixin
from nailgun.db import db
from nailgun.errors import errors
from nailgun.logger import logger
import nailgun.rpc as rpc
from nailgun.rpc.receiver import NailgunReceiver
class RPCConsumer(ConsumerMixin):
def __init__(self, connection, receiver):
self.connection = connection
self.receiver = receiver
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[rpc.nailgun_queue],
callbacks=[self.consume_msg])]
def consume_msg(self, body, msg):
callback = getattr(self.receiver, body["method"])
try:
callback(**body["args"])
db().commit()
except errors.CannotFindTask as e:
logger.warn(str(e))
db().rollback()
except Exception:
logger.error(traceback.format_exc())
db().rollback()
finally:
msg.ack()
db().expire_all()
def run():
logger.info("Starting standalone RPC consumer...")
with Connection(rpc.conn_str) as conn:
try:
RPCConsumer(conn, NailgunReceiver).run()
except (KeyboardInterrupt, SystemExit):
logger.info("Stopping standalone RPC consumer...")

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nailgun.assassin import assassind
from nailgun.test.base import BaseIntegrationTest
class TestKeepalive(BaseIntegrationTest):
VERY_LONG_TIMEOUT = 60 * 60 # 1 hour
ZERO_TIMEOUT = 0
def test_node_becomes_offline(self):
node = self.env.create_node(
status="discover",
roles=["controller"],
name="Dead or alive"
)
assassind.update_nodes_status(self.VERY_LONG_TIMEOUT)
self.assertEqual(node.online, True)
assassind.update_nodes_status(self.ZERO_TIMEOUT)
self.assertEqual(node.online, False)
def test_provisioning_node_not_becomes_offline(self):
node = self.env.create_node(
status="provisioning",
roles=["controller"],
name="Dead or alive"
)
assassind.update_nodes_status(self.ZERO_TIMEOUT)
self.assertEqual(node.online, True)

View File

@ -1,60 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from nailgun.keepalive.watcher import KeepAliveThread
from nailgun.test.base import BaseIntegrationTest
class TestKeepalive(BaseIntegrationTest):
def setUp(self):
super(TestKeepalive, self).setUp()
self.watcher = KeepAliveThread(
interval=2,
timeout=1
)
self.watcher.start()
self.timeout = self.watcher.interval + 10
def tearDown(self):
self.watcher.join()
super(TestKeepalive, self).tearDown()
def check_online(self, node, online):
self.env.refresh_nodes()
return node.online == online
def test_node_becomes_offline(self):
node = self.env.create_node(status="discover",
roles=["controller"],
name="Dead or alive")
self.assertEquals(node.online, True)
self.env.wait_for_true(
self.check_online,
args=[node, False],
timeout=self.timeout)
def test_provisioning_node_not_becomes_offline(self):
node = self.env.create_node(status="provisioning",
roles=["controller"],
name="Dead or alive")
time.sleep(self.watcher.interval + 2)
self.env.refresh_nodes()
self.assertEqual(node.online, True)

View File

@ -67,7 +67,7 @@ def run_server(func, server_address=('0.0.0.0', 8080)):
server.stop()
def appstart(keepalive=False):
def appstart():
logger.info("Fuel version: %s", str(settings.VERSION))
if not engine.dialect.has_table(engine.connect(), "nodes"):
logger.error(
@ -77,33 +77,10 @@ def appstart(keepalive=False):
app = build_app()
from nailgun.keepalive import keep_alive
from nailgun.rpc import threaded
if keepalive:
logger.info("Running KeepAlive watcher...")
keep_alive.start()
if not settings.FAKE_TASKS:
if not keep_alive.is_alive() \
and not settings.FAKE_TASKS_AMQP:
logger.info("Running KeepAlive watcher...")
keep_alive.start()
rpc_process = threaded.RPCKombuThread()
logger.info("Running RPC consumer...")
rpc_process.start()
logger.info("Running WSGI app...")
wsgifunc = build_middleware(app.wsgifunc)
run_server(wsgifunc,
(settings.LISTEN_ADDRESS, int(settings.LISTEN_PORT)))
logger.info("Stopping WSGI app...")
if keep_alive.is_alive():
logger.info("Stopping KeepAlive watcher...")
keep_alive.join()
if not settings.FAKE_TASKS:
logger.info("Stopping RPC consumer...")
rpc_process.join()
logger.info("Done")

View File

@ -69,7 +69,9 @@ if __name__ == "__main__":
'nailgun_fixtures = \
nailgun.db.sqlalchemy.fixman:upload_fixtures',
'nailgund = nailgun.wsgi:appstart',
'nailgun_dump = nailgun.task.task:dump'
'nailgun_dump = nailgun.task.task:dump',
'assassind = nailgun.assassin.assassind:run',
'receiverd = nailgun.rpc.receiverd:run'
],
},
data_files=recursive_data_files([('share/nailgun', 'static')])