Create worker thread
This change list is to create worker thread mechanism for the upcoming one time tasks to run in separated thread. Story: 2003577 Task: 24898 Change-Id: I5378b80763b104bcf0af95cb083de0cf61463788 Signed-off-by: Bin Qian <bin.qian@windriver.com>
This commit is contained in:
parent
41a543c346
commit
a0c243b4a3
@ -104,6 +104,7 @@ SRCS+=fm_api_wrapper.c
|
||||
SRCS+=sm_failover.c
|
||||
SRCS+=sm_failover_thread.c
|
||||
SRCS+=sm_swact_state.c
|
||||
SRCS+=sm_worker_thread.cpp
|
||||
SRCS+=sm_task_affining_thread.c
|
||||
SRCS+=sm_node_swact_monitor.cpp
|
||||
SRCS+=sm_service_domain_interface_not_in_use_state.c
|
||||
|
@ -53,6 +53,7 @@
|
||||
#include "sm_failover.h"
|
||||
#include "sm_failover_thread.h"
|
||||
#include "sm_task_affining_thread.h"
|
||||
#include "sm_worker_thread.h"
|
||||
|
||||
#define SM_PROCESS_DB_CHECKPOINT_INTERVAL_IN_MS 30000
|
||||
#define SM_PROCESS_TICK_INTERVAL_IN_MS 200
|
||||
@ -192,6 +193,14 @@ static SmErrorT sm_process_initialize( void )
|
||||
return( SM_FAILED );
|
||||
}
|
||||
|
||||
error = SmWorkerThread::initialize();
|
||||
if( SM_OKAY != error )
|
||||
{
|
||||
DPRINTFE( "Failed to initialize worker thread, error=%s.",
|
||||
sm_error_str( error ) );
|
||||
return( SM_FAILED );
|
||||
}
|
||||
|
||||
error = sm_msg_initialize();
|
||||
if( SM_OKAY != error )
|
||||
{
|
||||
@ -317,7 +326,7 @@ static SmErrorT sm_process_initialize( void )
|
||||
DPRINTFE( "Failed to initialize service heartbeat api module, "
|
||||
"error=%s.", sm_error_str( error ) );
|
||||
return( SM_FAILED );
|
||||
}
|
||||
}
|
||||
|
||||
error = sm_service_heartbeat_thread_start();
|
||||
if( SM_OKAY != error )
|
||||
@ -326,7 +335,7 @@ static SmErrorT sm_process_initialize( void )
|
||||
sm_error_str(error) );
|
||||
return( error );
|
||||
}
|
||||
|
||||
|
||||
error = sm_main_event_handler_initialize();
|
||||
if( SM_OKAY != error )
|
||||
{
|
||||
@ -360,7 +369,7 @@ static SmErrorT sm_process_initialize( void )
|
||||
DPRINTFE( "Failed to start the task affining thread, error=%s.",
|
||||
sm_error_str( error ) );
|
||||
return( SM_FAILED );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return( SM_OKAY );
|
||||
@ -418,7 +427,7 @@ static SmErrorT sm_process_finalize( void )
|
||||
{
|
||||
DPRINTFE( "Failed to finalize service heartbeat api module, "
|
||||
"error=%s.", sm_error_str( error ) );
|
||||
}
|
||||
}
|
||||
|
||||
error = sm_service_api_finalize();
|
||||
if( SM_OKAY != error )
|
||||
@ -511,6 +520,13 @@ static SmErrorT sm_process_finalize( void )
|
||||
sm_error_str( error ) );
|
||||
}
|
||||
|
||||
error = SmWorkerThread::finalize();
|
||||
if( SM_OKAY != error )
|
||||
{
|
||||
DPRINTFE( "Failed to finalize worker thread, error=%s.",
|
||||
sm_error_str( error ) );
|
||||
}
|
||||
|
||||
error = sm_hw_finalize();
|
||||
if( SM_OKAY != error )
|
||||
{
|
||||
@ -720,7 +736,7 @@ SmErrorT sm_process_main( int argc, char *argv[], char *envp[] )
|
||||
error = sm_process_initialize();
|
||||
if( SM_OKAY != error )
|
||||
{
|
||||
DPRINTFE( "Failed initialize process, error=%s.",
|
||||
DPRINTFE( "Failed initialize process, error=%s.",
|
||||
sm_error_str(error) );
|
||||
return( error );
|
||||
}
|
||||
@ -843,7 +859,7 @@ SmErrorT sm_process_main( int argc, char *argv[], char *envp[] )
|
||||
error = sm_process_finalize();
|
||||
if( SM_OKAY != error )
|
||||
{
|
||||
DPRINTFE( "Failed finalize process, error=%s.",
|
||||
DPRINTFE( "Failed finalize process, error=%s.",
|
||||
sm_error_str(error) );
|
||||
}
|
||||
|
||||
|
174
service-mgmt/sm-1.0.0/src/sm_worker_thread.cpp
Normal file
174
service-mgmt/sm-1.0.0/src/sm_worker_thread.cpp
Normal file
@ -0,0 +1,174 @@
|
||||
//
|
||||
// Copyright (c) 2018 Wind River Systems, Inc.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
#include "sm_worker_thread.h"
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <errno.h>
|
||||
#include <unistd.h>
|
||||
#include "sm_util_types.h"
|
||||
#include "sm_debug.h"
|
||||
|
||||
SmWorkerThread SmWorkerThread::_the_worker;
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread::get_worker get the thread singleton object
|
||||
// ****************************************************************************
|
||||
SmWorkerThread& SmWorkerThread::get_worker()
|
||||
{
|
||||
return SmWorkerThread::_the_worker;
|
||||
}
|
||||
// ****************************************************************************
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread::initialize initialize the worker thread
|
||||
// ****************************************************************************
|
||||
SmErrorT SmWorkerThread::initialize()
|
||||
{
|
||||
SmWorkerThread::get_worker().go();
|
||||
return SM_OKAY;
|
||||
}
|
||||
// ****************************************************************************
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread::finalize finalize the worker thread
|
||||
// ****************************************************************************
|
||||
SmErrorT SmWorkerThread::finalize()
|
||||
{
|
||||
SmWorkerThread::get_worker().stop();
|
||||
return SM_OKAY;
|
||||
}
|
||||
// ****************************************************************************
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread::SmWorkerThread
|
||||
// ****************************************************************************
|
||||
SmWorkerThread::SmWorkerThread() : _priority_queue(), _regular_queue()
|
||||
{
|
||||
this->_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
this->_goon = true;
|
||||
this->_thread_created = false;
|
||||
}
|
||||
// ****************************************************************************
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread::~SmWorkerThread
|
||||
// ****************************************************************************
|
||||
SmWorkerThread::~SmWorkerThread()
|
||||
{
|
||||
sem_destroy(&this->_sem);
|
||||
}
|
||||
// ****************************************************************************
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread::thread_helper helper function for the thread entry
|
||||
// ****************************************************************************
|
||||
void* SmWorkerThread::thread_helper(SmWorkerThread* workerThread)
|
||||
{
|
||||
workerThread->thread_run();
|
||||
return 0;
|
||||
}
|
||||
// ****************************************************************************
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread::go prepare and start the thread
|
||||
// ****************************************************************************
|
||||
SmErrorT SmWorkerThread::go()
|
||||
{
|
||||
if(this->_thread_created)
|
||||
{
|
||||
DPRINTFE("Worker thread has already been created");
|
||||
return SM_FAILED;
|
||||
}
|
||||
|
||||
this->_thread_created = true;
|
||||
if( 0 != sem_init(&this->_sem, 0, MAX_QUEUED_ACTIONS) )
|
||||
{
|
||||
DPRINTFE("Cannot init semaphore");
|
||||
return SM_FAILED;
|
||||
}
|
||||
|
||||
int result = pthread_create( &this->_thread, NULL,
|
||||
(void*(*)(void*))SmWorkerThread::thread_helper, (void*)this );
|
||||
if( 0 != result )
|
||||
{
|
||||
DPRINTFE("Failed to create thread. Error %d", errno);
|
||||
return SM_FAILED;
|
||||
}
|
||||
return SM_OKAY;
|
||||
}
|
||||
// ****************************************************************************
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread::stop stop the worker thread
|
||||
// ****************************************************************************
|
||||
SmErrorT SmWorkerThread::stop()
|
||||
{
|
||||
this->_goon = false;
|
||||
void* result = NULL;
|
||||
int res = pthread_join(this->_thread, &result);
|
||||
|
||||
if(0 != res)
|
||||
{
|
||||
DPRINTFE("pthread_join failed. Error %d", res);
|
||||
}
|
||||
if(NULL != result)
|
||||
{
|
||||
free(result);
|
||||
}
|
||||
return SM_OKAY;
|
||||
}
|
||||
// ****************************************************************************
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread::add_action add a regular priority action
|
||||
// ****************************************************************************
|
||||
void SmWorkerThread::add_action(SmAction& action)
|
||||
{
|
||||
mutex_holder(&this->_mutex);
|
||||
this->_regular_queue.push(&action);
|
||||
}
|
||||
// ****************************************************************************
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread::add_priority_action add a high priority action
|
||||
// ****************************************************************************
|
||||
void SmWorkerThread::add_priority_action(SmAction& action)
|
||||
{
|
||||
mutex_holder(&this->_mutex);
|
||||
this->_priority_queue.push(&action);
|
||||
}
|
||||
// ****************************************************************************
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread::thread_run main loop of the worker thread
|
||||
// ****************************************************************************
|
||||
void SmWorkerThread::thread_run()
|
||||
{
|
||||
while(this->_goon)
|
||||
{
|
||||
if(0 == sem_trywait(&this->_sem))
|
||||
{
|
||||
SmAction* action = NULL;
|
||||
if(!this->_priority_queue.empty())
|
||||
{
|
||||
action = this->_priority_queue.front();
|
||||
this->_priority_queue.pop();
|
||||
}else if(!this->_regular_queue.empty())
|
||||
{
|
||||
action = this->_regular_queue.front();
|
||||
this->_regular_queue.pop();
|
||||
}
|
||||
if( NULL != action )
|
||||
{
|
||||
action->action();
|
||||
}
|
||||
}else if(EAGAIN != errno)
|
||||
{
|
||||
DPRINTFE("Semaphore wait failed. Error %d", errno);
|
||||
}
|
||||
}
|
||||
}
|
||||
// ****************************************************************************
|
81
service-mgmt/sm-1.0.0/src/sm_worker_thread.h
Normal file
81
service-mgmt/sm-1.0.0/src/sm_worker_thread.h
Normal file
@ -0,0 +1,81 @@
|
||||
//
|
||||
// Copyright (c) 2018 Wind River Systems, Inc.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
#ifndef __SM_WORKER_THREAD_H__
|
||||
#define __SM_WORKER_THREAD_H__
|
||||
|
||||
#include "sm_types.h"
|
||||
#include <queue>
|
||||
#include <pthread.h>
|
||||
#include <semaphore.h>
|
||||
|
||||
#define MAX_QUEUED_ACTIONS 24
|
||||
// ****************************************************************************
|
||||
// SmAction interface, action to process in a worker thread
|
||||
// ****************************************************************************
|
||||
class SmAction
|
||||
{
|
||||
public:
|
||||
virtual void action() = 0;
|
||||
};
|
||||
|
||||
typedef std::queue<SmAction*> SmActionQueueT;
|
||||
class SmWorkerThread;
|
||||
|
||||
// ****************************************************************************
|
||||
// SmWorkerThread work thread class
|
||||
// ****************************************************************************
|
||||
class SmWorkerThread
|
||||
{
|
||||
public:
|
||||
SmWorkerThread();
|
||||
virtual ~SmWorkerThread();
|
||||
|
||||
/* Add an action to the normal priority FCFS queue.
|
||||
A normal priority action will be scheduled after all
|
||||
high priority actions.
|
||||
*/
|
||||
void add_action(SmAction& action);
|
||||
/*
|
||||
Add an action to high priority FCFS queue.
|
||||
A high priority action is nonpreemptive. It will
|
||||
be scheduled after the current action.
|
||||
*/
|
||||
void add_priority_action(SmAction& action);
|
||||
|
||||
|
||||
// retrieve singleton object
|
||||
static SmWorkerThread& get_worker();
|
||||
// initialize worker thread
|
||||
static SmErrorT initialize();
|
||||
// stop worker thread
|
||||
static SmErrorT finalize();
|
||||
|
||||
private:
|
||||
pthread_mutex_t _mutex;
|
||||
SmActionQueueT _priority_queue;
|
||||
SmActionQueueT _regular_queue;
|
||||
sem_t _sem;
|
||||
pthread_t _thread;
|
||||
bool _thread_created;
|
||||
bool _goon;
|
||||
|
||||
// create worker thread and run tasks
|
||||
SmErrorT go();
|
||||
// stop processing tasks and stop the worker thread
|
||||
SmErrorT stop();
|
||||
|
||||
// run the thread
|
||||
void thread_run();
|
||||
/* help function to provide function pointer for
|
||||
creating the worker thread
|
||||
*/
|
||||
static void* thread_helper(SmWorkerThread* me);
|
||||
|
||||
static SmWorkerThread _the_worker;
|
||||
|
||||
};
|
||||
|
||||
#endif //__SM_WORKER_THREAD_H__
|
Loading…
Reference in New Issue
Block a user