diff --git a/etc/sample_libra.cfg b/etc/sample_libra.cfg index 9b25fa46..00398c74 100644 --- a/etc/sample_libra.cfg +++ b/etc/sample_libra.cfg @@ -77,6 +77,7 @@ poll_timeout_retry = 30 db_sections=mysql1 ssl_certfile=certfile.crt ssl_keyfile=keyfile.key +expire_days=7 [api] host=0.0.0.0 @@ -88,7 +89,6 @@ swift_basepath=lbaaslogs swift_endpoint=https://host.com:443/v1/ ssl_certfile=certfile.crt ssl_keyfile=keyfile.key -expire_days=7 ip_filters=192.168.0.0/24 [mysql1] diff --git a/libra/admin_api/app.py b/libra/admin_api/app.py index 3faee4b0..b2313f86 100644 --- a/libra/admin_api/app.py +++ b/libra/admin_api/app.py @@ -20,8 +20,11 @@ import grp import pwd import pecan import sys +import signal import os from libra.admin_api.stats.scheduler import Stats +from libra.admin_api.device_pool.manage_pool import Pool +from libra.admin_api.expunge.expunge import ExpungeScheduler from libra.admin_api import config as api_config from libra.admin_api import model from libra.admin_api.stats.drivers.base import known_drivers @@ -72,6 +75,33 @@ def setup_app(pecan_config, args): return app +class MaintThreads(object): + def __init__(self, logger, args, drivers): + self.classes = [] + self.logger = logger + self.args = args + self.drivers = drivers + signal.signal(signal.SIGINT, self.exit_handler) + signal.signal(signal.SIGTERM, self.exit_handler) + self.run_threads() + + def run_threads(self): + stats = Stats(self.logger, self.args, self.drivers) + pool = Pool(self.logger, self.args) + expunge = ExpungeScheduler(self.logger, self.args) + self.classes.append(stats) + self.classes.append(pool) + self.classes.append(expunge) + + def exit_handler(self, signum, frame): + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + for function in self.classes: + function.shutdown() + self.logger.info("Safely shutting down") + sys.exit() + + class LogStdout(object): def __init__(self, logger): self.logger = logger.info @@ -160,6 +190,18 @@ def main(): '--datadog_env', default='unknown', help='Server enironment' ) + options.parser.add_argument( + '--node_pool_size', default=10, type=int, + help='Number of hot spare devices to keep in the pool' + ) + options.parser.add_argument( + '--vip_pool_size', default=10, type=int, + help='Number of hot spare vips to keep in the pool' + ) + options.parser.add_argument( + '--expire_days', default=0, + help='Number of days until deleted load balancers are expired' + ) args = options.run() @@ -218,7 +260,7 @@ def main(): drivers.append(importutils.import_class( known_drivers[driver] )) - Stats(logger, args, drivers) + MaintThreads(logger, args, drivers) sys.stderr = LogStdout(logger) # TODO: set ca_certs and cert_reqs=CERT_REQUIRED ssl_sock = eventlet.wrap_ssl( diff --git a/libra/admin_api/controllers/devices.py b/libra/admin_api/controllers/devices.py index c713461f..6be6bb9c 100644 --- a/libra/admin_api/controllers/devices.py +++ b/libra/admin_api/controllers/devices.py @@ -20,8 +20,8 @@ from pecan.rest import RestController import wsmeext.pecan as wsme_pecan from wsme.exc import ClientSideError from libra.admin_api.model.validators import DeviceResp, DevicePost, DevicePut -from libra.admin_api.model.lbaas import LoadBalancer, Device, db_session -from libra.admin_api.model.lbaas import loadbalancers_devices +from libra.common.api.lbaas import LoadBalancer, Device, db_session +from libra.common.api.lbaas import loadbalancers_devices class DevicesController(RestController): diff --git a/libra/mgm/drivers/hp_rest/__init__.py b/libra/admin_api/device_pool/__init__.py similarity index 90% rename from libra/mgm/drivers/hp_rest/__init__.py rename to libra/admin_api/device_pool/__init__.py index 582348cb..92bd912f 100644 --- a/libra/mgm/drivers/hp_rest/__init__.py +++ b/libra/admin_api/device_pool/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. +# Copyright 2013 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain diff --git a/libra/admin_api/device_pool/manage_pool.py b/libra/admin_api/device_pool/manage_pool.py new file mode 100644 index 00000000..3a9569d5 --- /dev/null +++ b/libra/admin_api/device_pool/manage_pool.py @@ -0,0 +1,350 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import ipaddress +from datetime import datetime +from libra.common.json_gearman import JSONGearmanClient +from gearman.constants import JOB_UNKNOWN +from sqlalchemy import func +from libra.common.api.lbaas import Device, PoolBuilding, Vip, db_session + +#TODO: Lots of duplication of code here, need to cleanup + + +class Pool(object): + + DELETE_SECONDS = 05 + PROBE_SECONDS = 30 + VIPS_SECONDS = 50 + + def __init__(self, logger, args): + self.logger = logger + self.args = args + self.probe_timer = None + self.delete_timer = None + self.vips_time = None + + self.start_delete_sched() + self.start_probe_sched() + self.start_vips_sched() + + def shutdown(self): + if self.probe_timer: + self.probe_timer.cancel() + if self.delete_timer: + self.delete_timer.cancel() + if self.vips_timer: + self.vips_timer.cancel() + + def delete_devices(self): + """ Searches for all devices in the DELETED state and removes them """ + minute = datetime.now().minute + if self.args.server_id != minute % self.args.number_of_servers: + self.logger.info('Not our turn to run delete check, sleeping') + self.start_delete_sched() + return + self.logger.info('Running device delete check') + try: + message = [] + with db_session() as session: + devices = session.query(Device).\ + filter(Device.status == 'DELETED').all() + + for device in devices: + job_data = { + 'action': 'DELETE_DEVICE', + 'name': device.name + } + message.append(dict(task='libra_pool_mgm', data=job_data)) + session.commit() + if not message: + self.logger.info("No devices to delete") + else: + gear = GearmanWork(self.args, self.logger) + gear.send_delete_message(message) + except: + self.logger.exception("Exception when deleting devices") + + self.start_delete_sched() + + def probe_vips(self): + minute = datetime.now().minute + if self.args.server_id != minute % self.args.number_of_servers: + self.logging.info('Not our turn to run vips check, sleeping') + self.start_vips_sched() + return + self.logger.info('Running vips count probe check') + try: + with db_session() as session: + NULL = None # For pep8 + vip_count = session.query(Vip).\ + filter(Vip.device == NULL).count() + if vip_count >= self.args.vip_pool_size: + self.logger.info("Enough vips exist, no work to do") + session.commit() + self.start_vips_sched() + return + + build_count = self.args.vip_pool_size - vip_count + self._build_vips(build_count) + except: + self.logger.exception( + "Uncaught exception during vip pool expansion" + ) + self.start_vips_sched() + + def probe_devices(self): + minute = datetime.now().minute + if self.args.server_id != minute % self.args.number_of_servers: + self.logger.info('Not our turn to run probe check, sleeping') + self.start_probe_sched() + return + self.logger.info('Running device count probe check') + try: + with db_session() as session: + # Double check we have no outstanding builds assigned to us + session.query(PoolBuilding).\ + filter(PoolBuilding.server_id == self.args.server_id).\ + delete() + session.flush() + dev_count = session.query(Device).\ + filter(Device.status == 'OFFLINE').count() + if dev_count >= self.args.node_pool_size: + self.logger.info("Enough devices exist, no work to do") + session.commit() + self.start_probe_sched() + return + + build_count = self.args.node_pool_size - dev_count + built = session.query(func.sum(PoolBuilding.qty)).first() + if not built[0]: + built = 0 + else: + built = built[0] + if build_count - built <= 0: + self.logger.info( + "Other servers are building enough nodes" + ) + session.commit() + self.start_probe_sched() + return + build_count -= built + building = PoolBuilding() + building.server_id = self.args.server_id + building.qty = build_count + session.add(building) + session.commit() + + # Closed the DB session because we don't want it hanging around + # for a long time locking tables + self._build_nodes(build_count) + with db_session() as session: + session.query(PoolBuilding).\ + filter(PoolBuilding.server_id == self.args.server_id).\ + delete() + session.commit() + except: + self.logger.exception("Uncaught exception during pool expansion") + self.start_probe_sched() + + def _build_nodes(self, count): + message = [] + it = 0 + job_data = {'action': 'BUILD_DEVICE'} + while it < count: + message.append(dict(task='libra_pool_mgm', data=job_data)) + it += 1 + gear = GearmanWork(self.args, self.logger) + gear.send_create_message(message) + + def _build_vips(self, count): + message = [] + it = 0 + job_data = {'action': 'BUILD_IP'} + while it < count: + message.append(dict(task='libra_pool_mgm', data=job_data)) + it += 1 + gear = GearmanWork(self.args, self.logger) + gear.send_vips_message(message) + + def start_probe_sched(self): + seconds = datetime.now().second + if seconds < self.PROBE_SECONDS: + sleeptime = self.PROBE_SECONDS - seconds + else: + sleeptime = 60 - (seconds - self.PROBE_SECONDS) + + self.logger.info('Pool probe check timer sleeping for {secs} seconds' + .format(secs=sleeptime)) + self.probe_timer = threading.Timer(sleeptime, self.probe_devices, ()) + self.probe_timer.start() + + def start_vips_sched(self): + seconds = datetime.now().second + if seconds < self.VIPS_SECONDS: + sleeptime = self.VIPS_SECONDS - seconds + else: + sleeptime = 60 - (seconds - self.VIPS_SECONDS) + + self.logger.info('Pool vips check timer sleeping for {secs} seconds' + .format(secs=sleeptime)) + self.vips_timer = threading.Timer(sleeptime, self.probe_vips, ()) + self.vips_timer.start() + + def start_delete_sched(self): + seconds = datetime.now().second + if seconds < self.DELETE_SECONDS: + sleeptime = self.DELETE_SECONDS - seconds + else: + sleeptime = 60 - (seconds - self.DELETE_SECONDS) + + self.logger.info('Pool delete check timer sleeping for {secs} seconds' + .format(secs=sleeptime)) + self.delete_timer = threading.Timer(sleeptime, self.delete_devices, ()) + self.delete_timer.start() + + +class GearmanWork(object): + + def __init__(self, args, logger): + self.logger = logger + if all([args.gearman_ssl_key, args.gearman_ssl_cert, + args.gearman_ssl_ca]): + # Use SSL connections to each Gearman job server. + ssl_server_list = [] + for server in args.gearman: + ghost, gport = server.split(':') + ssl_server_list.append({'host': ghost, + 'port': int(gport), + 'keyfile': args.gearman_ssl_key, + 'certfile': args.gearman_ssl_cert, + 'ca_certs': args.gearman_ssl_ca}) + self.gearman_client = JSONGearmanClient(ssl_server_list) + else: + self.gearman_client = JSONGearmanClient(args.gearman) + + def send_delete_message(self, message): + self.logger.info("Sending {0} gearman messages".format(len(message))) + job_status = self.gearman_client.submit_multiple_jobs( + message, background=False, wait_until_complete=True, + max_retries=10, poll_timeout=30.0 + ) + delete_count = 0 + for status in job_status: + if status.state == JOB_UNKNOWN: + self.logger.error('Gearman Job server fail') + continue + if status.timed_out: + self.logger.error('Gearman timeout whilst deleting device') + continue + if status.result['response'] == 'FAIL': + self.logger.error( + 'Pool manager failed to delete a device, removing from DB' + ) + + delete_count += 1 + with db_session() as session: + session.query(Device).\ + filter(Device.name == status.result['name']).delete() + session.commit() + + self.logger.info( + '{nodes} freed devices delete from pool'.format(nodes=delete_count) + ) + + def send_vips_message(self, message): + # TODO: make this gearman part more async, not wait for all builds + self.logger.info("Sending {0} gearman messages".format(len(message))) + job_status = self.gearman_client.submit_multiple_jobs( + message, background=False, wait_until_complete=True, + max_retries=10, poll_timeout=3600.0 + ) + built_count = 0 + for status in job_status: + if status.state == JOB_UNKNOWN: + self.logger.error('Gearman Job server fail') + continue + if status.timed_out: + self.logger.error('Gearman timeout whilst building vip') + continue + if status.result['response'] == 'FAIL': + self.logger.error('Pool manager failed to build a vip') + continue + + built_count += 1 + try: + self._add_vip(status.result) + except: + self.logger.exception( + 'Could not add vip to DB, node data: {0}' + .format(status.result) + ) + self.logger.info( + '{vips} vips built and added to pool'.format(vips=built_count) + ) + + def send_create_message(self, message): + # TODO: make this gearman part more async, not wait for all builds + self.logger.info("Sending {0} gearman messages".format(len(message))) + job_status = self.gearman_client.submit_multiple_jobs( + message, background=False, wait_until_complete=True, + max_retries=10, poll_timeout=3600.0 + ) + built_count = 0 + for status in job_status: + if status.state == JOB_UNKNOWN: + self.logger.error('Gearman Job server fail') + continue + if status.timed_out: + self.logger.error('Gearman timeout whilst building device') + continue + if status.result['response'] == 'FAIL': + self.logger.error('Pool manager failed to build a device') + continue + + built_count += 1 + try: + self._add_node(status.result) + except: + self.logger.exception( + 'Could not add node to DB, node data: {0}' + .format(status.result) + ) + self.logger.info( + '{nodes} devices built and added to pool'.format(nodes=built_count) + ) + + def _add_vip(self, data): + self.logger.info('Adding vip {0} to DB'.format(data['ip'])) + vip = Vip() + vip.ip = int(ipaddress.IPv4Address(unicode(data['ip']))) + with db_session() as session: + session.add(vip) + session.commit() + + def _add_node(self, data): + self.logger.info('Adding device {0} to DB'.format(data['name'])) + device = Device() + device.name = data['name'] + device.publicIpAddr = data['addr'] + # TODO: kill this field, make things use publicIpAddr instead + device.floatingIpAddr = data['addr'] + device.az = data['az'] + device.type = data['type'] + device.status = 'OFFLINE' + device.created = None + with db_session() as session: + session.add(device) + session.commit() diff --git a/libra/mgm/schedulers/__init__.py b/libra/admin_api/expunge/__init__.py similarity index 71% rename from libra/mgm/schedulers/__init__.py rename to libra/admin_api/expunge/__init__.py index cd95310b..92bd912f 100644 --- a/libra/mgm/schedulers/__init__.py +++ b/libra/admin_api/expunge/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. +# Copyright 2013 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -11,9 +11,3 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -known_modules = { - 'build': 'libra.mgm.schedulers.build.BuildNodes', - 'submit': 'libra.mgm.schedulers.submit.SubmitNodes' -} -modules = ['build', 'submit'] diff --git a/libra/api/library/expunge.py b/libra/admin_api/expunge/expunge.py similarity index 77% rename from libra/api/library/expunge.py rename to libra/admin_api/expunge/expunge.py index a1ff8f4a..1c38275d 100644 --- a/libra/api/library/expunge.py +++ b/libra/admin_api/expunge/expunge.py @@ -13,37 +13,35 @@ # under the License. import threading -import signal -import sys from datetime import datetime, timedelta -from pecan import conf -from libra.api.model.lbaas import LoadBalancer, db_session +from libra.common.api.lbaas import LoadBalancer, db_session class ExpungeScheduler(object): - def __init__(self, logger): - if not conf.expire_days: + def __init__(self, logger, args): + self.expunge_timer = None + if not args.expire_days: logger.info('Expunge not configured, disabled') return self.logger = logger - self.expunge_timer = None - signal.signal(signal.SIGINT, self.exit_handler) - signal.signal(signal.SIGTERM, self.exit_handler) + self.args = args self.run_expunge() - def exit_handler(self, signum, frame): - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_IGN) + def shutdown(self): if self.expunge_timer: self.expunge_timer.cancel() - self.logger.info('Safely shutting down') - sys.exit(0) def run_expunge(self): + day = datetime.now().day + if self.args.server_id != day % self.args.number_of_servers: + self.logger.info('Not our turn to run expunge check, sleeping') + self.expunge_timer = threading.Timer( + 24*60*60, self.run_expunge, () + ) with db_session() as session: try: exp = datetime.now() - timedelta( - days=int(conf.expire_days) + days=int(self.args.expire_days) ) exp_time = exp.strftime('%Y-%m-%d %H:%M:%S') self.logger.info( diff --git a/libra/admin_api/model/lbaas.py b/libra/admin_api/model/lbaas.py deleted file mode 100644 index fc4fbfd1..00000000 --- a/libra/admin_api/model/lbaas.py +++ /dev/null @@ -1,191 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the 'License'); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from sqlalchemy import Table, Column, Integer, ForeignKey, create_engine -from sqlalchemy import INTEGER, VARCHAR, BIGINT -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import relationship, backref, sessionmaker, Session -import sqlalchemy.types as types -import random -import time -import ConfigParser -from pecan import conf -import logging - - -DeclarativeBase = declarative_base() -metadata = DeclarativeBase.metadata - -loadbalancers_devices = Table( - 'loadbalancers_devices', - metadata, - Column('loadbalancer', Integer, ForeignKey('loadbalancers.id')), - Column('device', Integer, ForeignKey('devices.id')) -) - - -class FormatedDateTime(types.TypeDecorator): - '''formats date to match iso 8601 standards - ''' - - impl = types.DateTime - - def process_result_value(self, value, dialect): - return value.strftime('%Y-%m-%dT%H:%M:%S') - - -class Limits(DeclarativeBase): - __tablename__ = 'global_limits' - id = Column(u'id', Integer, primary_key=True, nullable=False) - name = Column(u'name', VARCHAR(length=128), nullable=False) - value = Column(u'value', BIGINT(), nullable=False) - - -class Device(DeclarativeBase): - """device model""" - __tablename__ = 'devices' - #column definitions - az = Column(u'az', INTEGER(), nullable=False) - created = Column(u'created', FormatedDateTime(), nullable=False) - floatingIpAddr = Column( - u'floatingIpAddr', VARCHAR(length=128), nullable=False - ) - id = Column(u'id', BIGINT(), primary_key=True, nullable=False) - name = Column(u'name', VARCHAR(length=128), nullable=False) - publicIpAddr = Column(u'publicIpAddr', VARCHAR(length=128), nullable=False) - status = Column(u'status', VARCHAR(length=128), nullable=False) - type = Column(u'type', VARCHAR(length=128), nullable=False) - updated = Column(u'updated', FormatedDateTime(), nullable=False) - - -class LoadBalancer(DeclarativeBase): - """load balancer model""" - __tablename__ = 'loadbalancers' - #column definitions - algorithm = Column(u'algorithm', VARCHAR(length=80), nullable=False) - errmsg = Column(u'errmsg', VARCHAR(length=128)) - id = Column(u'id', BIGINT(), primary_key=True, nullable=False) - name = Column(u'name', VARCHAR(length=128), nullable=False) - port = Column(u'port', INTEGER(), nullable=False) - protocol = Column(u'protocol', VARCHAR(length=128), nullable=False) - status = Column(u'status', VARCHAR(length=50), nullable=False) - tenantid = Column(u'tenantid', VARCHAR(length=128), nullable=False) - updated = Column(u'updated', FormatedDateTime(), nullable=False) - created = Column(u'created', FormatedDateTime(), nullable=False) - - nodes = relationship( - 'Node', backref=backref('loadbalancers', order_by='Node.id') - ) - devices = relationship( - 'Device', secondary=loadbalancers_devices, backref='loadbalancers', - lazy='joined' - ) - - -class Node(DeclarativeBase): - """node model""" - __tablename__ = 'nodes' - #column definitions - address = Column(u'address', VARCHAR(length=128), nullable=False) - enabled = Column(u'enabled', Integer(), nullable=False) - id = Column(u'id', BIGINT(), primary_key=True, nullable=False) - lbid = Column( - u'lbid', BIGINT(), ForeignKey('loadbalancers.id'), nullable=False - ) - port = Column(u'port', INTEGER(), nullable=False) - status = Column(u'status', VARCHAR(length=128), nullable=False) - weight = Column(u'weight', INTEGER(), nullable=False) - - -class RoutingSession(Session): - """ If an engine is already in use, re-use it. Otherwise we can end up - with deadlocks in Galera, see http://tinyurl.com/9h6qlly - switch engines every 60 seconds of idle time """ - - engines = [] - last_engine = None - last_engine_time = 0 - - def get_bind(self, mapper=None, clause=None): - if not RoutingSession.engines: - self._build_engines() - - if ( - RoutingSession.last_engine - and time.time() < RoutingSession.last_engine_time + 60 - ): - RoutingSession.last_engine_time = time.time() - return RoutingSession.last_engine - engine = random.choice(RoutingSession.engines) - RoutingSession.last_engine = engine - RoutingSession.last_engine_time = time.time() - return engine - - def _build_engines(self): - config = ConfigParser.SafeConfigParser() - config.read([conf.conffile]) - for section in conf.database: - db_conf = config._sections[section] - - conn_string = '''mysql://%s:%s@%s:%d/%s''' % ( - db_conf['username'], - db_conf['password'], - db_conf['host'], - db_conf.get('port', 3306), - db_conf['schema'] - ) - - if 'ssl_key' in db_conf: - ssl_args = {'ssl': { - 'cert': db_conf['ssl_cert'], - 'key': db_conf['ssl_key'], - 'ca': db_conf['ssl_ca'] - }} - - engine = create_engine( - conn_string, isolation_level="READ COMMITTED", - pool_size=20, connect_args=ssl_args, pool_recycle=3600 - ) - else: - engine = create_engine( - conn_string, isolation_level="READ COMMITTED", - pool_size=20, pool_recycle=3600 - ) - RoutingSession.engines.append(engine) - - -class db_session(object): - def __init__(self): - self.session = None - self.logger = logging.getLogger(__name__) - - def __enter__(self): - for x in xrange(10): - try: - self.session = sessionmaker(class_=RoutingSession)() - self.session.execute("SELECT 1") - return self.session - except: - self.logger.error( - 'Could not connect to DB server: {0}'.format( - RoutingSession.last_engine.url - ) - ) - RoutingSession.last_engine = None - self.logger.error('Could not connect to any DB server') - return None - - def __exit__(self, type, value, traceback): - self.session.close() - return False diff --git a/libra/admin_api/stats/drivers/database/driver.py b/libra/admin_api/stats/drivers/database/driver.py index d1c0d550..5336abc5 100644 --- a/libra/admin_api/stats/drivers/database/driver.py +++ b/libra/admin_api/stats/drivers/database/driver.py @@ -11,8 +11,11 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations -from libra.admin_api.model.lbaas import Device, LoadBalancer, db_session -from libra.admin_api.model.lbaas import loadbalancers_devices +import logging +import ipaddress +from libra.common.api.lbaas import Device, LoadBalancer, db_session +from libra.common.api.lbaas import loadbalancers_devices, Vip +from libra.common.api.gearman_client import submit_job, submit_vip_job from libra.admin_api.stats.drivers.base import AlertDriver @@ -34,7 +37,7 @@ class DbDriver(AlertDriver): errmsg = "Load Balancer has recovered" lb_status = 'ACTIVE' elif status == 'ERROR': - errmsg = "Load Balancer has failed" + errmsg = "Load Balancer has failed, attempting rebuild" lb_status = status else: # This shouldnt happen @@ -55,6 +58,7 @@ class DbDriver(AlertDriver): session.flush() session.commit() + self._rebuild_device(device_id) def send_node_change(self, message, lbid, degraded): @@ -78,3 +82,61 @@ class DbDriver(AlertDriver): synchronize_session='fetch') session.commit() + + def _rebuild_device(self, device_id): + logger = logging.getLogger(__name__) + new_device_id = None + new_device_name = None + with db_session() as session: + new_device = session.query(Device).\ + filter(~Device.id.in_( + session.query(loadbalancers_devices.c.device) + )).\ + filter(Device.status == "OFFLINE").\ + with_lockmode('update').\ + first() + if new_device is None: + session.rollback() + logger.error( + 'No spare devices when trying to rebuild device {0}' + .format(device_id) + ) + return + new_device_id = new_device.id + new_device_name = new_device.name + logger.info( + "Moving device {0} to device {1}" + .format(device_id, new_device_id) + ) + lbs = session.query(LoadBalancer).\ + join(LoadBalancer.devices).\ + filter(Device.id == device_id).all() + for lb in lbs: + lb.devices = [new_device] + lb.status = "ERROR(REBUILDING)" + submit_job( + 'UPDATE', new_device.name, new_device.id, lbs[0].id + ) + new_device.status = 'ONLINE' + session.commit() + with db_session() as session: + vip = session.query(Vip).filter(Vip.device == device_id).first() + vip.device = new_device_id + device = session.query(Device).\ + filter(Device.id == device_id).first() + device.status = 'DELETED' + lbs = session.query(LoadBalancer).\ + join(LoadBalancer.devices).\ + filter(Device.id == device_id).all() + for lb in lbs: + lb.devices = [new_device] + lb.status = "ACTIVE" + lb.errmsg = "Load Balancer rebuild on new device" + logger.info( + "Moving IP {0} and marking device {1} for deletion" + .format(str(ipaddress.IPv4Address(vip.ip)), device_id) + ) + submit_vip_job( + 'ASSIGN', new_device_name, str(ipaddress.IPv4Address(vip.ip)) + ) + session.commit() diff --git a/libra/admin_api/stats/scheduler.py b/libra/admin_api/stats/scheduler.py index d6736c49..b254177e 100644 --- a/libra/admin_api/stats/scheduler.py +++ b/libra/admin_api/stats/scheduler.py @@ -13,10 +13,8 @@ # under the License. import threading -import signal -import sys from datetime import datetime -from libra.admin_api.model.lbaas import LoadBalancer, Device, Node, db_session +from libra.common.api.lbaas import LoadBalancer, Device, Node, db_session from libra.admin_api.stats.stats_gearman import GearJobs @@ -36,31 +34,18 @@ class Stats(object): self.ping_timer = None self.repair_timer = None - signal.signal(signal.SIGINT, self.exit_handler) - signal.signal(signal.SIGTERM, self.exit_handler) logger.info("Selected stats drivers: {0}".format(args.stats_driver)) self.start_ping_sched() - self.start_repair_sched() + # TODO: completely remove repaid sched, rebuild instead + #self.start_repair_sched() - def exit_handler(self, signum, frame): - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_IGN) - self.shutdown(False) - - def shutdown(self, error): + def shutdown(self): if self.ping_timer: self.ping_timer.cancel() if self.repair_timer: self.repair_timer.cancel() - if not error: - self.logger.info('Safely shutting down') - sys.exit(0) - else: - self.logger.info('Shutting down due to error') - sys.exit(1) - def repair_lbs(self): # Work out if it is our turn to run minute = datetime.now().minute @@ -115,6 +100,8 @@ class Stats(object): gearman = GearJobs(self.logger, self.args) failed_lbs, node_status = gearman.send_pings(node_list) failed = len(failed_lbs) + # TODO: if failed over a threshold (5?) error instead of rebuild, + # something bad probably happened if failed > 0: self._send_fails(failed_lbs, session) session.commit() @@ -130,9 +117,11 @@ class Stats(object): node_list = [] self.logger.info('Running repair check') with db_session() as session: + # Join to ensure device is in-use devices = session.query( Device.id, Device.name - ).filter(Device.status == 'ERROR').all() + ).join(LoadBalancer.devices).\ + filter(Device.status == 'ERROR').all() tested = len(devices) if tested == 0: diff --git a/libra/api/app.py b/libra/api/app.py index 2a3ede51..10a055b8 100644 --- a/libra/api/app.py +++ b/libra/api/app.py @@ -22,7 +22,6 @@ import pecan import sys import os import wsme_overrides -from libra.api.library.expunge import ExpungeScheduler from libra.api import config as api_config from libra.api import model from libra.api import acl @@ -59,7 +58,6 @@ def setup_app(pecan_config, args): 'ssl_cert': args.gearman_ssl_cert, 'ssl_ca': args.gearman_ssl_ca } - config['expire_days'] = args.expire_days config['ip_filters'] = args.ip_filters if args.debug: config['wsme'] = {'debug': True} @@ -150,10 +148,6 @@ def main(): '--ssl_keyfile', help='Path to an SSL key file' ) - options.parser.add_argument( - '--expire_days', default=0, - help='Number of days until deleted load balancers are expired' - ) options.parser.add_argument( '--ip_filters', action='append', default=[], help='IP filters for backend nodes in the form xxx.xxx.xxx.xxx/yy' @@ -215,7 +209,6 @@ def main(): logger = setup_logging('', args) logger.info('Starting on {0}:{1}'.format(args.host, args.port)) api = setup_app(pc, args) - ExpungeScheduler(logger) sys.stderr = LogStdout(logger) ssl_sock = eventlet.wrap_ssl( eventlet.listen((args.host, args.port)), diff --git a/libra/api/controllers/health_monitor.py b/libra/api/controllers/health_monitor.py index 3f83aad5..c66a93ae 100644 --- a/libra/api/controllers/health_monitor.py +++ b/libra/api/controllers/health_monitor.py @@ -18,11 +18,11 @@ from pecan.rest import RestController import wsmeext.pecan as wsme_pecan from wsme.exc import ClientSideError from wsme import Unset -from libra.api.model.lbaas import LoadBalancer, db_session -from libra.api.model.lbaas import Device, HealthMonitor +from libra.common.api.lbaas import LoadBalancer, db_session +from libra.common.api.lbaas import Device, HealthMonitor from libra.api.acl import get_limited_to_project from libra.api.model.validators import LBMonitorPut, LBMonitorResp -from libra.api.library.gearman_client import submit_job +from libra.common.api.gearman_client import submit_job from libra.api.library.exp import NotFound @@ -220,7 +220,7 @@ class HealthMonitorController(RestController): with db_session() as session: query = session.query( LoadBalancer, HealthMonitor - ).outerjoin(LoadBalancer.monitors).\ + ).outerjoin(LoadBalancer.monitors).\ filter(LoadBalancer.tenantid == tenant_id).\ filter(LoadBalancer.id == self.lbid).\ filter(LoadBalancer.status != 'DELETED').\ diff --git a/libra/api/controllers/limits.py b/libra/api/controllers/limits.py index 677e1926..84ff6fca 100644 --- a/libra/api/controllers/limits.py +++ b/libra/api/controllers/limits.py @@ -15,7 +15,7 @@ from pecan import expose from pecan.rest import RestController -from libra.api.model.lbaas import Limits, db_session +from libra.common.api.lbaas import Limits, db_session class LimitsController(RestController): diff --git a/libra/api/controllers/load_balancers.py b/libra/api/controllers/load_balancers.py index 10097961..e82a4c57 100644 --- a/libra/api/controllers/load_balancers.py +++ b/libra/api/controllers/load_balancers.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import ipaddress # pecan imports from pecan import expose, abort, response, request from pecan.rest import RestController @@ -25,11 +26,11 @@ from health_monitor import HealthMonitorController from logs import LogsController # models -from libra.api.model.lbaas import LoadBalancer, Device, Node, db_session -from libra.api.model.lbaas import loadbalancers_devices, Limits +from libra.common.api.lbaas import LoadBalancer, Device, Node, db_session +from libra.common.api.lbaas import loadbalancers_devices, Limits, Vip from libra.api.model.validators import LBPut, LBPost, LBResp, LBVipResp from libra.api.model.validators import LBRespNode -from libra.api.library.gearman_client import submit_job +from libra.common.api.gearman_client import submit_job, submit_vip_job from libra.api.acl import get_limited_to_project from libra.api.library.exp import OverLimit, IPOutOfRange, NotFound from libra.api.library.ip_filter import ipfilter @@ -96,8 +97,10 @@ class LoadBalancersController(RestController): LoadBalancer.name, LoadBalancer.id, LoadBalancer.protocol, LoadBalancer.port, LoadBalancer.algorithm, LoadBalancer.status, LoadBalancer.created, - LoadBalancer.updated, LoadBalancer.statusDescription + LoadBalancer.updated, LoadBalancer.statusDescription, + Vip.id.label('vipid'), Vip.ip ).join(LoadBalancer.devices).\ + join(Device.vip).\ filter(LoadBalancer.tenantid == tenant_id).\ filter(LoadBalancer.id == self.lbid).\ first() @@ -109,21 +112,17 @@ class LoadBalancersController(RestController): load_balancers = load_balancers._asdict() load_balancers['nodeCount'] = session.query(Node).\ filter(Node.lbid == load_balancers['id']).count() - virtualIps = session.query( - Device.id, Device.floatingIpAddr - ).join(LoadBalancer.devices).\ - filter(LoadBalancer.tenantid == tenant_id).\ - filter(LoadBalancer.id == self.lbid).\ - all() - load_balancers['virtualIps'] = [] - for item in virtualIps: - vip = item._asdict() - vip['type'] = 'PUBLIC' - vip['ipVersion'] = 'IPV4' - vip['address'] = vip['floatingIpAddr'] - del(vip['floatingIpAddr']) - load_balancers['virtualIps'].append(vip) + load_balancers['virtualIps'] = [{ + "id": load_balancers['vipid'], + "type": "PUBLIC", + "ipVersion": "IPV4", + "address": str(ipaddress.IPv4Address( + load_balancers['ip'] + )), + }] + del(load_balancers['ip']) + del(load_balancers['vipid']) nodes = session.query( Node.id, Node.address, Node.port, Node.status, Node.enabled @@ -259,18 +258,39 @@ class LoadBalancersController(RestController): filter(Device.status == "OFFLINE").\ with_lockmode('update').\ first() + NULL = None # For pep8 + vip = session.query(Vip).\ + filter(Vip.device == NULL).\ + with_lockmode('update').\ + first() + vip.device = device.id + if device is None: + session.rollback() + raise RuntimeError('No devices available') + if vip is None: + session.rollback() + raise RuntimeError('No virtual IPs available') + vip.device = device.id + submit_vip_job( + 'ASSIGN', device.name, str(ipaddress.IPv4Address(vip.ip)) + ) else: virtual_id = body.virtualIps[0].id # This is an additional load balancer device = session.query( Device - ).filter(Device.id == virtual_id).\ + ).join(Device.vip).\ + filter(Vip.id == virtual_id).\ first() old_lb = session.query( LoadBalancer ).join(LoadBalancer.devices).\ + join(Device.vip).\ filter(LoadBalancer.tenantid == tenant_id).\ - filter(Device.id == virtual_id).\ + filter(Vip.id == virtual_id).\ + first() + vip = session.query(Vip).\ + filter(Vip.device == device.id).\ first() if old_lb is None: session.rollback() @@ -290,10 +310,6 @@ class LoadBalancersController(RestController): 'Only one load balancer per port allowed per device' ) - if device is None: - session.rollback() - raise RuntimeError('No devices available') - if body.algorithm: lb.algorithm = body.algorithm.upper() else: @@ -333,8 +349,8 @@ class LoadBalancersController(RestController): return_data.created = lb.created return_data.updated = lb.updated vip_resp = LBVipResp( - address=device.floatingIpAddr, id=str(device.id), - type='PUBLIC', ipVersion='IPV4' + address=str(ipaddress.IPv4Address(vip.ip)), + id=str(vip.id), type='PUBLIC', ipVersion='IPV4' ) return_data.virtualIps = [vip_resp] return_data.nodes = [] @@ -346,12 +362,9 @@ class LoadBalancersController(RestController): return_data.nodes.append(out_node) session.commit() # trigger gearman client to create new lb - result = submit_job( + submit_job( 'UPDATE', device.name, device.id, lb.id ) - # do something with result - if result: - pass return return_data @wsme_pecan.wsexpose(None, body=LBPut, status_code=202) @@ -429,11 +442,10 @@ class LoadBalancersController(RestController): ).join(LoadBalancer.devices).\ filter(LoadBalancer.id == load_balancer_id).\ first() - session.flush() - session.commit() submit_job( 'DELETE', device.name, device.id, lb.id ) + session.commit() return None def usage(self, load_balancer_id): diff --git a/libra/api/controllers/logs.py b/libra/api/controllers/logs.py index 5e46bf59..d16a54a8 100644 --- a/libra/api/controllers/logs.py +++ b/libra/api/controllers/logs.py @@ -19,10 +19,10 @@ from pecan.rest import RestController import wsmeext.pecan as wsme_pecan from wsme.exc import ClientSideError from wsme import Unset -from libra.api.model.lbaas import LoadBalancer, Device, db_session +from libra.common.api.lbaas import LoadBalancer, Device, db_session from libra.api.acl import get_limited_to_project from libra.api.model.validators import LBLogsPost -from libra.api.library.gearman_client import submit_job +from libra.common.api.gearman_client import submit_job from libra.api.library.exp import NotFound diff --git a/libra/api/controllers/nodes.py b/libra/api/controllers/nodes.py index 7ef25a1f..e6af915e 100644 --- a/libra/api/controllers/nodes.py +++ b/libra/api/controllers/nodes.py @@ -19,12 +19,12 @@ import wsmeext.pecan as wsme_pecan from wsme.exc import ClientSideError from wsme import Unset #default response objects -from libra.api.model.lbaas import LoadBalancer, Node, db_session, Limits -from libra.api.model.lbaas import Device +from libra.common.api.lbaas import LoadBalancer, Node, db_session, Limits +from libra.common.api.lbaas import Device from libra.api.acl import get_limited_to_project from libra.api.model.validators import LBNodeResp, LBNodePost, NodeResp from libra.api.model.validators import LBNodePut -from libra.api.library.gearman_client import submit_job +from libra.common.api.gearman_client import submit_job from libra.api.library.exp import OverLimit, IPOutOfRange, NotFound from libra.api.library.ip_filter import ipfilter from pecan import conf diff --git a/libra/api/controllers/virtualips.py b/libra/api/controllers/virtualips.py index 20250550..0d2af7a4 100644 --- a/libra/api/controllers/virtualips.py +++ b/libra/api/controllers/virtualips.py @@ -13,9 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +import ipaddress from pecan import response, expose, request from pecan.rest import RestController -from libra.api.model.lbaas import LoadBalancer, Device, db_session +from libra.common.api.lbaas import LoadBalancer, Vip, Device, db_session from libra.api.acl import get_limited_to_project @@ -42,13 +43,14 @@ class VipsController(RestController): details="Load Balancer ID not provided" ) with db_session() as session: - device = session.query( - Device.id, Device.floatingIpAddr + vip = session.query( + Vip.id, Vip.ip ).join(LoadBalancer.devices).\ + join(Device.vip).\ filter(LoadBalancer.id == self.lbid).\ filter(LoadBalancer.tenantid == tenant_id).first() - if not device: + if not vip: session.rollback() response.status = 404 return dict( @@ -57,8 +59,8 @@ class VipsController(RestController): ) resp = { "virtualIps": [{ - "id": device.id, - "address": device.floatingIpAddr, + "id": vip.id, + "address": str(ipaddress.IPv4Address(vip.ip)), "type": "PUBLIC", "ipVersion": "IPV4" }] diff --git a/libra/mgm/drivers/__init__.py b/libra/common/api/__init__.py similarity index 100% rename from libra/mgm/drivers/__init__.py rename to libra/common/api/__init__.py diff --git a/libra/api/library/gearman_client.py b/libra/common/api/gearman_client.py similarity index 85% rename from libra/api/library/gearman_client.py rename to libra/common/api/gearman_client.py index 6e7e1469..fa068dd7 100644 --- a/libra/api/library/gearman_client.py +++ b/libra/common/api/gearman_client.py @@ -15,10 +15,11 @@ import eventlet eventlet.monkey_patch() import logging +import ipaddress from libra.common.json_gearman import JSONGearmanClient -from libra.api.model.lbaas import LoadBalancer, db_session, Device, Node -from libra.api.model.lbaas import HealthMonitor -from libra.api.model.lbaas import loadbalancers_devices +from libra.common.api.lbaas import LoadBalancer, db_session, Device, Node, Vip +from libra.common.api.lbaas import HealthMonitor +from libra.common.api.lbaas import loadbalancers_devices from sqlalchemy.exc import OperationalError from pecan import conf @@ -39,6 +40,13 @@ def submit_job(job_type, host, data, lbid): eventlet.spawn_n(client_job, logger, job_type, host, data, lbid) +def submit_vip_job(job_type, device, vip): + logger = logging.getLogger(__name__) + eventlet.spawn_n( + client_job, logger, job_type, "libra_pool_mgm", device, vip + ) + + def client_job(logger, job_type, host, data, lbid): for x in xrange(5): try: @@ -54,6 +62,10 @@ def client_job(logger, job_type, host, data, lbid): client.send_delete(data) if job_type == 'ARCHIVE': client.send_archive(data) + if job_type == 'ASSIGN': + client.send_assign(data) + if job_type == 'REMOVE': + client.send_remove(data) return except OperationalError: # Auto retry on galera locking error @@ -87,6 +99,31 @@ class GearmanClientThread(object): else: self.gearman_client = JSONGearmanClient(conf.gearman.server) + def send_assign(self, data): + job_data = { + 'action': 'ASSIGN_IP', + 'name': data, + 'ip': self.lbid + } + status, response = self._send_message(job_data, 'response') + if not status: + self.logger.error( + "Failed to assign IP {0} to device {1}".format(self.lbid, data) + ) + + def send_remove(self, data): + job_data = { + 'action': 'REMOVE_IP', + 'name': data, + 'ip': self.lbid + } + status, response = self._send_message(job_data, 'response') + if not status: + self.logger.error( + "Failed to remove IP {0} from device {1}" + .format(self.lbid, data) + ) + def send_delete(self, data): with db_session() as session: count = session.query( @@ -129,9 +166,17 @@ class GearmanClientThread(object): job_data['loadBalancers'][0]['nodes'].append(node_data) else: # This is a delete + dev = session.query(Device.name).\ + filter(Device.id == data).first() + vip = session.query(Vip).\ + filter(Vip.device == data).first() + submit_vip_job( + 'REMOVE', dev.name, str(ipaddress.IPv4Address(vip.ip)) + ) + vip.device = None job_data = {"hpcs_action": "DELETE"} - status, response = self._send_message(job_data) + status, response = self._send_message(job_data, 'hpcs_response') lb = session.query(LoadBalancer).\ filter(LoadBalancer.id == self.lbid).\ first() @@ -145,9 +190,7 @@ class GearmanClientThread(object): # Device should never be used again device = session.query(Device).\ filter(Device.id == data).first() - #TODO: change this to 'DELETED' when pool mgm deletes - if device.status != 'ERROR': - device.status = 'OFFLINE' + device.status = 'DELETED' # Remove LB-device join session.execute(loadbalancers_devices.delete().where( loadbalancers_devices.c.loadbalancer == lb.id @@ -191,7 +234,7 @@ class GearmanClientThread(object): 'protocol': lb.protocol }] } - status, response = self._send_message(job_data) + status, response = self._send_message(job_data, 'hpcs_response') device = session.query(Device).\ filter(Device.id == data['deviceid']).\ first() @@ -266,7 +309,7 @@ class GearmanClientThread(object): job_data['loadBalancers'].append(lb_data) # Update the worker - status, response = self._send_message(job_data) + status, response = self._send_message(job_data, 'hpcs_response') lb = session.query(LoadBalancer).\ filter(LoadBalancer.id == self.lbid).\ first() @@ -279,7 +322,7 @@ class GearmanClientThread(object): session.commit() - def _send_message(self, message): + def _send_message(self, message, response_name): job_status = self.gearman_client.submit_job( self.host, message, background=False, wait_until_complete=True, max_retries=10, poll_timeout=120.0 @@ -298,7 +341,7 @@ class GearmanClientThread(object): if 'badRequest' in job_status.result: error = job_status.result['badRequest']['validationErrors'] return False, error['message'] - if job_status.result['hpcs_response'] == 'FAIL': + if job_status.result[response_name] == 'FAIL': # Worker says 'no' if 'hpcs_error' in job_status.result: error = job_status.result['hpcs_error'] diff --git a/libra/api/model/lbaas.py b/libra/common/api/lbaas.py similarity index 93% rename from libra/api/model/lbaas.py rename to libra/common/api/lbaas.py index 7aaaad65..0b815229 100644 --- a/libra/api/model/lbaas.py +++ b/libra/common/api/lbaas.py @@ -52,6 +52,20 @@ class Limits(DeclarativeBase): value = Column(u'value', BIGINT(), nullable=False) +class PoolBuilding(DeclarativeBase): + __tablename__ = 'pool_building' + id = Column(u'id', Integer, primary_key=True, nullable=False) + server_id = Column(u'server_id', Integer, nullable=False) + qty = Column(u'qty', Integer, nullable=False) + + +class Vip(DeclarativeBase): + __tablename__ = 'vips' + id = Column(u'id', Integer, primary_key=True, nullable=False) + ip = Column(u'ip', Integer, nullable=True) + device = Column(u'device', Integer, ForeignKey('devices.id')) + + class Device(DeclarativeBase): """device model""" __tablename__ = 'devices' @@ -67,6 +81,7 @@ class Device(DeclarativeBase): status = Column(u'status', VARCHAR(length=128), nullable=False) type = Column(u'type', VARCHAR(length=128), nullable=False) updated = Column(u'updated', FormatedDateTime(), nullable=False) + vip = relationship("Vip", uselist=False, backref="devices") class LoadBalancer(DeclarativeBase): @@ -91,7 +106,7 @@ class LoadBalancer(DeclarativeBase): 'HealthMonitor', backref=backref( 'loadbalancers', order_by='HealthMonitor.lbid') - ) + ) devices = relationship( 'Device', secondary=loadbalancers_devices, backref='loadbalancers', lazy='joined' diff --git a/libra/api/model/lbaas.sql b/libra/common/api/lbaas.sql similarity index 92% rename from libra/api/model/lbaas.sql rename to libra/common/api/lbaas.sql index 7ea3167c..84c22d82 100644 --- a/libra/api/model/lbaas.sql +++ b/libra/common/api/lbaas.sql @@ -75,4 +75,18 @@ CREATE TABLE monitors ( PRIMARY KEY (lbid) # ids are unique accross all Nodes ) DEFAULT CHARSET utf8 DEFAULT COLLATE utf8_general_ci; - +CREATE TABLE `pool_building` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `server_id` int(11) NOT NULL, + `qty` int(11) NOT NULL, + PRIMARY KEY (`id`), + KEY `server_id` (`server_id`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE TABLE `vips` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `ip` int(11) DEFAULT NULL, + `device` int(11) DEFAULT NULL, + PRIMARY KEY (`id`), + KEY `device` (`device`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; diff --git a/libra/mgm/__init__.py b/libra/mgm/__init__.py index e69de29b..582348cb 100644 --- a/libra/mgm/__init__.py +++ b/libra/mgm/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/libra/mgm/drivers/dummy/__init__.py b/libra/mgm/controllers/__init__.py similarity index 100% rename from libra/mgm/drivers/dummy/__init__.py rename to libra/mgm/controllers/__init__.py diff --git a/libra/mgm/controllers/build.py b/libra/mgm/controllers/build.py new file mode 100644 index 00000000..dfc722a6 --- /dev/null +++ b/libra/mgm/controllers/build.py @@ -0,0 +1,118 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from time import sleep +from novaclient import exceptions +from libra.mgm.nova import Node, BuildError, NotFound + + +class BuildController(object): + + RESPONSE_FIELD = 'response' + RESPONSE_SUCCESS = 'PASS' + RESPONSE_FAILURE = 'FAIL' + + def __init__(self, logger, args, msg): + self.logger = logger + self.msg = msg + self.args = args + + def run(self): + try: + nova = Node(self.args) + except Exception: + self.logger.exception("Error initialising Nova connection") + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + + self.logger.info("Building a requested Nova instance") + try: + node_id = nova.build() + self.logger.info("Build command sent to Nova") + except BuildError as exc: + self.logger.exception( + "{0}, node {1}".format(exc.msg, exc.node_name) + ) + name = exc.node_name + # Node may have built despite error + try: + node_id = self.get_node(name) + except NotFound: + self.logger.error( + "No node found for {0}, giving up on it".format(name) + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + except exceptions.ClientException: + self.logger.exception( + 'Error getting failed node info from Nova for {0}' + .format(name) + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + if node_id > 0: + return self._wait_until_node_ready(nova, node_id) + else: + self.logger.error( + 'Node build did not return an ID, cannot find it' + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + + def _wait_until_node_ready(self, nova, node_id): + for x in xrange(1, 10): + try: + resp, status = nova.status(node_id) + except NotFound: + self.logger.error( + 'Node {0} can no longer be found'.format(node_id) + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + except exceptions.ClientException: + self.logger.exception( + 'Error getting status from Nova' + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + if resp.status_code not in(200, 203): + self.logger.error( + 'Error geting status from Nova, error {0}' + .format(resp.status_code) + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + status = status['server'] + if status['status'] == 'ACTIVE': + self.msg['name'] = status['name'] + addresses = status['addresses']['private'] + for address in addresses: + if not address['addr'].startswith('10.'): + break + self.msg['addr'] = address['addr'] + self.msg['type'] = "basename: {0}, image: {1}".format( + self.args.node_basename, self.args.nova_image + ) + self.msg['az'] = self.args.az + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS + self.logger.info('Node {0} returned'.format(status['name'])) + return self.msg + sleep(60) + + nova.delete(node_id) + self.logger.error( + "Node {0} didn't come up after 10 minutes, deleted" + ).format(node_id) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg diff --git a/libra/mgm/controllers/delete.py b/libra/mgm/controllers/delete.py new file mode 100644 index 00000000..b64b0888 --- /dev/null +++ b/libra/mgm/controllers/delete.py @@ -0,0 +1,53 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from libra.mgm.nova import Node, NotFound + + +class DeleteController(object): + + RESPONSE_FIELD = 'response' + RESPONSE_SUCCESS = 'PASS' + RESPONSE_FAILURE = 'FAIL' + + def __init__(self, logger, args, msg): + self.logger = logger + self.msg = msg + self.args = args + + def run(self): + try: + nova = Node(self.args) + except Exception: + self.logger.exception("Error initialising Nova connection") + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + + self.logger.info( + "Deleting a requested Nova instance {0}".format(self.msg['name']) + ) + try: + node_id = nova.get_node(self.msg['name']) + except NotFound: + self.logger.error( + "No node found for {0}".format(self.msg['name']) + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + nova.delete(node_id) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS + self.logger.info( + 'Deleted node {0}, id {1}'.format(self.msg['name'], node_id) + ) + return self.msg diff --git a/libra/mgm/controllers/root.py b/libra/mgm/controllers/root.py new file mode 100644 index 00000000..65a07ad7 --- /dev/null +++ b/libra/mgm/controllers/root.py @@ -0,0 +1,70 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from libra.mgm.controllers.build import BuildController +from libra.mgm.controllers.delete import DeleteController +from libra.mgm.controllers.vip import BuildIpController, AssignIpController +from libra.mgm.controllers.vip import RemoveIpController + + +class PoolMgmController(object): + + ACTION_FIELD = 'action' + RESPONSE_FIELD = 'response' + RESPONSE_SUCCESS = 'PASS' + RESPONSE_FAILURE = 'FAIL' + + def __init__(self, logger, args, json_msg): + self.logger = logger + self.msg = json_msg + self.args = args + + def run(self): + if self.ACTION_FIELD not in self.msg: + self.logger.error("Missing `{0}` value".format(self.ACTION_FIELD)) + self.msg[self.RESPONSE_FILED] = self.RESPONSE_FAILURE + return self.msg + + action = self.msg[self.ACTION_FIELD].upper() + + try: + if action == 'BUILD_DEVICE': + controller = BuildController(self.logger, self.args, self.msg) + elif action == 'DELETE_DEVICE': + controller = DeleteController(self.logger, self.args, self.msg) + elif action == 'BUILD_IP': + controller = BuildIpController( + self.logger, self.args, self.msg + ) + elif action == 'ASSIGN_IP': + controller = AssignIpController( + self.logger, self.args, self.msg + ) + elif action == 'REMOVE_IP': + controller = RemoveIpController( + self.logger, self.args, self.msg + ) + else: + self.logger.error( + "Invalid `{0}` value: {1}".format( + self.ACTION_FIELD, action + ) + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + return controller.run() + except Exception: + self.logger.exception("Controller exception") + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg diff --git a/libra/mgm/controllers/vip.py b/libra/mgm/controllers/vip.py new file mode 100644 index 00000000..15e4899f --- /dev/null +++ b/libra/mgm/controllers/vip.py @@ -0,0 +1,127 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from novaclient import exceptions +from libra.mgm.nova import Node + + +class BuildIpController(object): + + RESPONSE_FIELD = 'response' + RESPONSE_SUCCESS = 'PASS' + RESPONSE_FAILURE = 'FAIL' + + def __init__(self, logger, args, msg): + self.logger = logger + self.msg = msg + self.args = args + + def run(self): + try: + nova = Node(self.args) + except Exception: + self.logger.exception("Error initialising Nova connection") + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + + self.logger.info("Creating a requested floating IP") + try: + ip_info = nova.vip_create() + except exceptions.ClientException: + self.logger.exception( + 'Error getting a Floating IP' + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + self.logger.info("Floating IP {0} created".format(ip_info['id'])) + self.msg['id'] = ip_info['id'] + self.msg['ip'] = ip_info['ip'] + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS + return self.msg + + +class AssignIpController(object): + + RESPONSE_FIELD = 'response' + RESPONSE_SUCCESS = 'PASS' + RESPONSE_FAILURE = 'FAIL' + + def __init__(self, logger, args, msg): + self.logger = logger + self.msg = msg + self.args = args + + def run(self): + try: + nova = Node(self.args) + except Exception: + self.logger.exception("Error initialising Nova connection") + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + + self.logger.info( + "Assigning Floating IP {0} to {1}" + .format(self.msg['ip'], self.msg['name']) + ) + try: + node_id = nova.get_node(self.msg['name']) + nova.vip_assign(node_id, self.msg['ip']) + except: + self.logger.exception( + 'Error assigning Floating IP {0} to {1}' + .format(self.msg['ip'], self.msg['name']) + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS + return self.msg + + +class RemoveIpController(object): + + RESPONSE_FIELD = 'response' + RESPONSE_SUCCESS = 'PASS' + RESPONSE_FAILURE = 'FAIL' + + def __init__(self, logger, args, msg): + self.logger = logger + self.msg = msg + self.args = args + + def run(self): + try: + nova = Node(self.args) + except Exception: + self.logger.exception("Error initialising Nova connection") + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + + self.logger.info( + "Removing Floating IP {0} from {1}" + .format(self.msg['ip'], self.msg['name']) + ) + try: + node_id = nova.get_node(self.msg['name']) + nova.vip_remove(node_id, self.msg['ip']) + except: + self.logger.exception( + 'Error removing Floating IP {0} from {1}' + .format(self.msg['ip'], self.msg['name']) + ) + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE + return self.msg + + self.msg[self.RESPONSE_FIELD] = self.RESPONSE_SUCCESS + return self.msg diff --git a/libra/mgm/drivers/base.py b/libra/mgm/drivers/base.py deleted file mode 100644 index 4e70117b..00000000 --- a/libra/mgm/drivers/base.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations - - -# Mapping of --driver options to a class -known_drivers = { - 'hp_rest': 'libra.mgm.drivers.hp_rest.driver.HPRestDriver', - 'dummy': 'libra.mgm.drivers.dummy.driver.DummyDriver' -} - - -class MgmDriver(object): - """ - Pool manager device driver base class. - - This defines the API for interacting with various APIs. - Drivers for these appliances should inherit from this class and implement - the relevant API methods that it can support. - """ - - def get_free_count(self): - """ Get a count of how many nodes are free. """ - raise NotImplementedError() - - def add_node(self, name, address): - """ Add a node to a device. """ - raise NotImplementedError() - - def is_online(self): - """ Returns false if no API server is available """ - raise NotImplementedError() - - def get_url(self): - """ Gets the URL we are currently connected to """ - raise NotImplementedError() diff --git a/libra/mgm/drivers/dummy/driver.py b/libra/mgm/drivers/dummy/driver.py deleted file mode 100644 index fcb07a4b..00000000 --- a/libra/mgm/drivers/dummy/driver.py +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations - -from libra.mgm.drivers.base import MgmDriver - - -class DummyDriver(MgmDriver): - """ - Pool manager dummy driver for testing - """ - def __init__(self, addresses, logger): - self.logger = logger - - def get_free_count(self): - return 5 - - def is_online(self): - return True - - def add_node(self, node_data): - self.logger.info('Dummy API send of {0}'.format(node_data)) - return True, 'test response' - - def get_url(self): - return 'Dummy Connection' diff --git a/libra/mgm/drivers/hp_rest/driver.py b/libra/mgm/drivers/hp_rest/driver.py deleted file mode 100644 index 035905da..00000000 --- a/libra/mgm/drivers/hp_rest/driver.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import requests -import json -import random -import sys - -from libra.mgm.drivers.base import MgmDriver - -API_VERSION = 'v1' - - -class HPRestDriver(MgmDriver): - - def __init__(self, addresses, logger): - self.logger = logger - self.headers = {'Content-type': 'application/json'} - random.shuffle(addresses) - for address in addresses: - self.url = 'https://{0}/{1}'.format(address, API_VERSION) - self.logger.info('Trying {url}'.format(url=self.url)) - status, data = self._get('{url}/devices/usage' - .format(url=self.url)) - if status: - self.logger.info('API Server is online') - self.online = True - return - - # if we get this far all API servers are down - self.online = False - - def get_url(self): - return self.url - - def get_free_count(self): - status, usage = self.get_usage() - if not status: - return None - return usage['free'] - - def is_online(self): - return self.online - - def get_node_list(self, limit, marker): - return self._get('{url}/devices'.format(url=self.url)) - - def get_usage(self): - return self._get('{url}/devices/usage'.format(url=self.url)) - - def get_node(self, node_id): - return self._get( - '{url}/devices/{nid}'.format(url=self.url, nid=node_id) - ) - - def add_node(self, node_data): - return self._post('{url}/devices'.format(url=self.url), node_data) - - def delete_node(self, node_id): - requests.delete( - '{url}/devices/{nid}'.format(url=self.url, nid=node_id), - timeout=30 - ) - - def update_node(self, node_id, node_data): - requests.put( - '{url}/devices/{nid}'.format(url=self.url, nid=node_id), - json.dumps(node_data), - timeout=30 - ) - - def _get(self, url): - try: - r = requests.get(url, verify=False, timeout=30) - except requests.exceptions.RequestException: - self.logger.error('Exception communicating to server: {exc}' - .format(exc=sys.exc_info()[0])) - return False, None - - if r.status_code != 200: - self.logger.error('Server returned error {code}' - .format(code=r.status_code)) - return False, r.json() - return True, r.json() - - def _post(self, url, node_data): - try: - r = requests.post( - url, data=json.dumps(node_data), verify=False, - headers=self.headers, timeout=30 - ) - except requests.exceptions.RequestException: - self.logger.error('Exception communicating to server: {exc}' - .format(exc=sys.exc_info()[0])) - return False, None - - if r.status_code != 200: - self.logger.error('Server returned error {code}' - .format(code=r.status_code)) - return False, r.json() - return True, r.json() diff --git a/libra/mgm/gearman_worker.py b/libra/mgm/gearman_worker.py new file mode 100644 index 00000000..880bb716 --- /dev/null +++ b/libra/mgm/gearman_worker.py @@ -0,0 +1,70 @@ +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import gearman.errors +import json +import socket +import time + +from libra.common.json_gearman import JSONGearmanWorker +from libra.mgm.controllers.root import PoolMgmController + + +def handler(worker, job): + logger = worker.logger + logger.debug("Received JSON message: {0}".format(json.dumps(job.data))) + controller = PoolMgmController(logger, worker.args, job.data) + response = controller.run() + logger.debug("Return JSON message: {0}".format(json.dumps(response))) + return response + + +def worker_thread(logger, args): + logger.info("Registering task libra_pool_mgm") + hostname = socket.gethostname() + + if all([args.gearman_ssl_key, args.gearman_ssl_cert, args.gearman_ssl_ca]): + ssl_server_list = [] + for host_port in args.server: + host, port = host_port.split(':') + ssl_server_list.append({'host': host, + 'port': int(port), + 'keyfile': args.gearman_ssl_key, + 'certfile': args.gearman_ssl_cert, + 'ca_certs': args.gearman_ssl_ca}) + worker = JSONGearmanWorker(ssl_server_list) + else: + worker = JSONGearmanWorker(args.gearman) + + worker.set_client_id(hostname) + worker.register_task('libra_pool_mgm', handler) + worker.logger = logger + worker.args = args + + retry = True + + while (retry): + try: + worker.work(args.gearman_poll) + except KeyboardInterrupt: + retry = False + except gearman.errors.ServerUnavailable: + logger.error("Job server(s) went away. Reconnecting.") + time.sleep(args.reconnect_sleep) + retry = True + except Exception: + logger.exception("Exception in worker") + retry = False + + logger.debug("Pool manager process terminated.") diff --git a/libra/mgm/mgm.py b/libra/mgm/mgm.py index e5b86ba4..1ce956bf 100644 --- a/libra/mgm/mgm.py +++ b/libra/mgm/mgm.py @@ -12,85 +12,34 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet +eventlet.monkey_patch() import daemon import daemon.pidfile import daemon.runner import grp import pwd -import signal -import time import sys import os -import threading -from libra.mgm.schedulers import modules, known_modules -from libra.openstack.common import importutils from libra.common.options import Options, setup_logging -from libra.mgm.drivers.base import known_drivers -from libra.mgm.node_list import NodeList, AccessDenied +from libra.mgm.gearman_worker import worker_thread class Server(object): def __init__(self, args): self.args = args - self.ft = None - self.api = None - self.driver_class = None - self.schedulers = [] - try: - self.node_list = NodeList(self.args.datadir) - except AccessDenied as exc: - print(str(exc)) - self.shutdown(True) + self.logger = None def main(self): self.logger = setup_logging('libra_mgm', self.args) self.logger.info( - 'Libra Pool Manager started with a float of {nodes} nodes' - .format(nodes=self.args.nodes) + 'Libra Pool Manager worker started' ) - signal.signal(signal.SIGINT, self.exit_handler) - signal.signal(signal.SIGTERM, self.exit_handler) - - self.logger.info("Selected driver: {0}".format(self.args.driver)) - self.driver_class = importutils.import_class( - known_drivers[self.args.driver] - ) - - # NOTE(LinuxJedi): Threading lock is due to needing more than one - # timer and we don't want them to execute their trigger - # at the same time. - self.rlock = threading.RLock() - - # Load all the schedulers - for module in modules: - mod = importutils.import_class(known_modules[module]) - instance = mod( - self.driver_class, self.rlock, self.logger, self.node_list, - self.args - ) - self.schedulers.append(instance) - instance.run() - - while True: - time.sleep(1) - - def exit_handler(self, signum, frame): - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_IGN) - self.shutdown(False) - - def shutdown(self, error): - for sched in self.schedulers: - sched.timer.cancel() - - if not error: - self.logger.info('Safely shutting down') - sys.exit(0) - else: - self.logger.info('Shutting down due to error') - sys.exit(1) + thd = eventlet.spawn(worker_thread, self.logger, self.args) + thd.wait() + self.logger.info("Shutting down") def main(): @@ -99,33 +48,11 @@ def main(): '--api_server', action='append', metavar='HOST:PORT', default=[], help='a list of API servers to connect to (for HP REST API driver)' ) - options.parser.add_argument( - '--datadir', dest='datadir', - help='directory to store data files' - ) options.parser.add_argument( '--az', type=int, help='The az number the node will reside in (to be passed to the API' ' server)' ) - options.parser.add_argument( - '--nodes', type=int, default=1, - help='number of nodes' - ) - options.parser.add_argument( - '--check_interval', type=int, default=5, - help='how often to check if new nodes are needed (in minutes)' - ) - options.parser.add_argument( - '--submit_interval', type=int, default=15, - help='how often to test nodes for submission to the API' - ' server (in minutes)' - ) - options.parser.add_argument( - '--driver', dest='driver', - choices=known_drivers.keys(), default='hp_rest', - help='type of device to use' - ) options.parser.add_argument( '--node_basename', dest='node_basename', help='prepend the name of all nodes with this' @@ -168,11 +95,32 @@ def main(): help='the image size ID (flavor ID) or name to use for new nodes spun' ' up in the Nova API' ) + options.parser.add_argument( + '--gearman', action='append', metavar='HOST:PORT', default=[], + help='Gearman job servers' + ) + options.parser.add_argument( + '--gearman_ssl_ca', metavar='FILE', + help='Gearman SSL certificate authority' + ) + options.parser.add_argument( + '--gearman_ssl_cert', metavar='FILE', + help='Gearman SSL certificate' + ) + options.parser.add_argument( + '--gearman_ssl_key', metavar='FILE', + help='Gearman SSL key' + ) + options.parser.add_argument( + '--gearman-poll', + dest='gearman_poll', type=int, metavar='TIME', + default=1, help='Gearman worker polling timeout' + ) args = options.run() required_args = [ - 'datadir', 'az', + 'az', 'nova_image', 'nova_image_size', 'nova_secgroup', 'nova_keyname', 'nova_tenant', 'nova_region', 'nova_user', 'nova_pass', 'nova_auth_url' ] @@ -191,16 +139,16 @@ def main(): if missing_args: return 2 - if not args.api_server: + if not args.gearman: # NOTE(shrews): Can't set a default in argparse method because the # value is appended to the specified default. - args.api_server.append('localhost:8889') - elif not isinstance(args.api_server, list): + args.gearman.append('localhost:4730') + elif not isinstance(args.gearman, list): # NOTE(shrews): The Options object cannot intelligently handle # creating a list from an option that may have multiple values. # We convert it to the expected type here. - svr_list = args.api_server.split() - args.api_server = svr_list + svr_list = args.gearman.split() + args.gearman = svr_list server = Server(args) diff --git a/libra/mgm/node_list.py b/libra/mgm/node_list.py deleted file mode 100644 index c0914290..00000000 --- a/libra/mgm/node_list.py +++ /dev/null @@ -1,49 +0,0 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import pickle -import os - - -class AccessDenied(Exception): - pass - - -class NodeList(object): - def __init__(self, path): - if not os.access(path, os.W_OK): - msg = 'Do not have permission to write to {0}'.format(path) - raise AccessDenied(msg) - - self.file_name = '{0}/node_log.dat'.format(path) - - def add(self, item): - data = self.get() - data.append(item) - self.put(data) - - def delete(self, item): - data = self.get() - data.remove(item) - self.put(data) - - def get(self): - # Attribute error is thrown if file is non-existent - try: - return pickle.load(open(self.file_name, "rb")) - except IOError: - return [] - - def put(self, data): - pickle.dump(data, open(self.file_name, "wb")) diff --git a/libra/mgm/nova.py b/libra/mgm/nova.py index df68e7b2..2960ea3b 100644 --- a/libra/mgm/nova.py +++ b/libra/mgm/nova.py @@ -35,33 +35,32 @@ class BuildError(Exception): class Node(object): - def __init__(self, username, password, tenant, auth_url, region, keyname, - secgroup, image, node_type, node_basename=None): + def __init__(self, args): self.nova = client.HTTPClient( - username, - password, - tenant, - auth_url, - region_name=region, + args.nova_user, + args.nova_pass, + args.nova_tenant, + args.nova_auth_url, + region_name=args.nova_region, no_cache=True, service_type='compute' ) - self.keyname = keyname - self.secgroup = secgroup - self.node_basename = node_basename + self.keyname = args.nova_keyname + self.secgroup = args.nova_secgroup + self.node_basename = args.node_basename # Replace '_' with '-' in basename if self.node_basename: self.node_basename = self.node_basename.replace('_', '-') - if image.isdigit(): - self.image = image + if args.nova_image.isdigit(): + self.image = args.nova_image else: - self.image = self._get_image(image) + self.image = self._get_image(args.nova_image) - if node_type.isdigit(): - self.node_type = node_type + if args.nova_image_size.isdigit(): + self.node_type = args.nova_image_size else: - self.node_type = self._get_flavor(node_type) + self.node_type = self._get_flavor(args.nova_image_size) def build(self): """ create a node, test it is running """ @@ -76,6 +75,43 @@ class Node(object): return body['server']['id'] + def vip_create(self): + """ create a virtual IP """ + url = '/os-floating-ips' + body = {"pool": None} + resp, body = self.nova.post(url, body=body) + return body['floating_ip'] + + def vip_assign(self, node_id, vip): + """ assign a virtual IP to a Nova instance """ + url = '/servers/{0}/action'.format(node_id) + body = { + "addFloatingIp": { + "address": vip + } + } + resp, body = self.nova.post(url, body=body) + if resp.status_code != 202: + raise Exception( + 'Response code {0}, message {1} when assigning vip' + .format(resp.status_code, body) + ) + + def vip_remove(self, node_id, vip): + """ assign a virtual IP to a Nova instance """ + url = '/servers/{0}/action'.format(node_id) + body = { + "removeFloatingIp": { + "address": vip + } + } + resp, body = self.nova.post(url, body=body) + if resp.status_code != 202: + raise Exception( + 'Response code {0}, message {1} when assigning vip' + .format(resp.status_code, body) + ) + def delete(self, node_id): """ delete a node """ try: diff --git a/libra/mgm/schedulers/build.py b/libra/mgm/schedulers/build.py deleted file mode 100644 index e8abc1df..00000000 --- a/libra/mgm/schedulers/build.py +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import threading -from novaclient import exceptions -from libra.mgm.nova import Node, BuildError, NotFound - - -class BuildNodes(object): - def __init__(self, driver, lock, logger, node_list, args): - self.driver = driver - self.lock = lock - self.args = args - self.logger = logger - self.node_list = node_list - self.timer = None - - def run(self): - """ check if known nodes are used """ - with self.lock: - try: - self.logger.info('Checking if new nodes are needed') - api = self.driver(self.args.api_server, self.logger) - if api.is_online(): - self.logger.info( - 'Connected to {url}'.format(url=api.get_url()) - ) - free_count = api.get_free_count() - if free_count is None: - self.scheduler() - return - if free_count < self.args.nodes: - # we need to build new nodes - nodes_required = self.args.nodes - free_count - self.logger.info( - '{nodes} nodes required' - .format(nodes=nodes_required) - ) - self.build_nodes(nodes_required, api) - else: - self.logger.info('No new nodes required') - else: - self.logger.error('No working API server found') - except Exception: - self.logger.exception('Uncaught exception during node check') - - self.scheduler() - - def build_nodes(self, count, api): - try: - nova = Node( - self.args.nova_user, - self.args.nova_pass, - self.args.nova_tenant, - self.args.nova_auth_url, - self.args.nova_region, - self.args.nova_keyname, - self.args.nova_secgroup, - self.args.nova_image, - self.args.nova_image_size, - node_basename=self.args.node_basename - ) - except Exception as exc: - self.logger.error('Error initialising Nova connection {exc}' - .format(exc=exc) - ) - return - # Remove number of nodes we are still waiting on build status from - build_count = len(self.node_list.get()) - count = count - build_count - if count > 0: - self.logger.info( - '{0} nodes already building, attempting to build {1} more' - .format(build_count, count) - ) - else: - self.logger.info( - '{0} nodes already building, no more needed' - .format(build_count) - ) - while count > 0: - count = count - 1 - try: - node_id = nova.build() - except BuildError as exc: - self.logger.exception('{0}, node {1}' - .format(exc.msg, exc.node_name) - ) - self.logger.info( - 'Node build did not return ID for {0}, trying to find' - .format(exc.node_name) - ) - self.find_unknown(exc.node_name, nova) - continue - - if node_id > 0: - self.logger.info( - 'Storing node {0} to add later'.format(node_id) - ) - self.node_list.add(node_id) - else: - self.logger.error( - 'Node build did not return ID, cannot find it' - ) - - def find_unknown(self, name, nova): - """ - Nova can tell us a node failed to build when it didn't - This does a check and if it did start to build adds it to the - failed node list. - """ - try: - node_id = nova.get_node(name) - self.logger.info('Storing node {0} to add later'.format(node_id)) - self.node_list.add(node_id) - except NotFound: - # Node really didn't build - self.logger.info( - 'No node found for {0}, giving up on it'.format(name) - ) - return - except exceptions.ClientException: - self.logger.exception( - 'Error getting failed node info from Nova for {0}'.format(name) - ) - - def scheduler(self): - self.logger.info('Node check timer sleeping for {mins} minutes' - .format(mins=self.args.check_interval)) - self.timer = threading.Timer(60 * int(self.args.check_interval), - self.run, ()) - self.timer.start() diff --git a/libra/mgm/schedulers/submit.py b/libra/mgm/schedulers/submit.py deleted file mode 100644 index 2d5eec3b..00000000 --- a/libra/mgm/schedulers/submit.py +++ /dev/null @@ -1,148 +0,0 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import sys -import threading -from novaclient import exceptions -from libra.mgm.nova import Node, NotFound - - -class SubmitNodes(object): - def __init__(self, driver, lock, logger, node_list, args): - self.driver = driver - self.lock = lock - self.args = args - self.logger = logger - self.node_list = node_list - self.timer = None - - def run(self): - """ check/submit list of nodes to be added """ - with self.lock: - try: - self.logger.info('Checking log of nova builds') - nodes = self.node_list.get() - if len(nodes) == 0: - self.logger.info('Node log empty') - else: - api = self.driver(self.args.api_server, self.logger) - if api.is_online(): - self.logger.info( - 'Connected to {url}'.format(url=api.get_url()) - ) - for node in nodes: - self.test_node(node, api) - else: - self.logger.error('No working API server found') - except Exception: - self.logger.exception( - 'Uncaught exception during node check' - ) - self.scheduler() - - def test_node(self, node_id, api): - try: - nova = Node( - self.args.nova_user, - self.args.nova_pass, - self.args.nova_tenant, - self.args.nova_auth_url, - self.args.nova_region, - self.args.nova_keyname, - self.args.nova_secgroup, - self.args.nova_image, - self.args.nova_image_size, - node_basename=self.args.node_basename - ) - except Exception as exc: - self.logger.error( - 'Error initialising Nova connection {exc}' - .format(exc=exc) - ) - return - self.logger.info('Testing readiness node {0}'.format(node_id)) - try: - resp, status = nova.status(node_id) - except NotFound: - self.logger.info( - 'Node {0} no longer exists, removing from list' - .format(node_id) - ) - self.node_list.delete(node_id) - return - except exceptions.ClientException as exc: - self.logger.error( - 'Error getting status from Nova, exception {exc}' - .format(exc=sys.exc_info()[0]) - ) - return - - if resp.status_code not in(200, 203): - self.logger.error( - 'Error geting status from Nova, error {0}' - .format(resp.status_code) - ) - return - status = status['server'] - if status['status'] == 'ACTIVE': - name = status['name'] - body = self.build_node_data(status) - status, response = api.add_node(body) - if not status: - self.logger.error( - 'Could not upload node {name} to API server' - .format(name=name) - ) - else: - self.node_list.delete(node_id) - self.logger.info('Node {0} added to API server'.format(name)) - return - elif status['status'].startswith('BUILD'): - self.logger.info( - 'Node {0} still building, ignoring'.format(node_id) - ) - return - else: - self.logger.info( - 'Node {0} is bad, deleting'.format(node_id) - ) - status, msg = nova.delete(node_id) - if not status: - self.logger.error(msg) - else: - self.logger.info('Delete successful') - self.node_list.delete(node_id) - - def build_node_data(self, data): - """ Build the API data from the node data """ - body = {} - body['name'] = data['name'] - addresses = data['addresses']['private'] - for address in addresses: - if not address['addr'].startswith('10.'): - break - body['publicIpAddr'] = address['addr'] - body['floatingIpAddr'] = address['addr'] - body['az'] = self.args.az - body['type'] = "basename: {0}, image: {1}".format( - self.args.node_basename, self.args.nova_image - ) - return body - - def scheduler(self): - self.logger.info('Node submit timer sleeping for {mins} minutes' - .format(mins=self.args.submit_interval)) - self.timer = threading.Timer(60 * int(self.args.submit_interval), - self.run, ()) - self.timer.start() diff --git a/libra/tests/test_lbaas_mgm.py b/libra/tests/test_lbaas_mgm.py index 77c10fbc..f8a5c230 100644 --- a/libra/tests/test_lbaas_mgm.py +++ b/libra/tests/test_lbaas_mgm.py @@ -11,6 +11,19 @@ fake_body = open( os.path.join(os.path.dirname(__file__), "fake_body.json"), 'r').read() +class Args(object): + nova_user = "password" + nova_pass = "auth_test" + nova_tenant = "tenant1" + nova_region = "region1" + nova_keyname = "default" + nova_secgroup = "default" + nova_image = '1234' + nova_image_size = '100' + nova_auth_url = '' + node_basename = '' + + class TestResponse(requests.Response): """ Class used to wrap requests.Response and provide some @@ -56,10 +69,8 @@ class TestLBaaSMgmTask(testtools.TestCase): class TestLBaaSMgmNova(testtools.TestCase): def setUp(self): super(TestLBaaSMgmNova, self).setUp() - self.api = Node( - "username", "password", "auth_test", "tenant1", "region1", - "default", "default", '1234', '100' - ) + args = Args() + self.api = Node(args) self.api.nova.management_url = "http://example.com" self.api.nova.auth_token = "token"