diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 547b7de1..fc6e5dd5 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -31,6 +31,7 @@ from qinling.db import api as db_api from qinling import exceptions as exc from qinling import rpc from qinling.storage import base as storage_base +from qinling.utils.openstack import keystone as keystone_util from qinling.utils.openstack import swift as swift_util from qinling.utils import rest_utils @@ -146,6 +147,15 @@ class FunctionsController(rest.RestController): if not swift_util.check_object(container, object): raise exc.InputException('Object does not exist in Swift.') + if cfg.CONF.pecan.auth_enable: + try: + values['trust_id'] = keystone_util.create_trust().id + LOG.debug('Trust %s created', values['trust_id']) + except Exception: + raise exc.TrustFailedException( + 'Trust creation failed for function.' + ) + with db_api.transaction(): func_db = db_api.create_function(values) @@ -191,6 +201,10 @@ class FunctionsController(rest.RestController): # Delete all resources created by orchestrator asynchronously. self.engine_client.delete_function(id) + # Delete trust if needed + if func_db.trust_id: + keystone_util.delete_trust(func_db.trust_id) + # This will also delete function service mapping as well. db_api.delete_function(id) diff --git a/qinling/api/controllers/v1/job.py b/qinling/api/controllers/v1/job.py index 821a9725..3cdec0a5 100644 --- a/qinling/api/controllers/v1/job.py +++ b/qinling/api/controllers/v1/job.py @@ -27,7 +27,6 @@ from qinling.db import api as db_api from qinling import exceptions as exc from qinling import status from qinling.utils import jobs -from qinling.utils.openstack import keystone as keystone_util from qinling.utils import rest_utils LOG = logging.getLogger(__name__) @@ -72,16 +71,7 @@ class JobsController(rest.RestController): 'function_input': params.get('function_input') or {}, 'status': status.RUNNING } - - if cfg.CONF.pecan.auth_enable: - values['trust_id'] = keystone_util.create_trust().id - - try: - db_job = db_api.create_job(values) - except Exception: - # Delete trust before raising exception. - keystone_util.delete_trust(values.get('trust_id')) - raise + db_job = db_api.create_job(values) return resources.Job.from_dict(db_job.to_dict()) @@ -89,7 +79,7 @@ class JobsController(rest.RestController): @wsme_pecan.wsexpose(None, types.uuid, status_code=204) def delete(self, id): LOG.info("Delete resource.", resource={'type': self.type, 'id': id}) - jobs.delete_job(id) + return db_api.delete_job(id) @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.Job, types.uuid) diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index ee666126..5e5d3656 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -73,6 +73,7 @@ def upgrade(): sa.Column('code', st.JsonLongDictType(), nullable=False), sa.Column('entry', sa.String(length=80), nullable=False), sa.Column('count', sa.Integer, nullable=False), + sa.Column('trust_id', sa.String(length=80), nullable=True), sa.PrimaryKeyConstraint('id'), sa.ForeignKeyConstraint(['runtime_id'], [u'runtimes.id']), info={"check_ifexists": True} @@ -138,7 +139,6 @@ def upgrade(): sa.Column('first_execution_time', sa.DateTime(), nullable=True), sa.Column('next_execution_time', sa.DateTime(), nullable=False), sa.Column('count', sa.Integer(), nullable=True), - sa.Column('trust_id', sa.String(length=80), nullable=True), sa.PrimaryKeyConstraint('id'), sa.ForeignKeyConstraint(['function_id'], [u'functions.id']), info={"check_ifexists": True} diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 00ce486d..de194071 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -44,6 +44,7 @@ class Function(model_base.QinlingSecureModelBase): code = sa.Column(st.JsonLongDictType(), nullable=False) entry = sa.Column(sa.String(80), nullable=False) count = sa.Column(sa.Integer, default=0) + trust_id = sa.Column(sa.String(80)) class FunctionServiceMapping(model_base.QinlingModelBase): @@ -99,7 +100,6 @@ class Job(model_base.QinlingSecureModelBase): ) function = relationship('Function', back_populates="jobs") function_input = sa.Column(st.JsonDictType()) - trust_id = sa.Column(sa.String(80)) def to_dict(self): d = super(Job, self).to_dict() diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 493d0066..0bf097a9 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -16,7 +16,6 @@ from oslo_config import cfg from oslo_log import log as logging import requests -from qinling import context from qinling.db import api as db_api from qinling import status from qinling.utils import common @@ -104,14 +103,6 @@ class DefaultEngine(object): ) data = {'input': input, 'execution_id': execution_id} - if CONF.pecan.auth_enable: - data.update( - { - 'token': context.get_ctx().auth_token, - 'trust_id': context.get_ctx().trust_id - } - ) - r = self.session.post(func_url, json=data) res = r.json() @@ -145,7 +136,8 @@ class DefaultEngine(object): identifier=identifier, labels=labels, input=input, - entry=function.entry + entry=function.entry, + trust_id=function.trust_id ) output = self.orchestrator.run_execution( execution_id, diff --git a/qinling/exceptions.py b/qinling/exceptions.py index 582d5881..a0f9d31e 100644 --- a/qinling/exceptions.py +++ b/qinling/exceptions.py @@ -101,3 +101,8 @@ class StorageProviderException(QinlingException): class OrchestratorException(QinlingException): http_code = 500 message = "Orchestrator error." + + +class TrustFailedException(QinlingException): + http_code = 500 + message = "Trust operation failed." diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 44cc0b4c..ba3a0de6 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -234,7 +234,7 @@ class KubernetesManager(base.OrchestratorBase): return ret.items[-count:] def _prepare_pod(self, pod, deployment_name, function_id, labels=None, - entry=None, actual_function=None): + entry=None, trust_id=None, actual_function=None): """Pod preparation. 1. Update pod labels. @@ -298,6 +298,7 @@ class KubernetesManager(base.OrchestratorBase): 'download_url': download_url, 'function_id': actual_function, 'entry': entry, + 'trust_id': trust_id } if self.conf.pecan.auth_enable: data.update( @@ -370,7 +371,8 @@ class KubernetesManager(base.OrchestratorBase): return pod_labels def prepare_execution(self, function_id, image=None, identifier=None, - labels=None, input=None, entry='main.main'): + labels=None, input=None, entry='main.main', + trust_id=None): """Prepare service URL for function. For image function, create a single pod with input, so the function @@ -391,7 +393,7 @@ class KubernetesManager(base.OrchestratorBase): raise exc.OrchestratorException('No pod available.') return self._prepare_pod(pod[0], identifier, function_id, labels, - entry) + entry, trust_id) def run_execution(self, execution_id, function_id, input=None, identifier=None, service_url=None): @@ -401,14 +403,6 @@ class KubernetesManager(base.OrchestratorBase): 'input': input, 'execution_id': execution_id, } - if self.conf.pecan.auth_enable: - data.update( - { - 'token': context.get_ctx().auth_token, - 'trust_id': context.get_ctx().trust_id - } - ) - LOG.info('Invoke function %s, url: %s', function_id, func_url) r = self.session.post(func_url, json=data) diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index 25c01b65..174def70 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -71,10 +71,13 @@ def handle_job(engine_client): for job in db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3)): LOG.debug("Processing job: %s, function: %s", job.id, job.function_id) + func_db = db_api.get_function(job.function_id) + trust_id = func_db.trust_id + try: # Setup context before schedule job. ctx = keystone_utils.create_trust_context( - job.trust_id, job.project_id + trust_id, job.project_id ) context.set_ctx(ctx) diff --git a/qinling/utils/jobs.py b/qinling/utils/jobs.py index 54ff2775..7594bb59 100644 --- a/qinling/utils/jobs.py +++ b/qinling/utils/jobs.py @@ -18,9 +18,7 @@ from dateutil import parser from oslo_utils import timeutils import six -from qinling.db import api as db_api from qinling import exceptions as exc -from qinling.utils.openstack import keystone as keystone_utils def validate_next_time(next_execution_time): @@ -82,18 +80,6 @@ def validate_job(params): return first_time, next_time, count -def delete_job(id, trust_id=None): - if not trust_id: - trust_id = db_api.get_job(id).trust_id - - modified_count = db_api.delete_job(id) - if modified_count: - # Delete trust only together with deleting trigger. - keystone_utils.delete_trust(trust_id) - - return 0 != modified_count - - def get_next_execution_time(pattern, start_time): return croniter.croniter(pattern, start_time).get_next( datetime.datetime diff --git a/qinling/utils/openstack/keystone.py b/qinling/utils/openstack/keystone.py index 3a264723..f609746f 100644 --- a/qinling/utils/openstack/keystone.py +++ b/qinling/utils/openstack/keystone.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from keystoneauth1.identity import generic +from keystoneauth1.identity import v3 from keystoneauth1 import session from keystoneclient.v3 import client as ks_client from oslo_config import cfg @@ -29,7 +29,7 @@ CONF = cfg.CONF def _get_user_keystone_session(): ctx = context.get_ctx() - auth = generic.Token( + auth = v3.Token( auth_url=CONF.keystone_authtoken.auth_uri, token=ctx.auth_token, ) @@ -47,39 +47,35 @@ def get_swiftclient(): @common.disable_ssl_warnings -def get_keystone_client(use_session=True): - if use_session: - session = _get_user_keystone_session() - keystone = ks_client.Client(session=session) - else: - ctx = context.get_ctx() - auth_url = CONF.keystone_authtoken.auth_uri - keystone = ks_client.Client( - user_id=ctx.user, - token=ctx.auth_token, - tenant_id=ctx.projectid, - auth_url=auth_url - ) - keystone.management_url = auth_url +def get_user_client(): + ctx = context.get_ctx() + auth_url = CONF.keystone_authtoken.auth_uri + client = ks_client.Client( + user_id=ctx.user, + token=ctx.auth_token, + tenant_id=ctx.projectid, + auth_url=auth_url + ) + client.management_url = auth_url - return keystone + return client @common.disable_ssl_warnings -def _get_admin_user_id(): - auth_url = CONF.keystone_authtoken.auth_uri +def get_service_client(): client = ks_client.Client( username=CONF.keystone_authtoken.username, password=CONF.keystone_authtoken.password, project_name=CONF.keystone_authtoken.project_name, - auth_url=auth_url, + auth_url=CONF.keystone_authtoken.auth_uri, + user_domain_name=CONF.keystone_authtoken.user_domain_name, + project_domain_name=CONF.keystone_authtoken.project_domain_name ) - - return client.user_id + return client @common.disable_ssl_warnings -def _get_trust_client(trust_id): +def get_trust_client(trust_id): """Get project keystone client using admin credential.""" client = ks_client.Client( username=CONF.keystone_authtoken.username, @@ -87,18 +83,17 @@ def _get_trust_client(trust_id): auth_url=CONF.keystone_authtoken.auth_uri, trust_id=trust_id ) - client.management_url = CONF.keystone_authtoken.auth_uri return client @common.disable_ssl_warnings def create_trust(): - client = get_keystone_client() ctx = context.get_ctx() - trustee_id = _get_admin_user_id() + user_client = get_user_client() + trustee_id = get_service_client().user_id - return client.trusts.create( + return user_client.trusts.create( trustor_user=ctx.user, trustee_user=trustee_id, impersonation=True, @@ -117,7 +112,7 @@ def delete_trust(trust_id): return try: - client = get_keystone_client() + client = get_user_client() client.trusts.delete(trust_id) LOG.debug('Trust %s deleted.', trust_id) except Exception: @@ -127,7 +122,7 @@ def delete_trust(trust_id): def create_trust_context(trust_id, project_id): """Creates Qinling context on behalf of the project.""" if CONF.pecan.auth_enable: - client = _get_trust_client(trust_id) + client = get_trust_client(trust_id) return context.Context( user=client.user_id, diff --git a/qinling_tempest_plugin/tests/api/test_executions.py b/qinling_tempest_plugin/tests/api/test_executions.py index 7fc72615..a78a6774 100644 --- a/qinling_tempest_plugin/tests/api/test_executions.py +++ b/qinling_tempest_plugin/tests/api/test_executions.py @@ -98,7 +98,7 @@ class ExecutionsTest(base.BaseQinlingTest): self.function_id, ignore_notfound=True) @decorators.idempotent_id('2a93fab0-2dae-4748-b0d4-f06b735ff451') - def test_create_list_get_delete_execution(self): + def test_crud_execution(self): resp, body = self.client.create_execution(self.function_id, input={'name': 'Qinling'}) diff --git a/qinling_tempest_plugin/tests/api/test_functions.py b/qinling_tempest_plugin/tests/api/test_functions.py index 3d6f0c53..f66ac3e9 100644 --- a/qinling_tempest_plugin/tests/api/test_functions.py +++ b/qinling_tempest_plugin/tests/api/test_functions.py @@ -79,7 +79,7 @@ class FunctionsTest(base.BaseQinlingTest): zf.close() @decorators.idempotent_id('9c36ac64-9a44-4c44-9e44-241dcc6b0933') - def test_create_list_get_delete_function(self): + def test_crud_function(self): # Create function function_name = data_utils.rand_name('function', prefix=self.name_prefix) diff --git a/qinling_tempest_plugin/tests/api/test_runtimes.py b/qinling_tempest_plugin/tests/api/test_runtimes.py index 1432d306..49bafb43 100644 --- a/qinling_tempest_plugin/tests/api/test_runtimes.py +++ b/qinling_tempest_plugin/tests/api/test_runtimes.py @@ -21,7 +21,7 @@ class RuntimesTest(base.BaseQinlingTest): name_prefix = 'RuntimesTest' @decorators.idempotent_id('fdc2f07f-dd1d-4981-86d3-5bc7908d9a9b') - def test_create_list_get_delete_runtime(self): + def test_crud_runtime(self): name = data_utils.rand_name('runtime', prefix=self.name_prefix) resp, body = self.admin_client.create_runtime( diff --git a/qinling_tempest_plugin/tests/scenario/test_basic_ops.py b/qinling_tempest_plugin/tests/scenario/test_basic_ops.py index 5c1436f1..2adbb5ea 100644 --- a/qinling_tempest_plugin/tests/scenario/test_basic_ops.py +++ b/qinling_tempest_plugin/tests/scenario/test_basic_ops.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +import pkg_resources import tempfile import zipfile @@ -27,13 +28,9 @@ class BasicOpsTest(base.BaseQinlingTest): def setUp(self): super(BasicOpsTest, self).setUp() - python_file_path = os.path.abspath( - os.path.join( - os.path.dirname(__file__), - os.pardir, - os.pardir, - 'functions/python_test.py' - ) + python_file_path = pkg_resources.resource_filename( + 'qinling_tempest_plugin', + "functions/python_test.py" ) base_name, extention = os.path.splitext(python_file_path) diff --git a/runtimes/python2/server.py b/runtimes/python2/server.py index 42027900..4e842eb6 100644 --- a/runtimes/python2/server.py +++ b/runtimes/python2/server.py @@ -38,6 +38,7 @@ function_method = 'main' auth_url = None username = None password = None +trust_id = None @app.route('/download', methods=['POST']) @@ -51,10 +52,12 @@ def download(): global auth_url global username global password + global trust_id token = params.get('token') auth_url = params.get('auth_url') username = params.get('username') password = params.get('password') + trust_id = params.get('trust_id') headers = {} if token: @@ -117,26 +120,28 @@ def execute(): global auth_url global username global password + global trust_id params = request.get_json() or {} input = params.get('input') or {} execution_id = params['execution_id'] - token = params.get('token') - trust_id = params.get('trust_id') + + app.logger.info( + 'Request received, execution_id:%s, input: %s, auth_url: %s, ' + 'username: %s, trust_id: %s' % + (execution_id, input, auth_url, username, trust_id) + ) # Provide an openstack session to user's function os_session = None if auth_url: - if not trust_id: - auth = generic.Token(auth_url=auth_url, token=token) - else: - auth = generic.Password( - username=username, - password=password, - auth_url=auth_url, - trust_id=trust_id, - user_domain_name='Default' - ) + auth = generic.Password( + username=username, + password=password, + auth_url=auth_url, + trust_id=trust_id, + user_domain_name='Default' + ) os_session = session.Session(auth=auth, verify=False) input.update({'context': {'os_session': os_session}})