Initial commit

Initial commit of predictor code.

Change-Id: Ic8aaa0728a4b936cd4c6e1ed5a0e01ba8f0fbf5b
This commit is contained in:
Andreas Tsagkaropoulos 2024-01-11 17:19:45 +02:00
parent c7ea7b0db9
commit c40f480ce9
34 changed files with 7047 additions and 15 deletions

View File

@ -33,18 +33,20 @@ spec:
{{- toYaml .Values.securityContext | nindent 12 }} {{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }} imagePullPolicy: {{ .Values.image.pullPolicy }}
ports: # ports:
- name: http # - name: http
containerPort: 8080 # containerPort: 8080
protocol: TCP # protocol: TCP
livenessProbe: livenessProbe:
httpGet: exec:
path: / command:
port: http - ls
- /home
readinessProbe: readinessProbe:
httpGet: exec:
path: / command:
port: http - ls
- /home
resources: resources:
{{- toYaml .Values.resources | nindent 12 }} {{- toYaml .Values.resources | nindent 12 }}
{{- with .Values.nodeSelector }} {{- with .Values.nodeSelector }}

View File

@ -0,0 +1,72 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
FROM python:3.11 as source
#RUN pip install --no-cache --upgrade pip
RUN mkdir /src
COPY ./src/ /src/
WORKDIR /src
RUN pip install --no-cache-dir -r requirements.txt && python3 setup.py sdist
FROM ubuntu:noble
RUN mkdir -p /home/r_predictions
#RUN apt-get update
ENV TZ=Europe/Athens
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install --no-install-recommends -y \
libcurl4-openssl-dev=8.5.0-2ubuntu1 \
build-essential=12.10ubuntu1 \
r-base-core=4.3.2-1build1 \
r-base-dev=4.3.2-1build1 \
r-cran-digest=0.6.34-1 \
r-cran-boot=1.3-28.1-1 \
r-cran-class=7.3-22-2 \
r-cran-cluster=2.1.6-1 \
r-cran-codetools=0.2-19-1 \
r-cran-foreign=0.8.86-1 \
r-cran-kernsmooth=2.23-22-1 \
r-cran-lattice=0.22-5-1 \
r-cran-littler=0.3.19-1 \
r-cran-mass=7.3-60-2 \
r-cran-matrix=1.6-4-1 \
r-cran-mgcv=1.9-1-1 \
r-cran-nlme=3.1.164-1 \
r-cran-nnet=7.3-19-2 \
r-cran-pkgkitten=0.2.3-1 \
r-cran-rcpp=1.0.11-1 \
r-cran-rpart=4.1.23-1 \
r-cran-spatial=7.3-17-1 \
r-cran-survival=3.5-7-1 \
r-doc-html=4.3.2-1build1 \
r-recommended=4.3.2-1build1 \
python3=3.11.4-5ubuntu1 \
python3-pip=23.3+dfsg-1 \
python3.11-venv=3.11.7-2 \
&& rm -rf /var/lib/apt/lists/*
COPY ./src/r_predictors/r_commands.R /home/r_predictions/
RUN Rscript /home/r_predictions/r_commands.R #install prerequisite libraries
COPY --from=source ./src/dist/esm_forecaster-0.1.0.tar.gz /home/r_predictions/
COPY ./src/requirements.txt /home/r_predictions/
COPY ./src/prepare_python_dependencies.sh /home/r_predictions/
RUN bash -x /home/r_predictions/prepare_python_dependencies.sh
COPY ./src/r_predictors/forecasting_real_workload.R /home/r_predictions/
#below two commented lines only serve for experiments with predictive functionality
#COPY ./default_application.csv /home/r_predictions
#RUN Rscript forecasting_real_workload.R default_application.csv MinimumCores 1638878119
WORKDIR /home/r_predictions/esm_forecaster-0.1.0
CMD ["/bin/sh","-c",". /home/forecasting_env/bin/activate && python3 -u /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties 2>&1 > $LOG_FILE "]

View File

@ -0,0 +1,10 @@
EXPONENTIAL_SMOOTHING_VERSION="0.1.0"
python3 -m venv /home/forecasting_env
. /home/forecasting_env/bin/activate
pip3 install --no-cache-dir -r /home/r_predictions/requirements.txt
cd /home/r_predictions
# Install the module itself (provided that the tar.gz file of the module has already been copied inside the container)
pip install esm_forecaster-$EXPONENTIAL_SMOOTHING_VERSION.tar.gz
tar -xzvf esm_forecaster-$EXPONENTIAL_SMOOTHING_VERSION.tar.gz

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,477 @@
#!Rscript
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
library(rapportools)
library(gbutils)
library(forecast)
library(ggplot2)
library(properties)
library(xts)
library(anytime)
library(purrr)
# Outline of the operation of the forecasting script
#
# This forecasting script relies on the presence of a dataset which contains the metric values to be forecasted. It is called with three main parameters - dataset path, metric to be forecasted and the time for which the forecast should be produced - and two optional parameters, the alpha and beta coefficients to be used during forecasting. The time for which the forecast should be produced may be ommitted under some circumstances.
#
# To create the final dataset which will be used for predictions, this script creates a timeseries with all times from the beginning of the observations in the dataset, until its end, using 1-second intervals (to allow for predictions based on epoch). In order for the exponential smoothing forecaster to operate satisfactorily, it is necessary to set the `number_of_seconds_to_aggregate_on` variable to a value which is large enough to smooth small fluctuations, yet small enough to allow for reasonable reaction times (e.g 300 seconds).
# Once the creation of the dataset is over, the `configuration_forecasting_horizon` configuration property is evaluated. If this value is positive, the time for which the forecast should be made should be provided as a command line argument, and this allows the formation of a training dataset and a test dataset. If a non-positive horizon is provided, then the `realtime_mode` configuration property is evaluated. In case that this is false, the prediction time does not need to be provided (it means we simply want to evaluate the predictive functionality based on past data), and the next prediction time will be the time of the last observation in the dataset. If the realtime mode parameter is true, then the prediction time needs to be provided, and the script will try to create a prediction using the minimum value between the next prediction time and the last observation time which is available in the dataset - in this case the next prediction time is also needed(TODO:perhaps this behaviour should be changed).
#Then, the final data points which will be used for the forecasting are determined, and the forecasting models are created, to produce predictions. The user of the script can opt to try finding the best parameters manually, using the `try_to_optimize_parameters` configuration parameter.
find_smape <- function(actual, forecast) {
return (1/length(actual) * sum(2*abs(forecast-actual) / (abs(actual)+abs(forecast))*100))
}
get_current_epoch_time <- function(){
return (as.integer(as.POSIXct(Sys.time())))
}
#Assumes an xts time series object as input, with each record having a 1-sec difference from the previous one, and returns the last timestamp which is (or should have been) assigned (if not present).
find_last_timestamp <- function(mydata,next_prediction_time,realtime_mode){
possible_timestamp <- as.numeric(end(mydata))
if(realtime_mode){
#return(min(c(possible_timestamp,next_prediction_time)))
if (next_prediction_time>possible_timestamp){
return(possible_timestamp)
}else{
print("Possible problem with the requested prediction time, there is already data for a timestamp newer than the time requested to predict for. Returning the newer timestamp, being aware that this will lead to this prediction returning no meaningful output")
return (possible_timestamp)
}
}else{
return (possible_timestamp)
}
}
get_time_value <- function(time_object){
time_object[1][["user.self"]]
}
####Time the execution of the prediction
start_time <- proc.time()
time_field_name <- "ems_time" # The field holding the epoch timestamp in the generated csv
time_unit_granularity <- "sec" # Handle monitoring data using this time unit granularity
endpoint_time_unit_granularity <- "seconds"
#configuration_properties <- read.properties(".\\prediction_configuration-windows.properties")
print("Reading properties from the following file:")
print(paste(getwd(),"/prediction_configuration.properties",sep=''))
configuration_properties <- read.properties(paste(getwd(),"/prediction_configuration.properties",sep=''))
realtime_mode <- as.logical(configuration_properties$realtime_mode) #whether or not we should use all datapoints available (True value), or we are only partially using the available dataset (False value) e.g to test the prediction method performance
try_to_optimize_parameters <- as.logical(configuration_properties$try_to_optimize_parameters)
prediction_method <- configuration_properties$prediction_method
number_of_seconds_to_aggregate_on <- as.integer(configuration_properties$number_of_seconds_to_aggregate_on)
preprocessing_required <- FALSE #Required for some/all FCR datasets
write_back_clean_data_file <- FALSE
csv_has_header <- TRUE
periodic_data <- FALSE #Setting to TRUE uses gamma, else gamma is set to FALSE
if (try_to_optimize_parameters){
frequency_setting <- 12 #12 five-minute intervals per period
}else{ #downsampling to single hours
frequency_setting <- 1
}
#Parsing of command-line arguments. Providing the alpha and beta values as arguments is entirely optional. Providing the next_prediction_time may be optional or it may be needed, depending on the circumstances. Please refer to the execution flow which is outlined in the beginning of this program
args <- commandArgs(trailingOnly=TRUE)
dataset_to_process <- args[1]
attribute_to_predict <- args[2]
next_prediction_time <- as.numeric(args[3])
alpha_value_argument <- as.double(args[4])
beta_value_argument <- as.double(args[5])
#mydata <- read.csv(configuration_properties$input_data_file, sep=",", header=TRUE)
#mydata <- read.csv(dataset_to_process, sep=",", header=TRUE)
data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE)
#sanitize data_to_process by removing any very old values which may have been accidentally introduced. For this reason we remove all data points before now - number_of_days*24hrs*3600sec/hr seconds, and we additionally subtract configuration_properties$prediction_processing_time_safety_margin_seconds in order to account for the time it takes to create the dataset and start the prediction process)
current_time <- get_current_epoch_time()
if (!realtime_mode){
current_time <- tail(data_to_process[time_field_name],1)
}
oldest_acceptable_time_point <- current_time -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds))
print(paste("Using data after time point",oldest_acceptable_time_point,"..."))
data_to_process <- data_to_process[data_to_process[[time_field_name]]>oldest_acceptable_time_point,]
if (length(data_to_process[,attribute_to_predict])>0){
print(paste("Constructing fine-grained data series for",attribute_to_predict,"using the requested granularity..."))
}else{
print("No valid data points remained after enforcing the number_of_days_to_use_data_from configuration option. This may mean that you are trying to predict using realtime mode, using data points older than the number of days specified in the number_of_days_to_use_data_from configuration option")
stop()
}
#Fail-safe default
df1 <- xts(as.numeric(data_to_process[,attribute_to_predict]),anytime(data_to_process[,time_field_name]))
date_time_init <- anytime(data_to_process[,time_field_name])
date_time_complete <- seq.POSIXt(
from=as.POSIXct(min(date_time_init),origin = "1970-01-01"),
to=as.POSIXct(max(date_time_init),origin = "1970-01-01"),
by=time_unit_granularity
)
df2 <- merge(df1,xts(,date_time_complete))
mydata <- na.approx(df2)
colnames(mydata)<-c(attribute_to_predict)
print(paste("The complete time series to be predicted for attribute",attribute_to_predict,"has been created"))
configuration_forecasting_horizon <- as.integer(configuration_properties$horizon)
last_timestamp_data <- 0
if (configuration_forecasting_horizon>0){
print("Using a statically defined horizon from the configuration file")
forecasting_horizon <- configuration_forecasting_horizon
last_timestamp_data <- next_prediction_time - forecasting_horizon
first_timestamp_data <- as.integer(index(mydata[1]))
#from the number of datapoints, the last 'forecasting_horizon' datapoints will be used for testing
data_points_number <- next_prediction_time - first_timestamp_data
mydata <- head(mydata,data_points_number)
number_of_periods_in_dataset <- length(mydata[,attribute_to_predict])%/%frequency_setting
#data_points_number<-length(mydata[,attribute_to_predict])
}else {
last_timestamp_data <- find_last_timestamp(mydata,next_prediction_time,realtime_mode)
number_of_periods_in_dataset <- length(mydata[,attribute_to_predict])%/%frequency_setting
data_points_number<-length(mydata[,attribute_to_predict])
if (!is.na(next_prediction_time)){
print(paste("Using value",next_prediction_time,"from command line arguments for forecasting horizon, to be derived after subtracting last timestamp which is",last_timestamp_data))
forecasting_horizon <- next_prediction_time - last_timestamp_data
if (forecasting_horizon<=0 && realtime_mode){
print("Cannot proceed with prediction as the horizon should be a positive value")
stop()
}
}else{
print("Cannot proceed as a proper prediction horizon value could not be determined")
stop()
}
}
if (configuration_properties$forecasting_data_slicing_mode == "percentage"){
forecasting_data_points_limit <- configuration_properties$forecasting_data_limit *data_points_number
forecasting_data_points_offset <- configuration_properties$forecasting_data_offset * data_points_number
number_of_data_points_used_for_training <- round(as.double(configuration_properties$forecasting_data_used_for_training) * data_points_number)
number_of_data_points_used_for_testing <- round((1-as.double(configuration_properties$forecasting_data_used_for_training))* data_points_number)
#data_used_for_training <- 0.95
#data_used_for_testing <- 1 - data_used_for_training
}else{
forecasting_data_points_limit <- data_points_number
forecasting_data_offset <- 0
# forecasting_data_offset can be from 0 to 1 - beggining to end of dataset
number_of_data_points_used_for_testing <- base::min(as.numeric(forecasting_horizon),data_points_number%/%2)
print(paste("Forecasting horizon is",forecasting_horizon))
number_of_data_points_used_for_training <- data_points_number - number_of_data_points_used_for_testing
print(paste("Data points number is",data_points_number,"- from these",number_of_data_points_used_for_testing,"will be used for testing. If the horizon is too large, only half of the data points will be used to evaluate the prediction"))
}
#TODO check the code line below for validity - maybe use head and tail
data_points <-tail(head(mydata[,attribute_to_predict],forecasting_data_points_limit),data_points_number-forecasting_data_offset)
###Load time
load_time <- proc.time() - start_time
print(load_time)
if (write_back_clean_data_file){
write.csv(mydata,configuration_properties$clean_data_file)
if(!file.exists(configuration_properties$clean_data_file)){
file.create(configuration_properties$clean_data_file)
}
}
### Preprocessing time
preprocessing_time<-proc.time() - load_time - start_time
testing_datapoints <- tail(data_points, number_of_data_points_used_for_testing)
if (number_of_seconds_to_aggregate_on<(forecasting_horizon%/%10)) {
print(paste("Setting new value for number_of_seconds_to_aggregate_on, from ",number_of_seconds_to_aggregate_on," to ",forecasting_horizon%/%10," in order not to make too far-fetched (slow) predictions"))
number_of_seconds_to_aggregate_on <- forecasting_horizon%/%10
}
mydata.test <- tail(period.apply(testing_datapoints,endpoints(testing_datapoints,endpoint_time_unit_granularity,k=number_of_seconds_to_aggregate_on),mean),forecasting_horizon%/%(number_of_seconds_to_aggregate_on))
if (length(mydata.test)<=0){
print(paste("Unable to generate predictions as a prediction is requested for a shorter time duration than the aggregation interval (requested prediction with horizon",forecasting_horizon," whereas the aggregation period is",number_of_seconds_to_aggregate_on,")"))
stop()
}
training_datapoints <- head(data_points, number_of_data_points_used_for_training)
mydata.train <- period.apply(training_datapoints,endpoints(training_datapoints,endpoint_time_unit_granularity,k=number_of_seconds_to_aggregate_on),mean)
#print(paste("length-debugging",length(mydata.train)+1,length(mydata.train)+length(mydata.test)))
mydata_trainseries <- (ts(mydata.train,start=c(1),frequency = frequency_setting))
mydata_testseries <- (ts(mydata.test, start=c(1), frequency = frequency_setting))
if (try_to_optimize_parameters){
#initialization
alpha_ticks <- 5
beta_ticks <- 5
if (periodic_data){
gamma_ticks <- 20
}else{
gamma_ticks <- -1
}
minimum_optimization_variable_value <- 10000000
optimal_alpha <- 1
optimal_beta <- 1
optimal_gamma <- 1
iterations <- 0
iteration_average_time <- 0
last_iteration_time <- proc.time()
#actual optimization
for (alpha_counter in seq(1,alpha_ticks)){
for (beta_counter in seq(-1,beta_ticks)){
for (gamma_counter in seq(-1,gamma_ticks)){
alpha_value <- alpha_counter/alpha_ticks
beta_value <- beta_counter/beta_ticks
gamma_value <- gamma_counter/gamma_ticks
if(beta_value<0){
beta_value <- FALSE
}
if(gamma_value<0 || gamma_ticks<0){
gamma_value <- FALSE
}
holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,alpha=alpha_value,beta=beta_value,gamma=gamma_value)
holt_winters_forecasts <- forecast:::forecast.HoltWinters(holt_winters_forecasting_model, h=forecasting_horizon)
optimization_variable<-3 #1: Mean error #2 RMSE #3 MAE #4 MPE #5 MAPE #6 MASE #7 ACF1
optimization_variable_value <- accuracy(holt_winters_forecasts,x=mydata.test,D=0,d=1)[1,optimization_variable]
# Use [2, optimization_variable] in the above expression to evaluate with the help of the test set and [1, optimization_variable] to evaluate with the help of the training set.
# Evaluating using the test set can be useful when the quality of multiple step ahead predictions should be measured. On the other hand, evaluating using the training set tries to minimize one-step ahead predictions.
# Resampling the data can be an alternative to ensure that one-step ahead predictions are performed and therefore the training set can be used to evaluate accuracy.
#if (gamma_value==FALSE && beta_value==FALSE && alpha_value==0.75){
# print(paste(optimization_variable_value,minimum_optimization_variable_value))
#}
print(paste("Alpha,beta,gamma: ",alpha_value,beta_value,gamma_value," optimization value",optimization_variable_value," minimum value",minimum_optimization_variable_value))
if (optimization_variable_value<minimum_optimization_variable_value){
if (configuration_properties$debug_level>0){
print(paste("Replacing existing alpha, beta and gamma ",optimal_alpha,",",optimal_beta,",",optimal_gamma,"as",optimization_variable_value,"<",minimum_optimization_variable_value,"with",alpha_value,",",beta_value,",",gamma_value))
}
optimal_alpha <- alpha_value
optimal_beta <- beta_value
optimal_gamma <- gamma_value
if (configuration_properties$debug_level>1){
debug_option <- readline()
if(debug_option=="beta"){
print(paste(optimal_beta))
}
}
minimum_optimization_variable_value <- optimization_variable_value
}
iterations <- iterations+1
iteration_average_time <- iteration_average_time + ((proc.time()-last_iteration_time)-iteration_average_time)/iterations
}
}
}
}
#Override of forecasting model with custom values
#optimal_alpha <- 1
#optimal_beta <- FALSE
#optimal_gamma <- FALSE
#Creation of forecasting model
if (try_to_optimize_parameters){
holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,alpha=optimal_alpha,beta=optimal_beta,gamma=optimal_gamma)
ets_forecasting_model <- tryCatch({
ets(mydata_trainseries,alpha = optimal_alpha,beta = optimal_beta,gamma = optimal_gamma) #phi is left to be optimized
}, error = function(e) {
NULL
})
}else{
if (!is.na(alpha_value_argument) && !is.na(beta_value_argument)){
if (periodic_data){
holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,alpha=alpha_value_argument,beta=beta_value_argument)
ets_forecasting_model <- tryCatch({
ets(mydata_trainseries,alpha = alpha_value_argument,beta = beta_value_argument)
}, error = function(e) {
NULL
})
}else{
holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,alpha=alpha_value_argument,beta=beta_value_argument,gamma=FALSE)
#ets_forecasting_model <- ets(mydata_trainseries,alpha = alpha_value_argument,beta = beta_value_argument,gamma = FALSE)
ets_forecasting_model <- tryCatch({
ets(mydata_trainseries,alpha = alpha_value_argument,beta = beta_value_argument,gamma=FALSE)
}, error = function(e) {
NULL
})
}
}else{
print("No alpha or beta values provided, so will calculate them now")
if (periodic_data){
ets_forecasting_model <- ets(mydata_trainseries)
holt_winters_forecasting_model <- HoltWinters(mydata_trainseries)
}else{
ets_forecasting_model <- tryCatch({
ets(mydata_trainseries,model="ZZN")
}, error = function(e) {
NULL
})
holt_winters_forecasting_model <- HoltWinters(mydata_trainseries,gamma=FALSE)
}
}
}
print("Starting execution, forecasting horizon, next prediction time and last timestamp data are as follows")
print(paste(forecasting_horizon,next_prediction_time,last_timestamp_data))
if (try_to_optimize_parameters){
print(paste("The optimal alpha, beta and gamma values are, respectively",optimal_alpha,",",optimal_beta,"and",optimal_gamma))
if (prediction_method=="Holt-Winters"){
holt_winters_forecasts <- forecast:::forecast.HoltWinters(holt_winters_forecasting_model, h=forecasting_horizon)
}
else if (prediction_method=="ETS"){
ets_forecasts <- forecast::forecast.ets(ets_forecasting_model, h=forecasting_horizon)
}
}else{
if (prediction_method=="Holt-Winters"){
holt_winters_forecasts <- forecast:::forecast.HoltWinters(holt_winters_forecasting_model, h=forecasting_horizon%/%(number_of_seconds_to_aggregate_on))
}else{
ets_forecasts <- forecast::forecast.ets(ets_forecasting_model, h=forecasting_horizon%/%(number_of_seconds_to_aggregate_on))
}
}
if (prediction_method == "Holt-Winters"){
holt_winters_accuracy_measures <- accuracy(holt_winters_forecasts,x=mydata.test,D=0,d=1)#d,D values only influence MASE calculation, and are chosen to reflect a non-seasonal time-series
print(paste("Holt-Winters accuracy measures"))
print(holt_winters_accuracy_measures)
print("------------------------------------------------")
}else if (prediction_method == "ETS"){
ets_accuracy_measures <- accuracy(ets_forecasts,x=mydata.test,D=0,d=1)#d,D values only influence MASE calculation, and are chosen to reflect a non-seasonal time-series
print("ETS accuracy measures:")
print(ets_accuracy_measures)
print("------------------------------------------------")
}
###prediction_time
prediction_time <- proc.time() - preprocessing_time -load_time - start_time
total_time <- proc.time() - start_time
print(paste("The load_time is:",get_time_value(load_time)))
print(paste("The preprocessing time is:",get_time_value(preprocessing_time)))
print(paste("The prediction time is:",get_time_value(prediction_time)))
print(paste("The total time is:",get_time_value(prediction_time)))
if(prediction_method=="ETS"){
forecast_object <- ets_forecasts
print(paste("Prediction:",tail(ets_forecasts[["mean"]],n=1)))
print(paste0("Confidence_interval:",tail((ets_forecasts[["lower"]]),n=1)[2],",",tail((ets_forecasts[["upper"]]),n=1)[2]))
#2,1: Mean error 2,2: RMSE 2,3 MAE 2,4 MPE 2,5 MAPE 2,6 MASE 2,7 ACF1
print(paste0("mae:",ets_accuracy_measures[2,3]))
mse<-as.numeric(ets_accuracy_measures[2,2])*as.numeric(ets_accuracy_measures[2,2])
print(paste0("mse:",mse)) #square of RMSE
print(paste0("mape:",ets_accuracy_measures[2,5]))
print(paste0("smape:",find_smape(ets_forecasts$x,ets_forecasts$fitted)))
}else if (prediction_method=="Holt-Winters"){
forecast_object <- holt_winters_forecasts
print(paste0("Prediction:",tail(holt_winters_forecasts[["mean"]],n=1)))
print(paste0("Confidence_interval:",tail((holt_winters_forecasts[["lower"]]),n=1)[2],",",tail((holt_winters_forecasts[["upper"]]),n=1)[2]))
print(paste0("mae:",holt_winters_accuracy_measures[2,3]))
mse<-as.numeric(holt_winters_accuracy_measures[2,2])*as.numeric(holt_winters_accuracy_measures[2,2])
print(paste0("mse:",mse))
print(paste0("mape:",holt_winters_accuracy_measures[2,5]))
print(paste0("smape:",find_smape(holt_winters_forecasts$x,holt_winters_forecasts$fitted)))
}
#GRAPHING DOCUMENTATION
#forecast_object contains the timeseries which is forecasted, the original time series, and the one-step ahead prediction, along with the confidence intervals. When it alone is plotted, with the command forecast_object %>% autoplot(), the black line are the original values of the timeseries, and the single point in the end along with the blue zones, are the intervals which characterize the final prediction is calculated
#To draw the predictions along with the original time series values, we can use the following code:
#x_values <- seq.int(1,length(forecast_object$x)) #This should be changed as needed
#pred_values <- forecast_object$fitted
#observed_values <- forecast_object$x
#residuals <- forecast_object$residuals
#plot(x_values,observed_values,type='l',col="red")
#lines(x_values,residuals,col="blue")
#lines(x_values,pred_values,col="green")
#plot(x=as.numeric(time(forecast_object$x)),forecast_object$x,type='l',col='blue',ylim=c(0,1000))
#lines(x=as.numeric(time(forecast_object$mean)),forecast_object$mean,type='l',col='red')
#65130 was the length of the training dataset
#lines(x=65130+as.numeric(time(mydata_testseries)),mydata_testseries,type='l',col='green')
#dev.off()
if (as.logical(configuration_properties$generate_prediction_png_output)){
print(paste("creating new figure at",configuration_properties$png_output_file))
mydata.aggregated <- period.apply(data_points,endpoints(data_points,endpoint_time_unit_granularity,k=number_of_seconds_to_aggregate_on),mean)
mydata_full_series <- ts(mydata.aggregated,start=c(1),frequency = frequency_setting)
png(filename=configuration_properties$png_output_file,
type="cairo",
units="in",
width=10,
height=6,
pointsize=1,
res=1200)
forecast_object %>%
autoplot() +
geom_line(
aes(
x = as.numeric(time(mydata_full_series)),
y = as.numeric(mydata_full_series)
),
col = "red",
size = 0.1
) +
geom_line(
aes(
x = as.numeric(time(forecast_object$mean)),
y = as.numeric(forecast_object$mean)
#Painting the actual predictions
),
col = "green",
size = 0.1
)
#goes to above line: +
# geom_line(
# aes(
# x = as.numeric(time(forecast_object$mean)),
# y = as.numeric(forecast_object$mean)
# ),
# col = "yellow",
# size = 0.1
# )
dev.off()
}

View File

@ -0,0 +1,42 @@
#AMQ_HOST=ems
#AMQ_USER=aaa
#AMQ_PASSWORD=111
#AMQ_PORT_BROKER=61610
APP_NAME=default_application
METHOD=exponential_smoothing
INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
INFLUXDB_ORG=morphemic
broker_address=localhost
broker_port=61613
broker_username=admin
broker_password=admin
prediction_method=Holt-Winters
forecasting_data_slicing_mode=none
forecasting_data_offset=
forecasting_data_limit=
forecasting_data_used_for_training=0.999
path_to_datasets=C:/Users/user/Desktop/Predictions_using_R/custom_workloads/datasets
application_name=default_application
input_data_file=C:/Users/user/Desktop/Predictions_using_R/custom_workloads/datasets/demo.csv
clean_data_file=C:/Users/user/Desktop/Predictions_using_R/clean_data.csv
output_data_file=C:/Users/user/Desktop/Predictions_using_R/output_data.csv
number_of_seconds_to_aggregate_on=300
number_of_days_to_use_data_from=3
prediction_processing_time_safety_margin_seconds=10
testing_prediction_functionality=FALSE
try_to_optimize_parameters=FALSE
debug_level=0
generate_prediction_png_output=TRUE
png_output_file=C:\\Users\\user\\Desktop\\Predictions_using_R\\output.png
horizon=0
realtime_mode=TRUE

View File

@ -0,0 +1,34 @@
#Fri Jan 12 17:06:48 UTC 2024
APP_NAME=default_application
METHOD=exponential_smoothing
INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
INFLUXDB_ORG=morphemic
broker_address=localhost
broker_port=61610
broker_username=morphemic
broker_password=morphemic
prediction_method=Holt-Winters
forecasting_data_slicing_mode=none
forecasting_data_offset=
forecasting_data_limit=
forecasting_data_used_for_training=
forecasting_data_used_for_testing=
path_to_datasets=.
application_name=default_application
input_data_file=/home/r_predictions/datasets/input_data.csv
clean_data_file=/home/r_predictions/datasets/clean_data.csv
output_data_file=/home/r_predictions/datasets/output_data.csv
number_of_seconds_to_aggregate_on=10
number_of_days_to_use_data_from=3
prediction_processing_time_safety_margin_seconds=10
testing_prediction_functionality=FALSE
try_to_optimize_parameters=FALSE
debug_level=0
generate_prediction_png_output=FALSE
png_output_file=/home/r_predictions/prediction_output.png
horizon=0
realtime_mode=TRUE

View File

@ -0,0 +1,15 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
install.packages("rapportools")
install.packages("gbutils")
install.packages("forecast")
install.packages("ggplot2")
install.packages("properties")
install.packages("xts")
install.packages("anytime")
install.packages("purrr")

View File

@ -0,0 +1,9 @@
python-slugify==8.0.1
jproperties==2.1.1
requests==2.31.0
msgpack==1.0.7
numpy==1.26.3
pandas==2.1.4
python-dotenv==1.0.0
python-qpid-proton==0.39.0
influxdb-client==1.39.0

View File

@ -0,0 +1,416 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import datetime
import json
import threading
import time
import os, sys
import multiprocessing
import traceback
from subprocess import PIPE, run
from runtime.exn import core
import logging
from runtime.exn import connector
from runtime.predictions.Prediction import Prediction
from runtime.operational_status.State import State
from runtime.utilities.PredictionPublisher import PredictionPublisher
from runtime.utilities.Utilities import Utilities
print_with_time = Utilities.print_with_time
def predict_attribute(attribute, configuration_file_location,next_prediction_time):
prediction_confidence_interval_produced = False
prediction_value_produced = False
prediction_valid = False
#os.chdir(os.path.dirname(configuration_file_location))
State.prediction_data_filename = Utilities.get_prediction_data_filename(configuration_file_location,attribute)
from sys import platform
if State.testing_prediction_functionality:
print_with_time("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data")
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(State.prediction_data_filename)+" "+attribute)
# Windows
if platform == "win32":
command = ['Rscript', 'forecasting_real_workload.R', State.prediction_data_filename, attribute]
# linux
elif platform == "linux" or platform == "linux2":
command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)]
#Choosing the solution of linux
else:
command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)]
else:
print_with_time("The current directory is "+os.path.abspath(os.getcwd()))
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(State.prediction_data_filename)+" "+attribute+" "+next_prediction_time)
# Windows
if platform == "win32":
command = ['Rscript', 'forecasting_real_workload.R', State.prediction_data_filename, attribute, next_prediction_time]
# Linux
elif platform == "linux" or platform == "linux2":
command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time) + " 2>&1"]
#Choosing the solution of linux
else:
command = ["Rscript forecasting_real_workload.R "+str(State.prediction_data_filename) + " "+ str(attribute)+" "+str(next_prediction_time)]
process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
if (process_output.stdout==""):
print_with_time("Empty output from R predictions - the error output is the following:")
print(process_output.stderr) #There was an error during the calculation of the predicted value
process_output_string_list = process_output.stdout.replace("[1] ", "").replace("\"", "").split()
prediction_value = 0
prediction_confidence_interval = "-10000000000000000000000000,10000000000000000000000000"
prediction_mae = 0
prediction_mse = 0
prediction_mape = 0
prediction_smape = 0
for string in process_output_string_list:
if (string.startswith("Prediction:")):
prediction_value = string.replace("Prediction:", "")
prediction_value_produced = True
if (string.startswith("Confidence_interval:")):
prediction_confidence_interval = string.replace("Confidence_interval:", "")
prediction_confidence_interval_produced = True
elif (string.startswith("mae:")):
prediction_mae = string.replace("mae:", "")
elif (string.startswith("mse:")):
prediction_mse = string.replace("mse:", "")
elif (string.startswith("mape:")):
prediction_mape = string.replace("mape:", "")
elif (string.startswith("smape:")):
prediction_smape = string.replace("smape:", "")
if (prediction_confidence_interval_produced and prediction_value_produced):
prediction_valid = True
print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval)
else:
print_with_time("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows")
print_with_time(process_output.stdout)
output_prediction = Prediction(prediction_value, prediction_confidence_interval,prediction_valid,prediction_mae,prediction_mse,prediction_mape,prediction_smape)
return output_prediction
def predict_attributes(attributes,next_prediction_time):
pool = multiprocessing.Pool(len(attributes))
print_with_time("Prediction thread pool size set to " + str(len(attributes)))
attribute_predictions = {}
for attribute in attributes:
print_with_time("Starting " + attribute + " prediction thread")
start_time = time.time()
attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, State.configuration_file_location,str(next_prediction_time)])
#attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, configuration_file_location,str(next_prediction_time)]).get()
for attribute in attributes:
attribute_predictions[attribute] = attribute_predictions[attribute].get() #get the results of the processing
attribute_predictions[attribute].set_last_prediction_time_needed(int(time.time() - start_time))
#prediction_time_needed[attribute])
pool.close()
pool.join()
return attribute_predictions
def update_prediction_time(epoch_start,prediction_horizon,maximum_time_for_prediction):
current_time = time.time()
prediction_intervals_since_epoch = ((current_time - epoch_start)//prediction_horizon)
estimated_time_after_prediction = current_time+maximum_time_for_prediction
earliest_time_to_predict_at = epoch_start + (prediction_intervals_since_epoch+1)*prediction_horizon #these predictions will concern the next prediction interval
if (estimated_time_after_prediction > earliest_time_to_predict_at ):
future_prediction_time_factor = 1+(estimated_time_after_prediction-earliest_time_to_predict_at)//prediction_horizon
prediction_time = earliest_time_to_predict_at+ future_prediction_time_factor*prediction_horizon
print_with_time("Due to slowness of the prediction, skipping next time point for prediction (prediction at " + str(earliest_time_to_predict_at-prediction_horizon)+" for "+ str(earliest_time_to_predict_at)+") and targeting "+str(future_prediction_time_factor)+" intervals ahead (prediction at time point "+str(prediction_time-prediction_horizon)+" for "+ str(prediction_time)+")")
else:
prediction_time = earliest_time_to_predict_at + prediction_horizon
print_with_time("Time is now "+str(current_time)+" and next prediction batch starts with prediction for time "+str(prediction_time))
return prediction_time
def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_for_prediction):
while Bootstrap.start_forecasting:
print_with_time("Using " + State.configuration_file_location + " for configuration details...")
State.next_prediction_time = update_prediction_time(State.epoch_start, prediction_horizon,maximum_time_required_for_prediction)
for attribute in State.metrics_to_predict:
if ((State.previous_prediction is not None) and (State.previous_prediction[attribute] is not None) and (State.previous_prediction[attribute].last_prediction_time_needed>maximum_time_required_for_prediction)):
maximum_time_required_for_prediction = State.previous_prediction[attribute].last_prediction_time_needed
#Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval
wait_time = State.next_prediction_time - prediction_horizon - time.time()
print_with_time("Waiting for "+str((int(wait_time*100))/100)+" seconds, until time "+datetime.datetime.fromtimestamp(State.next_prediction_time - prediction_horizon).strftime('%Y-%m-%d %H:%M:%S'))
if (wait_time>0):
time.sleep(wait_time)
if(not Bootstrap.start_forecasting):
break
Utilities.load_configuration()
Utilities.update_monitoring_data()
first_prediction = None
for prediction_index in range(0,State.total_time_intervals_to_predict):
prediction_time = int(State.next_prediction_time)+prediction_index*prediction_horizon
try:
print_with_time ("Initiating predictions for all metrics for next_prediction_time, which is "+str(State.next_prediction_time))
prediction = predict_attributes(State.metrics_to_predict,prediction_time)
if (prediction_time == int(State.next_prediction_time)):
first_prediction = prediction
except Exception as e:
print_with_time("Could not create a prediction for some or all of the metrics for time point "+str(State.next_prediction_time)+", proceeding to next prediction time. However, "+str(prediction_index)+" predictions were produced (out of the configured "+State.total_time_intervals_to_predict+"). The encountered exception trace follows:")
print(e)
#continue was here, to continue while loop, replaced by break
break
for attribute in State.metrics_to_predict:
if(not prediction[attribute].prediction_valid):
#continue was here, to continue while loop, replaced by break
break
if (State.disconnected or State.check_stale_connection()):
logging.info("Possible problem due to disconnection or a stale connection")
#State.connection.connect()
message_not_sent = True
current_time = int(time.time())
prediction_message_body = {
"metricValue": float(prediction[attribute].value),
"level": 3,
"timestamp": current_time,
"probability": 0.95,
"confidence_interval": [float(prediction[attribute].lower_confidence_interval_value) , float(
prediction[attribute].upper_confidence_interval_value)],
"predictionTime": prediction_time,
"refersTo": "todo",
"cloud": "todo",
"provider": "todo",
}
training_models_message_body = {
"metrics": State.metrics_to_predict,
"forecasting_method": "exponentialsmoothing",
"timestamp": current_time,
}
while (message_not_sent):
try:
#for publisher in State.broker_publishers:
# if publisher.
for publisher in State.broker_publishers:
#if publisher.address=="eu.nebulouscloud.monitoring.preliminary_predicted.exponentialsmoothing"+attribute:
if publisher.key=="publisher_"+attribute:
publisher.send(prediction_message_body)
#State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body)
#State.connection.send_to_topic('training_models',training_models_message_body)
message_not_sent = False
print_with_time("Successfully sent prediction message for %s to topic eu.nebulouscloud.preliminary_predicted.%s.%s\n\n%s\n\n" % (attribute, id, attribute, prediction_message_body))
except ConnectionError as exception:
#State.connection.disconnect()
#State.connection = messaging.morphemic.Connection('admin', 'admin')
#State.connection.connect()
logging.error("Error sending intermediate prediction"+str(exception))
State.disconnected = False
if (first_prediction is not None):
State.previous_prediction = first_prediction #first_prediction is the first of the batch of the predictions which are produced. The size of this batch is set by the State.total_time_intervals_to_predict (currently set to 8)
#State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from)
#State.number_of_days_to_use_data_from = 1 + int(
# (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) /
# (wait_time / State.number_of_days_to_use_data_from)
#)
#class Listener(messaging.listener.MorphemicListener):
class Bootstrap(connector.ConnectorHandler):
start_forecasting = None # Whether the component should start (or keep on) forecasting
prediction_thread = None
def ready(self, context):
if context.has_publisher('state'):
context.publishers['state'].starting()
context.publishers['state'].started()
context.publishers['state'].custom('forecasting')
context.publishers['state'].stopping()
context.publishers['state'].stopped()
context.publishers['publisher_cpu_usage'].send({
'hello': 'world'
})
def on_message(self, key, address, body, context, **kwargs):
application_name = "default_application"
address = address.replace("topic://eu.nebulouscloud.","")
if (address).startswith(State.MONITORING_DATA_PREFIX):
address = address.replace(State.MONITORING_DATA_PREFIX+".","",1)
logging.info("New monitoring data arrived at topic "+address)
logging.info(body)
elif (address).startswith(State.FORECASTING_CONTROL_PREFIX):
address = address.replace(State.FORECASTING_CONTROL_PREFIX+".","",1)
logging.info("The address is " + address)
if address == 'metrics_to_predict':
State.initial_metric_list_received = True
print_with_time("Inside message handler for metrics_to predict")
#body = json.loads(body)
#for element in body:
# State.metrics_to_predict.append(element["metric"])
elif address == 'test.exponentialsmoothing':
State.testing_prediction_functionality = True
elif address == 'start_forecasting.exponentialsmoothing':
try:
State.metrics_to_predict = body["metrics"]
print_with_time("Received request to start predicting the following metrics: "+ ",".join(State.metrics_to_predict))
State.broker_publishers = []
for metric in State.metrics_to_predict:
State.broker_publishers.append (PredictionPublisher(metric))
State.publishing_connector = connector.EXN('publishing_exsmoothing', handler=Bootstrap(),#consumers=list(State.broker_consumers),
consumers=[],
publishers=State.broker_publishers,
url="localhost",
port="5672",
username="admin",
password="admin"
)
thread = threading.Thread(target=State.publishing_connector.start,args=())
thread.start()
except Exception as e:
print_with_time("Could not load json object to process the start forecasting message \n"+str(body))
return
#if (not State.initial_metric_list_received):
# print_with_time("The initial metric list has not been received,
#therefore no predictions are generated")
# return
try:
Bootstrap.start_forecasting = True
State.epoch_start = body["epoch_start"]
prediction_horizon = int(body["prediction_horizon"])
State.next_prediction_time = update_prediction_time(State.epoch_start,prediction_horizon,State.prediction_processing_time_safety_margin_seconds) # State.next_prediction_time was assigned the value of State.epoch_start here, but this re-initializes targeted prediction times after each start_forecasting message, which is not desired necessarily
print_with_time("A start_forecasting message has been received, epoch start and prediction horizon are "+str(State.epoch_start)+", and "+str(prediction_horizon)+ " seconds respectively")
except Exception as e:
print_with_time("Problem while retrieving epoch start and/or prediction_horizon")
return
with open(State.configuration_file_location, "r+b") as f:
State.configuration_details.load(f, "utf-8")
# Do stuff with the p object...
initial_seconds_aggregation_value, metadata = State.configuration_details["number_of_seconds_to_aggregate_on"]
initial_seconds_aggregation_value = int(initial_seconds_aggregation_value)
if (prediction_horizon<initial_seconds_aggregation_value):
print_with_time("Changing number_of_seconds_to_aggregate_on to "+str(prediction_horizon)+" from its initial value "+str(initial_seconds_aggregation_value))
State.configuration_details["number_of_seconds_to_aggregate_on"] = str(prediction_horizon)
f.seek(0)
f.truncate(0)
State.configuration_details.store(f, encoding="utf-8")
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming X seconds processing time to derive a first prediction
if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())):
self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[prediction_horizon,maximum_time_required_for_prediction])
self.prediction_thread.start()
#waitfor(first period)
elif address == 'stop_forecasting.exponentialsmoothing':
#waitfor(first period)
print_with_time("Received message to stop predicting some of the metrics")
metrics_to_remove = json.loads(body)["metrics"]
for metric in metrics_to_remove:
if (State.metrics_to_predict.__contains__(metric)):
print_with_time("Stopping generating predictions for metric "+metric)
State.metrics_to_predict.remove(metric)
if len(State.metrics_to_predict)==0:
Bootstrap.start_forecasting = False
self.prediction_thread.join()
else:
print_with_time("The address was "+ address +" and did not match metrics_to_predict/test.exponentialsmoothing/start_forecasting.exponentialsmoothing/stop_forecasting.exponentialsmoothing")
# logging.info(f"Received {key} => {address}")
else:
print_with_time("Received message "+body+" but could not handle it")
def get_dataset_file(attribute):
pass
if __name__ == "__main__":
os.chdir("exponential-smoothing-predictor/src/r_predictors")
State.configuration_file_location = sys.argv[1]
Utilities.load_configuration()
# Subscribe to retrieve the metrics which should be used
id = "exponentialsmoothing"
State.disconnected = True
#while(True):
# State.connection = messaging.morphemic.Connection('admin', 'admin')
# State.connection.connect()
# State.connection.set_listener(id, Listener())
# State.connection.topic("test","helloid")
# State.connection.send_to_topic("test","HELLO!!!")
#exit(100)
while True:
topics_to_subscribe = ["eu.nebulouscloud.monitoring.metric_list","eu.nebulouscloud.monitoring.realtime.>","eu.nebulouscloud.forecasting.start_forecasting.exponentialsmoothing","eu.nebulouscloud.forecasting.stop_forecasting.exponentialsmoothing"]
current_consumers = []
for topic in topics_to_subscribe:
current_consumer = core.consumer.Consumer('monitoring_'+topic, topic, topic=True,fqdn=True)
State.broker_consumers.append(current_consumer)
current_consumers.append(current_consumer)
State.subscribing_connector = connector.EXN('slovid', handler=Bootstrap(),
#consumers=list(State.broker_consumers),
consumers=State.broker_consumers,
url="localhost",
port="5672",
username="admin",
password="admin"
)
#connector.start()
thread = threading.Thread(target=State.subscribing_connector.start,args=())
thread.start()
State.disconnected = False;
print_with_time("Checking (EMS) broker connectivity state, possibly ready to start")
if (State.disconnected or State.check_stale_connection()):
try:
#State.connection.disconnect() #required to avoid the already connected exception
#State.connection.connect()
State.disconnected = True
print_with_time("Possible problem in the connection")
except Exception as e:
print_with_time("Encountered exception while trying to connect to broker")
print(traceback.format_exc())
State.disconnected = True
time.sleep(5)
continue
State.disconnection_handler.acquire()
State.disconnection_handler.wait()
State.disconnection_handler.release()
#State.connector.stop()

View File

@ -0,0 +1 @@
from . import connector

View File

@ -0,0 +1,155 @@
import logging
import os
from dotenv import load_dotenv
from proton.handlers import MessagingHandler
from proton.reactor import Container
from .core import context as core_context, state_publisher, schedule_publisher
from .settings import base
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
_logger = logging.getLogger(__name__)
class ConnectorHandler:
def __init__(self):
self.initialized = False
def set_ready(self,ready, ctx:core_context.Context):
self.initialized = ready
self.ready(ctx)
def ready(self, ctx:core_context.Context):
pass
def on_message(self, key, address, body, context, **kwargs):
pass
class CoreHandler(MessagingHandler):
def __init__(self,
context,
handler: ConnectorHandler,
publishers = [],
consumers = [],
):
super(CoreHandler, self).__init__()
self.context=context
self.publishers=publishers
self.consumers=consumers
self.handler = handler
self.conn = None
def on_start(self, event) -> None:
self.conn = event.container.connect(self.context.connection)
for publisher in self.publishers:
_logger.info(f"{publisher.address} registering sender")
address = self.context.build_address_from_link(publisher)
publisher.set(event.container.create_sender(self.conn,address))
self.context.register_publisher(publisher)
_logger.debug(f"{self.context.base} Registering timer { hasattr(publisher, 'delay')}")
if hasattr(publisher, "delay"):
_logger.debug(f"{self.context.base} Registering timer")
event.reactor.schedule(publisher.delay, self)
for consumer in self.consumers:
address = self.context.build_address_from_link(consumer)
_logger.info(f"{self.context.base} Registering consumer {address}")
consumer.set(event.container.create_receiver(self.conn, address))
self.context.register_consumers(consumer)
def on_sendable(self, event):
if not self.handler.initialized:
self.handler.set_ready(True, self.context)
def on_timer_task(self, event):
_logger.debug(f"{self.context.base} On timer")
for publisher in self._delay_publishers():
publisher.send()
event.reactor.schedule(publisher.delay, self)
def on_message(self, event):
try:
for consumer in self.consumers:
if consumer.should_handle(event):
_logger.debug(f"Received message: {event.message.address}")
self.handler.on_message(consumer.key, event.message.address, event.message.body, self.context, event=event)
except Exception as e:
_logger.error(f"Received message: {e}")
def close(self):
if self.conn:
self.conn.close()
else:
_logger.warning(f"{self.context.base} No open connection")
def _delay_publishers(self):
return [p for p in self.publishers if hasattr(p,'delay')]
class EXN:
def __init__(self, component=None,
handler:ConnectorHandler = None,
publishers=[],
consumers=[],
**kwargs):
# Load .env file
load_dotenv()
# Validate and set connector
if not component:
_logger.error("Component cannot be empty or None")
raise ValueError("Component cannot be empty or None")
self.component = component
self.handler = handler
self.url = kwargs.get('url',os.getenv('NEBULOUS_BROKER_URL'))
self.port = kwargs.get('port', os.getenv('NEBULOUS_BROKER_PORT'))
self.username = kwargs.get('username',os.getenv('NEBULOUS_BROKER_USERNAME'))
self.password = kwargs.get('password', os.getenv('NEBULOUS_BROKER_PASSWORD'))
# Validate attributes
if not self.url:
_logger.error("URL cannot be empty or None")
raise ValueError("URL cannot be empty or None")
if not self.port:
_logger.error("PORT cannot be empty or None")
raise ValueError("PORT cannot be empty or None")
if not self.username:
_logger.error("USERNAME cannot be empty or None")
raise ValueError("USERNAME cannot be empty or None")
if not self.password:
_logger.error("PASSWORD cannot be empty or None")
raise ValueError("PASSWORD cannot be empty or None")
ctx = core_context.Context(
connection=f"{self.url}:{self.port}",
base=f"{base.NEBULOUS_BASE_NAME}.{self.component}",
)
if kwargs.get("enable_state",False):
publishers.append(state_publisher.Publisher())
if kwargs.get("enable_health",False):
publishers.append(schedule_publisher.Publisher(
base.NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT,
'health',
'health',
True))
core_handler = CoreHandler(
ctx,
handler,
publishers,
consumers
)
self.container = Container(core_handler)
def start(self):
self.container.run()

View File

@ -0,0 +1,8 @@
from . import context
from . import publisher
from . import consumer
from . import state_publisher
from . import schedule_publisher

View File

@ -0,0 +1,17 @@
import datetime
from proton import Message, Event
from . import link
import logging
_logger = logging.getLogger(__name__)
class Consumer(link.Link):
def on_message(self, body, **kwargs):
_logger.debug(f"{self.address} Got {body} ")
def should_handle(self, event: Event):
if event.link == self._link:
return True

View File

@ -0,0 +1,63 @@
from . import link
class Context:
def __init__(self, connection, base):
self.connection = connection
self.base = base
self.publishers = {}
self.consumers = {}
def get_publisher(self, key):
if key in self.publishers:
return self.publishers[key]
return None
def has_publisher(self, key):
return key in self.publishers
def has_consumer(self, key):
return key in self.consumers
def register_publisher(self, publisher):
self.publishers[publisher.key] = publisher
def register_consumers(self, consumer):
self.consumers[consumer.key] = consumer
def build_address_from_link(self, link: link.Link):
if link.fqdn:
address = link.address
if link.topic and not link.address.startswith("topic://"):
address = f"topic://{address}"
return address
address = f"{self.base}.{link.address}"
if link.topic:
address = f"topic://{address}"
return address
def match_address(self, l: link.Link, event):
if not event \
or not event.message \
or not event.message.address:
return False
address = self.build_address_from_link(l)
return address == event.message.address
def build_address(self, *actions, topic=False):
if len(actions) <= 0:
return self.base
address = f"{self.base}.{'.'.join(actions)}"
if topic:
address = f"topic://{address}"
return address

View File

@ -0,0 +1,18 @@
from proton import Link as pLink
class Link:
fqdn=False
def __init__(self, key, address, topic=False, fqdn=False):
self.key = key
self.address = address
self._link = None
self.topic= topic
self.fqdn= fqdn
def set(self, link:pLink):
# The proton container creates a sender
# so we just use composition instead of extension
self._link = link

View File

@ -0,0 +1,33 @@
import datetime
import logging
from proton import Message
from . import link
_logger = logging.getLogger(__name__)
class Publisher(link.Link):
def send(self, body=None):
if not body:
body = {}
_logger.debug(f"{self.address} Sending {body} ")
msg = self._prepare_message(body)
self._link.send(msg)
def _prepare_message(self, body=None):
if not body:
body = {}
send = {"when": datetime.datetime.utcnow().isoformat()}
send.update(body)
msg = Message(
address=self._link.target.address,
body=send
)
msg.content_type = "application/json"
return msg

View File

@ -0,0 +1,14 @@
import logging
from . import publisher
_logger = logging.getLogger(__name__)
class Publisher(publisher.Publisher):
send_next = False
delay = 15
def __init__(self, delay, key, address, topic=False):
super(Publisher, self).__init__(key, address, topic)
self.delay = delay

View File

@ -0,0 +1,45 @@
import datetime
import json
from enum import Enum
from proton import Message
from . import publisher
import logging
_logger = logging.getLogger(__name__)
class States(Enum):
STARTING = "starting"
STARTED = "started"
READY = "ready"
STOPPING = "stopping"
STOPPED = "stopped"
class Publisher(publisher.Publisher):
def __init__(self):
super().__init__("state","state", True)
def _send_message(self, message_type):
self.send({"state": message_type,"message": None})
def starting(self):
self._send_message(States.STARTING)
def started(self):
self._send_message(States.STARTED)
def ready(self):
self._send_message(States.READY)
def stopping(self):
self._send_message(States.STOPPING)
def stopped(self):
self._send_message(States.STOPPED)
def custom(self, state):
self._send_message(state)

View File

@ -0,0 +1 @@
from . import base

View File

@ -0,0 +1,2 @@
NEBULOUS_BASE_NAME="eu.nebulous"
NEBULOUS_DEFAULT_HEALTH_CHECK_TIMEOUT=15

View File

@ -0,0 +1,62 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import threading
from jproperties import Properties
class State:
"""
Fail-safe default values introduced below
"""
prediction_data_filename = "default_application.csv"
MONITORING_DATA_PREFIX = "monitoring"
FORECASTING_CONTROL_PREFIX = "forecasting"
#Used to create the dataset from the InfluxDB
application_name = "default_application"
influxdb_bucket = "nebulous"
influxdb_organization = "nebulous"
influxdb_token = "tzIfpbU9b77quyvN0yHIbWltSh1c1371-o9nl_wJYaeo5TWdk5txyxXhp2iaLVMvOvf020HnEEAkE0yy5AllKQ=="
influxdb_dbname = "nebulous"
influxdb_password = "adminadmin"
influxdb_username = "admin"
influxdb_port = 8086
influxdb_hostname = "localhost"
path_to_datasets = "./datasets"
dataset_file_name = "exponential_smoothing_dataset.csv"
number_of_days_to_use_data_from = 365
#Forecaster variables
metrics_to_predict = []
epoch_start = 0
next_prediction_time = 0
previous_prediction = None
configuration_file_location="prediction_configuration.properties"
configuration_details = Properties()
prediction_processing_time_safety_margin_seconds = 20
disconnected = True
disconnection_handler = threading.Condition()
initial_metric_list_received = False
testing_prediction_functionality = False
total_time_intervals_to_predict = 8
#Connection details
subscribing_connector = None
publishing_connector = None
broker_publishers = []
broker_consumers = []
connector = None
broker_address = "localhost"
broker_port = 5672
broker_username = "admin"
broker_password = "admin"
@staticmethod
#TODO inspect State.connection
def check_stale_connection():
return (not State.subscribing_connector)

View File

@ -0,0 +1,5 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

View File

@ -0,0 +1,31 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
class Prediction():
value = None
lower_confidence_interval_value = None
upper_confidence_interval_value = None
prediction_valid=False
#Error metrics
mae = None
mse = None
mape = None
smape = None
def __init__(self,value,confidence_interval_tuple,prediction_valid,prediction_mae,prediction_mse,prediction_mape,prediction_smape):
self.value = value
self.lower_confidence_interval_value,self.upper_confidence_interval_value = map(float,confidence_interval_tuple.split(","))
self.prediction_valid = prediction_valid
self.mae = prediction_mae
self.mse = prediction_mse
self.mape = prediction_mape
self.smape = prediction_smape
def set_last_prediction_time_needed(self,prediction_time_needed):
self.last_prediction_time_needed = prediction_time_needed
def get_error_metrics_string(self):
return self.mae+";"+self.mse+";"+self.mape+";"+self.smape

View File

@ -0,0 +1,5 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

View File

@ -0,0 +1,60 @@
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime
from influxdb_client.client.write_api import SYNCHRONOUS
from runtime.operational_status.State import State
#import influxdb_client, os, time
#from influxdb_client import InfluxDBClient, Point, WritePrecision
#from influxdb_client.client.write_api import SYNCHRONOUS
#token = Constants.token
#org = "nebulous"
#url = "http://localhost:8086"
#write_client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)
#bucket="nebulous"
#write_api = client.write_api(write_options=SYNCHRONOUS)
#
#for value in range(5):
# point = (
# Point("measurement1")
# .tag("tagname1", "tagvalue1")
# .field("field1", value)
# )
# write_api.write(bucket=bucket, org="nebulous", record=point)
# time.sleep(1) # separate points by 1 second
data = [
{
"measurement": "temperature",
"tags": {"location": "Prague"},
"fields": {"temperature": 25.3}
}
]
class InfluxDBConnector:
client = InfluxDBClient(url="http://"+State.influxdb_hostname+":"+str(State.influxdb_port), token=State.influxdb_token, org=State.influxdb_organization)
write_api = client.write_api(write_options=SYNCHRONOUS)
def InfluxDBConnector(self):
pass
def write_data(self,data):
self.write_api.write(bucket=State.influxdb_bucket, org=State.influxdb_organization, record=data, write_precision=WritePrecision.S)
def get_data(self):
query_api = self.client.query_api()
query = """from(bucket: "nebulous")
|> range(start: -1m)
|> filter(fn: (r) => r._measurement == "temperature")"""
tables = query_api.query(query, org=State.influxdb_organization)
for table in tables:
for record in table.records:
print(record)

View File

@ -0,0 +1,11 @@
from runtime.exn import core
class PredictionPublisher(core.publisher.Publisher):
metric_name = ""
def __init__(self,metric_name):
super().__init__('publisher_'+metric_name, 'eu.nebulouscloud.preliminary_predicted.'+metric_name, True,True)
self.metric_name = metric_name
def send(self, body={}):
super(PredictionPublisher, self).send(body)

View File

@ -0,0 +1,114 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import pathlib
#from morphemic.dataset import DatasetMaker
import datetime
import time,os
from dateutil import parser
from runtime.operational_status.State import State
from runtime.utilities.InfluxDBConnector import InfluxDBConnector
class DatasetMaker:
pass
class Utilities:
@staticmethod
def print_with_time(x):
now = datetime.datetime.now()
print("["+now.strftime('%Y-%m-%d %H:%M:%S')+"] "+str(x))
@staticmethod
def load_configuration():
with open(State.configuration_file_location,'rb') as config_file:
State.configuration_details.load(config_file)
#prediction_horizon = configuration_details.get("prediction_horizon")
State.dataset_file_name = State.configuration_details.get("input_data_file").data
State.number_of_days_to_use_data_from = int(State.configuration_details.get("number_of_days_to_use_data_from").data)
State.prediction_processing_time_safety_margin_seconds = int(State.configuration_details.get("prediction_processing_time_safety_margin_seconds").data)
State.testing_prediction_functionality = State.configuration_details.get("testing_prediction_functionality").data.lower() == "true"
State.path_to_datasets = State.configuration_details.get("path_to_datasets").data
State.broker_address = State.configuration_details.get("broker_address").data
State.broker_port = int(State.configuration_details.get("broker_port").data)
State.broker_username = State.configuration_details.get("broker_username").data
State.broker_password = State.configuration_details.get("broker_password").data
State.influxdb_hostname = State.configuration_details.get("INFLUXDB_HOSTNAME").data
State.influxdb_port = int(State.configuration_details.get("INFLUXDB_PORT").data)
State.influxdb_username = State.configuration_details.get("INFLUXDB_USERNAME").data
State.influxdb_password = State.configuration_details.get("INFLUXDB_PASSWORD").data
State.influxdb_dbname = State.configuration_details.get("INFLUXDB_DBNAME").data
State.influxdb_org = State.configuration_details.get("INFLUXDB_ORG").data
State.application_name = State.configuration_details.get("APP_NAME").data
#This method accesses influx db to retrieve the most recent metric values.
@staticmethod
def update_monitoring_data():
#query(metrics_to_predict,number_of_days_for_which_data_was_retrieved)
#save_new_file()
Utilities.print_with_time("Starting dataset creation process...")
try:
"""
Deprecated functionality to retrieve dataset creation details. Relevant functionality moved inside the load configuration method
influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME","localhost")
influxdb_port = int(os.environ.get("INFLUXDB_PORT","8086"))
influxdb_username = os.environ.get("INFLUXDB_USERNAME","morphemic")
influxdb_password = os.environ.get("INFLUXDB_PASSWORD","password")
influxdb_dbname = os.environ.get("INFLUXDB_DBNAME","morphemic")
influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic")
application_name = "default_application"
"""
metric_names = ["cpu_usage","ram_usage"]
for metric_name in State.metrics_to_predict:
time_interval_to_get_data_for = str(State.number_of_days_to_use_data_from)+"d"
print_data_from_db = True
query_string = 'from(bucket: "'+State.influxdb_bucket+'") |> range(start:-'+time_interval_to_get_data_for+') |> filter(fn: (r) => r["_measurement"] == "'+metric_name+'")'
influx_connector = InfluxDBConnector()
print("performing query")
current_time = time.time()
result = influx_connector.client.query_api().query(query_string,State.influxdb_organization)
elapsed_time = time.time()-current_time
print("performed query, it took "+str(elapsed_time) + " seconds")
#print(result.to_values())
with open(Utilities.get_prediction_data_filename(State.configuration_file_location,metric_name), 'w') as file:
for table in result:
#print header row
file.write("Timestamp,ems_time,"+metric_name+"\r\n")
for record in table.records:
dt = parser.isoparse(str(record.get_time()))
epoch_time = int(dt.timestamp())
metric_value = record.get_value()
if(print_data_from_db):
file.write(str(epoch_time)+","+str(epoch_time)+","+str(metric_value)+"\r\n")
# Write the string data to the file
except Exception as e:
Utilities.print_with_time("Could not create new dataset as an exception was thrown")
print(e)
@staticmethod
def get_prediction_data_filename(configuration_file_location,metric_name):
from jproperties import Properties
p = Properties()
with open(configuration_file_location, "rb") as f:
p.load(f, "utf-8")
path_to_datasets, metadata = p["path_to_datasets"]
application_name, metadata = p["application_name"]
path_to_datasets = Utilities.fix_path_ending(path_to_datasets)
return "" + str(path_to_datasets) + str(application_name) + "_"+metric_name+ ".csv"
@staticmethod
def fix_path_ending(path):
if (path[-1] is os.sep):
return path
else:
return path + os.sep

View File

@ -0,0 +1,5 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

View File

@ -0,0 +1,40 @@
# Copyright (c) 2023 Institute of Communication and Computer Systems
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
from distutils.core import setup
setup(
# Application name:
name="esm_forecaster",
# Version number (initial):
version="0.1.0",
# Application author details:
author="Andreas Tsagkaropoulos",
author_email="atsagkaropoulos@mail.ntua.gr",
# Packages
packages=["r_predictors","runtime","runtime.exn","runtime.operational_status","runtime.utilities","runtime.predictions"],
# Include additional files into the package
include_package_data=True,
# Details
#url="http://pypi.python.org/pypi/exponential_smoothing_forecaster/",
#
# license="LICENSE.txt",
description="A utility to generate monitoring metric predictions for the Morphemic platform using exponential smoothing.",
# long_description=open("README.txt").read(),
# Dependent packages (distributions)
install_requires=[
"python-slugify",
"jproperties"
],
)

View File

@ -8,15 +8,15 @@
- nebulous-exponential-smoothing-predictor-container-images - nebulous-exponential-smoothing-predictor-container-images
description: Build the container images. description: Build the container images.
files: &image_files files: &image_files
- ^java-spring-boot-demo/ - ^exponential-smoothing-predictor/
vars: &image_vars vars: &image_vars
promote_container_image_job: nebulous-exponential-smoothing-predictor-upload-container-images promote_container_image_job: nebulous-exponential-smoothing-predictor-upload-container-images
container_images: container_images:
- context: java-spring-boot-demo - context: exponential-smoothing-predictor
registry: quay.io registry: quay.io
repository: quay.io/nebulous/exponential-smoothing-predictor-java-spring-boot-demo repository: quay.io/nebulous/exponential-smoothing-predictor-exponential-smoothing-predictor
namespace: nebulous namespace: nebulous
repo_shortname: exponential-smoothing-predictor-java-spring-boot-demo repo_shortname: exponential-smoothing-predictor-exponential-smoothing-predictor
repo_description: "" repo_description: ""
- job: - job:
@ -44,7 +44,7 @@
description: Run Hadolint on Dockerfile(s). description: Run Hadolint on Dockerfile(s).
vars: vars:
dockerfiles: dockerfiles:
- java-spring-boot-demo/Dockerfile - exponential-smoothing-predictor/Dockerfile
- job: - job:
name: nebulous-exponential-smoothing-predictor-helm-lint name: nebulous-exponential-smoothing-predictor-helm-lint