diff --git a/qinling/api/app.py b/qinling/api/app.py index f5969f99..aa1a1fa4 100644 --- a/qinling/api/app.py +++ b/qinling/api/app.py @@ -47,8 +47,9 @@ def setup_app(config=None): db_api.setup_db() - LOG.info('Starting periodic tasks...') - periodics.start_job_handler() + if cfg.CONF.api.enable_job_handler: + LOG.info('Starting periodic tasks...') + periodics.start_job_handler() app = pecan.make_app( app_conf.pop('root'), diff --git a/qinling/api/controllers/v1/execution.py b/qinling/api/controllers/v1/execution.py index 310ba094..7cf91184 100644 --- a/qinling/api/controllers/v1/execution.py +++ b/qinling/api/controllers/v1/execution.py @@ -75,7 +75,7 @@ class ExecutionsController(rest.RestController): @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.Execution, types.uuid) def get(self, id): - LOG.info("Fetch resource.", resource={'type': self.type, 'id': id}) + LOG.info("Get resource.", resource={'type': self.type, 'id': id}) execution_db = db_api.get_execution(id) diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 9b5302b1..0170c024 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -69,7 +69,7 @@ class FunctionsController(rest.RestController): @rest_utils.wrap_pecan_controller_exception @pecan.expose() def get(self, id): - LOG.info("Fetch resource.", resource={'type': self.type, 'id': id}) + LOG.info("Get resource.", resource={'type': self.type, 'id': id}) download = strutils.bool_from_string( pecan.request.GET.get('download', False) diff --git a/qinling/api/controllers/v1/job.py b/qinling/api/controllers/v1/job.py index 3cdec0a5..1d1695b3 100644 --- a/qinling/api/controllers/v1/job.py +++ b/qinling/api/controllers/v1/job.py @@ -84,7 +84,7 @@ class JobsController(rest.RestController): @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.Job, types.uuid) def get(self, id): - LOG.info("Fetch resource.", resource={'type': self.type, 'id': id}) + LOG.info("Get resource.", resource={'type': self.type, 'id': id}) job_db = db_api.get_job(id) return resources.Job.from_dict(job_db.to_dict()) diff --git a/qinling/api/controllers/v1/runtime.py b/qinling/api/controllers/v1/runtime.py index fc963104..e4c47e55 100644 --- a/qinling/api/controllers/v1/runtime.py +++ b/qinling/api/controllers/v1/runtime.py @@ -42,7 +42,7 @@ class RuntimesController(rest.RestController): @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.Runtime, types.uuid) def get(self, id): - LOG.info("Fetch resource.", resource={'type': self.type, 'id': id}) + LOG.info("Get resource.", resource={'type': self.type, 'id': id}) runtime_db = db_api.get_runtime(id) diff --git a/qinling/config.py b/qinling/config.py index 3731a710..816bb4e6 100644 --- a/qinling/config.py +++ b/qinling/config.py @@ -41,7 +41,12 @@ api_opts = [ help='Number of workers for Qinling API service ' 'default is equal to the number of CPUs available if that can ' 'be determined, else a default worker count of 1 is returned.' - ) + ), + cfg.BoolOpt( + 'enable_job_handler', + default=True, + help='Enable job handler.' + ), ] PECAN_GROUP = 'pecan' diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index ee69dd7d..49dc0755 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -19,6 +19,7 @@ import threading from oslo_config import cfg from oslo_db import exception as oslo_db_exc from oslo_db.sqlalchemy import utils as db_utils +from oslo_log import log as logging import sqlalchemy as sa from qinling import context @@ -30,6 +31,7 @@ from qinling import exceptions as exc from qinling import status CONF = cfg.CONF +LOG = logging.getLogger(__name__) _SCHEMA_LOCK = threading.RLock() _initialized = False @@ -366,14 +368,11 @@ def create_function_service_mapping(values, session=None): mapping = models.FunctionServiceMapping() mapping.update(values.copy()) + # Ignore duplicate error for FunctionServiceMapping try: mapping.save(session=session) - except oslo_db_exc.DBDuplicateEntry as e: - raise exc.DBError( - "Duplicate entry for FunctionServiceMapping: %s" % e.columns - ) - - return mapping + except oslo_db_exc.DBDuplicateEntry: + session.close() @db_base.session_aware() @@ -412,14 +411,11 @@ def create_function_worker(values, session=None): mapping = models.FunctionWorkers() mapping.update(values.copy()) + # Ignore duplicate error for FunctionWorkers try: mapping.save(session=session) - except oslo_db_exc.DBDuplicateEntry as e: - raise exc.DBError( - "Duplicate entry for FunctionWorkers: %s" % e.columns - ) - - return mapping + except oslo_db_exc.DBDuplicateEntry: + session.close() @db_base.session_aware() diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index 5e5d3656..4ab02d7c 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -102,6 +102,7 @@ def upgrade(): sa.Column('function_id', sa.String(length=36), nullable=False), sa.Column('worker_name', sa.String(length=255), nullable=False), sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('function_id', 'worker_name'), sa.ForeignKeyConstraint( ['function_id'], [u'functions.id'], ondelete='CASCADE' ), diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index de194071..54bb686f 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -38,7 +38,10 @@ class Function(model_base.QinlingSecureModelBase): runtime_id = sa.Column( sa.String(36), sa.ForeignKey(Runtime.id), nullable=True ) - runtime = relationship('Runtime', back_populates="functions") + # We want to get runtime info when we query function + runtime = relationship( + 'Runtime', back_populates="functions", innerjoin=True, lazy='joined' + ) memory_size = sa.Column(sa.Integer) timeout = sa.Column(sa.Integer) code = sa.Column(st.JsonLongDictType(), nullable=False) @@ -65,6 +68,10 @@ class FunctionServiceMapping(model_base.QinlingModelBase): class FunctionWorkers(model_base.QinlingModelBase): __tablename__ = 'function_workers' + __table_args__ = ( + sa.UniqueConstraint('function_id', 'worker_name'), + ) + id = model_base.id_column() function_id = sa.Column( sa.String(36), diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 4944e27b..bb22c8a5 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -16,6 +16,7 @@ from oslo_config import cfg from oslo_log import log as logging import requests +from qinling import context from qinling.db import api as db_api from qinling.engine import utils from qinling import status @@ -99,12 +100,35 @@ class DefaultEngine(object): if function.service: func_url = '%s/execute' % function.service.service_url + LOG.debug( 'Found service url for function: %s, url: %s', function_id, func_url ) - data = {'input': input, 'execution_id': execution_id} + download_url = ( + 'http://%s:%s/v1/functions/%s?download=true' % + (CONF.kubernetes.qinling_service_address, + CONF.api.port, function_id) + ) + data = { + 'execution_id': execution_id, + 'input': input, + 'function_id': function_id, + 'entry': function.entry, + 'download_url': download_url, + } + if CONF.pecan.auth_enable: + data.update( + { + 'token': context.get_ctx().auth_token, + 'auth_url': CONF.keystone_authtoken.auth_uri, + 'username': CONF.keystone_authtoken.username, + 'password': CONF.keystone_authtoken.password, + 'trust_id': function.trust_id + } + ) + success, res = utils.url_request( self.session, func_url, body=data ) @@ -141,8 +165,6 @@ class DefaultEngine(object): identifier=identifier, labels=labels, input=input, - entry=function.entry, - trust_id=function.trust_id ) success, res = self.orchestrator.run_execution( execution_id, @@ -150,6 +172,8 @@ class DefaultEngine(object): input=input, identifier=identifier, service_url=service_url, + entry=function.entry, + trust_id=function.trust_id ) logs = '' @@ -171,18 +195,18 @@ class DefaultEngine(object): execution.logs = logs execution.status = status.SUCCESS if success else status.FAILED - # No service is created in orchestrator for single container. - if not image: - mapping = { - 'function_id': function_id, - 'service_url': service_url, - } - db_api.create_function_service_mapping(mapping) - worker = { - 'function_id': function_id, - 'worker_name': worker_name - } - db_api.create_function_worker(worker) + # No service is created in orchestrator for single container. + if not image: + mapping = { + 'function_id': function_id, + 'service_url': service_url, + } + db_api.create_function_service_mapping(mapping) + worker = { + 'function_id': function_id, + 'worker_name': worker_name + } + db_api.create_function_worker(worker) def delete_function(self, ctx, function_id): """Deletes underlying resources allocated for function.""" @@ -195,12 +219,9 @@ class DefaultEngine(object): LOG.info('Deleted.', resource=resource) def scaleup_function(self, ctx, function_id, runtime_id, count=1): - function = db_api.get_function(function_id) - worker_names = self.orchestrator.scaleup_function( function_id, identifier=runtime_id, - entry=function.entry, count=count ) diff --git a/qinling/engine/utils.py b/qinling/engine/utils.py index 138cb893..169b6450 100644 --- a/qinling/engine/utils.py +++ b/qinling/engine/utils.py @@ -11,18 +11,33 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import time from oslo_log import log as logging +import requests +import six LOG = logging.getLogger(__name__) def url_request(request_session, url, body=None): """Send request to a service url.""" - try: - # Default execution duration is 3min, could be configurable - r = request_session.post(url, json=body, timeout=(3, 180)) - return True, r.json() - except Exception: - LOG.exception("Failed to request url %s", url) - return False, {'error': 'Function execution timeout.'} + exception = None + + for a in six.moves.xrange(10): + try: + # Default execution duration is 3min, could be configurable + r = request_session.post(url, json=body, timeout=(3, 180)) + return True, r.json() + except requests.ConnectionError as e: + exception = e + LOG.warning("Could not connect to service. Retrying.") + time.sleep(1) + except Exception: + LOG.exception("Failed to request url %s", url) + return False, {'error': 'Function execution timeout.'} + + LOG.exception("Could not connect to function service. Reason: %s", + exception) + + return False, {'error': 'Internal service error.'} diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 4a0b010b..ded947f9 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -20,7 +20,6 @@ import jinja2 from kubernetes import client from oslo_log import log as logging import requests -import six import tenacity import yaml @@ -234,19 +233,18 @@ class KubernetesManager(base.OrchestratorBase): return ret.items[-count:] - def _prepare_pod(self, pod, deployment_name, function_id, labels=None, - entry=None, trust_id=None, actual_function=None): + def _prepare_pod(self, pod, deployment_name, function_id, labels=None): """Pod preparation. 1. Update pod labels. - 2. Expose service and trigger package download. + 2. Expose service. """ - name = pod.metadata.name - actual_function = actual_function or function_id + pod_name = pod.metadata.name + labels = labels or {} LOG.info( 'Prepare pod %s in deployment %s for function %s', - name, deployment_name, function_id + pod_name, deployment_name, function_id ) # Update pod label. @@ -255,6 +253,8 @@ class KubernetesManager(base.OrchestratorBase): # Create service for the chosen pod. service_name = "service-%s" % function_id labels.update({'function_id': function_id}) + + # TODO(kong): Make the service type configurable. service_body = self.service_template.render( { "service_name": service_name, @@ -262,19 +262,32 @@ class KubernetesManager(base.OrchestratorBase): "selector": pod_labels } ) - ret = self.v1.create_namespaced_service( - self.conf.kubernetes.namespace, yaml.safe_load(service_body) - ) - node_port = ret.spec.ports[0].node_port - - LOG.debug( - 'Service created for pod %s, service name: %s, node port: %s', - name, service_name, node_port - ) + try: + ret = self.v1.create_namespaced_service( + self.conf.kubernetes.namespace, yaml.safe_load(service_body) + ) + LOG.debug( + 'Service created for pod %s, service name: %s', + pod_name, service_name + ) + except Exception as e: + # Service already exists + if e.status == 409: + LOG.debug( + 'Service already exists for pod %s, service name: %s', + pod_name, service_name + ) + time.sleep(1) + ret = self.v1.read_namespaced_service( + service_name, self.conf.kubernetes.namespace + ) + else: + raise # Get external ip address for an arbitrary node. - ret = self.v1.list_node() - addresses = ret.items[0].status.addresses + node_port = ret.spec.ports[0].node_port + nodes = self.v1.list_node() + addresses = nodes.items[0].status.addresses node_ip = None for addr in addresses: if addr.type == 'ExternalIP': @@ -286,53 +299,9 @@ class KubernetesManager(base.OrchestratorBase): if addr.type == 'InternalIP': node_ip = addr.address - # Download code package into container. pod_service_url = 'http://%s:%s' % (node_ip, node_port) - request_url = '%s/download' % pod_service_url - download_url = ( - 'http://%s:%s/v1/functions/%s?download=true' % - (self.conf.kubernetes.qinling_service_address, - self.conf.api.port, actual_function) - ) - data = { - 'download_url': download_url, - 'function_id': actual_function, - 'entry': entry, - 'trust_id': trust_id - } - if self.conf.pecan.auth_enable: - data.update( - { - 'token': context.get_ctx().auth_token, - 'auth_url': self.conf.keystone_authtoken.auth_uri, - 'username': self.conf.keystone_authtoken.username, - 'password': self.conf.keystone_authtoken.password, - } - ) - - LOG.info( - 'Send request to pod %s, request_url: %s', name, request_url - ) - - exception = None - for a in six.moves.xrange(10): - try: - r = self.session.post(request_url, json=data) - if r.status_code != requests.codes.ok: - raise exc.OrchestratorException( - 'Failed to download function code package.' - ) - - return name, pod_service_url - except (requests.ConnectionError, requests.Timeout) as e: - exception = e - LOG.warning("Could not connect to service. Retrying.") - time.sleep(1) - - raise exc.OrchestratorException( - 'Could not connect to function service. Reason: %s', exception - ) + return pod_name, pod_service_url def _create_pod(self, image, pod_name, labels, input): pod_body = self.pod_template.render( @@ -372,8 +341,7 @@ class KubernetesManager(base.OrchestratorBase): return pod_labels def prepare_execution(self, function_id, image=None, identifier=None, - labels=None, input=None, entry='main.main', - trust_id=None): + labels=None, input=None): """Prepare service URL for function. For image function, create a single pod with input, so the function @@ -395,7 +363,7 @@ class KubernetesManager(base.OrchestratorBase): try: pod_name, url = self._prepare_pod( - pod[0], identifier, function_id, labels, entry, trust_id + pod[0], identifier, function_id, labels ) return pod_name, url except Exception: @@ -404,14 +372,38 @@ class KubernetesManager(base.OrchestratorBase): raise exc.OrchestratorException('Execution preparation failed.') def run_execution(self, execution_id, function_id, input=None, - identifier=None, service_url=None): + identifier=None, service_url=None, entry='main.main', + trust_id=None): if service_url: func_url = '%s/execute' % service_url + download_url = ( + 'http://%s:%s/v1/functions/%s?download=true' % + (self.conf.kubernetes.qinling_service_address, + self.conf.api.port, function_id) + ) + data = { - 'input': input, 'execution_id': execution_id, + 'input': input, + 'function_id': function_id, + 'entry': entry, + 'download_url': download_url, + 'trust_id': trust_id } - LOG.debug('Invoke function %s, url: %s', function_id, func_url) + if self.conf.pecan.auth_enable: + data.update( + { + 'token': context.get_ctx().auth_token, + 'auth_url': self.conf.keystone_authtoken.auth_uri, + 'username': self.conf.keystone_authtoken.username, + 'password': self.conf.keystone_authtoken.password, + } + ) + + LOG.debug( + 'Invoke function %s, url: %s, data: %s', + function_id, func_url, data + ) return utils.url_request(self.session, func_url, body=data) else: @@ -425,7 +417,6 @@ class KubernetesManager(base.OrchestratorBase): self.conf.kubernetes.namespace ) status = pod.status.phase - time.sleep(0.5) output = self.v1.read_namespaced_pod_log( @@ -453,47 +444,26 @@ class KubernetesManager(base.OrchestratorBase): label_selector=selector ) - def scaleup_function(self, function_id, identifier=None, - entry='main.main', count=1): + def scaleup_function(self, function_id, identifier=None, count=1): pod_names = [] labels = {'runtime_id': identifier} - pods = self._choose_available_pod( - labels, count=count - ) + pods = self._choose_available_pod(labels, count=count) if not pods: raise exc.OrchestratorException('Not enough workers available.') - temp_function = '%s-temp' % function_id for pod in pods: - self._prepare_pod(pod, identifier, temp_function, labels, entry, - actual_function=function_id) - - # Delete temporary service - selector = common.convert_dict_to_string( - {'function_id': temp_function} + pod_name, _ = self._prepare_pod( + pod, identifier, function_id, labels ) - ret = self.v1.list_namespaced_service( - self.conf.kubernetes.namespace, label_selector=selector - ) - svc_names = [i.metadata.name for i in ret.items] - for svc_name in svc_names: - self.v1.delete_namespaced_service( - svc_name, - self.conf.kubernetes.namespace, - ) - - # Modify pod labels to fit into correct service - self._update_pod_label(pod, {'function_id': function_id}) - - pod_names.append(pod.metadata.name) + pod_names.append(pod_name) LOG.info('Pods scaled up for function %s: %s', function_id, pod_names) return pod_names - def delete_worker(self, worker_name, **kwargs): + def delete_worker(self, pod_name, **kwargs): self.v1.delete_namespaced_pod( - worker_name, + pod_name, self.conf.kubernetes.namespace, {} ) diff --git a/qinling/orchestrator/kubernetes/templates/deployment.j2 b/qinling/orchestrator/kubernetes/templates/deployment.j2 index e40f2064..2b92c8dc 100644 --- a/qinling/orchestrator/kubernetes/templates/deployment.j2 +++ b/qinling/orchestrator/kubernetes/templates/deployment.j2 @@ -28,8 +28,8 @@ spec: - containerPort: 9090 resources: limits: - cpu: "0.5" - memory: 512Mi + cpu: "0.3" + memory: 256Mi requests: cpu: "0.1" - memory: 128Mi + memory: 64Mi diff --git a/qinling/orchestrator/kubernetes/templates/pod.j2 b/qinling/orchestrator/kubernetes/templates/pod.j2 index 4391c56c..182501b7 100644 --- a/qinling/orchestrator/kubernetes/templates/pod.j2 +++ b/qinling/orchestrator/kubernetes/templates/pod.j2 @@ -19,9 +19,9 @@ spec: {% endif %} restartPolicy: Never resources: - limits: - cpu: "0.5" - memory: 512Mi - requests: - cpu: "0.1" - memory: 128Mi + limits: + cpu: "0.3" + memory: 256Mi + requests: + cpu: "0.1" + memory: 64Mi diff --git a/qinling/utils/executions.py b/qinling/utils/executions.py index 2693c230..21989f83 100644 --- a/qinling/utils/executions.py +++ b/qinling/utils/executions.py @@ -12,32 +12,61 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo_log import log as logging + from qinling.db import api as db_api +from qinling.db.sqlalchemy import models from qinling import exceptions as exc from qinling import status +LOG = logging.getLogger(__name__) + + +def _update_function_db(function_id): + # NOTE(kong): Store function info in cache? + func_db = db_api.get_function(function_id) + runtime_db = func_db.runtime + if runtime_db and runtime_db.status != status.AVAILABLE: + raise exc.RuntimeNotAvailableException( + 'Runtime %s is not available.' % func_db.runtime_id + ) + + # Function update is done using UPDATE ... FROM ... WHERE + # non-locking clause. + while func_db: + count = func_db.count + modified = db_api.conditional_update( + models.Function, + { + 'count': count + 1, + }, + { + 'id': function_id, + 'count': count + }, + insecure=True, + ) + if not modified: + LOG.warning("Retrying to update function count.") + func_db = db_api.get_function(function_id) + continue + else: + break + + return func_db.runtime_id + def create_execution(engine_client, params): function_id = params['function_id'] is_sync = params.get('sync', True) - with db_api.transaction(): - func_db = db_api.get_function(function_id) - runtime_db = func_db.runtime - if runtime_db and runtime_db.status != status.AVAILABLE: - raise exc.RuntimeNotAvailableException( - 'Runtime %s is not available.' % func_db.runtime_id - ) + runtime_id = _update_function_db(function_id) - # Increase function invoke count, the updated_at field will be also - # updated. - func_db.count = func_db.count + 1 - - params.update({'status': status.RUNNING}) - db_model = db_api.create_execution(params) + params.update({'status': status.RUNNING}) + db_model = db_api.create_execution(params) engine_client.create_execution( - db_model.id, function_id, func_db.runtime_id, + db_model.id, function_id, runtime_id, input=params.get('input'), is_sync=is_sync ) diff --git a/requirements.txt b/requirements.txt index 5d97470a..ce724245 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,3 +29,4 @@ python-swiftclient>=3.2.0 # Apache-2.0 croniter>=0.3.4 # MIT License python-dateutil>=2.4.2 # BSD tenacity>=3.2.1 # Apache-2.0 +PyMySQL>=0.7.6 # MIT License diff --git a/runtimes/python2/server.py b/runtimes/python2/server.py index 9ebbaae5..b8d0ad81 100644 --- a/runtimes/python2/server.py +++ b/runtimes/python2/server.py @@ -23,7 +23,6 @@ import time import traceback import zipfile -from flask import abort from flask import Flask from flask import request from flask import Response @@ -32,60 +31,20 @@ from keystoneauth1 import session import requests app = Flask(__name__) -zip_file = '' -function_module = 'main' -function_method = 'main' -auth_url = None -username = None -password = None -trust_id = None +downloaded = False +downloading = False -@app.route('/download', methods=['POST']) -def download(): - params = request.get_json() or {} - - download_url = params.get('download_url') - function_id = params.get('function_id') - entry = params.get('entry') - - global auth_url - global username - global password - global trust_id - token = params.get('token') - auth_url = params.get('auth_url') - username = params.get('username') - password = params.get('password') - trust_id = params.get('trust_id') - - headers = {} - if token: - headers = {'X-Auth-Token': token} - - global zip_file - zip_file = '%s.zip' % function_id - - app.logger.info( - 'Request received, download_url:%s, headers: %s, entry: %s' % - (download_url, headers, entry) +def setup_logger(loglevel): + global app + root = logging.getLogger() + root.setLevel(loglevel) + ch = logging.StreamHandler(sys.stdout) + ch.setLevel(loglevel) + ch.setFormatter( + logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') ) - - # Get function code package from Qinling service. - r = requests.get(download_url, headers=headers, stream=True) - with open(zip_file, 'wb') as fd: - for chunk in r.iter_content(chunk_size=65535): - fd.write(chunk) - - if not zipfile.is_zipfile(zip_file): - abort(500) - app.logger.info('Code package downloaded to %s' % zip_file) - - global function_module - global function_method - function_module, function_method = tuple(entry.rsplit('.', 1)) - - return 'success' + app.logger.addHandler(ch) def _print_trace(): @@ -108,13 +67,13 @@ def _invoke_function(execution_id, zip_file, module_name, method, input, return_dict['result'] = func(**input) return_dict['success'] = True except Exception as e: + _print_trace() + if isinstance(e, OSError) and 'Resource' in str(e): sys.exit(1) return_dict['result'] = str(e) return_dict['success'] = False - - _print_trace() finally: print('Finished execution: %s' % execution_id) @@ -130,25 +89,73 @@ def execute(): reason, e.g. unlimited memory allocation) - Deal with os error for process (e.g. Resource temporarily unavailable) """ - - global zip_file - global function_module - global function_method - global auth_url - global username - global password - global trust_id + global downloading + global downloaded params = request.get_json() or {} input = params.get('input') or {} execution_id = params['execution_id'] + download_url = params.get('download_url') + function_id = params.get('function_id') + entry = params.get('entry') + trust_id = params.get('trust_id') + auth_url = params.get('auth_url') + username = params.get('username') + password = params.get('password') + zip_file = '%s.zip' % function_id + + function_module, function_method = 'main', 'main' + if entry: + function_module, function_method = tuple(entry.rsplit('.', 1)) app.logger.info( - 'Request received, execution_id:%s, input: %s, auth_url: %s, ' - 'username: %s, trust_id: %s' % - (execution_id, input, auth_url, username, trust_id) + 'Request received, execution_id:%s, input: %s, auth_url: %s' % + (execution_id, input, auth_url) ) + while downloading: + # wait + time.sleep(3) + + # download function package + if not downloading and not downloaded: + downloading = True + token = params.get('token') + + headers = {} + if token: + headers = {'X-Auth-Token': token} + + app.logger.info( + 'Downloading function, download_url:%s, entry: %s' % + (download_url, entry) + ) + + # Get function code package from Qinling service. + r = requests.get(download_url, headers=headers, stream=True) + with open(zip_file, 'wb') as fd: + for chunk in r.iter_content(chunk_size=65535): + fd.write(chunk) + + app.logger.info('Downloaded function package to %s' % zip_file) + downloading = False + downloaded = True + + if downloaded: + if not zipfile.is_zipfile(zip_file): + return Response( + response=json.dumps( + { + 'output': 'The function package is incorrect.', + 'duration': 0, + 'logs': '', + 'success': False + } + ), + status=500, + mimetype='application/json' + ) + # Provide an openstack session to user's function os_session = None if auth_url: @@ -160,7 +167,6 @@ def execute(): user_domain_name='Default' ) os_session = session.Session(auth=auth, verify=False) - input.update({'context': {'os_session': os_session}}) manager = Manager() @@ -207,18 +213,6 @@ def execute(): ) -def setup_logger(loglevel): - global app - root = logging.getLogger() - root.setLevel(loglevel) - ch = logging.StreamHandler(sys.stdout) - ch.setLevel(loglevel) - ch.setFormatter( - logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') - ) - app.logger.addHandler(ch) - - setup_logger(logging.DEBUG) app.logger.info("Starting server")