On of the most important challenges nowadays in programming is concurrency. If we don’t learn to write programs that are able to run on multiple cores the progress in hardware will be pointless. But when you run multiple threads for various processing you might face the situation when you have to write over and over again same or similar code for creating the threads, setting up the parameters for the threads, joining the threads, checking the result, cleaning-up, etc.
In this post I will show how you can create some helpers in C++ to simplify this process. This is not going to be a full solution, neither a solution that fits all needs, but can be a start.
What I would like to have is a helper class that will take care of:
- finding how many threads can run (considering each available core can run a thread)
- creating and starting the threads
- joining the threads
- checking the result of the threads execution
- cleaning up
The class show bellow does just that.
#include < windows.h >
class ThreadHelper
{
LPVOID* m_Params;
int m_ThreadsNo;
private:
int GetProcessorsCount()
{
SYSTEM_INFO info;
::GetSystemInfo(&info);
return info.dwNumberOfProcessors;
}
public:
ThreadHelper()
{
m_ThreadsNo = GetProcessorsCount();
m_Params = new LPVOID[m_ThreadsNo];
for(int i = 0; i < m_ThreadsNo; ++i)
m_Params[i] = NULL;
}
ThreadHelper(int threadsNo)
{
if(threadsNo < 1)
m_ThreadsNo = GetProcessorsCount();
else
m_ThreadsNo = threadsNo;
m_Params = new LPVOID[m_ThreadsNo];
for(int i = 0; i < m_ThreadsNo; ++i)
m_Params[i] = NULL;
}
~ThreadHelper()
{
delete [] m_Params;
}
int GetThreadsNo() const {return m_ThreadsNo;}
bool SetThreadParams(int threadIndex, LPVOID lpData)
{
if(threadIndex >= 0 && threadIndex < m_ThreadsNo)
{
m_Params[threadIndex] = lpData;
return true;
}
return false;
}
bool Run(LPTHREAD_START_ROUTINE threadProc, BOOL startImmediatelly, DWORD timeout = INFINITE)
{
bool success = false;
HANDLE* hThreads = new HANDLE[m_ThreadsNo];
DWORD* dwThreadIds = new DWORD[m_ThreadsNo];
bool allThreadsOK = true;
// create the threads
for(int i = 0; i < m_ThreadsNo && allThreadsOK; ++i)
{
hThreads[i] = ::CreateThread(
NULL,
0,
threadProc,
m_Params[i],
startImmediatelly ? 0 : CREATE_SUSPENDED,
&dwThreadIds[i]);
if(hThreads[i] == NULL)
{
for(int j = 0; j < i; ++j)
{
::CloseHandle(hThreads[j]);
}
allThreadsOK = false;
}
}
if(allThreadsOK)
{
// start the threads if they were suspended first
if(!startImmediatelly)
{
for(int i = 0; i < m_ThreadsNo; ++i)
{
::ResumeThread(hThreads[i]);
}
}
// wait for all threads
DWORD joinret = ::WaitForMultipleObjects(
m_ThreadsNo,
hThreads,
TRUE,
timeout);
if(joinret == WAIT_FAILED)
{
}
else if(joinret = WAIT_TIMEOUT)
{
}
else if(joinret >= WAIT_OBJECT_0 && joinret < WAIT_OBJECT_0 + m_ThreadsNo)
{
success = true;
}
else if(joinret >= WAIT_ABANDONED_0 && joinret < WAIT_ABANDONED_0 + m_ThreadsNo)
{
}
// close the thread handles
for(int i = 0; i < m_ThreadsNo; ++i)
{
::CloseHandle(hThreads[i]);
}
}
delete [] hThreads;
delete [] dwThreadIds;
return success;
}
};
This helper class contains:
- one parameter-less constructor that identifies the number of available processors and sets the threads count equal to the processors count
- one constructor that takes the number of threads that should be created
- one method (SetThreadParams) for setting the parameters for each thread that will be created
- one method (Run) that creates and runs the thread, waits for them and checks the result of the execution
As you can see the Run() method is simplistic. It does not handle for instance timed out or abandoned thread executions. Also it joins all threads, waiting until all of them finished execution. A more flexible method could wait only until the first thread finishes and then maybe closes the other threads. But as I said, this is a sample and not a complete solution.
Having this helper set up, I will start several threads to find the prime numbers in a sequence and print them in the console.
The following function computes whether a number is prime/
#include < cmath >
bool IsPrime(int number)
{
const int max = static_cast< int >(
std::sqrt(static_cast< double >(number))) + 1;
for (int i=2; i!=max; ++i)
{
if (number % i == 0) return false;
}
return true;
}
The thread procedure will run through a sub-sequence of a vector of integers and verify if each element is prime. I will use the following structure to pass the sequence bounds to the thread procedure:
struct vector_bounds
{
std::vector< int >::const_iterator begin;
std::vector< int >::const_iterator end;
};
The thread procedure could look like this:
static CRITICAL_SECTION cs;
DWORD WINAPI FindPrimes(LPVOID lpData)
{
vector_bounds* bounds = static_cast< vector_bounds* >(lpData);
if(bounds == NULL)
return 1;
for(std::vector< int >::const_iterator cit = bounds->begin;
cit != bounds->end; ++cit)
{
if(IsPrime(*cit))
{
EnterCriticalSection(&cs);
std::cout << *cit << std::endl;
LeaveCriticalSection(&cs);
}
}
return 0;
};
To print to the console a locking mechanism is necessary, otherwise prints from two different threads could collide. The critical section will be initialized before the threads are started.
What remains to be done is generating a sequence of integers, setting up the parameters with the sequence bounds for each thread and run the threads using the helper.
int main()
{
// generate some random numbers
srand((unsigned long)time(NULL));
std::vector< int > numbers;
std::generate_n(std::back_inserter(numbers), 1000, rand);
// create the thread helper
ThreadHelper helper(4);
int threads = helper.GetThreadsNo();
// create the parameters for the threads
std::vector< vector_bounds > params;
std::vector< int >::const_iterator begin = numbers.begin();
size_t partitionsize = numbers.size()/threads;
for(int i = 0; i < threads; ++i)
{
vector_bounds bound;
bound.begin = begin;
bound.end = (i == threads - 1) ? numbers.end() : begin + partitionsize;
params.push_back(bound);
begin = bound.end;
}
for(int i = 0; i < threads; ++i)
helper.SetThreadParams(i, ¶ms[i]);
// run the threads
InitializeCriticalSection(&cs);
std::cout << "start running..." << std::endl;
bool success = helper.Run(FindPrimes, FALSE);
std::cout << "finished " << (success? "successfully" : "failed") << std::endl;
DeleteCriticalSection(&cs);
return 0;
}
Having this threads helper class, what I need to do when running some processing in several threads is:
- setup thread parameters (if the case)
- write the thread procedure
- create a ThreadHelper object and initialize it
- run the threads and collect the results
The helper class prevents writing same code over and over again and helps focusing on the most important tasks: writing the thread procedure. As I said earlier it is not a full solution, nor one that fits all scenarios, but you can develop it to suit your needs.
C++ multithreads threads concurrency Hits for this post: 838 .