[API][ADMIN_API][MGM]: Auto-failover code

* Refactor pool manager into a gearman worker
* Add delete and floating IP functions to pool manager
* Fix flake8 for API server
* Add new table so that admin APIs can track current builders
* Add support to Admin API to build nodes using gearman pool manager
* Add auto-delete (Nova and DB) of used devices
* Mark deleted load balancers as DELETED instead of OFFLINE/ERROR
* Move expunge handler thread from API to ADMIN_API
* Don't ERROR check all devices, just used ones
* Add vip pool scheduler
* Add vip support to node build/list/delete/floatingIP
* Use vip ID instead of device ID for API output
* Move DB and gearman from API into common section for Admin API
* Make stats thread rebuild a bad device

Change-Id: I11ee8d21610ccfdf551a0db6c4734d7fc44cced5
This commit is contained in:
Andrew Hutchings
2013-08-28 12:13:14 +01:00
parent 9ee1255d01
commit fc8d9ca1d0
37 changed files with 1185 additions and 948 deletions

View File

@@ -77,6 +77,7 @@ poll_timeout_retry = 30
db_sections=mysql1
ssl_certfile=certfile.crt
ssl_keyfile=keyfile.key
expire_days=7
[api]
host=0.0.0.0
@@ -88,7 +89,6 @@ swift_basepath=lbaaslogs
swift_endpoint=https://host.com:443/v1/
ssl_certfile=certfile.crt
ssl_keyfile=keyfile.key
expire_days=7
ip_filters=192.168.0.0/24
[mysql1]

View File

@@ -20,8 +20,11 @@ import grp
import pwd
import pecan
import sys
import signal
import os
from libra.admin_api.stats.scheduler import Stats
from libra.admin_api.device_pool.manage_pool import Pool
from libra.admin_api.expunge.expunge import ExpungeScheduler
from libra.admin_api import config as api_config
from libra.admin_api import model
from libra.admin_api.stats.drivers.base import known_drivers
@@ -72,6 +75,33 @@ def setup_app(pecan_config, args):
return app
class MaintThreads(object):
def __init__(self, logger, args, drivers):
self.classes = []
self.logger = logger
self.args = args
self.drivers = drivers
signal.signal(signal.SIGINT, self.exit_handler)
signal.signal(signal.SIGTERM, self.exit_handler)
self.run_threads()
def run_threads(self):
stats = Stats(self.logger, self.args, self.drivers)
pool = Pool(self.logger, self.args)
expunge = ExpungeScheduler(self.logger, self.args)
self.classes.append(stats)
self.classes.append(pool)
self.classes.append(expunge)
def exit_handler(self, signum, frame):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
for function in self.classes:
function.shutdown()
self.logger.info("Safely shutting down")
sys.exit()
class LogStdout(object):
def __init__(self, logger):
self.logger = logger.info
@@ -160,6 +190,18 @@ def main():
'--datadog_env', default='unknown',
help='Server enironment'
)
options.parser.add_argument(
'--node_pool_size', default=10, type=int,
help='Number of hot spare devices to keep in the pool'
)
options.parser.add_argument(
'--vip_pool_size', default=10, type=int,
help='Number of hot spare vips to keep in the pool'
)
options.parser.add_argument(
'--expire_days', default=0,
help='Number of days until deleted load balancers are expired'
)
args = options.run()
@@ -218,7 +260,7 @@ def main():
drivers.append(importutils.import_class(
known_drivers[driver]
))
Stats(logger, args, drivers)
MaintThreads(logger, args, drivers)
sys.stderr = LogStdout(logger)
# TODO: set ca_certs and cert_reqs=CERT_REQUIRED
ssl_sock = eventlet.wrap_ssl(

View File

@@ -20,8 +20,8 @@ from pecan.rest import RestController
import wsmeext.pecan as wsme_pecan
from wsme.exc import ClientSideError
from libra.admin_api.model.validators import DeviceResp, DevicePost, DevicePut
from libra.admin_api.model.lbaas import LoadBalancer, Device, db_session
from libra.admin_api.model.lbaas import loadbalancers_devices
from libra.common.api.lbaas import LoadBalancer, Device, db_session
from libra.common.api.lbaas import loadbalancers_devices
class DevicesController(RestController):

View File

@@ -1,4 +1,4 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain

View File

@@ -0,0 +1,350 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import ipaddress
from datetime import datetime
from libra.common.json_gearman import JSONGearmanClient
from gearman.constants import JOB_UNKNOWN
from sqlalchemy import func
from libra.common.api.lbaas import Device, PoolBuilding, Vip, db_session
#TODO: Lots of duplication of code here, need to cleanup
class Pool(object):
DELETE_SECONDS = 05
PROBE_SECONDS = 30
VIPS_SECONDS = 50
def __init__(self, logger, args):
self.logger = logger
self.args = args
self.probe_timer = None
self.delete_timer = None
self.vips_time = None
self.start_delete_sched()
self.start_probe_sched()
self.start_vips_sched()
def shutdown(self):
if self.probe_timer:
self.probe_timer.cancel()
if self.delete_timer:
self.delete_timer.cancel()
if self.vips_timer:
self.vips_timer.cancel()
def delete_devices(self):
""" Searches for all devices in the DELETED state and removes them """
minute = datetime.now().minute
if self.args.server_id != minute % self.args.number_of_servers:
self.logger.info('Not our turn to run delete check, sleeping')
self.start_delete_sched()
return
self.logger.info('Running device delete check')
try:
message = []
with db_session() as session:
devices = session.query(Device).\
filter(Device.status == 'DELETED').all()
for device in devices:
job_data = {
'action': 'DELETE_DEVICE',
'name': device.name
}
message.append(dict(task='libra_pool_mgm', data=job_data))
session.commit()
if not message:
self.logger.info("No devices to delete")
else:
gear = GearmanWork(self.args, self.logger)
gear.send_delete_message(message)
except:
self.logger.exception("Exception when deleting devices")
self.start_delete_sched()
def probe_vips(self):
minute = datetime.now().minute
if self.args.server_id != minute % self.args.number_of_servers:
self.logging.info('Not our turn to run vips check, sleeping')
self.start_vips_sched()
return
self.logger.info('Running vips count probe check')
try:
with db_session() as session:
NULL = None # For pep8
vip_count = session.query(Vip).\
filter(Vip.device == NULL).count()
if vip_count >= self.args.vip_pool_size:
self.logger.info("Enough vips exist, no work to do")
session.commit()
self.start_vips_sched()
return
build_count = self.args.vip_pool_size - vip_count
self._build_vips(build_count)
except:
self.logger.exception(
"Uncaught exception during vip pool expansion"
)
self.start_vips_sched()
def probe_devices(self):
minute = datetime.now().minute
if self.args.server_id != minute % self.args.number_of_servers:
self.logger.info('Not our turn to run probe check, sleeping')
self.start_probe_sched()
return
self.logger.info('Running device count probe check')
try:
with db_session() as session:
# Double check we have no outstanding builds assigned to us
session.query(PoolBuilding).\
filter(PoolBuilding.server_id == self.args.server_id).\
delete()
session.flush()
dev_count = session.query(Device).\
filter(Device.status == 'OFFLINE').count()
if dev_count >= self.args.node_pool_size:
self.logger.info("Enough devices exist, no work to do")
session.commit()
self.start_probe_sched()
return
build_count = self.args.node_pool_size - dev_count
built = session.query(func.sum(PoolBuilding.qty)).first()
if not built[0]:
built = 0
else:
built = built[0]
if build_count - built <= 0:
self.logger.info(
"Other servers are building enough nodes"
)
session.commit()
self.start_probe_sched()
return
build_count -= built
building = PoolBuilding()
building.server_id = self.args.server_id
building.qty = build_count
session.add(building)
session.commit()
# Closed the DB session because we don't want it hanging around
# for a long time locking tables
self._build_nodes(build_count)
with db_session() as session:
session.query(PoolBuilding).\
filter(PoolBuilding.server_id == self.args.server_id).\
delete()
session.commit()
except:
self.logger.exception("Uncaught exception during pool expansion")
self.start_probe_sched()
def _build_nodes(self, count):
message = []
it = 0
job_data = {'action': 'BUILD_DEVICE'}
while it < count:
message.append(dict(task='libra_pool_mgm', data=job_data))
it += 1
gear = GearmanWork(self.args, self.logger)
gear.send_create_message(message)
def _build_vips(self, count):
message = []
it = 0
job_data = {'action': 'BUILD_IP'}
while it < count:
message.append(dict(task='libra_pool_mgm', data=job_data))
it += 1
gear = GearmanWork(self.args, self.logger)
gear.send_vips_message(message)
def start_probe_sched(self):
seconds = datetime.now().second
if seconds < self.PROBE_SECONDS:
sleeptime = self.PROBE_SECONDS - seconds
else:
sleeptime = 60 - (seconds - self.PROBE_SECONDS)
self.logger.info('Pool probe check timer sleeping for {secs} seconds'
.format(secs=sleeptime))
self.probe_timer = threading.Timer(sleeptime, self.probe_devices, ())
self.probe_timer.start()
def start_vips_sched(self):
seconds = datetime.now().second
if seconds < self.VIPS_SECONDS:
sleeptime = self.VIPS_SECONDS - seconds
else:
sleeptime = 60 - (seconds - self.VIPS_SECONDS)
self.logger.info('Pool vips check timer sleeping for {secs} seconds'
.format(secs=sleeptime))
self.vips_timer = threading.Timer(sleeptime, self.probe_vips, ())
self.vips_timer.start()
def start_delete_sched(self):
seconds = datetime.now().second
if seconds < self.DELETE_SECONDS:
sleeptime = self.DELETE_SECONDS - seconds
else:
sleeptime = 60 - (seconds - self.DELETE_SECONDS)
self.logger.info('Pool delete check timer sleeping for {secs} seconds'
.format(secs=sleeptime))
self.delete_timer = threading.Timer(sleeptime, self.delete_devices, ())
self.delete_timer.start()
class GearmanWork(object):
def __init__(self, args, logger):
self.logger = logger
if all([args.gearman_ssl_key, args.gearman_ssl_cert,
args.gearman_ssl_ca]):
# Use SSL connections to each Gearman job server.
ssl_server_list = []
for server in args.gearman:
ghost, gport = server.split(':')
ssl_server_list.append({'host': ghost,
'port': int(gport),
'keyfile': args.gearman_ssl_key,
'certfile': args.gearman_ssl_cert,
'ca_certs': args.gearman_ssl_ca})
self.gearman_client = JSONGearmanClient(ssl_server_list)
else:
self.gearman_client = JSONGearmanClient(args.gearman)
def send_delete_message(self, message):
self.logger.info("Sending {0} gearman messages".format(len(message)))
job_status = self.gearman_client.submit_multiple_jobs(
message, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=30.0
)
delete_count = 0
for status in job_status:
if status.state == JOB_UNKNOWN:
self.logger.error('Gearman Job server fail')
continue
if status.timed_out:
self.logger.error('Gearman timeout whilst deleting device')
continue
if status.result['response'] == 'FAIL':
self.logger.error(
'Pool manager failed to delete a device, removing from DB'
)
delete_count += 1
with db_session() as session:
session.query(Device).\
filter(Device.name == status.result['name']).delete()
session.commit()
self.logger.info(
'{nodes} freed devices delete from pool'.format(nodes=delete_count)
)
def send_vips_message(self, message):
# TODO: make this gearman part more async, not wait for all builds
self.logger.info("Sending {0} gearman messages".format(len(message)))
job_status = self.gearman_client.submit_multiple_jobs(
message, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=3600.0
)
built_count = 0
for status in job_status:
if status.state == JOB_UNKNOWN:
self.logger.error('Gearman Job server fail')
continue
if status.timed_out:
self.logger.error('Gearman timeout whilst building vip')
continue
if status.result['response'] == 'FAIL':
self.logger.error('Pool manager failed to build a vip')
continue
built_count += 1
try:
self._add_vip(status.result)
except:
self.logger.exception(
'Could not add vip to DB, node data: {0}'
.format(status.result)
)
self.logger.info(
'{vips} vips built and added to pool'.format(vips=built_count)
)
def send_create_message(self, message):
# TODO: make this gearman part more async, not wait for all builds
self.logger.info("Sending {0} gearman messages".format(len(message)))
job_status = self.gearman_client.submit_multiple_jobs(
message, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=3600.0
)
built_count = 0
for status in job_status:
if status.state == JOB_UNKNOWN:
self.logger.error('Gearman Job server fail')
continue
if status.timed_out:
self.logger.error('Gearman timeout whilst building device')
continue
if status.result['response'] == 'FAIL':
self.logger.error('Pool manager failed to build a device')
continue
built_count += 1
try:
self._add_node(status.result)
except:
self.logger.exception(
'Could not add node to DB, node data: {0}'
.format(status.result)
)
self.logger.info(
'{nodes} devices built and added to pool'.format(nodes=built_count)
)
def _add_vip(self, data):
self.logger.info('Adding vip {0} to DB'.format(data['ip']))
vip = Vip()
vip.ip = int(ipaddress.IPv4Address(unicode(data['ip'])))
with db_session() as session:
session.add(vip)
session.commit()
def _add_node(self, data):
self.logger.info('Adding device {0} to DB'.format(data['name']))
device = Device()
device.name = data['name']
device.publicIpAddr = data['addr']
# TODO: kill this field, make things use publicIpAddr instead
device.floatingIpAddr = data['addr']
device.az = data['az']
device.type = data['type']
device.status = 'OFFLINE'
device.created = None
with db_session() as session:
session.add(device)
session.commit()

View File

@@ -1,4 +1,4 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -11,9 +11,3 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
known_modules = {
'build': 'libra.mgm.schedulers.build.BuildNodes',
'submit': 'libra.mgm.schedulers.submit.SubmitNodes'
}
modules = ['build', 'submit']

View File

@@ -13,37 +13,35 @@
# under the License.
import threading
import signal
import sys
from datetime import datetime, timedelta
from pecan import conf
from libra.api.model.lbaas import LoadBalancer, db_session
from libra.common.api.lbaas import LoadBalancer, db_session
class ExpungeScheduler(object):
def __init__(self, logger):
if not conf.expire_days:
def __init__(self, logger, args):
self.expunge_timer = None
if not args.expire_days:
logger.info('Expunge not configured, disabled')
return
self.logger = logger
self.expunge_timer = None
signal.signal(signal.SIGINT, self.exit_handler)
signal.signal(signal.SIGTERM, self.exit_handler)
self.args = args
self.run_expunge()
def exit_handler(self, signum, frame):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
def shutdown(self):
if self.expunge_timer:
self.expunge_timer.cancel()
self.logger.info('Safely shutting down')
sys.exit(0)
def run_expunge(self):
day = datetime.now().day
if self.args.server_id != day % self.args.number_of_servers:
self.logger.info('Not our turn to run expunge check, sleeping')
self.expunge_timer = threading.Timer(
24*60*60, self.run_expunge, ()
)
with db_session() as session:
try:
exp = datetime.now() - timedelta(
days=int(conf.expire_days)
days=int(self.args.expire_days)
)
exp_time = exp.strftime('%Y-%m-%d %H:%M:%S')
self.logger.info(

View File

@@ -1,191 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the 'License'); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Table, Column, Integer, ForeignKey, create_engine
from sqlalchemy import INTEGER, VARCHAR, BIGINT
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, backref, sessionmaker, Session
import sqlalchemy.types as types
import random
import time
import ConfigParser
from pecan import conf
import logging
DeclarativeBase = declarative_base()
metadata = DeclarativeBase.metadata
loadbalancers_devices = Table(
'loadbalancers_devices',
metadata,
Column('loadbalancer', Integer, ForeignKey('loadbalancers.id')),
Column('device', Integer, ForeignKey('devices.id'))
)
class FormatedDateTime(types.TypeDecorator):
'''formats date to match iso 8601 standards
'''
impl = types.DateTime
def process_result_value(self, value, dialect):
return value.strftime('%Y-%m-%dT%H:%M:%S')
class Limits(DeclarativeBase):
__tablename__ = 'global_limits'
id = Column(u'id', Integer, primary_key=True, nullable=False)
name = Column(u'name', VARCHAR(length=128), nullable=False)
value = Column(u'value', BIGINT(), nullable=False)
class Device(DeclarativeBase):
"""device model"""
__tablename__ = 'devices'
#column definitions
az = Column(u'az', INTEGER(), nullable=False)
created = Column(u'created', FormatedDateTime(), nullable=False)
floatingIpAddr = Column(
u'floatingIpAddr', VARCHAR(length=128), nullable=False
)
id = Column(u'id', BIGINT(), primary_key=True, nullable=False)
name = Column(u'name', VARCHAR(length=128), nullable=False)
publicIpAddr = Column(u'publicIpAddr', VARCHAR(length=128), nullable=False)
status = Column(u'status', VARCHAR(length=128), nullable=False)
type = Column(u'type', VARCHAR(length=128), nullable=False)
updated = Column(u'updated', FormatedDateTime(), nullable=False)
class LoadBalancer(DeclarativeBase):
"""load balancer model"""
__tablename__ = 'loadbalancers'
#column definitions
algorithm = Column(u'algorithm', VARCHAR(length=80), nullable=False)
errmsg = Column(u'errmsg', VARCHAR(length=128))
id = Column(u'id', BIGINT(), primary_key=True, nullable=False)
name = Column(u'name', VARCHAR(length=128), nullable=False)
port = Column(u'port', INTEGER(), nullable=False)
protocol = Column(u'protocol', VARCHAR(length=128), nullable=False)
status = Column(u'status', VARCHAR(length=50), nullable=False)
tenantid = Column(u'tenantid', VARCHAR(length=128), nullable=False)
updated = Column(u'updated', FormatedDateTime(), nullable=False)
created = Column(u'created', FormatedDateTime(), nullable=False)
nodes = relationship(
'Node', backref=backref('loadbalancers', order_by='Node.id')
)
devices = relationship(
'Device', secondary=loadbalancers_devices, backref='loadbalancers',
lazy='joined'
)
class Node(DeclarativeBase):
"""node model"""
__tablename__ = 'nodes'
#column definitions
address = Column(u'address', VARCHAR(length=128), nullable=False)
enabled = Column(u'enabled', Integer(), nullable=False)
id = Column(u'id', BIGINT(), primary_key=True, nullable=False)
lbid = Column(
u'lbid', BIGINT(), ForeignKey('loadbalancers.id'), nullable=False
)
port = Column(u'port', INTEGER(), nullable=False)
status = Column(u'status', VARCHAR(length=128), nullable=False)
weight = Column(u'weight', INTEGER(), nullable=False)
class RoutingSession(Session):
""" If an engine is already in use, re-use it. Otherwise we can end up
with deadlocks in Galera, see http://tinyurl.com/9h6qlly
switch engines every 60 seconds of idle time """
engines = []
last_engine = None
last_engine_time = 0
def get_bind(self, mapper=None, clause=None):
if not RoutingSession.engines:
self._build_engines()
if (
RoutingSession.last_engine
and time.time() < RoutingSession.last_engine_time + 60
):
RoutingSession.last_engine_time = time.time()
return RoutingSession.last_engine
engine = random.choice(RoutingSession.engines)
RoutingSession.last_engine = engine
RoutingSession.last_engine_time = time.time()
return engine
def _build_engines(self):
config = ConfigParser.SafeConfigParser()
config.read([conf.conffile])
for section in conf.database:
db_conf = config._sections[section]
conn_string = '''mysql://%s:%s@%s:%d/%s''' % (
db_conf['username'],
db_conf['password'],
db_conf['host'],
db_conf.get('port', 3306),
db_conf['schema']
)
if 'ssl_key' in db_conf:
ssl_args = {'ssl': {
'cert': db_conf['ssl_cert'],
'key': db_conf['ssl_key'],
'ca': db_conf['ssl_ca']
}}
engine = create_engine(
conn_string, isolation_level="READ COMMITTED",
pool_size=20, connect_args=ssl_args, pool_recycle=3600
)
else:
engine = create_engine(
conn_string, isolation_level="READ COMMITTED",
pool_size=20, pool_recycle=3600
)
RoutingSession.engines.append(engine)
class db_session(object):
def __init__(self):
self.session = None
self.logger = logging.getLogger(__name__)
def __enter__(self):
for x in xrange(10):
try:
self.session = sessionmaker(class_=RoutingSession)()
self.session.execute("SELECT 1")
return self.session
except:
self.logger.error(
'Could not connect to DB server: {0}'.format(
RoutingSession.last_engine.url
)
)
RoutingSession.last_engine = None
self.logger.error('Could not connect to any DB server')
return None
def __exit__(self, type, value, traceback):
self.session.close()
return False

View File

@@ -11,8 +11,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
from libra.admin_api.model.lbaas import Device, LoadBalancer, db_session
from libra.admin_api.model.lbaas import loadbalancers_devices
import logging
import ipaddress
from libra.common.api.lbaas import Device, LoadBalancer, db_session
from libra.common.api.lbaas import loadbalancers_devices, Vip
from libra.common.api.gearman_client import submit_job, submit_vip_job
from libra.admin_api.stats.drivers.base import AlertDriver
@@ -34,7 +37,7 @@ class DbDriver(AlertDriver):
errmsg = "Load Balancer has recovered"
lb_status = 'ACTIVE'
elif status == 'ERROR':
errmsg = "Load Balancer has failed"
errmsg = "Load Balancer has failed, attempting rebuild"
lb_status = status
else:
# This shouldnt happen
@@ -55,6 +58,7 @@ class DbDriver(AlertDriver):
session.flush()
session.commit()
self._rebuild_device(device_id)
def send_node_change(self, message, lbid, degraded):
@@ -78,3 +82,61 @@ class DbDriver(AlertDriver):
synchronize_session='fetch')
session.commit()
def _rebuild_device(self, device_id):
logger = logging.getLogger(__name__)
new_device_id = None
new_device_name = None
with db_session() as session:
new_device = session.query(Device).\
filter(~Device.id.in_(
session.query(loadbalancers_devices.c.device)
)).\
filter(Device.status == "OFFLINE").\
with_lockmode('update').\
first()
if new_device is None:
session.rollback()
logger.error(
'No spare devices when trying to rebuild device {0}'
.format(device_id)
)
return
new_device_id = new_device.id
new_device_name = new_device.name
logger.info(
"Moving device {0} to device {1}"
.format(device_id, new_device_id)
)
lbs = session.query(LoadBalancer).\
join(LoadBalancer.devices).\
filter(Device.id == device_id).all()
for lb in lbs:
lb.devices = [new_device]
lb.status = "ERROR(REBUILDING)"
submit_job(
'UPDATE', new_device.name, new_device.id, lbs[0].id
)
new_device.status = 'ONLINE'
session.commit()
with db_session() as session:
vip = session.query(Vip).filter(Vip.device == device_id).first()
vip.device = new_device_id
device = session.query(Device).\
filter(Device.id == device_id).first()
device.status = 'DELETED'
lbs = session.query(LoadBalancer).\
join(LoadBalancer.devices).\
filter(Device.id == device_id).all()
for lb in lbs:
lb.devices = [new_device]
lb.status = "ACTIVE"
lb.errmsg = "Load Balancer rebuild on new device"
logger.info(
"Moving IP {0} and marking device {1} for deletion"
.format(str(ipaddress.IPv4Address(vip.ip)), device_id)
)
submit_vip_job(
'ASSIGN', new_device_name, str(ipaddress.IPv4Address(vip.ip))
)
session.commit()

View File

@@ -13,10 +13,8 @@
# under the License.
import threading
import signal
import sys
from datetime import datetime
from libra.admin_api.model.lbaas import LoadBalancer, Device, Node, db_session
from libra.common.api.lbaas import LoadBalancer, Device, Node, db_session
from libra.admin_api.stats.stats_gearman import GearJobs
@@ -36,31 +34,18 @@ class Stats(object):
self.ping_timer = None
self.repair_timer = None
signal.signal(signal.SIGINT, self.exit_handler)
signal.signal(signal.SIGTERM, self.exit_handler)
logger.info("Selected stats drivers: {0}".format(args.stats_driver))
self.start_ping_sched()
self.start_repair_sched()
# TODO: completely remove repaid sched, rebuild instead
#self.start_repair_sched()
def exit_handler(self, signum, frame):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
self.shutdown(False)
def shutdown(self, error):
def shutdown(self):
if self.ping_timer:
self.ping_timer.cancel()
if self.repair_timer:
self.repair_timer.cancel()
if not error:
self.logger.info('Safely shutting down')
sys.exit(0)
else:
self.logger.info('Shutting down due to error')
sys.exit(1)
def repair_lbs(self):
# Work out if it is our turn to run
minute = datetime.now().minute
@@ -115,6 +100,8 @@ class Stats(object):
gearman = GearJobs(self.logger, self.args)
failed_lbs, node_status = gearman.send_pings(node_list)
failed = len(failed_lbs)
# TODO: if failed over a threshold (5?) error instead of rebuild,
# something bad probably happened
if failed > 0:
self._send_fails(failed_lbs, session)
session.commit()
@@ -130,9 +117,11 @@ class Stats(object):
node_list = []
self.logger.info('Running repair check')
with db_session() as session:
# Join to ensure device is in-use
devices = session.query(
Device.id, Device.name
).filter(Device.status == 'ERROR').all()
).join(LoadBalancer.devices).\
filter(Device.status == 'ERROR').all()
tested = len(devices)
if tested == 0:

View File

@@ -22,7 +22,6 @@ import pecan
import sys
import os
import wsme_overrides
from libra.api.library.expunge import ExpungeScheduler
from libra.api import config as api_config
from libra.api import model
from libra.api import acl
@@ -59,7 +58,6 @@ def setup_app(pecan_config, args):
'ssl_cert': args.gearman_ssl_cert,
'ssl_ca': args.gearman_ssl_ca
}
config['expire_days'] = args.expire_days
config['ip_filters'] = args.ip_filters
if args.debug:
config['wsme'] = {'debug': True}
@@ -150,10 +148,6 @@ def main():
'--ssl_keyfile',
help='Path to an SSL key file'
)
options.parser.add_argument(
'--expire_days', default=0,
help='Number of days until deleted load balancers are expired'
)
options.parser.add_argument(
'--ip_filters', action='append', default=[],
help='IP filters for backend nodes in the form xxx.xxx.xxx.xxx/yy'
@@ -215,7 +209,6 @@ def main():
logger = setup_logging('', args)
logger.info('Starting on {0}:{1}'.format(args.host, args.port))
api = setup_app(pc, args)
ExpungeScheduler(logger)
sys.stderr = LogStdout(logger)
ssl_sock = eventlet.wrap_ssl(
eventlet.listen((args.host, args.port)),

View File

@@ -18,11 +18,11 @@ from pecan.rest import RestController
import wsmeext.pecan as wsme_pecan
from wsme.exc import ClientSideError
from wsme import Unset
from libra.api.model.lbaas import LoadBalancer, db_session
from libra.api.model.lbaas import Device, HealthMonitor
from libra.common.api.lbaas import LoadBalancer, db_session
from libra.common.api.lbaas import Device, HealthMonitor
from libra.api.acl import get_limited_to_project
from libra.api.model.validators import LBMonitorPut, LBMonitorResp
from libra.api.library.gearman_client import submit_job
from libra.common.api.gearman_client import submit_job
from libra.api.library.exp import NotFound
@@ -220,7 +220,7 @@ class HealthMonitorController(RestController):
with db_session() as session:
query = session.query(
LoadBalancer, HealthMonitor
).outerjoin(LoadBalancer.monitors).\
).outerjoin(LoadBalancer.monitors).\
filter(LoadBalancer.tenantid == tenant_id).\
filter(LoadBalancer.id == self.lbid).\
filter(LoadBalancer.status != 'DELETED').\

View File

@@ -15,7 +15,7 @@
from pecan import expose
from pecan.rest import RestController
from libra.api.model.lbaas import Limits, db_session
from libra.common.api.lbaas import Limits, db_session
class LimitsController(RestController):

View File

@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress
# pecan imports
from pecan import expose, abort, response, request
from pecan.rest import RestController
@@ -25,11 +26,11 @@ from health_monitor import HealthMonitorController
from logs import LogsController
# models
from libra.api.model.lbaas import LoadBalancer, Device, Node, db_session
from libra.api.model.lbaas import loadbalancers_devices, Limits
from libra.common.api.lbaas import LoadBalancer, Device, Node, db_session
from libra.common.api.lbaas import loadbalancers_devices, Limits, Vip
from libra.api.model.validators import LBPut, LBPost, LBResp, LBVipResp
from libra.api.model.validators import LBRespNode
from libra.api.library.gearman_client import submit_job
from libra.common.api.gearman_client import submit_job, submit_vip_job
from libra.api.acl import get_limited_to_project
from libra.api.library.exp import OverLimit, IPOutOfRange, NotFound
from libra.api.library.ip_filter import ipfilter
@@ -96,8 +97,10 @@ class LoadBalancersController(RestController):
LoadBalancer.name, LoadBalancer.id, LoadBalancer.protocol,
LoadBalancer.port, LoadBalancer.algorithm,
LoadBalancer.status, LoadBalancer.created,
LoadBalancer.updated, LoadBalancer.statusDescription
LoadBalancer.updated, LoadBalancer.statusDescription,
Vip.id.label('vipid'), Vip.ip
).join(LoadBalancer.devices).\
join(Device.vip).\
filter(LoadBalancer.tenantid == tenant_id).\
filter(LoadBalancer.id == self.lbid).\
first()
@@ -109,21 +112,17 @@ class LoadBalancersController(RestController):
load_balancers = load_balancers._asdict()
load_balancers['nodeCount'] = session.query(Node).\
filter(Node.lbid == load_balancers['id']).count()
virtualIps = session.query(
Device.id, Device.floatingIpAddr
).join(LoadBalancer.devices).\
filter(LoadBalancer.tenantid == tenant_id).\
filter(LoadBalancer.id == self.lbid).\
all()
load_balancers['virtualIps'] = []
for item in virtualIps:
vip = item._asdict()
vip['type'] = 'PUBLIC'
vip['ipVersion'] = 'IPV4'
vip['address'] = vip['floatingIpAddr']
del(vip['floatingIpAddr'])
load_balancers['virtualIps'].append(vip)
load_balancers['virtualIps'] = [{
"id": load_balancers['vipid'],
"type": "PUBLIC",
"ipVersion": "IPV4",
"address": str(ipaddress.IPv4Address(
load_balancers['ip']
)),
}]
del(load_balancers['ip'])
del(load_balancers['vipid'])
nodes = session.query(
Node.id, Node.address, Node.port, Node.status, Node.enabled
@@ -259,18 +258,39 @@ class LoadBalancersController(RestController):
filter(Device.status == "OFFLINE").\
with_lockmode('update').\
first()
NULL = None # For pep8
vip = session.query(Vip).\
filter(Vip.device == NULL).\
with_lockmode('update').\
first()
vip.device = device.id
if device is None:
session.rollback()
raise RuntimeError('No devices available')
if vip is None:
session.rollback()
raise RuntimeError('No virtual IPs available')
vip.device = device.id
submit_vip_job(
'ASSIGN', device.name, str(ipaddress.IPv4Address(vip.ip))
)
else:
virtual_id = body.virtualIps[0].id
# This is an additional load balancer
device = session.query(
Device
).filter(Device.id == virtual_id).\
).join(Device.vip).\
filter(Vip.id == virtual_id).\
first()
old_lb = session.query(
LoadBalancer
).join(LoadBalancer.devices).\
join(Device.vip).\
filter(LoadBalancer.tenantid == tenant_id).\
filter(Device.id == virtual_id).\
filter(Vip.id == virtual_id).\
first()
vip = session.query(Vip).\
filter(Vip.device == device.id).\
first()
if old_lb is None:
session.rollback()
@@ -290,10 +310,6 @@ class LoadBalancersController(RestController):
'Only one load balancer per port allowed per device'
)
if device is None:
session.rollback()
raise RuntimeError('No devices available')
if body.algorithm:
lb.algorithm = body.algorithm.upper()
else:
@@ -333,8 +349,8 @@ class LoadBalancersController(RestController):
return_data.created = lb.created
return_data.updated = lb.updated
vip_resp = LBVipResp(
address=device.floatingIpAddr, id=str(device.id),
type='PUBLIC', ipVersion='IPV4'
address=str(ipaddress.IPv4Address(vip.ip)),
id=str(vip.id), type='PUBLIC', ipVersion='IPV4'
)
return_data.virtualIps = [vip_resp]
return_data.nodes = []
@@ -346,12 +362,9 @@ class LoadBalancersController(RestController):
return_data.nodes.append(out_node)
session.commit()
# trigger gearman client to create new lb
result = submit_job(
submit_job(
'UPDATE', device.name, device.id, lb.id
)
# do something with result
if result:
pass
return return_data
@wsme_pecan.wsexpose(None, body=LBPut, status_code=202)
@@ -429,11 +442,10 @@ class LoadBalancersController(RestController):
).join(LoadBalancer.devices).\
filter(LoadBalancer.id == load_balancer_id).\
first()
session.flush()
session.commit()
submit_job(
'DELETE', device.name, device.id, lb.id
)
session.commit()
return None
def usage(self, load_balancer_id):

View File

@@ -19,10 +19,10 @@ from pecan.rest import RestController
import wsmeext.pecan as wsme_pecan
from wsme.exc import ClientSideError
from wsme import Unset
from libra.api.model.lbaas import LoadBalancer, Device, db_session
from libra.common.api.lbaas import LoadBalancer, Device, db_session
from libra.api.acl import get_limited_to_project
from libra.api.model.validators import LBLogsPost
from libra.api.library.gearman_client import submit_job
from libra.common.api.gearman_client import submit_job
from libra.api.library.exp import NotFound

View File

@@ -19,12 +19,12 @@ import wsmeext.pecan as wsme_pecan
from wsme.exc import ClientSideError
from wsme import Unset
#default response objects
from libra.api.model.lbaas import LoadBalancer, Node, db_session, Limits
from libra.api.model.lbaas import Device
from libra.common.api.lbaas import LoadBalancer, Node, db_session, Limits
from libra.common.api.lbaas import Device
from libra.api.acl import get_limited_to_project
from libra.api.model.validators import LBNodeResp, LBNodePost, NodeResp
from libra.api.model.validators import LBNodePut
from libra.api.library.gearman_client import submit_job
from libra.common.api.gearman_client import submit_job
from libra.api.library.exp import OverLimit, IPOutOfRange, NotFound
from libra.api.library.ip_filter import ipfilter
from pecan import conf

View File

@@ -13,9 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress
from pecan import response, expose, request
from pecan.rest import RestController
from libra.api.model.lbaas import LoadBalancer, Device, db_session
from libra.common.api.lbaas import LoadBalancer, Vip, Device, db_session
from libra.api.acl import get_limited_to_project
@@ -42,13 +43,14 @@ class VipsController(RestController):
details="Load Balancer ID not provided"
)
with db_session() as session:
device = session.query(
Device.id, Device.floatingIpAddr
vip = session.query(
Vip.id, Vip.ip
).join(LoadBalancer.devices).\
join(Device.vip).\
filter(LoadBalancer.id == self.lbid).\
filter(LoadBalancer.tenantid == tenant_id).first()
if not device:
if not vip:
session.rollback()
response.status = 404
return dict(
@@ -57,8 +59,8 @@ class VipsController(RestController):
)
resp = {
"virtualIps": [{
"id": device.id,
"address": device.floatingIpAddr,
"id": vip.id,
"address": str(ipaddress.IPv4Address(vip.ip)),
"type": "PUBLIC",
"ipVersion": "IPV4"
}]

View File

@@ -15,10 +15,11 @@
import eventlet
eventlet.monkey_patch()
import logging
import ipaddress
from libra.common.json_gearman import JSONGearmanClient
from libra.api.model.lbaas import LoadBalancer, db_session, Device, Node
from libra.api.model.lbaas import HealthMonitor
from libra.api.model.lbaas import loadbalancers_devices
from libra.common.api.lbaas import LoadBalancer, db_session, Device, Node, Vip
from libra.common.api.lbaas import HealthMonitor
from libra.common.api.lbaas import loadbalancers_devices
from sqlalchemy.exc import OperationalError
from pecan import conf
@@ -39,6 +40,13 @@ def submit_job(job_type, host, data, lbid):
eventlet.spawn_n(client_job, logger, job_type, host, data, lbid)
def submit_vip_job(job_type, device, vip):
logger = logging.getLogger(__name__)
eventlet.spawn_n(
client_job, logger, job_type, "libra_pool_mgm", device, vip
)
def client_job(logger, job_type, host, data, lbid):
for x in xrange(5):
try:
@@ -54,6 +62,10 @@ def client_job(logger, job_type, host, data, lbid):
client.send_delete(data)
if job_type == 'ARCHIVE':
client.send_archive(data)
if job_type == 'ASSIGN':
client.send_assign(data)
if job_type == 'REMOVE':
client.send_remove(data)
return
except OperationalError:
# Auto retry on galera locking error
@@ -87,6 +99,31 @@ class GearmanClientThread(object):
else:
self.gearman_client = JSONGearmanClient(conf.gearman.server)
def send_assign(self, data):
job_data = {
'action': 'ASSIGN_IP',
'name': data,
'ip': self.lbid
}
status, response = self._send_message(job_data, 'response')
if not status:
self.logger.error(
"Failed to assign IP {0} to device {1}".format(self.lbid, data)
)
def send_remove(self, data):
job_data = {
'action': 'REMOVE_IP',
'name': data,
'ip': self.lbid
}
status, response = self._send_message(job_data, 'response')
if not status:
self.logger.error(
"Failed to remove IP {0} from device {1}"
.format(self.lbid, data)
)
def send_delete(self, data):
with db_session() as session:
count = session.query(
@@ -129,9 +166,17 @@ class GearmanClientThread(object):
job_data['loadBalancers'][0]['nodes'].append(node_data)
else:
# This is a delete
dev = session.query(Device.name).\
filter(Device.id == data).first()
vip = session.query(Vip).\
filter(Vip.device == data).first()
submit_vip_job(
'REMOVE', dev.name, str(ipaddress.IPv4Address(vip.ip))
)
vip.device = None
job_data = {"hpcs_action": "DELETE"}
status, response = self._send_message(job_data)
status, response = self._send_message(job_data, 'hpcs_response')
lb = session.query(LoadBalancer).\
filter(LoadBalancer.id == self.lbid).\
first()
@@ -145,9 +190,7 @@ class GearmanClientThread(object):
# Device should never be used again
device = session.query(Device).\
filter(Device.id == data).first()
#TODO: change this to 'DELETED' when pool mgm deletes
if device.status != 'ERROR':
device.status = 'OFFLINE'
device.status = 'DELETED'
# Remove LB-device join
session.execute(loadbalancers_devices.delete().where(
loadbalancers_devices.c.loadbalancer == lb.id
@@ -191,7 +234,7 @@ class GearmanClientThread(object):
'protocol': lb.protocol
}]
}
status, response = self._send_message(job_data)
status, response = self._send_message(job_data, 'hpcs_response')
device = session.query(Device).\
filter(Device.id == data['deviceid']).\
first()
@@ -266,7 +309,7 @@ class GearmanClientThread(object):
job_data['loadBalancers'].append(lb_data)
# Update the worker
status, response = self._send_message(job_data)
status, response = self._send_message(job_data, 'hpcs_response')
lb = session.query(LoadBalancer).\
filter(LoadBalancer.id == self.lbid).\
first()
@@ -279,7 +322,7 @@ class GearmanClientThread(object):
session.commit()
def _send_message(self, message):
def _send_message(self, message, response_name):
job_status = self.gearman_client.submit_job(
self.host, message, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=120.0
@@ -298,7 +341,7 @@ class GearmanClientThread(object):
if 'badRequest' in job_status.result:
error = job_status.result['badRequest']['validationErrors']
return False, error['message']
if job_status.result['hpcs_response'] == 'FAIL':
if job_status.result[response_name] == 'FAIL':
# Worker says 'no'
if 'hpcs_error' in job_status.result:
error = job_status.result['hpcs_error']

View File

@@ -52,6 +52,20 @@ class Limits(DeclarativeBase):
value = Column(u'value', BIGINT(), nullable=False)
class PoolBuilding(DeclarativeBase):
__tablename__ = 'pool_building'
id = Column(u'id', Integer, primary_key=True, nullable=False)
server_id = Column(u'server_id', Integer, nullable=False)
qty = Column(u'qty', Integer, nullable=False)
class Vip(DeclarativeBase):
__tablename__ = 'vips'
id = Column(u'id', Integer, primary_key=True, nullable=False)
ip = Column(u'ip', Integer, nullable=True)
device = Column(u'device', Integer, ForeignKey('devices.id'))
class Device(DeclarativeBase):
"""device model"""
__tablename__ = 'devices'
@@ -67,6 +81,7 @@ class Device(DeclarativeBase):
status = Column(u'status', VARCHAR(length=128), nullable=False)
type = Column(u'type', VARCHAR(length=128), nullable=False)
updated = Column(u'updated', FormatedDateTime(), nullable=False)
vip = relationship("Vip", uselist=False, backref="devices")
class LoadBalancer(DeclarativeBase):
@@ -91,7 +106,7 @@ class LoadBalancer(DeclarativeBase):
'HealthMonitor', backref=backref(
'loadbalancers',
order_by='HealthMonitor.lbid')
)
)
devices = relationship(
'Device', secondary=loadbalancers_devices, backref='loadbalancers',
lazy='joined'

View File

@@ -75,4 +75,18 @@ CREATE TABLE monitors (
PRIMARY KEY (lbid) # ids are unique accross all Nodes
) DEFAULT CHARSET utf8 DEFAULT COLLATE utf8_general_ci;
CREATE TABLE `pool_building` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`server_id` int(11) NOT NULL,
`qty` int(11) NOT NULL,
PRIMARY KEY (`id`),
KEY `server_id` (`server_id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE `vips` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`ip` int(11) DEFAULT NULL,
`device` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `device` (`device`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

View File

@@ -0,0 +1,13 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,118 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from time import sleep
from novaclient import exceptions
from libra.mgm.nova import Node, BuildError, NotFound
class BuildController(object):
RESPONSE_FIELD = 'response'
RESPONSE_SUCCESS = 'PASS'
RESPONSE_FAILURE = 'FAIL'
def __init__(self, logger, args, msg):
self.logger = logger
self.msg = msg
self.args = args
def run(self):
try:
nova = Node(self.args)
except Exception:
self.logger.exception("Error initialising Nova connection")
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
self.logger.info("Building a requested Nova instance")
try:
node_id = nova.build()
self.logger.info("Build command sent to Nova")
except BuildError as exc:
self.logger.exception(
"{0}, node {1}".format(exc.msg, exc.node_name)
)
name = exc.node_name
# Node may have built despite error
try:
node_id = self.get_node(name)
except NotFound:
self.logger.error(
"No node found for {0}, giving up on it".format(name)
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
except exceptions.ClientException:
self.logger.exception(
'Error getting failed node info from Nova for {0}'
.format(name)
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
if node_id > 0:
return self._wait_until_node_ready(nova, node_id)
else:
self.logger.error(
'Node build did not return an ID, cannot find it'
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
def _wait_until_node_ready(self, nova, node_id):
for x in xrange(1, 10):
try:
resp, status = nova.status(node_id)
except NotFound:
self.logger.error(
'Node {0} can no longer be found'.format(node_id)
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
except exceptions.ClientException:
self.logger.exception(
'Error getting status from Nova'
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
if resp.status_code not in(200, 203):
self.logger.error(
'Error geting status from Nova, error {0}'
.format(resp.status_code)
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
status = status['server']
if status['status'] == 'ACTIVE':
self.msg['name'] = status['name']
addresses = status['addresses']['private']
for address in addresses:
if not address['addr'].startswith('10.'):
break
self.msg['addr'] = address['addr']
self.msg['type'] = "basename: {0}, image: {1}".format(
self.args.node_basename, self.args.nova_image
)
self.msg['az'] = self.args.az
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
self.logger.info('Node {0} returned'.format(status['name']))
return self.msg
sleep(60)
nova.delete(node_id)
self.logger.error(
"Node {0} didn't come up after 10 minutes, deleted"
).format(node_id)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg

View File

@@ -0,0 +1,53 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from libra.mgm.nova import Node, NotFound
class DeleteController(object):
RESPONSE_FIELD = 'response'
RESPONSE_SUCCESS = 'PASS'
RESPONSE_FAILURE = 'FAIL'
def __init__(self, logger, args, msg):
self.logger = logger
self.msg = msg
self.args = args
def run(self):
try:
nova = Node(self.args)
except Exception:
self.logger.exception("Error initialising Nova connection")
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
self.logger.info(
"Deleting a requested Nova instance {0}".format(self.msg['name'])
)
try:
node_id = nova.get_node(self.msg['name'])
except NotFound:
self.logger.error(
"No node found for {0}".format(self.msg['name'])
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
nova.delete(node_id)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
self.logger.info(
'Deleted node {0}, id {1}'.format(self.msg['name'], node_id)
)
return self.msg

View File

@@ -0,0 +1,70 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from libra.mgm.controllers.build import BuildController
from libra.mgm.controllers.delete import DeleteController
from libra.mgm.controllers.vip import BuildIpController, AssignIpController
from libra.mgm.controllers.vip import RemoveIpController
class PoolMgmController(object):
ACTION_FIELD = 'action'
RESPONSE_FIELD = 'response'
RESPONSE_SUCCESS = 'PASS'
RESPONSE_FAILURE = 'FAIL'
def __init__(self, logger, args, json_msg):
self.logger = logger
self.msg = json_msg
self.args = args
def run(self):
if self.ACTION_FIELD not in self.msg:
self.logger.error("Missing `{0}` value".format(self.ACTION_FIELD))
self.msg[self.RESPONSE_FILED] = self.RESPONSE_FAILURE
return self.msg
action = self.msg[self.ACTION_FIELD].upper()
try:
if action == 'BUILD_DEVICE':
controller = BuildController(self.logger, self.args, self.msg)
elif action == 'DELETE_DEVICE':
controller = DeleteController(self.logger, self.args, self.msg)
elif action == 'BUILD_IP':
controller = BuildIpController(
self.logger, self.args, self.msg
)
elif action == 'ASSIGN_IP':
controller = AssignIpController(
self.logger, self.args, self.msg
)
elif action == 'REMOVE_IP':
controller = RemoveIpController(
self.logger, self.args, self.msg
)
else:
self.logger.error(
"Invalid `{0}` value: {1}".format(
self.ACTION_FIELD, action
)
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
return controller.run()
except Exception:
self.logger.exception("Controller exception")
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg

View File

@@ -0,0 +1,127 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from novaclient import exceptions
from libra.mgm.nova import Node
class BuildIpController(object):
RESPONSE_FIELD = 'response'
RESPONSE_SUCCESS = 'PASS'
RESPONSE_FAILURE = 'FAIL'
def __init__(self, logger, args, msg):
self.logger = logger
self.msg = msg
self.args = args
def run(self):
try:
nova = Node(self.args)
except Exception:
self.logger.exception("Error initialising Nova connection")
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
self.logger.info("Creating a requested floating IP")
try:
ip_info = nova.vip_create()
except exceptions.ClientException:
self.logger.exception(
'Error getting a Floating IP'
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
self.logger.info("Floating IP {0} created".format(ip_info['id']))
self.msg['id'] = ip_info['id']
self.msg['ip'] = ip_info['ip']
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
return self.msg
class AssignIpController(object):
RESPONSE_FIELD = 'response'
RESPONSE_SUCCESS = 'PASS'
RESPONSE_FAILURE = 'FAIL'
def __init__(self, logger, args, msg):
self.logger = logger
self.msg = msg
self.args = args
def run(self):
try:
nova = Node(self.args)
except Exception:
self.logger.exception("Error initialising Nova connection")
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
self.logger.info(
"Assigning Floating IP {0} to {1}"
.format(self.msg['ip'], self.msg['name'])
)
try:
node_id = nova.get_node(self.msg['name'])
nova.vip_assign(node_id, self.msg['ip'])
except:
self.logger.exception(
'Error assigning Floating IP {0} to {1}'
.format(self.msg['ip'], self.msg['name'])
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
return self.msg
class RemoveIpController(object):
RESPONSE_FIELD = 'response'
RESPONSE_SUCCESS = 'PASS'
RESPONSE_FAILURE = 'FAIL'
def __init__(self, logger, args, msg):
self.logger = logger
self.msg = msg
self.args = args
def run(self):
try:
nova = Node(self.args)
except Exception:
self.logger.exception("Error initialising Nova connection")
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
self.logger.info(
"Removing Floating IP {0} from {1}"
.format(self.msg['ip'], self.msg['name'])
)
try:
node_id = nova.get_node(self.msg['name'])
nova.vip_remove(node_id, self.msg['ip'])
except:
self.logger.exception(
'Error removing Floating IP {0} from {1}'
.format(self.msg['ip'], self.msg['name'])
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS
return self.msg

View File

@@ -1,45 +0,0 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# Mapping of --driver options to a class
known_drivers = {
'hp_rest': 'libra.mgm.drivers.hp_rest.driver.HPRestDriver',
'dummy': 'libra.mgm.drivers.dummy.driver.DummyDriver'
}
class MgmDriver(object):
"""
Pool manager device driver base class.
This defines the API for interacting with various APIs.
Drivers for these appliances should inherit from this class and implement
the relevant API methods that it can support.
"""
def get_free_count(self):
""" Get a count of how many nodes are free. """
raise NotImplementedError()
def add_node(self, name, address):
""" Add a node to a device. """
raise NotImplementedError()
def is_online(self):
""" Returns false if no API server is available """
raise NotImplementedError()
def get_url(self):
""" Gets the URL we are currently connected to """
raise NotImplementedError()

View File

@@ -1,35 +0,0 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
from libra.mgm.drivers.base import MgmDriver
class DummyDriver(MgmDriver):
"""
Pool manager dummy driver for testing
"""
def __init__(self, addresses, logger):
self.logger = logger
def get_free_count(self):
return 5
def is_online(self):
return True
def add_node(self, node_data):
self.logger.info('Dummy API send of {0}'.format(node_data))
return True, 'test response'
def get_url(self):
return 'Dummy Connection'

View File

@@ -1,112 +0,0 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import requests
import json
import random
import sys
from libra.mgm.drivers.base import MgmDriver
API_VERSION = 'v1'
class HPRestDriver(MgmDriver):
def __init__(self, addresses, logger):
self.logger = logger
self.headers = {'Content-type': 'application/json'}
random.shuffle(addresses)
for address in addresses:
self.url = 'https://{0}/{1}'.format(address, API_VERSION)
self.logger.info('Trying {url}'.format(url=self.url))
status, data = self._get('{url}/devices/usage'
.format(url=self.url))
if status:
self.logger.info('API Server is online')
self.online = True
return
# if we get this far all API servers are down
self.online = False
def get_url(self):
return self.url
def get_free_count(self):
status, usage = self.get_usage()
if not status:
return None
return usage['free']
def is_online(self):
return self.online
def get_node_list(self, limit, marker):
return self._get('{url}/devices'.format(url=self.url))
def get_usage(self):
return self._get('{url}/devices/usage'.format(url=self.url))
def get_node(self, node_id):
return self._get(
'{url}/devices/{nid}'.format(url=self.url, nid=node_id)
)
def add_node(self, node_data):
return self._post('{url}/devices'.format(url=self.url), node_data)
def delete_node(self, node_id):
requests.delete(
'{url}/devices/{nid}'.format(url=self.url, nid=node_id),
timeout=30
)
def update_node(self, node_id, node_data):
requests.put(
'{url}/devices/{nid}'.format(url=self.url, nid=node_id),
json.dumps(node_data),
timeout=30
)
def _get(self, url):
try:
r = requests.get(url, verify=False, timeout=30)
except requests.exceptions.RequestException:
self.logger.error('Exception communicating to server: {exc}'
.format(exc=sys.exc_info()[0]))
return False, None
if r.status_code != 200:
self.logger.error('Server returned error {code}'
.format(code=r.status_code))
return False, r.json()
return True, r.json()
def _post(self, url, node_data):
try:
r = requests.post(
url, data=json.dumps(node_data), verify=False,
headers=self.headers, timeout=30
)
except requests.exceptions.RequestException:
self.logger.error('Exception communicating to server: {exc}'
.format(exc=sys.exc_info()[0]))
return False, None
if r.status_code != 200:
self.logger.error('Server returned error {code}'
.format(code=r.status_code))
return False, r.json()
return True, r.json()

View File

@@ -0,0 +1,70 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gearman.errors
import json
import socket
import time
from libra.common.json_gearman import JSONGearmanWorker
from libra.mgm.controllers.root import PoolMgmController
def handler(worker, job):
logger = worker.logger
logger.debug("Received JSON message: {0}".format(json.dumps(job.data)))
controller = PoolMgmController(logger, worker.args, job.data)
response = controller.run()
logger.debug("Return JSON message: {0}".format(json.dumps(response)))
return response
def worker_thread(logger, args):
logger.info("Registering task libra_pool_mgm")
hostname = socket.gethostname()
if all([args.gearman_ssl_key, args.gearman_ssl_cert, args.gearman_ssl_ca]):
ssl_server_list = []
for host_port in args.server:
host, port = host_port.split(':')
ssl_server_list.append({'host': host,
'port': int(port),
'keyfile': args.gearman_ssl_key,
'certfile': args.gearman_ssl_cert,
'ca_certs': args.gearman_ssl_ca})
worker = JSONGearmanWorker(ssl_server_list)
else:
worker = JSONGearmanWorker(args.gearman)
worker.set_client_id(hostname)
worker.register_task('libra_pool_mgm', handler)
worker.logger = logger
worker.args = args
retry = True
while (retry):
try:
worker.work(args.gearman_poll)
except KeyboardInterrupt:
retry = False
except gearman.errors.ServerUnavailable:
logger.error("Job server(s) went away. Reconnecting.")
time.sleep(args.reconnect_sleep)
retry = True
except Exception:
logger.exception("Exception in worker")
retry = False
logger.debug("Pool manager process terminated.")

View File

@@ -12,85 +12,34 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import daemon
import daemon.pidfile
import daemon.runner
import grp
import pwd
import signal
import time
import sys
import os
import threading
from libra.mgm.schedulers import modules, known_modules
from libra.openstack.common import importutils
from libra.common.options import Options, setup_logging
from libra.mgm.drivers.base import known_drivers
from libra.mgm.node_list import NodeList, AccessDenied
from libra.mgm.gearman_worker import worker_thread
class Server(object):
def __init__(self, args):
self.args = args
self.ft = None
self.api = None
self.driver_class = None
self.schedulers = []
try:
self.node_list = NodeList(self.args.datadir)
except AccessDenied as exc:
print(str(exc))
self.shutdown(True)
self.logger = None
def main(self):
self.logger = setup_logging('libra_mgm', self.args)
self.logger.info(
'Libra Pool Manager started with a float of {nodes} nodes'
.format(nodes=self.args.nodes)
'Libra Pool Manager worker started'
)
signal.signal(signal.SIGINT, self.exit_handler)
signal.signal(signal.SIGTERM, self.exit_handler)
self.logger.info("Selected driver: {0}".format(self.args.driver))
self.driver_class = importutils.import_class(
known_drivers[self.args.driver]
)
# NOTE(LinuxJedi): Threading lock is due to needing more than one
# timer and we don't want them to execute their trigger
# at the same time.
self.rlock = threading.RLock()
# Load all the schedulers
for module in modules:
mod = importutils.import_class(known_modules[module])
instance = mod(
self.driver_class, self.rlock, self.logger, self.node_list,
self.args
)
self.schedulers.append(instance)
instance.run()
while True:
time.sleep(1)
def exit_handler(self, signum, frame):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
self.shutdown(False)
def shutdown(self, error):
for sched in self.schedulers:
sched.timer.cancel()
if not error:
self.logger.info('Safely shutting down')
sys.exit(0)
else:
self.logger.info('Shutting down due to error')
sys.exit(1)
thd = eventlet.spawn(worker_thread, self.logger, self.args)
thd.wait()
self.logger.info("Shutting down")
def main():
@@ -99,33 +48,11 @@ def main():
'--api_server', action='append', metavar='HOST:PORT', default=[],
help='a list of API servers to connect to (for HP REST API driver)'
)
options.parser.add_argument(
'--datadir', dest='datadir',
help='directory to store data files'
)
options.parser.add_argument(
'--az', type=int,
help='The az number the node will reside in (to be passed to the API'
' server)'
)
options.parser.add_argument(
'--nodes', type=int, default=1,
help='number of nodes'
)
options.parser.add_argument(
'--check_interval', type=int, default=5,
help='how often to check if new nodes are needed (in minutes)'
)
options.parser.add_argument(
'--submit_interval', type=int, default=15,
help='how often to test nodes for submission to the API'
' server (in minutes)'
)
options.parser.add_argument(
'--driver', dest='driver',
choices=known_drivers.keys(), default='hp_rest',
help='type of device to use'
)
options.parser.add_argument(
'--node_basename', dest='node_basename',
help='prepend the name of all nodes with this'
@@ -168,11 +95,32 @@ def main():
help='the image size ID (flavor ID) or name to use for new nodes spun'
' up in the Nova API'
)
options.parser.add_argument(
'--gearman', action='append', metavar='HOST:PORT', default=[],
help='Gearman job servers'
)
options.parser.add_argument(
'--gearman_ssl_ca', metavar='FILE',
help='Gearman SSL certificate authority'
)
options.parser.add_argument(
'--gearman_ssl_cert', metavar='FILE',
help='Gearman SSL certificate'
)
options.parser.add_argument(
'--gearman_ssl_key', metavar='FILE',
help='Gearman SSL key'
)
options.parser.add_argument(
'--gearman-poll',
dest='gearman_poll', type=int, metavar='TIME',
default=1, help='Gearman worker polling timeout'
)
args = options.run()
required_args = [
'datadir', 'az',
'az',
'nova_image', 'nova_image_size', 'nova_secgroup', 'nova_keyname',
'nova_tenant', 'nova_region', 'nova_user', 'nova_pass', 'nova_auth_url'
]
@@ -191,16 +139,16 @@ def main():
if missing_args:
return 2
if not args.api_server:
if not args.gearman:
# NOTE(shrews): Can't set a default in argparse method because the
# value is appended to the specified default.
args.api_server.append('localhost:8889')
elif not isinstance(args.api_server, list):
args.gearman.append('localhost:4730')
elif not isinstance(args.gearman, list):
# NOTE(shrews): The Options object cannot intelligently handle
# creating a list from an option that may have multiple values.
# We convert it to the expected type here.
svr_list = args.api_server.split()
args.api_server = svr_list
svr_list = args.gearman.split()
args.gearman = svr_list
server = Server(args)

View File

@@ -1,49 +0,0 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pickle
import os
class AccessDenied(Exception):
pass
class NodeList(object):
def __init__(self, path):
if not os.access(path, os.W_OK):
msg = 'Do not have permission to write to {0}'.format(path)
raise AccessDenied(msg)
self.file_name = '{0}/node_log.dat'.format(path)
def add(self, item):
data = self.get()
data.append(item)
self.put(data)
def delete(self, item):
data = self.get()
data.remove(item)
self.put(data)
def get(self):
# Attribute error is thrown if file is non-existent
try:
return pickle.load(open(self.file_name, "rb"))
except IOError:
return []
def put(self, data):
pickle.dump(data, open(self.file_name, "wb"))

View File

@@ -35,33 +35,32 @@ class BuildError(Exception):
class Node(object):
def __init__(self, username, password, tenant, auth_url, region, keyname,
secgroup, image, node_type, node_basename=None):
def __init__(self, args):
self.nova = client.HTTPClient(
username,
password,
tenant,
auth_url,
region_name=region,
args.nova_user,
args.nova_pass,
args.nova_tenant,
args.nova_auth_url,
region_name=args.nova_region,
no_cache=True,
service_type='compute'
)
self.keyname = keyname
self.secgroup = secgroup
self.node_basename = node_basename
self.keyname = args.nova_keyname
self.secgroup = args.nova_secgroup
self.node_basename = args.node_basename
# Replace '_' with '-' in basename
if self.node_basename:
self.node_basename = self.node_basename.replace('_', '-')
if image.isdigit():
self.image = image
if args.nova_image.isdigit():
self.image = args.nova_image
else:
self.image = self._get_image(image)
self.image = self._get_image(args.nova_image)
if node_type.isdigit():
self.node_type = node_type
if args.nova_image_size.isdigit():
self.node_type = args.nova_image_size
else:
self.node_type = self._get_flavor(node_type)
self.node_type = self._get_flavor(args.nova_image_size)
def build(self):
""" create a node, test it is running """
@@ -76,6 +75,43 @@ class Node(object):
return body['server']['id']
def vip_create(self):
""" create a virtual IP """
url = '/os-floating-ips'
body = {"pool": None}
resp, body = self.nova.post(url, body=body)
return body['floating_ip']
def vip_assign(self, node_id, vip):
""" assign a virtual IP to a Nova instance """
url = '/servers/{0}/action'.format(node_id)
body = {
"addFloatingIp": {
"address": vip
}
}
resp, body = self.nova.post(url, body=body)
if resp.status_code != 202:
raise Exception(
'Response code {0}, message {1} when assigning vip'
.format(resp.status_code, body)
)
def vip_remove(self, node_id, vip):
""" assign a virtual IP to a Nova instance """
url = '/servers/{0}/action'.format(node_id)
body = {
"removeFloatingIp": {
"address": vip
}
}
resp, body = self.nova.post(url, body=body)
if resp.status_code != 202:
raise Exception(
'Response code {0}, message {1} when assigning vip'
.format(resp.status_code, body)
)
def delete(self, node_id):
""" delete a node """
try:

View File

@@ -1,143 +0,0 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
from novaclient import exceptions
from libra.mgm.nova import Node, BuildError, NotFound
class BuildNodes(object):
def __init__(self, driver, lock, logger, node_list, args):
self.driver = driver
self.lock = lock
self.args = args
self.logger = logger
self.node_list = node_list
self.timer = None
def run(self):
""" check if known nodes are used """
with self.lock:
try:
self.logger.info('Checking if new nodes are needed')
api = self.driver(self.args.api_server, self.logger)
if api.is_online():
self.logger.info(
'Connected to {url}'.format(url=api.get_url())
)
free_count = api.get_free_count()
if free_count is None:
self.scheduler()
return
if free_count < self.args.nodes:
# we need to build new nodes
nodes_required = self.args.nodes - free_count
self.logger.info(
'{nodes} nodes required'
.format(nodes=nodes_required)
)
self.build_nodes(nodes_required, api)
else:
self.logger.info('No new nodes required')
else:
self.logger.error('No working API server found')
except Exception:
self.logger.exception('Uncaught exception during node check')
self.scheduler()
def build_nodes(self, count, api):
try:
nova = Node(
self.args.nova_user,
self.args.nova_pass,
self.args.nova_tenant,
self.args.nova_auth_url,
self.args.nova_region,
self.args.nova_keyname,
self.args.nova_secgroup,
self.args.nova_image,
self.args.nova_image_size,
node_basename=self.args.node_basename
)
except Exception as exc:
self.logger.error('Error initialising Nova connection {exc}'
.format(exc=exc)
)
return
# Remove number of nodes we are still waiting on build status from
build_count = len(self.node_list.get())
count = count - build_count
if count > 0:
self.logger.info(
'{0} nodes already building, attempting to build {1} more'
.format(build_count, count)
)
else:
self.logger.info(
'{0} nodes already building, no more needed'
.format(build_count)
)
while count > 0:
count = count - 1
try:
node_id = nova.build()
except BuildError as exc:
self.logger.exception('{0}, node {1}'
.format(exc.msg, exc.node_name)
)
self.logger.info(
'Node build did not return ID for {0}, trying to find'
.format(exc.node_name)
)
self.find_unknown(exc.node_name, nova)
continue
if node_id > 0:
self.logger.info(
'Storing node {0} to add later'.format(node_id)
)
self.node_list.add(node_id)
else:
self.logger.error(
'Node build did not return ID, cannot find it'
)
def find_unknown(self, name, nova):
"""
Nova can tell us a node failed to build when it didn't
This does a check and if it did start to build adds it to the
failed node list.
"""
try:
node_id = nova.get_node(name)
self.logger.info('Storing node {0} to add later'.format(node_id))
self.node_list.add(node_id)
except NotFound:
# Node really didn't build
self.logger.info(
'No node found for {0}, giving up on it'.format(name)
)
return
except exceptions.ClientException:
self.logger.exception(
'Error getting failed node info from Nova for {0}'.format(name)
)
def scheduler(self):
self.logger.info('Node check timer sleeping for {mins} minutes'
.format(mins=self.args.check_interval))
self.timer = threading.Timer(60 * int(self.args.check_interval),
self.run, ())
self.timer.start()

View File

@@ -1,148 +0,0 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import threading
from novaclient import exceptions
from libra.mgm.nova import Node, NotFound
class SubmitNodes(object):
def __init__(self, driver, lock, logger, node_list, args):
self.driver = driver
self.lock = lock
self.args = args
self.logger = logger
self.node_list = node_list
self.timer = None
def run(self):
""" check/submit list of nodes to be added """
with self.lock:
try:
self.logger.info('Checking log of nova builds')
nodes = self.node_list.get()
if len(nodes) == 0:
self.logger.info('Node log empty')
else:
api = self.driver(self.args.api_server, self.logger)
if api.is_online():
self.logger.info(
'Connected to {url}'.format(url=api.get_url())
)
for node in nodes:
self.test_node(node, api)
else:
self.logger.error('No working API server found')
except Exception:
self.logger.exception(
'Uncaught exception during node check'
)
self.scheduler()
def test_node(self, node_id, api):
try:
nova = Node(
self.args.nova_user,
self.args.nova_pass,
self.args.nova_tenant,
self.args.nova_auth_url,
self.args.nova_region,
self.args.nova_keyname,
self.args.nova_secgroup,
self.args.nova_image,
self.args.nova_image_size,
node_basename=self.args.node_basename
)
except Exception as exc:
self.logger.error(
'Error initialising Nova connection {exc}'
.format(exc=exc)
)
return
self.logger.info('Testing readiness node {0}'.format(node_id))
try:
resp, status = nova.status(node_id)
except NotFound:
self.logger.info(
'Node {0} no longer exists, removing from list'
.format(node_id)
)
self.node_list.delete(node_id)
return
except exceptions.ClientException as exc:
self.logger.error(
'Error getting status from Nova, exception {exc}'
.format(exc=sys.exc_info()[0])
)
return
if resp.status_code not in(200, 203):
self.logger.error(
'Error geting status from Nova, error {0}'
.format(resp.status_code)
)
return
status = status['server']
if status['status'] == 'ACTIVE':
name = status['name']
body = self.build_node_data(status)
status, response = api.add_node(body)
if not status:
self.logger.error(
'Could not upload node {name} to API server'
.format(name=name)
)
else:
self.node_list.delete(node_id)
self.logger.info('Node {0} added to API server'.format(name))
return
elif status['status'].startswith('BUILD'):
self.logger.info(
'Node {0} still building, ignoring'.format(node_id)
)
return
else:
self.logger.info(
'Node {0} is bad, deleting'.format(node_id)
)
status, msg = nova.delete(node_id)
if not status:
self.logger.error(msg)
else:
self.logger.info('Delete successful')
self.node_list.delete(node_id)
def build_node_data(self, data):
""" Build the API data from the node data """
body = {}
body['name'] = data['name']
addresses = data['addresses']['private']
for address in addresses:
if not address['addr'].startswith('10.'):
break
body['publicIpAddr'] = address['addr']
body['floatingIpAddr'] = address['addr']
body['az'] = self.args.az
body['type'] = "basename: {0}, image: {1}".format(
self.args.node_basename, self.args.nova_image
)
return body
def scheduler(self):
self.logger.info('Node submit timer sleeping for {mins} minutes'
.format(mins=self.args.submit_interval))
self.timer = threading.Timer(60 * int(self.args.submit_interval),
self.run, ())
self.timer.start()

View File

@@ -11,6 +11,19 @@ fake_body = open(
os.path.join(os.path.dirname(__file__), "fake_body.json"), 'r').read()
class Args(object):
nova_user = "password"
nova_pass = "auth_test"
nova_tenant = "tenant1"
nova_region = "region1"
nova_keyname = "default"
nova_secgroup = "default"
nova_image = '1234'
nova_image_size = '100'
nova_auth_url = ''
node_basename = ''
class TestResponse(requests.Response):
"""
Class used to wrap requests.Response and provide some
@@ -56,10 +69,8 @@ class TestLBaaSMgmTask(testtools.TestCase):
class TestLBaaSMgmNova(testtools.TestCase):
def setUp(self):
super(TestLBaaSMgmNova, self).setUp()
self.api = Node(
"username", "password", "auth_test", "tenant1", "region1",
"default", "default", '1234', '100'
)
args = Args()
self.api = Node(args)
self.api.nova.management_url = "http://example.com"
self.api.nova.auth_token = "token"