Reduced database overhead from agents

- added dedicated handler for node agents update only requests
 - caching data from agents to avoid db update with same data
 - nailgun responses with appropriate http statuses
 - changed agent update logic. Now it tries to update first and respects
   nailgun response statuses

Change-Id: I2658cf7561cd8c9116acced2443d072d471f3bdb
Implements: blueprint nailgun-agent-handler
Closes-Bug: #1274614
This commit is contained in:
demon.mhm 2014-02-27 15:38:21 +04:00
parent 06d22c86f0
commit 59ed8c081d
10 changed files with 219 additions and 85 deletions

View File

@ -26,6 +26,7 @@ require 'optparse'
require 'yaml'
require 'ipaddr'
require 'rethtool'
require 'digest'
unless Process.euid == 0
puts "You must be root"
@ -45,6 +46,20 @@ REMOVABLE_VENDORS = [
"Adaptec",
]
def digest(body)
if body.is_a? Hash
digest body.map { |k,v| [digest(k),digest(v)].join("=>") }.sort
elsif body.is_a? Array
body.map{ |v| digest v }.join('|')
else
[body.class.to_s, body.to_s].join(":")
end
end
def createsig(body)
Digest::SHA1.hexdigest( digest body )
end
class McollectiveConfig
def initialize(logger)
@logger = logger
@ -134,11 +149,9 @@ class NodeAgent
def put
headers = {"Content-Type" => "application/json"}
@logger.debug("Trying to put host info into #{@api_url}")
@logger.debug("Headers: #{headers}")
@logger.debug("Data: #{[_data].to_json}")
res = htclient.put("#{@api_url}/nodes/", [_data].to_json, headers)
res = htclient.put("#{@api_url}/nodes/agent/", _data.to_json, headers)
@logger.debug("Response: status: #{res.status} body: #{res.body}")
if res.status < 200 or res.status >= 300
if res.status < 200 or res.status >= 400
@logger.error("HTTP PUT failed: #{res.inspect}")
end
res
@ -147,8 +160,6 @@ class NodeAgent
def post
headers = {"Content-Type" => "application/json"}
@logger.debug("Trying to create host using #{@api_url}")
@logger.debug("Headers: #{headers}")
@logger.debug("Data: #{_data.to_json}")
res = htclient.post("#{@api_url}/nodes/", _data.to_json, headers)
@logger.debug("Response: status: #{res.status} body: #{res.body}")
res
@ -461,7 +472,7 @@ class NodeAgent
res = {
:mac => (@os[:macaddress] rescue nil),
:ip => (@os[:ipaddress] rescue nil),
:os_platform => (@os[:platform] rescue nil)
:os_platform => (@os[:platform] rescue nil),
}
begin
detailed_data = _detailed
@ -479,6 +490,7 @@ class NodeAgent
res[:status] = @node_state if @node_state
res[:is_agent] = true
res[:agent_checksum] = createsig(res)
res
end
@ -515,7 +527,6 @@ def write_data_to_file(logger, filename, data)
end
end
logger = Logger.new(STDOUT)
logger.level = Logger::DEBUG
@ -540,14 +551,20 @@ agent = NodeAgent.new(logger, url)
agent.update_state
begin
post_res = agent.post
if post_res.status == 409
put_res = agent.put
new_id = JSON.parse(put_res.body)[0]['id']
elsif post_res.status == 201
new_id = JSON.parse(post_res.body)['id']
put_res = agent.put
# nailgun returns 'Invalid MAC specified' for unregistered nodes
if [404, 400].include? put_res.status
post_res = agent.post
if post_res.status == 201
new_id = JSON.parse(post_res.body)[0]['id']
else
logger.error post_res.body
exit 1
end
elsif put_res.status == 200
new_id = JSON.parse(put_res.body)['id']
else
logger.error post_res.body
logger.error put_res.body
exit 1
end
mc_config = McollectiveConfig.new(logger)

View File

@ -57,13 +57,16 @@ def forbid_client_caching(handler):
@decorator
def content_json(func, *args, **kwargs):
web.header('Content-Type', 'application/json')
try:
data = func(*args, **kwargs)
except web.notmodified:
raise
except web.HTTPError as http_error:
web.header('Content-Type', 'application/json')
if isinstance(http_error.data, (dict, list)):
http_error.data = build_json_response(http_error.data)
raise
web.header('Content-Type', 'application/json')
return build_json_response(data)

View File

@ -332,17 +332,6 @@ class NodeCollectionHandler(BaseHandler):
else:
node = q.get(nd["id"])
is_agent = nd.pop("is_agent") if "is_agent" in nd else False
if is_agent:
node.timestamp = datetime.now()
if not node.online:
node.online = True
msg = u"Node '{0}' is back online".format(
node.human_readable_name)
logger.info(msg)
notifier.notify("discover", msg, node_id=node.id)
db().commit()
old_cluster_id = node.cluster_id
if nd.get("pending_roles") == [] and node.cluster:
@ -364,13 +353,6 @@ class NodeCollectionHandler(BaseHandler):
))
for key, value in nd.iteritems():
if is_agent and (key, value) == ("status", "discover") \
and node.status in ('provisioning', 'error'):
# We don't update provisioning and error back to discover
logger.debug(
"Node has provisioning or error status - "
"status not updated by agent")
continue
if key == "meta":
node.update_meta(value)
# don't update node ID
@ -419,11 +401,6 @@ class NodeCollectionHandler(BaseHandler):
network_manager = NetworkManager
if is_agent:
# Update node's NICs.
network_manager.update_interfaces_info(node)
db().commit()
nodes_updated.append(node.id)
if 'cluster_id' in nd and nd['cluster_id'] != old_cluster_id:
if old_cluster_id:
@ -443,6 +420,109 @@ class NodeCollectionHandler(BaseHandler):
return self.render(nodes)
class NodeAgentHandler(BaseHandler):
validator = NodeValidator
@content_json
def PUT(self):
""":returns: node id.
:http: * 200 (node are successfully updated)
* 304 (node data not changed since last request)
* 400 (invalid nodes data specified)
* 404 (node not found)
"""
nd = self.checked_data(
self.validator.validate_collection_update,
data=u'[{0}]'.format(web.data())
)[0]
q = db().query(Node)
if nd.get("mac"):
node = (
q.filter_by(mac=nd["mac"]).first()
or self.validator.validate_existent_node_mac_update(nd)
)
else:
node = q.get(nd["id"])
if not node:
raise web.notfound()
node.timestamp = datetime.now()
if not node.online:
node.online = True
msg = u"Node '{0}' is back online".format(node.human_readable_name)
logger.info(msg)
notifier.notify("discover", msg, node_id=node.id)
db().commit()
if 'agent_checksum' in nd and (
node.agent_checksum == nd['agent_checksum']
):
return {'id': node.id, 'cached': True}
for key, value in nd.iteritems():
if (
(key, value) == ("status", "discover")
and node.status in ('provisioning', 'error')
):
# We don't update provisioning and error back to discover
logger.debug(
u"Node {0} has provisioning or error status - "
u"status not updated by agent".format(
node.human_readable_name
)
)
continue
if key == "meta":
node.update_meta(value)
# don't update node ID
elif key != "id":
setattr(node, key, value)
db().commit()
if not node.attributes:
node.attributes = NodeAttributes()
db().commit()
if not node.attributes.volumes:
node.attributes.volumes = node.volume_manager.gen_volumes_info()
db().commit()
if node.status not in ('provisioning', 'deploying'):
variants = (
"disks" in node.meta and len(node.meta["disks"]) != len(
filter(
lambda d: d["type"] == "disk",
node.attributes.volumes
)
),
)
if any(variants):
try:
node.attributes.volumes = (
node.volume_manager.gen_volumes_info()
)
if node.cluster:
node.cluster.add_pending_changes(
"disks", node_id=node.id
)
except Exception as exc:
msg = (
"Failed to generate volumes info for node '{0}': '{1}'"
).format(
node.human_readable_name,
str(exc) or "see logs for details"
)
logger.warning(traceback.format_exc())
notifier.notify("error", msg, node_id=node.id)
db().commit()
NetworkManager.update_interfaces_info(node)
db().commit()
return {"id": node.id}
class NodeNICsHandler(BaseHandler):
"""Node network interfaces handler
"""

View File

@ -49,6 +49,7 @@ from nailgun.api.handlers.network_configuration \
from nailgun.api.handlers.network_configuration \
import NovaNetworkConfigurationVerifyHandler
from nailgun.api.handlers.node import NodeAgentHandler
from nailgun.api.handlers.node import NodeCollectionHandler
from nailgun.api.handlers.node import NodeHandler
from nailgun.api.handlers.node import NodesAllocationStatsHandler
@ -137,6 +138,8 @@ urls = (
r'/nodes/?$',
NodeCollectionHandler,
r'/nodes/agent/?$',
NodeAgentHandler,
r'/nodes/(?P<node_id>\d+)/?$',
NodeHandler,
r'/nodes/(?P<node_id>\d+)/disks/?$',

View File

@ -178,6 +178,10 @@ def upgrade():
new_task_names_options # new options
)
op.add_column('nodes', sa.Column(
'agent_checksum', sa.String(40), nullable=True
))
### end Alembic commands ###
@ -308,4 +312,5 @@ def downgrade():
)
op.drop_table('net_bond_assignments')
op.drop_table('node_bond_interfaces')
op.drop_column('nodes', 'agent_checksum')
### end Alembic commands ###

View File

@ -122,6 +122,8 @@ class Node(Base):
bond_interfaces = relationship("NodeBondInterface", backref="node",
cascade="delete",
order_by="NodeBondInterface.name")
# hash function from raw node agent request data - for caching purposes
agent_checksum = Column(String(40), nullable=True)
@property
def interfaces(self):

View File

@ -252,11 +252,11 @@ class TestHandlers(BaseIntegrationTest):
)
node_db = self.env.nodes[0]
resp = self.app.put(
reverse('NodeCollectionHandler'),
json.dumps([
{'mac': node_db.mac, 'is_agent': True,
reverse('NodeAgentHandler'),
json.dumps(
{'mac': node_db.mac,
'status': 'discover', 'manufacturer': 'new'}
]),
),
headers=self.default_headers
)
self.assertEquals(resp.status_code, 200)
@ -283,17 +283,42 @@ class TestHandlers(BaseIntegrationTest):
self.assertEquals(node.timestamp, timestamp)
resp = self.app.put(
reverse('NodeCollectionHandler'),
json.dumps([
reverse('NodeAgentHandler'),
json.dumps(
{'mac': node.mac, 'status': 'discover',
'manufacturer': 'new', 'is_agent': True}
]),
'manufacturer': 'new'}
),
headers=self.default_headers)
self.assertEquals(resp.status_code, 200)
node = self.db.query(Node).get(node.id)
self.assertNotEquals(node.timestamp, timestamp)
self.assertEquals('new', node.manufacturer)
def test_agent_caching(self):
node = self.env.create_node(api=False)
resp = self.app.put(
reverse('NodeAgentHandler'),
json.dumps({
'mac': node.mac,
'manufacturer': 'new',
'agent_checksum': 'test'
}),
headers=self.default_headers)
response = json.loads(resp.body)
self.assertEquals(resp.status_code, 200)
self.assertFalse('cached' in response and response['cached'])
resp = self.app.put(
reverse('NodeAgentHandler'),
json.dumps({
'mac': node.mac,
'manufacturer': 'new',
'agent_checksum': 'test'
}),
headers=self.default_headers)
response = json.loads(resp.body)
self.assertEquals(resp.status_code, 200)
self.assertTrue('cached' in response and response['cached'])
def test_node_create_ip_not_in_admin_range(self):
node = self.env.create_node(api=False)

View File

@ -458,10 +458,9 @@ class TestNodeNICAdminAssigning(BaseIntegrationTest):
interface['ip'] = admin_ip
resp = self.app.put(
reverse('NodeCollectionHandler'),
json.dumps([{'id': node_db.id,
'meta': meta,
'is_agent': True}]),
reverse('NodeAgentHandler'),
json.dumps({'id': node_db.id,
'meta': meta}),
headers=self.default_headers
)
self.assertEquals(resp.status_code, 200)

View File

@ -251,11 +251,10 @@ class TestNodeDefaultsDisksHandler(BaseIntegrationTest):
'disk': 'disk/id/b00b135'})
self.app.put(
reverse('NodeCollectionHandler'),
json.dumps([{
reverse('NodeAgentHandler'),
json.dumps({
"mac": node_db.mac,
"meta": new_meta,
"is_agent": True}]),
"meta": new_meta}),
headers=self.default_headers)
self.env.refresh_nodes()
@ -579,11 +578,10 @@ class TestVolumeManager(BaseIntegrationTest):
'disk': 'disk/id/a00b135'}]
self.app.put(
reverse('NodeCollectionHandler'),
json.dumps([{
reverse('NodeAgentHandler'),
json.dumps({
'mac': node.mac,
'meta': new_meta,
'is_agent': True}]),
'meta': new_meta}),
headers=self.default_headers)
def add_disk_to_node(self, node, size):
@ -599,11 +597,10 @@ class TestVolumeManager(BaseIntegrationTest):
'disk': 'disk/id/%s00b135' % string.letters[new_disk]})
self.app.put(
reverse('NodeCollectionHandler'),
json.dumps([{
reverse('NodeAgentHandler'),
json.dumps({
'mac': node.mac,
'meta': new_meta,
'is_agent': True}]),
'meta': new_meta}),
headers=self.default_headers)
def test_check_disk_space_for_deployment(self):

View File

@ -40,11 +40,10 @@ class TestHandlers(BaseIntegrationTest):
for nic_meta in meta_list:
meta = self.env.default_metadata()
meta.update(nic_meta)
node_data = {'mac': node['mac'], 'is_agent': True,
'meta': meta}
node_data = {'mac': node['mac'], 'meta': meta}
resp = self.app.put(
reverse('NodeCollectionHandler'),
json.dumps([node_data]),
reverse('NodeAgentHandler'),
json.dumps(node_data),
expect_errors=True,
headers=self.default_headers
)
@ -71,16 +70,19 @@ class TestHandlers(BaseIntegrationTest):
for nic_meta in meta_clean_list:
meta = self.env.default_metadata()
meta.update(nic_meta)
node_data = {'mac': node['mac'], 'is_agent': True,
'meta': meta}
node_data = {'mac': node['mac'], 'meta': meta}
resp = self.app.put(
reverse('NodeCollectionHandler'),
json.dumps([node_data]),
reverse('NodeAgentHandler'),
json.dumps(node_data),
expect_errors=True,
headers=self.default_headers
)
self.assertEquals(resp.status_code, 200)
ifaces = json.loads(resp.body)[0]["meta"]["interfaces"]
resp = self.app.get(
reverse('NodeNICsHandler', kwargs={'node_id': node['id']}),
headers=self.default_headers
)
ifaces = json.loads(resp.body)
self.assertEquals(ifaces, [])
def test_get_handler_with_invalid_speed_data(self):
@ -105,16 +107,19 @@ class TestHandlers(BaseIntegrationTest):
for nic_meta in meta_clean_list:
meta = self.env.default_metadata()
meta.update(nic_meta)
node_data = {'mac': node['mac'], 'is_agent': True,
'meta': meta}
node_data = {'mac': node['mac'], 'meta': meta}
resp = self.app.put(
reverse('NodeCollectionHandler'),
json.dumps([node_data]),
reverse('NodeAgentHandler'),
json.dumps(node_data),
expect_errors=True,
headers=self.default_headers
)
self.assertEquals(resp.status_code, 200)
ifaces = json.loads(resp.body)[0]["meta"]["interfaces"]
resp = self.app.get(
reverse('NodeHandler', kwargs={'node_id': node['id']}),
headers=self.default_headers
)
ifaces = json.loads(resp.body)['meta']['interfaces']
self.assertEquals(
ifaces,
[
@ -175,11 +180,10 @@ class TestHandlers(BaseIntegrationTest):
self.env.set_interfaces_in_meta(new_meta, [
{'name': 'new_nic', 'mac': '12345', 'current_speed': 10,
'max_speed': 10, 'state': 'down'}])
node_data = {'mac': node['mac'], 'is_agent': True,
'meta': new_meta}
node_data = {'mac': node['mac'], 'meta': new_meta}
resp = self.app.put(
reverse('NodeCollectionHandler'),
json.dumps([node_data]),
reverse('NodeAgentHandler'),
json.dumps(node_data),
headers=self.default_headers)
self.assertEquals(resp.status_code, 200)
@ -206,11 +210,10 @@ class TestHandlers(BaseIntegrationTest):
node = self.env.create_node(api=True, meta=meta)
meta['interfaces'].append({'name': 'new_nic', 'mac': '643'})
node_data = {'mac': node['mac'], 'is_agent': True,
'meta': meta}
node_data = {'mac': node['mac'], 'meta': meta}
resp = self.app.put(
reverse('NodeCollectionHandler'),
json.dumps([node_data]),
reverse('NodeAgentHandler'),
json.dumps(node_data),
headers=self.default_headers)
self.assertEquals(resp.status_code, 200)