# Copyright 2015 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import datetime from oslo_config import cfg from oslo_log import log as logging from oslo_utils import excutils import sqlalchemy from stevedore import driver as stevedore_driver from octavia.common import constants from octavia.common import stats from octavia.controller.healthmanager import update_serializer from octavia.db import api as db_api from octavia.db import repositories as repo LOG = logging.getLogger(__name__) class UpdateHealthDb(object): def __init__(self): super(UpdateHealthDb, self).__init__() # first setup repo for amphora, listener,member(nodes),pool repo self.event_streamer = stevedore_driver.DriverManager( namespace='octavia.controller.queues', name=cfg.CONF.health_manager.event_streamer_driver, invoke_on_load=True).driver self.amphora_repo = repo.AmphoraRepository() self.amphora_health_repo = repo.AmphoraHealthRepository() self.listener_repo = repo.ListenerRepository() self.loadbalancer_repo = repo.LoadBalancerRepository() self.member_repo = repo.MemberRepository() self.pool_repo = repo.PoolRepository() self.sync_prv_status = cfg.CONF.health_manager.sync_provisioning_status def emit(self, info_type, info_id, info_obj): cnt = update_serializer.InfoContainer(info_type, info_id, info_obj) self.event_streamer.emit(cnt) def _update_status_and_emit_event(self, session, repo, entity_type, entity_id, new_op_status): entity = repo.get(session, id=entity_id) message = {} if entity.operating_status.lower() != new_op_status.lower(): LOG.debug("%s %s status has changed from %s to " "%s. Updating db and sending event.", entity_type, entity_id, entity.operating_status, new_op_status) repo.update(session, entity_id, operating_status=new_op_status) if new_op_status == constants.DRAINING: new_op_status = constants.ONLINE message.update({constants.OPERATING_STATUS: new_op_status}) if self.sync_prv_status: LOG.debug("%s %s provisioning_status %s. Updating db and sending" " event.", entity_type, entity_id, entity.provisioning_status) message.update( {constants.PROVISIONING_STATUS: entity.provisioning_status}) if message: self.emit(entity_type, entity_id, message) def update_health(self, health): """This function is to update db info based on amphora status :param health: map object that contains amphora, listener, member info :type map: string :returns: null The input health data structure is shown as below:: health = { "id": self.FAKE_UUID_1, "listeners": { "listener-id-1": {"status": constants.OPEN, "pools": { "pool-id-1": {"status": constants.UP, "members": { "member-id-1": constants.ONLINE} } } } } } """ session = db_api.get_session() # We need to see if all of the listeners are reporting in expected_listener_count = 0 lbs_on_amp = self.amphora_repo.get_all_lbs_on_amphora(session, health['id']) for lb in lbs_on_amp: listener_count = self.listener_repo.count(session, load_balancer_id=lb.id) expected_listener_count += listener_count listeners = health['listeners'] # Do not update amphora health if the reporting listener count # does not match the expected listener count if len(listeners) == expected_listener_count: lock_session = db_api.get_session(autocommit=False) # if the input amphora is healthy, we update its db info try: self.amphora_health_repo.replace( lock_session, health['id'], last_update=(datetime.datetime.utcnow())) lock_session.commit() except Exception: with excutils.save_and_reraise_exception(): lock_session.rollback() else: LOG.warning('Amphora %(id)s health message reports %(found)i ' 'listeners when %(expected)i expected', {'id': health['id'], 'found': len(listeners), 'expected': expected_listener_count}) # We got a heartbeat so lb is healthy until proven otherwise # TODO(johnsom) Fix this if we have more than one LB on an amp if lbs_on_amp[0].enabled is False: lb_status = constants.OFFLINE else: lb_status = constants.ONLINE # update listener and nodes db information for listener_id, listener in listeners.items(): listener_status = None # OPEN = HAProxy listener status nbconn < maxconn if listener.get('status') == constants.OPEN: listener_status = constants.ONLINE # FULL = HAProxy listener status not nbconn < maxconn elif listener.get('status') == constants.FULL: listener_status = constants.DEGRADED if lb_status == constants.ONLINE: lb_status = constants.DEGRADED else: LOG.warning(('Listener %(list)s reported status of ' '%(status)s'), {'list': listener_id, 'status': listener.get('status')}) try: if listener_status is not None: self._update_status_and_emit_event( session, self.listener_repo, constants.LISTENER, listener_id, listener_status ) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Listener %s is not in DB", listener_id) pools = listener['pools'] for pool_id, pool in pools.items(): pool_status = None # UP = HAProxy backend has working or no servers if pool.get('status') == constants.UP: pool_status = constants.ONLINE # DOWN = HAProxy backend has no working servers elif pool.get('status') == constants.DOWN: pool_status = constants.ERROR lb_status = constants.ERROR else: LOG.warning(('Pool %(pool)s reported status of ' '%(status)s'), {'pool': pool_id, 'status': pool.get('status')}) # Deal with the members that are reporting from the Amphora members = pool['members'] for member_id, status in members.items(): member_status = None # Member status can be "UP" or "UP #/#" (transitional) if status.startswith(constants.UP): member_status = constants.ONLINE # Member status can be "DOWN" or "DOWN #/#" (transitional) elif status.startswith(constants.DOWN): member_status = constants.ERROR if pool_status == constants.ONLINE: pool_status = constants.DEGRADED if lb_status == constants.ONLINE: lb_status = constants.DEGRADED elif status == constants.DRAIN: member_status = constants.DRAINING elif status == constants.MAINT: member_status = constants.OFFLINE elif status == constants.NO_CHECK: member_status = constants.NO_MONITOR else: LOG.warning('Member %(mem)s reported status of ' '%(status)s', {'mem': member_id, 'status': status}) try: if member_status is not None: self._update_status_and_emit_event( session, self.member_repo, constants.MEMBER, member_id, member_status ) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Member %s is not able to update " "in DB", member_id) # Now deal with the members that didn't report from the Amphora db_pool = self.pool_repo.get(session, id=pool_id) real_members = [member.id for member in db_pool.members] reported_members = [member for member in members.keys()] missing_members = set(real_members) - set(reported_members) for member_id in missing_members: self._update_status_and_emit_event( session, self.member_repo, constants.MEMBER, member_id, constants.OFFLINE ) try: if pool_status is not None: self._update_status_and_emit_event( session, self.pool_repo, constants.POOL, pool_id, pool_status ) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Pool %s is not in DB", pool_id) # Update the load balancer status last # TODO(sbalukoff): This logic will need to be adjusted if we # start supporting multiple load balancers per amphora lb_id = self.amphora_repo.get( session, id=health['id']).load_balancer_id if lb_id is not None: try: self._update_status_and_emit_event( session, self.loadbalancer_repo, constants.LOADBALANCER, lb_id, lb_status ) except sqlalchemy.orm.exc.NoResultFound: LOG.error("Load balancer %s is not in DB", lb_id) class UpdateStatsDb(stats.StatsMixin): def __init__(self): super(UpdateStatsDb, self).__init__() self.event_streamer = stevedore_driver.DriverManager( namespace='octavia.controller.queues', name=cfg.CONF.health_manager.event_streamer_driver, invoke_on_load=True).driver self.repo_listener = repo.ListenerRepository() def emit(self, info_type, info_id, info_obj): cnt = update_serializer.InfoContainer(info_type, info_id, info_obj) self.event_streamer.emit(cnt) def update_stats(self, health_message): """This function is to update the db with listener stats :param health_message: The health message containing the listener stats :type map: string :returns: null Example:: health = { "id": self.FAKE_UUID_1, "listeners": { "listener-id-1": { "status": constants.OPEN, "stats": { "ereq":0, "conns": 0, "totconns": 0, "rx": 0, "tx": 0, }, "pools": { "pool-id-1": { "status": constants.UP, "members": {"member-id-1": constants.ONLINE} } } } } } """ session = db_api.get_session() amphora_id = health_message['id'] listeners = health_message['listeners'] for listener_id, listener in listeners.items(): stats = listener.get('stats') stats = {'bytes_in': stats['rx'], 'bytes_out': stats['tx'], 'active_connections': stats['conns'], 'total_connections': stats['totconns'], 'request_errors': stats['ereq']} LOG.debug("Updating listener stats in db and sending event.") LOG.debug("Listener %s / Amphora %s stats: %s", listener_id, amphora_id, stats) self.listener_stats_repo.replace( session, listener_id, amphora_id, **stats) listener_stats = self.get_listener_stats(session, listener_id) self.emit( 'listener_stats', listener_id, listener_stats.get_stats()) listener_db = self.repo_listener.get(session, id=listener_id) lb_stats = self.get_loadbalancer_stats( session, listener_db.load_balancer_id) self.emit('loadbalancer_stats', listener_db.load_balancer_id, lb_stats.get_stats())