Region is added part of os_namos

Change-Id: I54e8f5ca592ae3bc91d35a6997d601e92fd83b04
This commit is contained in:
Kanagaraj Manickam 2016-03-28 14:44:53 +05:30
parent 59a3041e70
commit 97d84d1b45
9 changed files with 160 additions and 65 deletions

View File

@ -1,10 +1,6 @@
[DEFAULT] [DEFAULT]
rpc_backend = rabbit rpc_backend = rabbit
debug=True debug=True
logging_exception_prefix = %(color)s%(asctime)s.%(msecs)03d TRACE %(name)s %(instance)s
logging_debug_format_suffix = from (pid=%(process)d) %(funcName)s %(pathname)s:%(lineno)d
logging_default_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [-%(color)s] %(instance)s%(color)s%(message)s
logging_context_format_string = %(asctime)s.%(msecs)03d %(color)s%(levelname)s %(name)s [%(request_id)s %(user)s %(tenant)s%(color)s] %(instance)s%(color)s%(message)s
[oslo_messaging_rabbit] [oslo_messaging_rabbit]
rabbit_userid = stackrabbit rabbit_userid = stackrabbit
@ -14,5 +10,8 @@ rabbit_hosts = 172.241.0.101
[database] [database]
connection = mysql+pymysql://root:password@172.241.0.101/namos?charset=utf8 connection = mysql+pymysql://root:password@172.241.0.101/namos?charset=utf8
[conductor] [os_manager]
workers=3 workers=3
[os_namos]
region_name=RegionTwo

View File

@ -1,6 +1,7 @@
# List of config generator conf files for syncing the conf with namos # List of config generator conf files for syncing the conf with namos
heat=/opt/stack/heat/config-generator.conf heat=/opt/stack/heat/config-generator.conf
namos=/home/manickan/workspace/namos/openstack/namos/config-generator.conf namos=/home/manickan/workspace/namos/openstack/namos/config-generator.conf
os_namos=/home/manickan/workspace/namos/openstack/os-namos/config-generator.conf
keystone=/opt/stack/keystone/config-generator/keystone.conf keystone=/opt/stack/keystone/config-generator/keystone.conf
neutron-bgp-dragent=/opt/stack/neutron/etc/oslo-config-generator/bgp_dragent.ini neutron-bgp-dragent=/opt/stack/neutron/etc/oslo-config-generator/bgp_dragent.ini
neutron-dhcp-agent=/opt/stack/neutron/etc/oslo-config-generator/dhcp_agent.ini neutron-dhcp-agent=/opt/stack/neutron/etc/oslo-config-generator/dhcp_agent.ini

View File

@ -43,15 +43,15 @@ def main():
from namos import conductor # noqa from namos import conductor # noqa
mgr = service.RPCService( mgr = service.RPCService(
CONF.conductor.name, CONF.os_manager.name,
config.PROJECT_NAME, config.PROJECT_NAME,
manager.ConductorManager()) manager.ConductorManager())
launcher = os_service.launch(CONF, mgr, CONF.conductor.workers) launcher = os_service.launch(CONF, mgr, CONF.os_manager.workers)
# TODO(mrkanag) Namos is not registering the RPC backend, fix it ! # TODO(mrkanag) Namos is not registering the RPC backend, fix it !
import os_namos # import os_namos
os_namos.register_myself() # os_namos.register_myself()
launcher.wait() launcher.wait()

View File

@ -15,10 +15,10 @@
import sys import sys
from oslo_config import cfg from oslo_config import cfg
from oslo_utils import timeutils
from namos.common import config from namos.common import config
from namos.common import exception from namos.common import exception
from namos.common import utils
from namos.db import api from namos.db import api
from namos.db import sample from namos.db import sample
from namos.db.sqlalchemy import migration from namos.db.sqlalchemy import migration
@ -29,24 +29,18 @@ MANAGE_COMMAND_NAME = 'namos-manage'
class HeartBeat(object): class HeartBeat(object):
def find_status(self, sw, report_interval=60):
status = False
if sw.updated_at is not None:
if ((timeutils.utcnow() - sw.updated_at).total_seconds()
<= report_interval):
status = True
else:
if ((timeutils.utcnow() - sw.created_at).total_seconds()
<= report_interval):
status = True
return status
def report_status(self): def report_status(self):
# TODO(mrkanag) Make like Node: Service: worker: status # TODO(mrkanag) Make like Node: Service: worker: status
for sw in api.service_worker_get_all(None): for sw in api.service_worker_get_all(None):
msg = '[%s] %s' % ('T' if self.find_status(sw) else 'F', # TODO(mrkanag) Move this to db layer and query non deleted entries
sw.name) if sw.deleted_at is not None:
continue
msg = '[%s] [%s] %s %s' % (
'T' if sw.is_launcher else 'F',
'T' if utils.find_status(sw) else 'F',
sw.name,
sw.host)
print (msg) print (msg)

View File

@ -35,7 +35,7 @@ conductor_opts = [
def register_conductor_opts(): def register_conductor_opts():
CONF.register_opts(conductor_opts, 'conductor') CONF.register_opts(conductor_opts, 'os_manager')
def init_conf(prog): def init_conf(prog):
@ -52,4 +52,4 @@ def init_log(project=PROJECT_NAME):
def list_opts(): def list_opts():
yield 'conductor', conductor_opts yield 'os_manager', conductor_opts

29
namos/common/utils.py Normal file
View File

@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import timeutils
def find_status(sw, report_interval=60):
status = False
if sw.updated_at is not None:
if ((timeutils.utcnow() - sw.updated_at).total_seconds()
<= report_interval):
status = True
else:
if ((timeutils.utcnow() - sw.created_at).total_seconds()
<= report_interval):
status = True
return status

View File

@ -22,6 +22,7 @@ from oslo_utils import timeutils
from namos.common import config from namos.common import config
from namos.common import exception from namos.common import exception
from namos.common import messaging from namos.common import messaging
from namos.common import utils
from namos.db import api as db_api from namos.db import api as db_api
from namos.db import openstack_drivers from namos.db import openstack_drivers
@ -68,8 +69,10 @@ class ConductorManager(object):
)) ))
# Service processing # Service processing
sp = ServiceProcessor(registration_info) sp = ServiceProcessor(context,
service_worker_id = sp.process_service(context) self,
registration_info)
service_component_id, service_worker_id = sp.process_service(context)
# Device Driver processing # Device Driver processing
dp = DriverProcessor(service_worker_id, dp = DriverProcessor(service_worker_id,
@ -82,6 +85,8 @@ class ConductorManager(object):
)) ))
self._regisgration_ackw(context, self._regisgration_ackw(context,
registration_info['identification']) registration_info['identification'])
sp.cleanup(service_component_id)
return service_worker_id return service_worker_id
def _regisgration_ackw(self, context, identification): def _regisgration_ackw(self, context, identification):
@ -94,6 +99,22 @@ class ConductorManager(object):
identification=identification) identification=identification)
LOG.info("REGISTER [%s] ACK" % identification) LOG.info("REGISTER [%s] ACK" % identification)
def _ping(self, context, identification):
client = messaging.get_rpc_client(
topic='namos.CONF.%s' % identification,
version=self.RPC_API_VERSION,
exchange=config.PROJECT_NAME)
try:
client.call(context,
'ping_me',
identification=identification)
LOG.debug("PING [%s] SUCCESSFUL" % identification)
return True
except: # noqa
LOG.debug("PING [%s] FAILED" % identification)
return False
@request_context @request_context
def heart_beat(self, context, identification, dieing=False): def heart_beat(self, context, identification, dieing=False):
try: try:
@ -164,8 +185,13 @@ class ConductorManager(object):
class ServiceProcessor(object): class ServiceProcessor(object):
def __init__(self, registration_info): def __init__(self,
context,
manager,
registration_info):
self.registration_info = registration_info self.registration_info = registration_info
self.manager = manager
self.context = context
def file_to_configs(self, file_content): def file_to_configs(self, file_content):
tmp_file_path = '/tmp/sample-namos-config.conf' tmp_file_path = '/tmp/sample-namos-config.conf'
@ -191,15 +217,33 @@ class ServiceProcessor(object):
return conf_dict return conf_dict
def process_service(self, context): def process_service(self, context):
# region
# If region is not provided, make it as belongs to namos's region
if not self.registration_info.get('region_name'):
self.registration_info[
'region_name'] = cfg.CONF.os_namos.region_name
try:
region = db_api.region_create(
context,
dict(name=self.registration_info.get('region_name'))
)
LOG.info('Region %s is created' % region)
except exception.AlreadyExist:
region = db_api.region_get_by_name(
context,
name=self.registration_info.get('region_name')
)
LOG.info('Region %s is existing' % region)
# Service Node # Service Node
try: try:
# TODO(mrkanag) region_id is hard-coded, fix it ! # TODO(mrkanag) user proper node name instead of fqdn
# user proper node name instead of fqdn
node = db_api.service_node_create( node = db_api.service_node_create(
context, context,
dict(name=self.registration_info.get('fqdn'), dict(name=self.registration_info.get('fqdn'),
fqdn=self.registration_info.get('fqdn'), fqdn=self.registration_info.get('fqdn'),
region_id='f7dcd175-27ef-46b5-997f-e6e572f320b0')) region_id=region.id))
LOG.info('Service node %s is created' % node) LOG.info('Service node %s is created' % node)
except exception.AlreadyExist: except exception.AlreadyExist:
@ -258,35 +302,17 @@ class ServiceProcessor(object):
pid=self.registration_info['identification'], pid=self.registration_info['identification'],
host=self.registration_info['host'], host=self.registration_info['host'],
service_component_id=service_component.id, service_component_id=service_component.id,
deleted_at=None deleted_at=None,
is_launcher=self.registration_info['i_am_launcher']
)) ))
LOG.info('Service Worker %s is created' % service_worker) LOG.info('Service Worker %s is created' % service_worker)
except exception.AlreadyExist: except exception.AlreadyExist:
# TODO(mrkanag) Find a way to purge the dead service worker LOG.info('Service Worker %s is existing' %
# Once each service is enabled with heart beating namos db_api.service_worker_get_all_by(
# purging can be done once heart beat stopped. this can be context,
# done from openstack.common.service.py pid=self.registration_info['identification'],
service_workers = \ service_component_id=service_component.id
db_api.service_worker_get_by_host_for_service_component( )[0])
context,
service_component_id=service_component.id,
host=self.registration_info['host']
)
if len(service_workers) == 1:
service_worker = \
db_api.service_worker_update(
context,
service_workers[0].id,
dict(
deleted_at=None,
pid=self.registration_info['identification'],
name='%s@%s' % (self.registration_info['pid'],
service_component.name)
))
LOG.info('Service Worker %s is existing and is updated'
% service_worker)
# TODO(mrkanag) what to do when service_workers size is > 1
# config file # config file
conf_files = dict() conf_files = dict()
@ -398,7 +424,40 @@ class ServiceProcessor(object):
cfg_obj_) cfg_obj_)
LOG.debug("Config %s is existing and is updated" % config) LOG.debug("Config %s is existing and is updated" % config)
return service_worker.id return service_component.id, service_worker.id
def cleanup(self, service_component_id):
# clean up the dead service workers
# TODO(mrkanag) Make this into thread
service_workers = \
db_api.service_worker_get_all_by(
context,
service_component_id=service_component_id
)
for srv_wkr in service_workers:
# TODO(mrkanag) Move this to db layer and query non deleted entries
if srv_wkr.deleted_at is not None:
continue
if utils.find_status(srv_wkr):
LOG.info('Service Worker %s is live'
% srv_wkr.id)
continue
else:
confs = db_api.config_get_by_name_for_service_worker(
self.context,
service_worker_id=srv_wkr.id
)
for conf in confs:
db_api.config_delete(self.context, conf.id)
LOG.debug('Config %s is deleted'
% conf.id)
db_api.service_worker_delete(self.context, srv_wkr.id)
LOG.info('Service Worker %s is deleted'
% srv_wkr.id)
class DriverProcessor(object): class DriverProcessor(object):

View File

@ -48,6 +48,7 @@ def get_backend():
def _model_query(context, *args): def _model_query(context, *args):
session = _session(context) session = _session(context)
query = session.query(*args) query = session.query(*args)
return query return query
@ -98,9 +99,14 @@ def _get_all_by(context, cls, **kwargs):
return results return results
def _delete(context, cls, _id): def _delete(context, cls, _id, soft=True):
result = _get(context, cls, _id) result = _get(context, cls, _id)
if result is not None: if result is not None:
if soft and hasattr(result, 'soft_delete'):
result.soft_delete(_session(context))
return
# TODO(mrkanag) is it ok to hard delete when soft =True and soft_delete
# is missing
result.delete(_session(context)) result.delete(_session(context))
@ -588,7 +594,7 @@ def config_get_by_name_for_service_worker(context,
query = query.filter_by(name=name) query = query.filter_by(name=name)
elif only_configured: elif only_configured:
query = query.filter( query = query.filter(
models.OsloConfig.value != models.OsloConfig.default_value) models.OsloConfig.oslo_config_file_id is not None)
return query.all() return query.all()

View File

@ -102,9 +102,10 @@ class Region(BASE,
__tablename__ = 'region' __tablename__ = 'region'
# Its of type String to match with keystone region id # Its of type String to match with keystone region id
# TODO(mrkanag) make this as non nullable
keystone_region_id = sqlalchemy.Column( keystone_region_id = sqlalchemy.Column(
sqlalchemy.String(255), sqlalchemy.String(255),
nullable=False) nullable=True)
class Device(BASE, class Device(BASE,
@ -211,9 +212,10 @@ class Service(BASE,
Extra): Extra):
__tablename__ = 'service' __tablename__ = 'service'
# TODO(mrkanag) make this as non nullable
keystone_service_id = sqlalchemy.Column( keystone_service_id = sqlalchemy.Column(
Uuid, Uuid,
nullable=False) nullable=True)
class ServiceNode(BASE, class ServiceNode(BASE,
@ -264,7 +266,7 @@ class ServiceWorker(BASE,
__tablename__ = 'service_worker' __tablename__ = 'service_worker'
__table_args__ = ( __table_args__ = (
UniqueConstraint("host", "service_component_id"), UniqueConstraint("pid", "service_component_id"),
) )
name = sqlalchemy.Column(sqlalchemy.String(255), name = sqlalchemy.Column(sqlalchemy.String(255),
@ -281,6 +283,11 @@ class ServiceWorker(BASE,
sqlalchemy.String(248), sqlalchemy.String(248),
nullable=False nullable=False
) )
is_launcher = sqlalchemy.Column(
sqlalchemy.Boolean,
nullable=False,
default=False
)
service_component_id = sqlalchemy.Column( service_component_id = sqlalchemy.Column(
Uuid, Uuid,
sqlalchemy.ForeignKey('service_component.id'), sqlalchemy.ForeignKey('service_component.id'),