diff --git a/doc/source/api-ref/v1/parameters.yaml b/doc/source/api-ref/v1/parameters.yaml index a412cfa..d29ed58 100644 --- a/doc/source/api-ref/v1/parameters.yaml +++ b/doc/source/api-ref/v1/parameters.yaml @@ -188,7 +188,9 @@ migration-type: Own action is create new and delete old instance. Note! VNF need to obey resource_mitigation with own action This affects to order of delete old and create new to not over - commit the resources. + commit the resources. In Kubernetes also EVICTION supported. There admin + will delete instance and VNF automation like ReplicaSet will make a new + instance in: body required: true type: string diff --git a/doc/source/user/notifications.rst b/doc/source/user/notifications.rst index 317a87f..ea2bcc9 100644 --- a/doc/source/user/notifications.rst +++ b/doc/source/user/notifications.rst @@ -114,7 +114,9 @@ payload | | | 'MIGRATE', 'LIVE_MIGRATE' and 'OWN_ACTION'. 'OWN_ACTION' means | | | | an action project manager can do itself. Usually this could be | | | | re-instantiation even with a new flavor. Other actions are done by | -| | | Fenix as they need the admin privileges. Valid for states: | +| | | Fenix as they need the admin privileges. In Kubernetes also 'EVICTION' | +| | | supported. There admin will delete instance and VNF automation like | +| | | ReplicaSet will make a new instance. Valid for states: | | | | 'SCALE_IN', 'PREPARE_MAINTENANCE' and 'PLANNED_MAINTENANCE'. | +-----------------+------------+------------------------------------------------------------------------+ | instance_ids | string | Link to Fenix maintenance session and project specific API to get | @@ -176,7 +178,7 @@ Example of notification for many instances: "metadata": {"openstack_release": "Queens"} } -Example of notification for single instances. Note the instance specific +Example of notification for single instance. Note the instance specific 'reply_url': .. code-block:: json @@ -194,5 +196,23 @@ Example of notification for single instances. Note the instance specific "metadata": {"openstack_release": "Queens"} } +Example of notification for single instance in Kubernetes. Note the instance +specific 'reply_url' and allowed actions for Kubernetes: + +.. code-block:: json + + { + "service": "fenix", + "allowed_actions": ["OWN_ACTION", "EVICTION"], + "instance_ids": ["28d226f3-8d06-444f-a3f1-c586d2e7cb39"], + "reply_url": "http://0.0.0.0:12347/v1/maintenance/76e55df8-1c51-11e8-9928-0242ac110002/ead0dbcaf3564cbbb04842e3e54960e3/28d226f3-8d06-444f-a3f1-c586d2e7cb39", + "state": "PREPARE_MAINTENANCE", + "session_id": "76e55df8-1c51-11e8-9928-0242ac110002", + "reply_at": "2018-02-28T06:40:16", + "actions_at": "2018-02-29T00:00:00", + "project_id": "ead0dbcaf3564cbbb04842e3e54960e3", + "metadata": {"openstack_release": "Queens"} + } + .. [1] http://docs.openstack.org/developer/oslo.messaging/notifier.html .. [2] https://docs.openstack.org/aodh/latest/admin/telemetry-alarms.html#event-based-alarm diff --git a/fenix/api/v1/controllers/maintenance.py b/fenix/api/v1/controllers/maintenance.py index 19e244b..19dc0dc 100644 --- a/fenix/api/v1/controllers/maintenance.py +++ b/fenix/api/v1/controllers/maintenance.py @@ -20,18 +20,49 @@ from pecan import expose from pecan import request from pecan import response from pecan import rest +import six from oslo_log import log +from oslo_messaging import RemoteError from oslo_serialization import jsonutils from fenix.api.v1 import maintenance from fenix.api.v1 import schema +import fenix.db.exceptions as db_exceptions +import fenix.exceptions as exceptions from fenix import policy LOG = log.getLogger(__name__) -class ProjectController(rest.RestController): +def _format_ex_message(ex): + if len(ex.path) > 0: + return ("Invalid input for field/attribute %(path)s." + " Value: %(value)s. %(message)s" % {'path': ex.path.pop(), + 'value': ex.instance, + 'message': ex.message}) + else: + return ex.message + return + + +class BaseController(rest.RestController): + + def handle_remote_error(self, e): + cls = getattr(db_exceptions, e.exc_type, None) + cls = cls or getattr(exceptions, e.exc_type, None) + if cls is not None: + if e.value: + description = e.value + elif "msg_fmt" in vars(cls).keys(): + description = cls.msg_fmt + else: + description = "" + abort(cls.code, description) + abort(500) + + +class ProjectController(BaseController): name = 'project' @@ -49,8 +80,9 @@ class ProjectController(rest.RestController): jsonschema.validate(session_id, schema.uid) jsonschema.validate(project_id, schema.uid) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) engine_data = self.engine_rpcapi.project_get_session(session_id, project_id) try: @@ -68,18 +100,22 @@ class ProjectController(rest.RestController): jsonschema.validate(project_id, schema.uid) jsonschema.validate(data, schema.maintenance_session_project_put) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) - engine_data = self.engine_rpcapi.project_update_session(session_id, - project_id, - data) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) + try: + engine_data = self.engine_rpcapi.project_update_session(session_id, + project_id, + data) + except RemoteError as e: + self.handle_remote_error(e) try: response.text = jsonutils.dumps(engine_data) except TypeError: response.body = jsonutils.dumps(engine_data) -class ProjectInstanceController(rest.RestController): +class ProjectInstanceController(BaseController): name = 'project_instance' @@ -99,20 +135,24 @@ class ProjectInstanceController(rest.RestController): data, schema.maintenance_session_project_instance_put) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) - engine_data = ( - self.engine_rpcapi.project_update_session_instance(session_id, - project_id, - instance_id, - data)) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) + try: + engine_data = ( + self.engine_rpcapi.project_update_session_instance(session_id, + project_id, + instance_id, + data)) + except RemoteError as e: + self.handle_remote_error(e) try: response.text = jsonutils.dumps(engine_data) except TypeError: response.body = jsonutils.dumps(engine_data) -class SessionController(rest.RestController): +class SessionController(BaseController): name = 'session' @@ -126,15 +166,20 @@ class SessionController(rest.RestController): try: jsonschema.validate(session_id, schema.uid) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) if request.body: LOG.error("Unexpected data") abort(400) - session = self.engine_rpcapi.admin_get_session(session_id) + try: + session = self.engine_rpcapi.admin_get_session(session_id) + except RemoteError as e: + self.handle_remote_error(e) if session is None: - LOG.error("Invalid session") - abort(404) + description = "Invalid session" + LOG.error(description) + abort(422, six.text_type(description)) try: response.text = jsonutils.dumps(session) except TypeError: @@ -149,9 +194,14 @@ class SessionController(rest.RestController): jsonschema.validate(session_id, schema.uid) jsonschema.validate(data, schema.maintenance_session_put) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) - engine_data = self.engine_rpcapi.admin_update_session(session_id, data) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) + try: + engine_data = self.engine_rpcapi.admin_update_session(session_id, + data) + except RemoteError as e: + self.handle_remote_error(e) try: response.text = jsonutils.dumps(engine_data) except TypeError: @@ -164,19 +214,23 @@ class SessionController(rest.RestController): try: jsonschema.validate(session_id, schema.uid) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) if request.body: LOG.error("Unexpected data") abort(400) - engine_data = self.engine_rpcapi.admin_delete_session(session_id) + try: + engine_data = self.engine_rpcapi.admin_delete_session(session_id) + except RemoteError as e: + self.handle_remote_error(e) try: response.text = jsonutils.dumps(engine_data) except TypeError: response.body = jsonutils.dumps(engine_data) -class MaintenanceController(rest.RestController): +class MaintenanceController(BaseController): name = 'maintenance' @@ -190,7 +244,10 @@ class MaintenanceController(rest.RestController): if request.body: LOG.error("Unexpected data") abort(400) - sessions = self.engine_rpcapi.admin_get() + try: + sessions = self.engine_rpcapi.admin_get() + except RemoteError as e: + self.handle_remote_error(e) try: response.text = jsonutils.dumps(sessions) except TypeError: @@ -204,9 +261,13 @@ class MaintenanceController(rest.RestController): try: jsonschema.validate(data, schema.maintenance_post) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) - session = self.engine_rpcapi.admin_create_session(data) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) + try: + session = self.engine_rpcapi.admin_create_session(data) + except RemoteError as e: + self.handle_remote_error(e) if session is None: LOG.error("Too many sessions") abort(509) @@ -216,7 +277,7 @@ class MaintenanceController(rest.RestController): response.body = jsonutils.dumps(session) -class InstanceController(rest.RestController): +class InstanceController(BaseController): name = 'instance' @@ -230,15 +291,20 @@ class InstanceController(rest.RestController): try: jsonschema.validate(instance_id, schema.uid) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) if request.body: LOG.error("Unexpected data") abort(400) - instance = self.engine_rpcapi.get_instance(instance_id) + try: + instance = self.engine_rpcapi.get_instance(instance_id) + except RemoteError as e: + self.handle_remote_error(e) if instance is None: - LOG.error("Invalid instance: %s" % instance_id) - abort(404) + description = "Invalid instance: %s" % instance_id + LOG.error(description) + abort(422, six.text_type(description)) try: response.text = jsonutils.dumps(instance) except TypeError: @@ -253,10 +319,14 @@ class InstanceController(rest.RestController): jsonschema.validate(instance_id, schema.uid) jsonschema.validate(data, schema.instance_put) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) - engine_data = self.engine_rpcapi.update_instance(instance_id, - data) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) + try: + engine_data = self.engine_rpcapi.update_instance(instance_id, + data) + except RemoteError as e: + self.handle_remote_error(e) try: response.text = jsonutils.dumps(engine_data) except TypeError: @@ -269,19 +339,23 @@ class InstanceController(rest.RestController): try: jsonschema.validate(instance_id, schema.uid) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) if request.body: LOG.error("Unexpected data") abort(400) - engine_data = self.engine_rpcapi.delete_instance(instance_id) + try: + engine_data = self.engine_rpcapi.delete_instance(instance_id) + except RemoteError as e: + self.handle_remote_error(e) try: response.text = jsonutils.dumps(engine_data) except TypeError: response.body = jsonutils.dumps(engine_data) -class InstanceGroupController(rest.RestController): +class InstanceGroupController(BaseController): name = 'instance_group' @@ -295,15 +369,20 @@ class InstanceGroupController(rest.RestController): try: jsonschema.validate(group_id, schema.uid) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) if request.body: LOG.error("Unexpected data") abort(400) - group = self.engine_rpcapi.get_instance_group(group_id) + try: + group = self.engine_rpcapi.get_instance_group(group_id) + except RemoteError as e: + self.handle_remote_error(e) if group is None: - LOG.error("Invalid instance_group: %s" % group_id) - abort(404) + description = "Invalid instance_group: %s" % group_id + LOG.error(description) + abort(422, six.text_type(description)) try: response.text = jsonutils.dumps(group) except TypeError: @@ -318,10 +397,14 @@ class InstanceGroupController(rest.RestController): jsonschema.validate(group_id, schema.uid) jsonschema.validate(data, schema.instance_group_put) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) - engine_data = ( - self.engine_rpcapi.update_instance_group(group_id, data)) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) + try: + engine_data = ( + self.engine_rpcapi.update_instance_group(group_id, data)) + except RemoteError as e: + self.handle_remote_error(e) try: response.text = jsonutils.dumps(engine_data) except TypeError: @@ -334,13 +417,17 @@ class InstanceGroupController(rest.RestController): try: jsonschema.validate(group_id, schema.uid) except jsonschema.exceptions.ValidationError as e: - LOG.error(str(e.message)) - abort(422) + description = _format_ex_message(e) + LOG.error(description) + abort(422, six.text_type(description)) if request.body: LOG.error("Unexpected data") abort(400) - engine_data = ( - self.engine_rpcapi.delete_instance_group(group_id)) + try: + engine_data = ( + self.engine_rpcapi.delete_instance_group(group_id)) + except RemoteError as e: + self.handle_remote_error(e) try: response.text = jsonutils.dumps(engine_data) except TypeError: diff --git a/fenix/api/v1/schema.py b/fenix/api/v1/schema.py index 2e46fb8..d4934fb 100644 --- a/fenix/api/v1/schema.py +++ b/fenix/api/v1/schema.py @@ -40,7 +40,7 @@ reply_states = ['ACK_MAINTENANCE', 'NACK_PLANNED_MAINTENANCE', 'NACK_MAINTENANCE_COMPLETE'] -allowed_actions = ['MIGRATE', 'LIVE_MIGRATE', 'OWN_ACTION'] +allowed_actions = ['MIGRATE', 'LIVE_MIGRATE', 'OWN_ACTION', 'EVICTION'] maintenance_session_project_put = { 'type': 'object', diff --git a/fenix/db/exceptions.py b/fenix/db/exceptions.py index 0dbae2f..966bb5c 100644 --- a/fenix/db/exceptions.py +++ b/fenix/db/exceptions.py @@ -25,11 +25,11 @@ class FenixDBException(exceptions.FenixException): msg_fmt = 'An unknown database exception occurred' -class FenixDBDuplicateEntry(FenixDBException): +class FenixDBDuplicateEntry(exceptions.Duplicate): msg_fmt = 'Duplicate entry for %(columns)s in %(model)s model was found' -class FenixDBNotFound(FenixDBException): +class FenixDBNotFound(exceptions.NotFound): msg_fmt = '%(id)s %(model)s was not found' diff --git a/fenix/db/sqlalchemy/api.py b/fenix/db/sqlalchemy/api.py index 20c5590..42be499 100644 --- a/fenix/db/sqlalchemy/api.py +++ b/fenix/db/sqlalchemy/api.py @@ -516,11 +516,12 @@ def project_instance_get(instance_id): def update_project_instance(values): values = values.copy() - minstance = models.ProjectInstance() - minstance.update(values) - session = get_session() with session.begin(): + minstance = _project_instance_get(session, values['instance_id']) + if not minstance: + minstance = models.ProjectInstance() + minstance.update(values) try: minstance.save(session=session) except common_db_exc.DBDuplicateEntry as e: @@ -553,6 +554,15 @@ def instance_group_get(group_id): return _instance_group_get(get_session(), group_id) +def _instance_groups_get(session): + query = model_query(models.InstanceGroup, session) + return query.all() + + +def instance_groups_get(): + return _instance_groups_get(get_session()) + + def _group_instances_get(session, group_id): query = model_query(models.ProjectInstance, session) return query.filter_by(group_id=group_id).all() @@ -564,28 +574,29 @@ def group_instances_get(group_id): def update_instance_group(values): values = values.copy() - minstance = models.InstanceGroup() - minstance.update(values) - session = get_session() with session.begin(): + ig = _instance_group_get(session, values['group_id']) + if not ig: + ig = models.InstanceGroup() + ig.update(values) try: - minstance.save(session=session) + ig.save(session=session) except common_db_exc.DBDuplicateEntry as e: # raise exception about duplicated columns (e.columns) raise db_exc.FenixDBDuplicateEntry( - model=minstance.__class__.__name__, columns=e.columns) + model=ig.__class__.__name__, columns=e.columns) - return instance_group_get(minstance.group_id) + return instance_group_get(ig.group_id) def remove_instance_group(group_id): session = get_session() with session.begin(): - minstance = _instance_group_get(session, group_id) - if not minstance: + ig = _instance_group_get(session, group_id) + if not ig: # raise not found error raise db_exc.FenixDBNotFound(session, group_id=group_id, model='instance_groups') - session.delete(minstance) + session.delete(ig) diff --git a/fenix/exceptions.py b/fenix/exceptions.py index 21eb61e..cdf8541 100644 --- a/fenix/exceptions.py +++ b/fenix/exceptions.py @@ -55,6 +55,12 @@ class NotFound(FenixException): code = 404 +class Duplicate(FenixException): + """Object not found exception.""" + msg_fmt = "Object with %(object)s not found" + code = 409 + + class NotAuthorized(FenixException): msg_fmt = "Not authorized" code = 403 diff --git a/fenix/tools/README.md b/fenix/tools/README.md new file mode 100644 index 0000000..37aa9f9 --- /dev/null +++ b/fenix/tools/README.md @@ -0,0 +1,216 @@ +# fenix.tools + +This directory contains tools and instructions to test Fenix workflows. +Currently OPNFV Doctor has been used to test OpenStack related workflows. +As Doctor is at the moment only for OpenStack and Fenix itself needs a way to be +tested, the Kubernetes workflow (fenix/workflow/workflows/k8s.py) testing +is implemented here. + +Files: + +- 'demo-ha.yaml': demo-ha ReplicaSet to make 2 anti-affinity PODS. +- 'demo-nonha.yaml': demo-nonha ReplicaSet to make n nonha PODS. +- 'vnfm.py': VNFM to test k8s.py workflow. + +## Kubernetes workflow (k8s.py) + +First version of workflow towards Kubeernetes use cases. + +### Requirements for testing + +This workflow assumes ReplicaSet used for PODs. A Kubernetes cluster with +1 master and at least 3 workers are required for testing. Master node needs +DevStack to have Fenix and OpenStack services it still uses. Later on there +can be a version of Fenix not needing Keystone and AODH event alarming, but +using native Kubernetes services for RBAC and events. + +As in Doctor testing, there is a pair of anti-affinity PODs (demo-ha) and rest +of the worker node capacity is filled with (demo-nonha) PODs. Scaling of PODs +is done via ReplicaSet number of replicas. Idea is that each POD is taking +("number of worker node CPUs" - 2) / 2. That will make sure scheduler fits +2 PODs on each node and 2 CPUs capacity is assumed for other node services. +This should require at least 6 CPUs for each node to work. + +### Install Kubernetes cluster with 1 Manager and 3 Worker nodes. + +Here is instructions: +https://docs.openstack.org/openstack-helm/latest/install/kubernetes-gate.html +https://phoenixnap.com/kb/how-to-install-kubernetes-on-a-bare-metal-server +https://phoenixnap.com/kb/how-to-install-kubernetes-on-centos + +### On Manager node, install DevStack including Fenix and its minimum services + +Note! There is no conflict with Kubernetes as limiting to only Fenix needed +services. + +Clone devstack. Tested to work with latest stable release Train. + +```sh +git clone https://github.com/openstack/devstack -b stable/train +``` + +Make local.conf. 'HOST_IP' should bemaster node IP. + +```sh +cd devstack vi local.conf +``` +```sh +[[local|localrc]] +GIT_BASE=https://git.openstack.org +HOST_IP=192.0.2.4 +ADMIN_PASSWORD=admin +DATABASE_PASSWORD=admin +RABBIT_PASSWORD=admin +SERVICE_PASSWORD=admin +LOGFILE=/opt/stack/stack.sh.log + +PUBLIC_INTERFACE=eth0 + +CEILOMETER_EVENT_ALARM=True + +ENABLED_SERVICES=key,rabbit,mysql,fenix-engine,fenix-api,aodh-evaluator,aodh-notifier,aodh-api + +enable_plugin ceilometer https://git.openstack.org/openstack/ceilometer stable/train +enable_plugin aodh https://git.openstack.org/openstack/aodh stable/train +enable_plugin gnocchi https://github.com/openstack/gnocchi +enable_plugin fenix https://opendev.org/x/fenix master +``` + +Deploy needed OpenStack services with Fenix + +```sh +./stack.sh +``` + +Now you should have Kubernetes cluster and Fenix via DevStack. Any hacking of +Fenix can be done under '/opt/stack/fenix'. + +### Running test + +Use 3 terminal windows (Term1, Term2 and Term3) to test Fenix with Kubernetes +kluster. Under here is what you can run in different terminals. Terminals +should be running in master node. Here is short description: + +- Term1: Used for logging Fenix +- Term2: Infrastructure admin commands +- Term3: VNFM logging for testing and setting up the VNF + +#### Term1: Fenix-engine logging + +If any changes to Fenix make them under '/opt/stack/fenix'; restart fenix and +see logs + +```sh +sudo systemctl restart devstack@fenix*;sudo journalctl -f --unit devstack@fenix-engine +``` + +API logs can also be seen + +```sh +sudo journalctl -f --unit devstack@fenix-api +``` + +Debugging and other configuration changes to conf files under '/etc/fenix' + +#### Term2: Infrastructure admin window + +Use DevStack admin as user. Set your variables needed accordingly + +```sh +. ~/devstack/operc admin admin +USER_ID=`openstack user list | grep admin | awk '{print $2}' +HOST=192.0.2.4 +PORT=12347 +``` + +Authenticate to Keystone as admin user before calling Fenix. If you will have +some not authorized error later on, you need to do this again. + +```sh +OS_AUTH_TOKEN=`openstack token issue | grep " id " |awk '{print $4}'` +``` + +After you have first: Fenix running in Term1; Next: VNF created a in Term3 +Next: VNFM running in Term3, you can create maintenance session utilizing those + +```sh +DATE=`date -d "+15 sec" "+%Y-%m-%d %H:%M:%S"`;MSESSION=`curl -g -i -X POST http://$HOST:$PORT/v1/maintenance -H "Accept: application/json" -H "Content-Type: application/json" -d '{"workflow": "k8s", "state": "MAINTENANCE","metadata": {} ,"maintenance_at": "'"$DATE"'"}' -H "X-Auth-Token: $OS_AUTH_TOKEN" -H "X-User-Id: $USER_ID" | grep session_id | jq -r '.session_id'` +``` + +After maintenance workflow is 'MAINTENANCE_DONE', you should first press +"ctrl + c" in VNFM window (Term3), so it removes constraints from Fenix and +dies. Then you can remove the finished session from fenix + +```sh +curl -g -i -X DELETE http://$HOST:$PORT/v1/maintenance/$MSESSION -H "Accept: application/json" -H "Content-Type: application/json" -H "X-Auth-Token: $OS_AUTH_TOKEN" -H "X-User-Id: $USER_ID" +``` +If maintenance run till the end with 'MAINTENANCE_DONE', you are ready to run it +again if you wish. 'MAINTENANCE_FAILED' or in case of exceptions, you should +recover system before trying to test again. This is covered in Term3 below. + +#### Term3: VNFM (fenix/tools/vnfm.py) + +Go to Fenix Kubernetes tool directory for testing + +```sh +cd /opt/stack/fenix/fenix/tools +``` + +Create demo namespace (we use demo namespace and demo user and project in +Keystone) + +```sh +kubectl create namespace demo +``` + +Start VNFM (when done in this order, we make sure demo-ha has nodes for antiaffinity): + +```sh +kubectl apply -f demo-ha.yaml --namespace=demo;sleep 1;kubectl apply -f demo-nonha.yaml --namespace=demo +``` + +Note you should modify above yaml files so that "cpu:" has value of +'(workernode.status.capacity["cpu"] - 2) / 2'. Default is expecting that there +is 32 cpus, so value is "15" in both yaml files. Replicas can be changed in +demo-nonha.yaml. Minimum 2 (if minimum of 3 worker nodes) to maximum +'(amount_of_worker_nodes-1)*2'. Greater amount means more scaling needed and +longer maintenance window as less parallel actions possible. Surely constraints +in vnfm.py also can be changed for different behavior. + +You can delete pods used like this + +```sh +kubectl delete replicaset.apps demo-ha demo-nonha --namespace=demo +``` + +Start Kubernetes VNFM that we need for testing + +```sh +python vnfm.py +``` + +Now you can start maintenance session in Term2. When workflow failed or +completed; you first kill vnfm.py with "ctrl+c" and delete maintenance session +in Term2. + +If workflow failed something might need to be manually fixed. Here you +uncordon your 3 worker nodes, if maintenance workflow did not run to the end. + +```sh +kubectl uncordon worker-node3 worker-node2 worker-node1 +``` + +You can check your pods matches to amount of replicas mentioned in +demo-nonha.yaml and demo-ha.yaml: + +```sh +kubectl get pods --all-namespaces --output=wide +``` + +If not matching, delete and create again as easiest solution + +```sh +kubectl delete replicaset.apps demo-ha demo-nonha --namespace=demo;sleep 15;kubectl apply -f demo-ha.yaml --namespace=demo;sleep 1;kubectl apply -f demo-nonha.yaml --namespace=demo +``` + + diff --git a/fenix/tools/__init__.py b/fenix/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fenix/tools/demo-ha.yaml b/fenix/tools/demo-ha.yaml new file mode 100644 index 0000000..a63aa35 --- /dev/null +++ b/fenix/tools/demo-ha.yaml @@ -0,0 +1,54 @@ +apiVersion: apps/v1 +kind: ReplicaSet +metadata: + name: demo-ha + labels: + app: demo-ha +spec: + replicas: 2 + selector: + matchLabels: + app: demo-ha + template: + metadata: + labels: + app: demo-ha + active: None + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: app + operator: In + values: + - demo-ha + topologyKey: "kubernetes.io/hostname" + spec: + containers: + - name: nginx + image: nginx + resources: + requests: + cpu: "15" + ports: + - containerPort: 80 + volumeMounts: + - name: workdir + mountPath: /usr/share/nginx/html + initContainers: + - name: install + image: busybox + command: + - wget + - "-O" + - "/work-dir/index.html" + - http://kubernetes.io + volumeMounts: + - name: workdir + mountPath: "/work-dir" + dnsPolicy: Default + volumes: + - name: workdir + emptyDir: {} diff --git a/fenix/tools/demo-nonha.yaml b/fenix/tools/demo-nonha.yaml new file mode 100644 index 0000000..8f49a1d --- /dev/null +++ b/fenix/tools/demo-nonha.yaml @@ -0,0 +1,42 @@ +apiVersion: apps/v1 +kind: ReplicaSet +metadata: + name: demo-nonha + labels: + app: demo-nonha +spec: + replicas: 3 + selector: + matchLabels: + app: demo-nonha + template: + metadata: + labels: + app: demo-nonha + spec: + containers: + - name: nginx + image: nginx + resources: + requests: + cpu: "15" + ports: + - containerPort: 80 + volumeMounts: + - name: workdir + mountPath: /usr/share/nginx/html + initContainers: + - name: install + image: busybox + command: + - wget + - "-O" + - "/work-dir/index.html" + - http://kubernetes.io + volumeMounts: + - name: workdir + mountPath: "/work-dir" + dnsPolicy: Default + volumes: + - name: workdir + emptyDir: {} diff --git a/fenix/tools/vnfm.py b/fenix/tools/vnfm.py new file mode 100644 index 0000000..e5573f8 --- /dev/null +++ b/fenix/tools/vnfm.py @@ -0,0 +1,545 @@ +# Copyright (c) 2020 Nokia Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import aodhclient.client as aodhclient +import datetime +from flask import Flask +from flask import request +import json +from keystoneclient import client as ks_client +from kubernetes import client +from kubernetes import config +import logging as lging +from oslo_config import cfg +from oslo_log import log as logging +import requests +import sys +from threading import Thread +import time +import yaml + +try: + import fenix.utils.identity_auth as identity_auth +except ValueError: + sys.path.append('../utils') + import identity_auth + +LOG = logging.getLogger(__name__) +streamlog = lging.StreamHandler(sys.stdout) +LOG.logger.addHandler(streamlog) +LOG.logger.setLevel(logging.INFO) + +opts = [ + cfg.StrOpt('ip', + default='127.0.0.1', + help='the ip of VNFM', + required=True), + cfg.IntOpt('port', + default='12348', + help='the port of VNFM', + required=True), +] + +CONF = cfg.CONF +CONF.register_opts(opts) +CONF.register_opts(identity_auth.os_opts, group='service_user') + + +class VNFM(object): + + def __init__(self, conf, log): + self.conf = conf + self.log = log + self.app = None + + def start(self): + LOG.info('VNFM start......') + self.app = VNFManager(self.conf, self.log) + self.app.start() + + def stop(self): + LOG.info('VNFM stop......') + if not self.app: + return + self.app.headers['X-Auth-Token'] = self.app.session.get_token() + self.app.delete_constraints() + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + } + url = 'http://%s:%d/shutdown'\ + % (self.conf.ip, + self.conf.port) + requests.post(url, data='', headers=headers) + + +class VNFManager(Thread): + + def __init__(self, conf, log): + Thread.__init__(self) + self.conf = conf + self.log = log + self.port = self.conf.port + self.intance_ids = None + # VNFM is started with OS_* exported as admin user + # We need that to query Fenix endpoint url + # Still we work with our tenant/poroject/vnf as demo + self.project = "demo" + LOG.info('VNFM project: %s' % self.project) + self.auth = identity_auth.get_identity_auth(conf, project=self.project) + self.session = identity_auth.get_session(auth=self.auth) + self.ks = ks_client.Client(version='v3', session=self.session) + self.aodh = aodhclient.Client(2, self.session) + # Subscribe to mainenance event alarm from Fenix via AODH + self.create_alarm() + config.load_kube_config() + self.kaapi = client.AppsV1Api() + self.kapi = client.CoreV1Api() + self.headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json'} + self.headers['X-Auth-Token'] = self.session.get_token() + self.orig_number_of_instances = self.number_of_instances() + # List of instances + self.ha_instances = [] + self.nonha_instances = [] + # Different instance_id specific constraints {instanse_id: {},...} + self.instance_constraints = None + # Update existing instances to instance lists + self.update_instances() + # How many instances needs to exists (with current VNF load) + # max_impacted_members need to be updated accordingly + # if number of instances is scaled. example for demo-ha: + # max_impacted_members = len(self.ha_instances) - ha_group_limit + self.ha_group_limit = 2 + self.nonha_group_limit = 2 + # Different instance groups constraints dict + self.ha_group = None + self.nonha_group = None + # VNF project_id (VNF ID) + self.project_id = None + # HA instance_id that is active has active label + self.active_instance_id = self.active_instance_id() + + services = self.ks.services.list() + for service in services: + if service.type == 'maintenance': + LOG.info('maintenance service: %s:%s type %s' + % (service.name, service.id, service.type)) + maint_id = service.id + self.maint_endpoint = [ep.url for ep in self.ks.endpoints.list() + if ep.service_id == maint_id and + ep.interface == 'public'][0] + LOG.info('maintenance endpoint: %s' % self.maint_endpoint) + self.update_constraints_lock = False + self.update_constraints() + # Instances waiting action to be done + self.pending_actions = {} + + def create_alarm(self): + alarms = {alarm['name']: alarm for alarm in self.aodh.alarm.list()} + alarm_name = "%s_MAINTENANCE_ALARM" % self.project + if alarm_name in alarms: + return + alarm_request = dict( + name=alarm_name, + description=alarm_name, + enabled=True, + alarm_actions=[u'http://%s:%d/maintenance' + % (self.conf.ip, + self.conf.port)], + repeat_actions=True, + severity=u'moderate', + type=u'event', + event_rule=dict(event_type=u'maintenance.scheduled')) + self.aodh.alarm.create(alarm_request) + + def delete_remote_instance_constraints(self, instance_id): + url = "%s/instance/%s" % (self.maint_endpoint, instance_id) + LOG.info('DELETE: %s' % url) + ret = requests.delete(url, data=None, headers=self.headers) + if ret.status_code != 200 and ret.status_code != 204: + if ret.status_code == 404: + LOG.info('Already deleted: %s' % instance_id) + else: + raise Exception(ret.text) + + def update_remote_instance_constraints(self, instance): + url = "%s/instance/%s" % (self.maint_endpoint, instance["instance_id"]) + LOG.info('PUT: %s' % url) + ret = requests.put(url, data=json.dumps(instance), + headers=self.headers) + if ret.status_code != 200 and ret.status_code != 204: + raise Exception(ret.text) + + def delete_remote_group_constraints(self, instance_group): + url = "%s/instance_group/%s" % (self.maint_endpoint, + instance_group["group_id"]) + LOG.info('DELETE: %s' % url) + ret = requests.delete(url, data=None, headers=self.headers) + if ret.status_code != 200 and ret.status_code != 204: + raise Exception(ret.text) + + def update_remote_group_constraints(self, instance_group): + url = "%s/instance_group/%s" % (self.maint_endpoint, + instance_group["group_id"]) + LOG.info('PUT: %s' % url) + ret = requests.put(url, data=json.dumps(instance_group), + headers=self.headers) + if ret.status_code != 200 and ret.status_code != 204: + raise Exception(ret.text) + + def delete_constraints(self): + for instance_id in self.instance_constraints: + self.delete_remote_instance_constraints(instance_id) + self.delete_remote_group_constraints(self.nonha_group) + self.delete_remote_group_constraints(self.ha_group) + + def update_constraints(self): + while self.update_constraints_lock: + LOG.info('Waiting update_constraints_lock...') + time.sleep(1) + self.update_constraints_lock = True + LOG.info('Update constraints') + if self.project_id is None: + self.project_id = self.ks.projects.list(name=self.project)[0].id + # Pods groupped by ReplicaSet, so we use that id + rs = {r.metadata.name: r.metadata.uid for r in + self.kaapi.list_namespaced_replica_set('demo').items} + max_impacted_members = len(self.nonha_instances) - 1 + nonha_group = { + "group_id": rs['demo-nonha'], + "project_id": self.project_id, + "group_name": "demo-nonha", + "anti_affinity_group": False, + "max_instances_per_host": 0, + "max_impacted_members": max_impacted_members, + "recovery_time": 10, + "resource_mitigation": True} + LOG.info('create demo-nonha constraints: %s' + % nonha_group) + ha_group = { + "group_id": rs['demo-ha'], + "project_id": self.project_id, + "group_name": "demo-ha", + "anti_affinity_group": True, + "max_instances_per_host": 1, + "max_impacted_members": 1, + "recovery_time": 10, + "resource_mitigation": True} + LOG.info('create demo-ha constraints: %s' + % ha_group) + + instance_constraints = {} + for ha_instance in self.ha_instances: + instance = { + "instance_id": ha_instance.metadata.uid, + "project_id": self.project_id, + "group_id": ha_group["group_id"], + "instance_name": ha_instance.metadata.name, + "max_interruption_time": 120, + "migration_type": "EVICTION", + "resource_mitigation": True, + "lead_time": 40} + LOG.info('create ha instance constraints: %s' % instance) + instance_constraints[ha_instance.metadata.uid] = instance + for nonha_instance in self.nonha_instances: + instance = { + "instance_id": nonha_instance.metadata.uid, + "project_id": self.project_id, + "group_id": nonha_group["group_id"], + "instance_name": nonha_instance.metadata.name, + "max_interruption_time": 120, + "migration_type": "EVICTION", + "resource_mitigation": True, + "lead_time": 40} + LOG.info('create nonha instance constraints: %s' % instance) + instance_constraints[nonha_instance.metadata.uid] = instance + if not self.instance_constraints: + # Initial instance constraints + LOG.info('create initial instances constraints...') + for instance in [instance_constraints[i] for i + in instance_constraints]: + self.update_remote_instance_constraints(instance) + self.instance_constraints = instance_constraints.copy() + else: + LOG.info('check instances constraints changes...') + added = [i for i in instance_constraints.keys() + if i not in self.instance_constraints] + deleted = [i for i in self.instance_constraints.keys() + if i not in instance_constraints] + modified = [i for i in instance_constraints.keys() + if (i not in added and i not in deleted and + instance_constraints[i] != + self.instance_constraints[i])] + for instance_id in deleted: + self.delete_remote_instance_constraints(instance_id) + updated = added + modified + for instance in [instance_constraints[i] for i in updated]: + self.update_remote_instance_constraints(instance) + if updated or deleted: + # Some instance constraints have changed + self.instance_constraints = instance_constraints.copy() + if not self.ha_group or self.ha_group != ha_group: + LOG.info('ha instance group need update') + self.update_remote_group_constraints(ha_group) + self.ha_group = ha_group.copy() + if not self.nonha_group or self.nonha_group != nonha_group: + LOG.info('nonha instance group need update') + self.update_remote_group_constraints(nonha_group) + self.nonha_group = nonha_group.copy() + self.update_constraints_lock = False + + def active_instance_id(self): + # We digtate the active in the beginning + instance = self.ha_instances[0] + LOG.info('Initially Active instance: %s %s' % + (instance.metadata.name, instance.metadata.uid)) + name = instance.metadata.name + namespace = instance.metadata.namespace + body = {"metadata": {"labels": {"active": "True"}}} + self.kapi.patch_namespaced_pod(name, namespace, body) + self.active_instance_id = instance.metadata.uid + + def switch_over_ha_instance(self, instance_id): + if instance_id == self.active_instance_id: + # Need to switchover as instance_id will be affected and is active + for instance in self.ha_instances: + if instance_id == instance.metadata.uid: + LOG.info('Active to Standby: %s %s' % + (instance.metadata.name, instance.metadata.uid)) + name = instance.metadata.name + namespace = instance.metadata.namespace + body = client.UNKNOWN_BASE_TYPE() + body.metadata.labels = {"ative": None} + self.kapi.patch_namespaced_pod(name, namespace, body) + else: + LOG.info('Standby to Active: %s %s' % + (instance.metadata.name, instance.metadata.uid)) + name = instance.metadata.name + namespace = instance.metadata.namespace + body = client.UNKNOWN_BASE_TYPE() + body.metadata.labels = {"ative": "True"} + self.kapi.patch_namespaced_pod(name, namespace, body) + self.active_instance_id = instance.metadata.uid + self.update_instances() + + def get_instance_ids(self): + instances = self.kapi.list_pod_for_all_namespaces().items + return [i.metadata.uid for i in instances + if i.metadata.name.startswith("demo-") + and i.metadata.namespace == "demo"] + + def update_instances(self): + instances = self.kapi.list_pod_for_all_namespaces().items + self.ha_instances = [i for i in instances + if i.metadata.name.startswith("demo-ha") + and i.metadata.namespace == "demo"] + self.nonha_instances = [i for i in instances + if i.metadata.name.startswith("demo-nonha") + and i.metadata.namespace == "demo"] + + def _alarm_data_decoder(self, data): + if "[" in data or "{" in data: + # string to list or dict removing unicode + data = yaml.load(data.replace("u'", "'")) + return data + + def _alarm_traits_decoder(self, data): + return ({str(t[0]): self._alarm_data_decoder(str(t[2])) + for t in data['reason_data']['event']['traits']}) + + def get_session_instance_ids(self, url, session_id): + ret = requests.get(url, data=None, headers=self.headers) + if ret.status_code != 200: + raise Exception(ret.text) + LOG.info('get_instance_ids %s' % ret.json()) + return ret.json()['instance_ids'] + + def scale_instances(self, scale_instances): + number_of_instances_before = len(self.nonha_instances) + replicas = number_of_instances_before + scale_instances + + # We only scale nonha apps + namespace = "demo" + name = "demo-nonha" + body = {'spec': {"replicas": replicas}} + self.kaapi.patch_namespaced_replica_set_scale(name, namespace, body) + time.sleep(3) + + # Let's check if scale has taken effect + self.update_instances() + number_of_instances_after = len(self.nonha_instances) + check = 20 + while number_of_instances_after == number_of_instances_before: + if check == 0: + LOG.error('scale_instances with: %d failed, still %d instances' + % (scale_instances, number_of_instances_after)) + raise Exception('scale_instances failed') + check -= 1 + time.sleep(1) + self.update_instances() + number_of_instances_after = len(self.nonha_instances) + + LOG.info('scaled instances from %d to %d' % + (number_of_instances_before, number_of_instances_after)) + + def number_of_instances(self): + instances = self.kapi.list_pod_for_all_namespaces().items + return len([i for i in instances + if i.metadata.name.startswith("demo-")]) + + def instance_action(self, instance_id, allowed_actions): + # We should keep instance constraint in our internal structur + # and match instance_id specific allowed action. Now we assume EVICTION + if 'EVICTION' not in allowed_actions: + LOG.error('Action for %s not foudn from %s' % + (instance_id, allowed_actions)) + return None + return 'EVICTION' + + def instance_action_started(self, instance_id, action): + time_now = datetime.datetime.utcnow() + max_interruption_time = ( + self.instance_constraints[instance_id]['max_interruption_time']) + self.pending_actions[instance_id] = { + 'started': time_now, + 'max_interruption_time': max_interruption_time, + 'action': action} + + def was_instance_action_in_time(self, instance_id): + time_now = datetime.datetime.utcnow() + started = self.pending_actions[instance_id]['started'] + limit = self.pending_actions[instance_id]['max_interruption_time'] + action = self.pending_actions[instance_id]['action'] + td = time_now - started + if td.total_seconds() > limit: + LOG.error('%s %s took too long: %ds' % + (instance_id, action, td.total_seconds())) + LOG.error('%s max_interruption_time %ds might be too short' % + (instance_id, limit)) + raise Exception('%s %s took too long: %ds' % + (instance_id, action, td.total_seconds())) + else: + LOG.info('%s %s with recovery time took %ds' % + (instance_id, action, td.total_seconds())) + del self.pending_actions[instance_id] + + def run(self): + app = Flask('VNFM') + + @app.route('/maintenance', methods=['POST']) + def maintenance_alarm(): + data = json.loads(request.data.decode('utf8')) + try: + payload = self._alarm_traits_decoder(data) + except Exception: + payload = ({t[0]: t[2] for t in + data['reason_data']['event']['traits']}) + LOG.error('cannot parse alarm data: %s' % payload) + raise Exception('VNFM cannot parse alarm.' + 'Possibly trait data over 256 char') + + LOG.info('VNFM received data = %s' % payload) + + state = payload['state'] + reply_state = None + reply = dict() + + LOG.info('VNFM state: %s' % state) + + if state == 'MAINTENANCE': + self.headers['X-Auth-Token'] = self.session.get_token() + instance_ids = (self.get_session_instance_ids( + payload['instance_ids'], + payload['session_id'])) + reply['instance_ids'] = instance_ids + reply_state = 'ACK_MAINTENANCE' + + elif state == 'SCALE_IN': + # scale down only nonha instances + nonha_instances = len(self.nonha_instances) + scale_in = nonha_instances / 2 + self.scale_instances(-scale_in) + self.update_constraints() + reply['instance_ids'] = self.get_instance_ids() + reply_state = 'ACK_SCALE_IN' + + elif state == 'MAINTENANCE_COMPLETE': + # possibly need to upscale + number_of_instances = self.number_of_instances() + if self.orig_number_of_instances > number_of_instances: + scale_instances = (self.orig_number_of_instances - + number_of_instances) + self.scale_instances(scale_instances) + self.update_constraints() + reply_state = 'ACK_MAINTENANCE_COMPLETE' + + elif (state == 'PREPARE_MAINTENANCE' + or state == 'PLANNED_MAINTENANCE'): + instance_id = payload['instance_ids'][0] + instance_action = (self.instance_action(instance_id, + payload['allowed_actions'])) + if not instance_action: + raise Exception('Allowed_actions not supported for %s' % + instance_id) + + LOG.info('VNFM got instance: %s' % instance_id) + self.switch_over_ha_instance(instance_id) + + reply['instance_action'] = instance_action + reply_state = 'ACK_%s' % state + self.instance_action_started(instance_id, instance_action) + + elif state == 'INSTANCE_ACTION_DONE': + # TBD was action done in max_interruption_time (live migration) + # NOTE, in EVICTION instance_id reported that was in evicted + # node. New instance_id might be different + LOG.info('%s' % payload['instance_ids']) + self.was_instance_action_in_time(payload['instance_ids'][0]) + self.update_instances() + self.update_constraints() + else: + raise Exception('VNFM received event with' + ' unknown state %s' % state) + + if reply_state: + reply['session_id'] = payload['session_id'] + reply['state'] = reply_state + url = payload['reply_url'] + LOG.info('VNFM reply: %s' % reply) + requests.put(url, data=json.dumps(reply), headers=self.headers) + + return 'OK' + + @app.route('/shutdown', methods=['POST']) + def shutdown(): + LOG.info('shutdown VNFM server at %s' % time.time()) + func = request.environ.get('werkzeug.server.shutdown') + if func is None: + raise RuntimeError('Not running with the Werkzeug Server') + func() + return 'VNFM shutting down...' + + app.run(host="0.0.0.0", port=self.port) + +if __name__ == '__main__': + app_manager = VNFM(CONF, LOG) + app_manager.start() + try: + LOG.info('Press CTRL + C to quit') + while True: + time.sleep(2) + except KeyboardInterrupt: + app_manager.stop() diff --git a/fenix/utils/identity_auth.py b/fenix/utils/identity_auth.py index 910eb36..5499b53 100644 --- a/fenix/utils/identity_auth.py +++ b/fenix/utils/identity_auth.py @@ -42,14 +42,14 @@ os_opts = [ ] -def get_identity_auth(conf): +def get_identity_auth(conf, project=None): loader = loading.get_plugin_loader('password') return loader.load_from_options( auth_url=conf.service_user.os_auth_url, username=conf.service_user.os_username, password=conf.service_user.os_password, user_domain_name=conf.service_user.os_user_domain_name, - project_name=conf.service_user.os_project_name, + project_name = (project or conf.service_user.os_project_name), tenant_name=conf.service_user.os_project_name, project_domain_name=conf.service_user.os_project_domain_name) diff --git a/fenix/utils/service.py b/fenix/utils/service.py index 1325e84..b1ad738 100644 --- a/fenix/utils/service.py +++ b/fenix/utils/service.py @@ -37,6 +37,7 @@ from uuid import uuid1 as generate_uuid from fenix import context from fenix.db import api as db_api +from fenix import exceptions from fenix.utils.download import download_url import fenix.utils.identity_auth @@ -159,6 +160,8 @@ class EngineEndpoint(object): def admin_delete_session(self, ctx, session_id): """Delete maintenance workflow session thread""" LOG.info("EngineEndpoint: admin_delete_session") + if session_id not in self.workflow_sessions: + raise exceptions.NotFound("session_id not found") self.workflow_sessions[session_id].cleanup() self.workflow_sessions[session_id].stop() self.workflow_sessions.pop(session_id) diff --git a/fenix/workflow/workflow.py b/fenix/workflow/workflow.py index 489844c..7e48263 100644 --- a/fenix/workflow/workflow.py +++ b/fenix/workflow/workflow.py @@ -398,13 +398,27 @@ class BaseWorkflow(Thread): def maintenance(self): LOG.error("%s: maintenance method not implemented!" % self.session_id) + def maintenance_done(self): + LOG.error("%s: maintenance_done method not implemented!" % + self.session_id) + def maintenance_failed(self): LOG.error("%s: maintenance_failed method not implemented!" % self.session_id) def state(self, state): + # TBD we could notify admin for workflow state change self.session.prev_state = self.session.state self.session.state = state + if state in ["MAINTENANCE_DONE", "MAINTENANCE_FAILED"]: + try: + statefunc = (getattr(self, + self.states_methods[self.session.state])) + statefunc() + except Exception as e: + LOG.error("%s: %s Raised exception: %s" % (self.session_id, + statefunc, e), exc_info=True) + self.state("MAINTENANCE_FAILED") def run(self): LOG.info("%s: started" % self.session_id) @@ -555,3 +569,10 @@ class BaseWorkflow(Thread): LOG.error('%s: timer %s expired' % (self.session_id, timer_name)) return False + + def project_ids_with_instance_group(self): + igs = db_api.instance_groups_get() + project_ids = list() + [project_ids.append(ig.project_id) for ig in igs + if ig.project_id not in project_ids] + return project_ids diff --git a/fenix/workflow/workflows/k8s.py b/fenix/workflow/workflows/k8s.py new file mode 100644 index 0000000..50ad1ac --- /dev/null +++ b/fenix/workflow/workflows/k8s.py @@ -0,0 +1,984 @@ +# Copyright (c) 2020 Nokia Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import datetime +from importlib import import_module +try: + from importlib.machinery import SourceFileLoader + + def mod_loader_action_instance(mname, mpath, session_instance, + ap_db_instance): + mi = SourceFileLoader(mname, mpath).load_module() + return mi.ActionPlugin(session_instance, ap_db_instance) +except ImportError: + from imp import load_source + + def mod_loader_action_instance(mname, mpath, session_instance, + ap_db_instance): + mi = load_source(mname, mpath) + return mi.ActionPlugin(session_instance, ap_db_instance) + +from keystoneclient import client as ks_client +from kubernetes import client +from kubernetes.client.rest import ApiException +from kubernetes import config +import os +from oslo_log import log as logging +import time + +from fenix.db import api as db_api +from fenix.db import exceptions as db_exc +from fenix.utils.thread import run_async +from fenix.utils.time import datetime_to_str +from fenix.utils.time import is_time_after_time +from fenix.utils.time import reply_time_str +from fenix.utils.time import time_now_str + + +from fenix.workflow.workflow import BaseWorkflow + +LOG = logging.getLogger(__name__) + + +class Workflow(BaseWorkflow): + + def __init__(self, conf, session_id, data): + super(Workflow, self).__init__(conf, session_id, data) + + config.load_kube_config() + v_api = client.VersionApi() + self.kapi = client.CoreV1Api() + self.ks = ks_client.Client(version='v3', session=self.auth_session) + LOG.info("%s: initialized with Kubernetes: %s" % + (self.session_id, + v_api.get_code_with_http_info()[0].git_version)) + + self.hosts = self._init_hosts_by_services() + + LOG.info('%s: Execute pre action plugins' % (self.session_id)) + self.maintenance_by_plugin_type("localhost", "pre") + self.group_impacted_members = {} + + def _init_hosts_by_services(self): + LOG.info("%s: Dicovering hosts by services" % self.session_id) + nodes = self.kapi.list_node().items + hosts = [] + for node in nodes: + host = {} + host['hostname'] = node.metadata.name + if 'node-role.kubernetes.io/master' in node.metadata.labels.keys(): + host['type'] = 'controller' + else: + host['type'] = 'compute' + + if node.spec.unschedulable: + host['disabled'] = True + else: + host['disabled'] = False + host['maintained'] = False + hosts.append(host) + + return db_api.create_hosts_by_details(self.session_id, hosts) + + def get_worker_nodes(self): + nodes = self.kapi.list_node().items + worker_hosts = self.get_compute_hosts() + return [n for n in nodes if n.metadata.name in worker_hosts] + + def is_node_cordoned(self, node_name): + host = self.get_host_by_name(node_name) + return host.disabled + + def cordon(self, node_name): + LOG.info("%s: cordon %s" % (self.session_id, node_name)) + host = self.get_host_by_name(node_name) + body = {"apiVersion": "v1", "spec": {"unschedulable": True}} + self.kapi.patch_node(node_name, body) + host.disabled = True + + def uncordon(self, node_name): + LOG.info("%s: uncordon %s" % (self.session_id, node_name)) + host = self.get_host_by_name(node_name) + body = {"apiVersion": "v1", "spec": {"unschedulable": None}} + self.kapi.patch_node(node_name, body) + host.disabled = False + + def _pod_by_id(self, pod_id): + return [p for p in self.kapi.list_pod_for_all_namespaces().items + if p.metadata.uid == pod_id][0] + + def _pods_by_node_and_controller(self, node_name, contoller): + return [p for p in self.kapi.list_pod_for_all_namespaces().items + if p.metadata.owner_references[0].kind == contoller and + p.spec.node_name == node_name and + p.metadata.namespace != 'kube-system'] + + def _pods_by_nodes_and_controller(self, node_names, contoller): + return [p for p in self.kapi.list_pod_for_all_namespaces().items + if p.metadata.owner_references[0].kind == contoller and + p.spec.node_name in node_names and + p.metadata.namespace != 'kube-system'] + + def _get_pod_by_name_and_namespace(self, name, namespace): + try: + pod = self.kapi.read_namespaced_pod(name, namespace) + except ApiException: + pod = None + return pod + + # TBD remove as deprecated + def _get_pod_host_and_state(self, name): + return [(p.spec.node_name, p.status.phase) for p in + self.kapi.list_pod_for_all_namespaces().items + if p.metadata.name == name][0] + + # TBD remove as deprecated + def wait_pod_evicted(self, name, orig_host, orig_state): + host, state = self._get_pod_host_and_state(name) + check = 60 + last_state = orig_state + last_host = orig_host + while host == orig_host or state != orig_state: + if host != last_host or state != last_state: + # log only if either value changed since last round + LOG.info("%s: pod: %s %s on host %s" % + (self.session_id, name, state, host)) + last_state = state + last_host = host + if check == 0: + raise Exception('Pod %s eviction timout' % name) + check -= 1 + time.sleep(1) + host, state = self._get_pod_host_and_state(name) + + # TBD remove as deprecated + def drain(self, node_name): + LOG.info("%s: drain %s" % (self.session_id, node_name)) + if not self.is_node_cordoned(node_name): + self.cordon(node_name) + for pod in self._pods_by_node_and_controller(node_name, + 'ReplicaSet'): + namespace = pod.metadata.namespace + name = pod.metadata.name + orig_host = pod.spec.node_name + orig_state = pod.status.phase + # For now k8s namespace will be the user and project in OpenStack + # keystone. Keycloak or webhook for keystone should be used + body = client.V1beta1Eviction() + body.api_version = "policy/v1beta1" + body.kind = "Eviction" + body.metadata = {"name": name, "namespace": namespace} + LOG.info("%s: Evicting pod: %s %s on host %s" % + (self.session_id, name, orig_state, orig_host)) + try: + self.kapi.create_namespaced_pod_eviction(name, + namespace, + body) + except ApiException as e: + LOG.error("Exception when calling create_namespaced_pod_" + "eviction: %s\n" % e) + # self.wait_pod_evicted(name, orig_host, orig_state) + LOG.info("%s: Evicted pod: %s" % (self.session_id, name)) + # VNFM should keep track of constraints, not Fenix + # db_api.remove_project_instance(pod_id) + # self.notify_action_done(self.instance_by_id(pod_id)) + LOG.info("%s: drained %s" % (self.session_id, node_name)) + + def evict(self, pod, recovery_time): + namespace = pod.metadata.namespace + name = pod.metadata.name + pod_id = pod.metadata.uid + LOG.info("%s: Evict: %s: %s" % (self.session_id, pod_id, name)) + orig_host = pod.spec.node_name + orig_state = pod.status.phase + # For now k8s namespace will be the user and project in OpenStack + # keystone. Keycloak or webhook for keystone should be used + body = client.V1beta1Eviction() + body.api_version = "policy/v1beta1" + body.kind = "Eviction" + body.metadata = {"name": name, + "namespace": namespace} + + LOG.info("%s: Evicting pod: %s %s on host %s" % + (self.session_id, name, orig_state, orig_host)) + try: + self.kapi.create_namespaced_pod_eviction(name, + namespace, + body) + except ApiException as e: + LOG.error("Exception when calling create_namespaced_pod_" + "eviction: %s\n" % e) + # Need to start timer to wait new POD initialization with recovery time + # TBD this might first check new POD STATUS == running and then + # still wait instance_group.recovery_time. This might be tricky as we + # do not know new POD. We might check new pods, but there might be more + # than one becasue parallel actions. Somehow we would need to be able + # to map evicted POD for new to make this enhancement + # tried adding "labels": {"previous_pod_id": pod_id} in above body. + # that did not result to this label to be in new POD + timer = 'RECOVERY_%s_TIMEOUT' % pod_id + self.start_timer(recovery_time, timer) + time.sleep(1) + pod = self._get_pod_by_name_and_namespace(name, namespace) + check = 40 + LOG.info("%s: Waiting pod: %s eviction from host %s ..." % + (self.session_id, name, orig_host)) + while pod: + if check == 0: + raise Exception('Pod %s still not deleted in eviction' % name) + check -= 1 + time.sleep(1) + pod = self._get_pod_by_name_and_namespace(name, namespace) + LOG.info("%s: Evicted pod: %s: %s" % (self.session_id, pod_id, name)) + return True + + def _fenix_instance(self, project_id, instance_id, instance_name, host, + state, details=None, action=None, project_state=None, + action_done=False): + instance = {'session_id': self.session_id, + 'instance_id': instance_id, + 'action': action, + 'project_id': project_id, + 'instance_id': instance_id, + 'project_state': project_state, + 'state': state, + 'instance_name': instance_name, + 'action_done': action_done, + 'host': host, + 'details': details} + return instance + + def initialize_server_info(self): + project_ids = {} + instances = [] + worker_hosts = self.get_compute_hosts() + pods = self._pods_by_nodes_and_controller(worker_hosts, 'ReplicaSet') + for pod in pods: + host = pod.spec.node_name + # Map K8S namespace as user and project in keystone + if pod.metadata.namespace not in project_ids.keys(): + project_id = str(self.ks.projects.list( + name=pod.metadata.namespace)[0].id) + project_ids[pod.metadata.namespace] = project_id + else: + project_id = project_ids[pod.metadata.namespace] + instance_name = pod.metadata.name + instance_id = pod.metadata.uid + state = pod.status.phase # Running + instances.append(self._fenix_instance(project_id, instance_id, + instance_name, host, state)) + if project_ids: + self.projects = self.init_projects(project_ids.values()) + else: + LOG.info('%s: No projects on nodes under maintenance' % + self.session_id) + if len(instances): + self.instances = self.add_instances(instances) + else: + LOG.info('%s: No instances on nodes under maintenance' % + self.session_id) + LOG.info(str(self)) + + def update_instance(self, project_id, instance_id, instance_name, host, + state, details=None): + if self.instance_id_found(instance_id): + # TBD Might need to update instance variables here if not done + # somewhere else + return + elif self.instance_name_found(instance_name): + # Project has made re-instantiation, remove old add new + old_instance = self.instance_by_name(instance_name) + instance = self._fenix_instance(project_id, instance_id, + instance_name, host, + state, details, + old_instance.action, + old_instance.project_state, + old_instance.action_done) + self.instances.append(self.add_instance(instance)) + self.remove_instance(old_instance) + else: + # Instance new, as project has added instances + instance = self._fenix_instance(project_id, instance_id, + instance_name, host, + state, details) + self.instances.append(self.add_instance(instance)) + + def remove_non_existing_instances(self, instance_ids): + remove_instances = [instance for instance in + self.instances if instance.instance_id not in + instance_ids] + for instance in remove_instances: + # Instance deleted, as project possibly scaled down + self.remove_instance(instance) + + def update_server_info(self): + # TBD This keeps internal instance information up-to-date and prints + # it out. Same could be done by updating the information when changed + # Anyhow this also double checks information against K8S + project_ids = {} + instance_ids = [] + worker_hosts = self.get_compute_hosts() + pods = self._pods_by_nodes_and_controller(worker_hosts, 'ReplicaSet') + for pod in pods: + host = pod.spec.node_name + # Map K8S namespace as user and project in keystone + if pod.metadata.namespace not in project_ids.keys(): + project_id = self.ks.projects.list( + name=pod.metadata.namespace)[0].id + project_ids[pod.metadata.namespace] = project_id + else: + project_id = project_ids[pod.metadata.namespace] + instance_name = pod.metadata.name + instance_id = pod.metadata.uid + state = pod.status.phase # Running + details = None + self.update_instance(project_id, instance_id, instance_name, host, + state, details) + instance_ids.append(instance_id) + self.remove_non_existing_instances(instance_ids) + LOG.info(str(self)) + + def projects_with_constraints(self): + project_ids = self.project_ids_with_instance_group() + for project_id in self.projects(): + if project_id not in project_ids: + LOG.error('%s: project_id %s not ' + 'set any instance_group' % + (self.session_id, project_id)) + return False + return True + + def confirm_maintenance(self): + allowed_actions = [] + actions_at = self.session.maintenance_at + state = 'MAINTENANCE' + self.set_projets_state(state) + all_replied = False + project_not_replied = None + retry = 2 + while not all_replied: + for project in self.project_names(): + if (project_not_replied is not None and project not in + project_not_replied): + continue + LOG.info('\nMAINTENANCE to project %s\n' % project) + instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project) + reply_at = reply_time_str(self.conf.project_maintenance_reply) + if is_time_after_time(reply_at, actions_at): + LOG.error('%s: No time for project to answer in state: %s' + % (self.session_id, state)) + self.state("MAINTENANCE_FAILED") + return False + metadata = self.session.meta + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + self.start_timer(self.conf.project_maintenance_reply, + 'MAINTENANCE_TIMEOUT') + + all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state) + if not all_replied: + if retry == 0: + LOG.info('confirm_maintenance failed after retries') + break + else: + LOG.info('confirm_maintenance retry') + projects = self.get_projects_with_state() + project_not_replied = ( + self._project_names_in_state(projects, state)) + retry -= 1 + return all_replied + + def worker_nodes_cpu_info(self, system_reserved): + # TBD system_reserved is now just questimated according to what + # flannel, kubelet... needs on top of pods + workers_info = {} + worker_hosts = self.get_compute_hosts() + workers = self.get_worker_nodes() + pods = self._pods_by_nodes_and_controller(worker_hosts, 'ReplicaSet') + for worker in workers: + cpus = int(worker.status.capacity[u'cpu']) - system_reserved + name = worker.metadata.name + workers_info[name] = {'cpus_used': 0, + 'cpus': cpus, + 'name': name} + for pod in [p for p in pods if p.spec.node_name == name]: + + cpus_used = 0 + for container in pod.spec.containers: + try: + cpus_used += int(container.resources.requests[u'cpu']) + except AttributeError: + # container does not need to have + # resources.requests.cpu + pass + if cpus_used > 0: + workers_info[name]['cpus_used'] += cpus_used + if workers_info[name]['cpus_used'] > workers_info[name]['cpus']: + LOG.error('%s overbooked: %s' % + (name, workers_info[name])) + raise Exception('%s overbooked: %s' % + (name, workers_info[name])) + LOG.info('workers_info:\n%s' % workers_info) + return workers_info + + def confirm_scale_in(self): + allowed_actions = [] + actions_at = reply_time_str(self.conf.project_scale_in_reply) + reply_at = actions_at + state = 'SCALE_IN' + self.set_projets_state(state) + all_replied = False + project_not_replied = None + retry = 2 + while not all_replied: + for project in self.project_names(): + if (project_not_replied is not None and project not in + project_not_replied): + continue + LOG.info('\nSCALE_IN to project %s\n' % project) + instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project) + metadata = self.session.meta + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + self.start_timer(self.conf.project_scale_in_reply, + 'SCALE_IN_TIMEOUT') + + all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state) + if not all_replied: + if retry == 0: + LOG.info('confirm_scale_in failed after retries') + break + else: + LOG.info('confirm_scale_in retry') + projects = self.get_projects_with_state() + project_not_replied = ( + self._project_names_in_state(projects, state)) + retry -= 1 + return all_replied + + def need_scale_in(self): + # TBD see if there is enough free capacity, so we do not need to scale + # TBD this should be calculated according to instance and + # instance_group constraints + workers_info = self.worker_nodes_cpu_info(2) + prev_cpus = 0 + free_cpus = 0 + prev_hostname = '' + LOG.info('checking workers CPU capacity') + for worker in workers_info.values(): + hostname = worker['name'] + cpus = worker['cpus'] + cpus_used = worker['cpus_used'] + if prev_cpus != 0 and prev_cpus != cpus: + raise Exception('%s: %d cpus on %s does not match to' + '%d on %s' + % (self.session_id, cpus, hostname, + prev_cpus, prev_hostname)) + free_cpus += cpus - cpus_used + prev_cpus = cpus + prev_hostname = hostname + if free_cpus >= cpus: + # TBD cpu capacity might be too scattered so moving instances from + # one host to other host still might not succeed. At least with + # NUMA and CPU pinning, one should calculate and ask specific + # instances + return False + else: + return True + + def find_host_to_be_empty(self, need_empty, weighted_hosts): + print("need_empty: %s" % need_empty) + hosts_to_be_empty = [] + for instances in sorted(weighted_hosts.keys()): + print("instances in weighted_hosts: %s" % instances) + weighted_candidates = weighted_hosts[instances] + if len(weighted_candidates) == need_empty: + # Happened to be exact match to needed + hosts_to_be_empty = weighted_hosts[instances] + print("hosts to be empty: %s" % hosts_to_be_empty) + elif len(weighted_candidates) > need_empty: + # More candidates than we need, dig deeper to act_instances + for host in weighted_candidates: + print("host to be empty: %s" % host) + hosts_to_be_empty.append(host) + if len(hosts_to_be_empty) == need_empty: + break + if len(hosts_to_be_empty) == need_empty: + break + if len(hosts_to_be_empty) != need_empty: + print("we failed to search hosts to be empty!!!") + return hosts_to_be_empty + + def make_empty_hosts(self, state): + # TBD, calculate how many nodes can be made empty, now just very simple + # According to where is least pods + weighted_hosts = {} + empty_hosts = [] + for host in self.get_compute_hosts(): + instances = len(self.instances_by_host(host)) + if instances == 0: + self.empty_hosts.append(host) + self.cordon(host) + LOG.info("host %s empty" % host) + else: + if instances not in weighted_hosts: + weighted_hosts[instances] = [host] + else: + weighted_hosts[instances].append(host) + if len(empty_hosts): + # TBD We just need empty host to initial POC testing + return True + else: + need_empty = 1 + hosts_to_be_empty = self.find_host_to_be_empty(need_empty, + weighted_hosts) + thrs = [] + for host in hosts_to_be_empty: + thrs.append(self.actions_to_have_empty_host(host, state)) + # self._wait_host_empty(host) + for thr in thrs: + thr.join() + return True + + @run_async + def instance_action(self, instance, state, target_host=None): + if not self.confirm_instance_action(instance, state): + raise Exception('%s: instance %s action %s ' + 'confirmation failed' % + (self.session_id, instance.instance_id, + instance.action)) + # TBD from constraints or override in instance.action + LOG.info('Action %s instance %s ' % (instance.action, + instance.instance_id)) + try: + instance_constraints = ( + db_api.project_instance_get(instance.instance_id)) + group_id = instance_constraints.group_id + instance_group = db_api.instance_group_get(group_id) + if group_id not in self.group_impacted_members: + self.group_impacted_members[group_id] = 0 + max_parallel = instance_group.max_impacted_members + LOG.info("%s - instance_group: %s max_impacted_members: %s " + "recovery_time: %s" % + (instance.instance_id, instance_group.group_name, + max_parallel, instance_group.recovery_time)) + except db_exc.FenixDBNotFound: + raise Exception('failed to get %s constraints' % + (instance.instance_id)) + while max_parallel < self.group_impacted_members[group_id]: + LOG.info('%s waiting in group queue / max_parallel %s/%s' % + (instance.instance_id, + self.group_impacted_members[group_id], + max_parallel)) + time.sleep(5) + self.group_impacted_members[group_id] += 1 + LOG.debug("%s Reserved / max_impacted_members: %s/%s" % + (instance.instance_id, self.group_impacted_members[group_id], + max_parallel)) + if instance.action == 'OWN_ACTION': + pass + elif instance.action == 'EVICTION': + pod = self._pod_by_id(instance.instance_id) + if not self.evict(pod, instance_group.recovery_time): + self.group_impacted_members[group_id] -= 1 + LOG.debug("%s Reservation freed. remain / " + "max_impacted_members:%s/%s" + % (instance.instance_id, + self.group_impacted_members[group_id], + max_parallel)) + raise Exception('%s: instance %s action ' + '%s failed' % + (self.session_id, instance.instance_id, + instance.action)) + else: + self.group_impacted_members[group_id] -= 1 + LOG.debug("%s Reservation freed. remain / " + "max_impacted_members:%s/%s" + % (instance.instance_id, + self.group_impacted_members[group_id], + max_parallel)) + raise Exception('%s: instance %s action ' + '%s not supported' % + (self.session_id, instance.instance_id, + instance.action)) + # We need to obey recovery time for instance group before + # decrease self.group_impacted_members[group_id] to allow + # one more instances of same group to be affected by any move + # operation + if instance_group.recovery_time > 0: + timer = 'RECOVERY_%s_TIMEOUT' % instance.instance_id + LOG.info("%s wait POD to recover from move..." + % instance.instance_id) + while not self.is_timer_expired(timer): + time.sleep(1) + self.notify_action_done(instance) + self.group_impacted_members[group_id] -= 1 + LOG.debug("%s Reservation freed. remain / max_impacted_members: %s/%s" + % (instance.instance_id, + self.group_impacted_members[group_id], + max_parallel)) + + @run_async + def actions_to_have_empty_host(self, host, state, target_host=None): + # TBD we only support EVICTION of all pods with drain(host) + # Need parallel hosts and make_empty_hosts to calculate + thrs = [] + LOG.info('actions_to_have_empty_host %s' % host) + instances = self.instances_by_host(host) + if not instances: + raise Exception('No instances on host: %s' % host) + self.cordon(host) + for instance in instances: + LOG.info('move %s from %s' % (instance.instance_name, host)) + thrs.append(self.instance_action(instance, state, + target_host)) + # thrs.append(self.confirm_instance_action(instance, state)) + for thr in thrs: + thr.join() + if state == 'PLANNED_MAINTENANCE': + self.host_maintenance(host) + + def confirm_instance_action(self, instance, state): + instance_id = instance.instance_id + LOG.info('%s to instance %s' % (state, instance_id)) + allowed_actions = ['EVICTION', 'OWN_ACTION'] + try: + instance_constraints = db_api.project_instance_get(instance_id) + wait_time = instance_constraints.lead_time + LOG.info("%s actions_at from constraints lead_time: %s" % + (instance_id, wait_time)) + except db_exc.FenixDBNotFound: + wait_time = self.conf.project_maintenance_reply + actions_at = reply_time_str(wait_time) + reply_at = actions_at + instance.project_state = state + metadata = self.session.meta + retry = 2 + replied = False + while not replied: + metadata = self.session.meta + self._project_notify(instance.project_id, [instance_id], + allowed_actions, actions_at, reply_at, + state, metadata) + timer = '%s_%s_TIMEOUT' % (state, instance_id) + self.start_timer(self.conf.project_maintenance_reply, timer) + replied = self.wait_instance_reply_state(state, instance, timer) + if not replied: + if retry == 0: + LOG.info('confirm_instance_action for %s failed after ' + 'retries' % instance.instance_id) + break + else: + LOG.info('confirm_instance_action for %s retry' + % instance.instance_id) + else: + break + retry -= 1 + return replied + + def confirm_maintenance_complete(self): + state = 'MAINTENANCE_COMPLETE' + metadata = self.session.meta + actions_at = reply_time_str(self.conf.project_scale_in_reply) + reply_at = actions_at + self.set_projets_state(state) + all_replied = False + project_not_replied = None + retry = 2 + while not all_replied: + for project in self.project_names(): + if (project_not_replied is not None and project not in + project_not_replied): + continue + LOG.info('%s to project %s' % (state, project)) + instance_ids = '%s/v1/maintenance/%s/%s' % (self.url, + self.session_id, + project) + allowed_actions = [] + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + self.start_timer(self.conf.project_scale_in_reply, + '%s_TIMEOUT' % state) + + all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state) + if not all_replied: + if retry == 0: + LOG.info('confirm_maintenance_complete failed after ' + 'retries') + break + else: + LOG.info('confirm_maintenance_complete retry') + projects = self.get_projects_with_state() + project_not_replied = ( + self._project_names_in_state(projects, state)) + retry -= 1 + return all_replied + + def notify_action_done(self, instance): + instance_ids = [instance.instance_id] + project = instance.project_id + allowed_actions = [] + actions_at = None + reply_at = None + state = "INSTANCE_ACTION_DONE" + instance.project_state = state + metadata = "{}" + self._project_notify(project, instance_ids, allowed_actions, + actions_at, reply_at, state, metadata) + + def maintenance_by_plugin_type(self, hostname, plugin_type): + aps = self.get_action_plugins_by_type(plugin_type) + session_dir = "%s/%s" % (self.conf.engine.local_cache_dir, + self.session_id) + download_plugin_dir = session_dir + "/actions/" + if aps: + LOG.info("%s: Calling action plug-ins with type %s" % + (self.session_id, plugin_type)) + for ap in aps: + ap_name = "fenix.workflow.actions.%s" % ap.plugin + LOG.info("%s: Calling action plug-in module: %s" % + (self.session_id, ap_name)) + ap_db_instance = self._create_action_plugin_instance(ap.plugin, + hostname) + try: + action_plugin = getattr(import_module(ap_name), + 'ActionPlugin') + ap_instance = action_plugin(self, ap_db_instance) + except ImportError: + download_plugin_file = "%s/%s.py" % (download_plugin_dir, + ap.plugin) + LOG.info("%s: Trying from: %s" % (self.session_id, + download_plugin_file)) + if os.path.isfile(download_plugin_file): + ap_instance = ( + mod_loader_action_instance(ap_name, + download_plugin_file, + self, + ap_db_instance)) + else: + raise Exception('%s: could not find action plugin %s' % + (self.session_id, ap.plugin)) + + ap_instance.run() + if ap_db_instance.state: + LOG.info('%s: %s finished with %s host %s' % + (self.session_id, ap.plugin, + ap_db_instance.state, hostname)) + if 'FAILED' in ap_db_instance.state: + raise Exception('%s: %s finished with %s host %s' % + (self.session_id, ap.plugin, + ap_db_instance.state, hostname)) + else: + raise Exception('%s: %s reported no state for host %s' % + (self.session_id, ap.plugin, hostname)) + # If ap_db_instance failed, we keep it for state + db_api.remove_action_plugin_instance(ap_db_instance) + else: + LOG.info("%s: No action plug-ins with type %s" % + (self.session_id, plugin_type)) + + def _wait_host_empty(self, host): + check = 60 + pods = self._pods_by_node_and_controller(host, 'ReplicaSet') + while pods: + if check == 0: + raise Exception('Wait empty host %s timout' % host) + elif not check % 5: + LOG.info('...waiting host %s empty' % host) + check -= 1 + time.sleep(1) + pods = self._pods_by_node_and_controller(host, 'ReplicaSet') + LOG.info('Host %s empty' % host) + + @run_async + def host_maintenance_async(self, hostname): + self.host_maintenance(hostname) + + def host_maintenance(self, hostname): + host = self.get_host_by_name(hostname) + if host.type == "compute": + self._wait_host_empty(hostname) + LOG.info('IN_MAINTENANCE %s' % hostname) + self._admin_notify(self.conf.service_user.os_project_name, + hostname, + 'IN_MAINTENANCE', + self.session_id) + for plugin_type in ["host", host.type]: + LOG.info('%s: Execute %s action plugins' % (self.session_id, + plugin_type)) + self.maintenance_by_plugin_type(hostname, plugin_type) + self._admin_notify(self.conf.service_user.os_project_name, + hostname, + 'MAINTENANCE_COMPLETE', + self.session_id) + if host.type == "compute": + self.uncordon(hostname) + LOG.info('MAINTENANCE_COMPLETE %s' % hostname) + host.maintained = True + + def maintenance(self): + LOG.info("%s: maintenance called" % self.session_id) + self.initialize_server_info() + time.sleep(1) + self.state('START_MAINTENANCE') + + if not self.projects_with_constraints: + self.state('MAINTENANCE_FAILED') + return + + if not self.confirm_maintenance(): + self.state('MAINTENANCE_FAILED') + return + + maintenance_empty_hosts = self.get_empty_computes() + + if len(maintenance_empty_hosts) == 0: + if self.need_scale_in(): + LOG.info('%s: Need to scale in to get capacity for ' + 'empty host' % (self.session_id)) + self.state('SCALE_IN') + else: + LOG.info('%s: Free capacity, but need empty host' % + (self.session_id)) + self.state('PREPARE_MAINTENANCE') + else: + LOG.info('Empty host found') + self.state('START_MAINTENANCE') + + if self.session.maintenance_at > datetime.datetime.utcnow(): + time_now = time_now_str() + LOG.info('Time now: %s maintenance starts: %s....' % + (time_now, datetime_to_str(self.session.maintenance_at))) + td = self.session.maintenance_at - datetime.datetime.utcnow() + self.start_timer(td.total_seconds(), 'MAINTENANCE_START_TIMEOUT') + while not self.is_timer_expired('MAINTENANCE_START_TIMEOUT'): + time.sleep(1) + + time_now = time_now_str() + LOG.info('Time to start maintenance: %s' % time_now) + + def scale_in(self): + LOG.info("%s: scale in" % self.session_id) + # TBD we just blindly ask to scale_in to have at least one + # empty compute. With NUMA and CPU pinning and together with + # how many instances can be affected at the same time, we should + # calculate and ask scaling of specific instances + if not self.confirm_scale_in(): + self.state('MAINTENANCE_FAILED') + return + # TBD it takes time to have proper information updated about free + # capacity. Should make sure instances removed by other means than + # sleeping here + time.sleep(4) + self.update_server_info() + maintenance_empty_hosts = self.get_empty_computes() + + if len(maintenance_empty_hosts) == 0: + if self.need_scale_in(): + LOG.info('%s: Need to scale in more to get capacity for ' + 'empty host' % (self.session_id)) + self.state('SCALE_IN') + else: + LOG.info('%s: Free capacity, but need empty host' % + (self.session_id)) + self.state('PREPARE_MAINTENANCE') + else: + LOG.info('Empty host found') + self.state('START_MAINTENANCE') + + def prepare_maintenance(self): + LOG.info("%s: prepare_maintenance called" % self.session_id) + if not self.make_empty_hosts('PREPARE_MAINTENANCE'): + LOG.error('make_empty_hosts failed') + self.state('MAINTENANCE_FAILED') + else: + self.state('START_MAINTENANCE') + self.update_server_info() + + def start_maintenance(self): + LOG.info("%s: start_maintenance called" % self.session_id) + empty_hosts = self.get_empty_computes() + if not empty_hosts: + LOG.error("%s: No empty host to be maintained" % self.session_id) + self.state('MAINTENANCE_FAILED') + return + for host_name in self.get_compute_hosts(): + self.cordon(host_name) + thrs = [] + for host_name in empty_hosts: + # LOG.info("%s: Maintaining %s" % (self.session_id, host_name)) + thrs.append(self.host_maintenance_async(host_name)) + # LOG.info("%s: Maintained %s" % (self.session_id, host_name)) + for thr in thrs: + thr.join() + time.sleep(1) + self.update_server_info() + self.state('PLANNED_MAINTENANCE') + + def planned_maintenance(self): + LOG.info("%s: planned_maintenance called" % self.session_id) + maintained_hosts = self.get_maintained_hosts_by_type('compute') + compute_hosts = self.get_compute_hosts() + not_maintained_hosts = ([host for host in compute_hosts if host + not in maintained_hosts]) + empty_compute_hosts = self.get_empty_computes() + parallel = len(empty_compute_hosts) + not_maintained = len(not_maintained_hosts) + while not_maintained: + if not_maintained < parallel: + parallel = not_maintained + thrs = [] + for index in range(parallel): + shost = not_maintained_hosts[index] + thost = empty_compute_hosts[index] + thrs.append( + self.actions_to_have_empty_host(shost, + 'PLANNED_MAINTENANCE', + thost)) + for thr in thrs: + thr.join() + empty_compute_hosts = self.get_empty_computes() + del not_maintained_hosts[:parallel] + parallel = len(empty_compute_hosts) + not_maintained = len(not_maintained_hosts) + self.update_server_info() + + LOG.info("%s: planned_maintenance done" % self.session_id) + self.state('MAINTENANCE_COMPLETE') + + def maintenance_complete(self): + LOG.info("%s: maintenance_complete called" % self.session_id) + LOG.info('%s: Execute post action plugins' % self.session_id) + self.maintenance_by_plugin_type("localhost", "post") + LOG.info('Projects may still need to up scale back to full ' + 'capcity') + if not self.confirm_maintenance_complete(): + self.state('MAINTENANCE_FAILED') + return + self.update_server_info() + self.state('MAINTENANCE_DONE') + + def maintenance_done(self): + LOG.info("%s: MAINTENANCE_DONE" % self.session_id) + + def maintenance_failed(self): + LOG.info("%s: MAINTENANCE_FAILED" % self.session_id) + + def cleanup(self): + LOG.info("%s: cleanup" % self.session_id) + db_api.remove_session(self.session_id) diff --git a/fenix/workflow/workflows/vnf.py b/fenix/workflow/workflows/vnf.py index 27a2de0..93500dc 100644 --- a/fenix/workflow/workflows/vnf.py +++ b/fenix/workflow/workflows/vnf.py @@ -170,18 +170,6 @@ class Workflow(BaseWorkflow): self.nova.services.enable(hostname, "nova-compute") host.disabled = False - def get_compute_hosts(self): - return [host.hostname for host in self.hosts - if host.type == 'compute'] - - def get_empty_computes(self): - all_computes = self.get_compute_hosts() - instance_computes = [] - for instance in self.instances: - if instance.host not in instance_computes: - instance_computes.append(instance.host) - return [host for host in all_computes if host not in instance_computes] - def get_instance_details(self, instance): network_interfaces = next(iter(instance.addresses.values())) for network_interface in network_interfaces: @@ -401,6 +389,8 @@ class Workflow(BaseWorkflow): return all_replied def need_scale_in(self): + # TBD this should be calculated according to instance and + # instance_group constraints hvisors = self.nova.hypervisors.list(detailed=True) prev_vcpus = 0 free_vcpus = 0 @@ -991,7 +981,7 @@ class Workflow(BaseWorkflow): def scale_in(self): LOG.info("%s: scale in" % self.session_id) # TBD we just blindly ask to scale_in to have at least one - # empty compute. With NUMA and CPI pinning and together with + # empty compute. With NUMA and CPU pinning and together with # how many instances can be affected at the same time, we should # calculate and ask scaling of specific instances if not self.confirm_scale_in():