Fully support sync/async execution
Change-Id: I008025f710f8d61eb6f8a4998fe6990abdbc7203
This commit is contained in:
parent
49c4c10adb
commit
d23551c465
@ -40,11 +40,9 @@ class ExecutionsController(rest.RestController):
|
|||||||
)
|
)
|
||||||
def post(self, execution):
|
def post(self, execution):
|
||||||
params = execution.to_dict()
|
params = execution.to_dict()
|
||||||
LOG.info("Creating execution. [execution=%s]", params)
|
LOG.info("Creating execution. [params=%s]", params)
|
||||||
|
|
||||||
db_model = executions.create_execution(self.engine_client, params)
|
db_model = executions.create_execution(self.engine_client, params)
|
||||||
if params.get('sync', True):
|
|
||||||
db_model = db_api.get_execution(db_model.id)
|
|
||||||
|
|
||||||
return resources.Execution.from_dict(db_model.to_dict())
|
return resources.Execution.from_dict(db_model.to_dict())
|
||||||
|
|
||||||
|
@ -54,8 +54,11 @@ def transaction():
|
|||||||
yield
|
yield
|
||||||
|
|
||||||
|
|
||||||
# A helper function for test.
|
|
||||||
def delete_all():
|
def delete_all():
|
||||||
|
"""A helper function for testing."""
|
||||||
|
delete_jobs(insecure=True)
|
||||||
|
delete_executions(insecure=True)
|
||||||
|
delete_functions(insecure=True)
|
||||||
delete_runtimes(insecure=True)
|
delete_runtimes(insecure=True)
|
||||||
|
|
||||||
|
|
||||||
@ -88,7 +91,11 @@ def update_function(id, values):
|
|||||||
|
|
||||||
|
|
||||||
def delete_function(id):
|
def delete_function(id):
|
||||||
IMPL.delete_function(id)
|
return IMPL.delete_function(id)
|
||||||
|
|
||||||
|
|
||||||
|
def delete_functions(**kwargs):
|
||||||
|
return IMPL.delete_functions(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
def create_runtime(values):
|
def create_runtime(values):
|
||||||
@ -135,6 +142,10 @@ def update_execution(id, values):
|
|||||||
return IMPL.update_execution(id, values)
|
return IMPL.update_execution(id, values)
|
||||||
|
|
||||||
|
|
||||||
|
def delete_executions(**kwargs):
|
||||||
|
return IMPL.delete_executions(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
def create_function_service_mapping(values):
|
def create_function_service_mapping(values):
|
||||||
return IMPL.create_function_service_mapping(values)
|
return IMPL.create_function_service_mapping(values)
|
||||||
|
|
||||||
@ -169,3 +180,7 @@ def delete_job(id):
|
|||||||
|
|
||||||
def get_jobs():
|
def get_jobs():
|
||||||
return IMPL.get_jobs()
|
return IMPL.get_jobs()
|
||||||
|
|
||||||
|
|
||||||
|
def delete_jobs(**kwargs):
|
||||||
|
return IMPL.delete_jobs(**kwargs)
|
||||||
|
@ -252,6 +252,11 @@ def delete_function(id, session=None):
|
|||||||
session.delete(function)
|
session.delete(function)
|
||||||
|
|
||||||
|
|
||||||
|
@db_base.session_aware()
|
||||||
|
def delete_functions(session=None, insecure=False, **kwargs):
|
||||||
|
return _delete_all(models.Function, insecure=insecure, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
@db_base.session_aware()
|
@db_base.session_aware()
|
||||||
def create_runtime(values, session=None):
|
def create_runtime(values, session=None):
|
||||||
runtime = models.Runtime()
|
runtime = models.Runtime()
|
||||||
@ -339,6 +344,11 @@ def delete_execution(id, session=None):
|
|||||||
session.delete(execution)
|
session.delete(execution)
|
||||||
|
|
||||||
|
|
||||||
|
@db_base.session_aware()
|
||||||
|
def delete_executions(session=None, insecure=False, **kwargs):
|
||||||
|
return _delete_all(models.Execution, insecure=insecure, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
@db_base.session_aware()
|
@db_base.session_aware()
|
||||||
def create_function_service_mapping(values, session=None):
|
def create_function_service_mapping(values, session=None):
|
||||||
mapping = models.FunctionServiceMapping()
|
mapping = models.FunctionServiceMapping()
|
||||||
@ -432,3 +442,8 @@ def get_next_jobs(before, session=None):
|
|||||||
@db_base.session_aware()
|
@db_base.session_aware()
|
||||||
def get_jobs(session=None, **kwargs):
|
def get_jobs(session=None, **kwargs):
|
||||||
return _get_collection_sorted_by_time(models.Job, **kwargs)
|
return _get_collection_sorted_by_time(models.Job, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
@db_base.session_aware()
|
||||||
|
def delete_jobs(session=None, insecure=False, **kwargs):
|
||||||
|
return _delete_all(models.Job, insecure=insecure, **kwargs)
|
||||||
|
@ -60,6 +60,7 @@ def upgrade():
|
|||||||
sa.Column('count', sa.Integer, nullable=False),
|
sa.Column('count', sa.Integer, nullable=False),
|
||||||
sa.PrimaryKeyConstraint('id'),
|
sa.PrimaryKeyConstraint('id'),
|
||||||
sa.UniqueConstraint('name', 'project_id'),
|
sa.UniqueConstraint('name', 'project_id'),
|
||||||
|
sa.ForeignKeyConstraint(['runtime_id'], [u'runtime.id']),
|
||||||
info={"check_ifexists": True}
|
info={"check_ifexists": True}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -20,6 +20,19 @@ from qinling.db.sqlalchemy import types as st
|
|||||||
from qinling.utils import common
|
from qinling.utils import common
|
||||||
|
|
||||||
|
|
||||||
|
class Runtime(model_base.QinlingSecureModelBase):
|
||||||
|
__tablename__ = 'runtime'
|
||||||
|
|
||||||
|
__table_args__ = (
|
||||||
|
sa.UniqueConstraint('image', 'project_id'),
|
||||||
|
)
|
||||||
|
|
||||||
|
name = sa.Column(sa.String(255))
|
||||||
|
description = sa.Column(sa.String(255))
|
||||||
|
image = sa.Column(sa.String(255), nullable=False)
|
||||||
|
status = sa.Column(sa.String(32), nullable=False)
|
||||||
|
|
||||||
|
|
||||||
class Function(model_base.QinlingSecureModelBase):
|
class Function(model_base.QinlingSecureModelBase):
|
||||||
__tablename__ = 'function'
|
__tablename__ = 'function'
|
||||||
|
|
||||||
@ -29,7 +42,10 @@ class Function(model_base.QinlingSecureModelBase):
|
|||||||
|
|
||||||
name = sa.Column(sa.String(255), nullable=False)
|
name = sa.Column(sa.String(255), nullable=False)
|
||||||
description = sa.Column(sa.String(255))
|
description = sa.Column(sa.String(255))
|
||||||
runtime_id = sa.Column(sa.String(36), nullable=True)
|
runtime_id = sa.Column(
|
||||||
|
sa.String(36), sa.ForeignKey(Runtime.id), nullable=True
|
||||||
|
)
|
||||||
|
runtime = relationship('Runtime', back_populates="functions")
|
||||||
memory_size = sa.Column(sa.Integer)
|
memory_size = sa.Column(sa.Integer)
|
||||||
timeout = sa.Column(sa.Integer)
|
timeout = sa.Column(sa.Integer)
|
||||||
code = sa.Column(st.JsonLongDictType(), nullable=False)
|
code = sa.Column(st.JsonLongDictType(), nullable=False)
|
||||||
@ -49,20 +65,10 @@ class FunctionServiceMapping(model_base.QinlingModelBase):
|
|||||||
sa.String(36),
|
sa.String(36),
|
||||||
sa.ForeignKey(Function.id, ondelete='CASCADE'),
|
sa.ForeignKey(Function.id, ondelete='CASCADE'),
|
||||||
)
|
)
|
||||||
service_url = sa.Column(sa.String(255), nullable=False)
|
function = relationship(
|
||||||
|
'Function', uselist=False, back_populates="service"
|
||||||
|
|
||||||
class Runtime(model_base.QinlingSecureModelBase):
|
|
||||||
__tablename__ = 'runtime'
|
|
||||||
|
|
||||||
__table_args__ = (
|
|
||||||
sa.UniqueConstraint('image', 'project_id'),
|
|
||||||
)
|
)
|
||||||
|
service_url = sa.Column(sa.String(255), nullable=False)
|
||||||
name = sa.Column(sa.String(255))
|
|
||||||
description = sa.Column(sa.String(255))
|
|
||||||
image = sa.Column(sa.String(255), nullable=False)
|
|
||||||
status = sa.Column(sa.String(32), nullable=False)
|
|
||||||
|
|
||||||
|
|
||||||
class Execution(model_base.QinlingSecureModelBase):
|
class Execution(model_base.QinlingSecureModelBase):
|
||||||
@ -94,7 +100,7 @@ class Job(model_base.QinlingSecureModelBase):
|
|||||||
sa.String(36),
|
sa.String(36),
|
||||||
sa.ForeignKey(Function.id)
|
sa.ForeignKey(Function.id)
|
||||||
)
|
)
|
||||||
function = relationship('Function', lazy='joined')
|
function = relationship('Function', back_populates="jobs")
|
||||||
function_input = sa.Column(st.JsonDictType())
|
function_input = sa.Column(st.JsonDictType())
|
||||||
trust_id = sa.Column(sa.String(80))
|
trust_id = sa.Column(sa.String(80))
|
||||||
|
|
||||||
@ -103,3 +109,10 @@ class Job(model_base.QinlingSecureModelBase):
|
|||||||
common.datetime_to_str(d, 'first_execution_time')
|
common.datetime_to_str(d, 'first_execution_time')
|
||||||
common.datetime_to_str(d, 'next_execution_time')
|
common.datetime_to_str(d, 'next_execution_time')
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
Function.service = relationship("FunctionServiceMapping",
|
||||||
|
uselist=False,
|
||||||
|
back_populates="function")
|
||||||
|
Runtime.functions = relationship("Function", back_populates="runtime")
|
||||||
|
Function.jobs = relationship("Job", back_populates="function")
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
import requests
|
||||||
|
|
||||||
from qinling.db import api as db_api
|
from qinling.db import api as db_api
|
||||||
from qinling import status
|
from qinling import status
|
||||||
@ -87,6 +88,18 @@ class DefaultEngine(object):
|
|||||||
execution = db_api.get_execution(execution_id)
|
execution = db_api.get_execution(execution_id)
|
||||||
function = db_api.get_function(function_id)
|
function = db_api.get_function(function_id)
|
||||||
|
|
||||||
|
if function.service:
|
||||||
|
func_url = '%s/execute' % function.service.service_url
|
||||||
|
LOG.debug(
|
||||||
|
'Found service url for function: %s, url: %s',
|
||||||
|
function_id, func_url
|
||||||
|
)
|
||||||
|
|
||||||
|
r = requests.post(func_url, json=input)
|
||||||
|
execution.status = status.SUCCESS
|
||||||
|
execution.output = {'result': r.json()}
|
||||||
|
return
|
||||||
|
|
||||||
source = function.code['source']
|
source = function.code['source']
|
||||||
image = None
|
image = None
|
||||||
identifier = None
|
identifier = None
|
||||||
@ -111,7 +124,6 @@ class DefaultEngine(object):
|
|||||||
input=input,
|
input=input,
|
||||||
entry=function.entry
|
entry=function.entry
|
||||||
)
|
)
|
||||||
|
|
||||||
output = self.orchestrator.run_execution(
|
output = self.orchestrator.run_execution(
|
||||||
function_id,
|
function_id,
|
||||||
input=input,
|
input=input,
|
||||||
@ -124,10 +136,10 @@ class DefaultEngine(object):
|
|||||||
execution_id,
|
execution_id,
|
||||||
output
|
output
|
||||||
)
|
)
|
||||||
|
|
||||||
execution.output = output
|
execution.output = output
|
||||||
execution.status = 'success'
|
execution.status = status.SUCCESS
|
||||||
|
|
||||||
|
# No service is created in orchestrator for single container.
|
||||||
if not image:
|
if not image:
|
||||||
mapping = {
|
mapping = {
|
||||||
'function_id': function_id,
|
'function_id': function_id,
|
||||||
|
@ -127,13 +127,13 @@ def handle_job(engine_client):
|
|||||||
job.function_id, job.id
|
job.function_id, job.id
|
||||||
)
|
)
|
||||||
|
|
||||||
exe_param = {
|
params = {
|
||||||
'function_id': job.function_id,
|
'function_id': job.function_id,
|
||||||
'input': job.function_input,
|
'input': job.function_input,
|
||||||
'sync': False,
|
'sync': False,
|
||||||
'description': constants.EXECUTION_BY_JOB % job.id
|
'description': constants.EXECUTION_BY_JOB % job.id
|
||||||
}
|
}
|
||||||
executions.create_execution(engine_client, exe_param)
|
executions.create_execution(engine_client, params)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception("Failed to process job %s", job.id)
|
LOG.exception("Failed to process job %s", job.id)
|
||||||
finally:
|
finally:
|
||||||
|
@ -19,3 +19,4 @@ ERROR = 'error'
|
|||||||
DELETING = 'deleting'
|
DELETING = 'deleting'
|
||||||
RUNNING = 'running'
|
RUNNING = 'running'
|
||||||
DONE = 'done'
|
DONE = 'done'
|
||||||
|
SUCCESS = 'success'
|
||||||
|
@ -12,59 +12,38 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from oslo_log import log as logging
|
|
||||||
import requests
|
|
||||||
|
|
||||||
from qinling.db import api as db_api
|
from qinling.db import api as db_api
|
||||||
from qinling import exceptions as exc
|
from qinling import exceptions as exc
|
||||||
from qinling import status
|
from qinling import status
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
def create_execution(engine_client, params):
|
||||||
def create_execution(engine_client, execution):
|
function_id = params['function_id']
|
||||||
function_id = execution['function_id']
|
is_sync = params.get('sync', True)
|
||||||
is_sync = execution.get('sync', True)
|
|
||||||
func_url = None
|
|
||||||
|
|
||||||
with db_api.transaction():
|
with db_api.transaction():
|
||||||
func_db = db_api.get_function(function_id)
|
func_db = db_api.get_function(function_id)
|
||||||
|
runtime_db = func_db.runtime
|
||||||
|
if runtime_db and runtime_db.status != status.AVAILABLE:
|
||||||
|
raise exc.RuntimeNotAvailableException(
|
||||||
|
'Runtime %s is not available.' % func_db.runtime_id
|
||||||
|
)
|
||||||
|
|
||||||
# Increase function invoke count, the updated_at field will be also
|
# Increase function invoke count, the updated_at field will be also
|
||||||
# updated.
|
# updated.
|
||||||
func_db.count = func_db.count + 1
|
func_db.count = func_db.count + 1
|
||||||
|
|
||||||
try:
|
params.update({'status': status.RUNNING})
|
||||||
# Check if the service url is existing.
|
db_model = db_api.create_execution(params)
|
||||||
mapping_db = db_api.get_function_service_mapping(function_id)
|
|
||||||
LOG.info('Found Service url for function: %s', function_id)
|
|
||||||
|
|
||||||
func_url = '%s/execute' % mapping_db.service_url
|
|
||||||
LOG.info('Invoke function %s, url: %s', function_id, func_url)
|
|
||||||
except exc.DBEntityNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if func_url:
|
|
||||||
r = requests.post(func_url, json=execution.get('input'))
|
|
||||||
execution.update(
|
|
||||||
{'status': 'success', 'output': {'result': r.json()}}
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
runtime_id = func_db.runtime_id
|
|
||||||
runtime_db = db_api.get_runtime(runtime_id)
|
|
||||||
if runtime_db.status != status.AVAILABLE:
|
|
||||||
raise exc.RuntimeNotAvailableException(
|
|
||||||
'Runtime %s is not available.' % runtime_id
|
|
||||||
)
|
|
||||||
|
|
||||||
execution.update({'status': status.RUNNING})
|
|
||||||
|
|
||||||
db_model = db_api.create_execution(execution)
|
|
||||||
|
|
||||||
if not func_url:
|
|
||||||
engine_client.create_execution(
|
engine_client.create_execution(
|
||||||
db_model.id, function_id, runtime_id,
|
db_model.id, function_id, func_db.runtime_id,
|
||||||
input=execution.get('input'),
|
input=params.get('input'), is_sync=is_sync
|
||||||
is_sync=is_sync
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if is_sync:
|
||||||
|
# The execution should already be updated by engine service for sync
|
||||||
|
# execution.
|
||||||
|
db_model = db_api.get_execution(db_model.id)
|
||||||
|
|
||||||
return db_model
|
return db_model
|
||||||
|
Loading…
Reference in New Issue
Block a user