Saturday, January 11, 2014

Chapter:12 Std::Thread Support

Support of multi threading was incorporated in the C++11. This is the best feature in C++11 as per my opinion. It has very clean interface and very easy to use. We all know how difficult it is to write multi threading programming using the system specific libraries. I mean to pass the  argument to thread function, we have to typecast to some system specific types. We can not directly pass more than one argument to the thread function. If we had to, we had to create some structure and club all those argument and then pass those to function.


All those steps are source of error and we sometime forget to do these things in our program. On the other hand C++11 thread has very very  clean and easy interface. It does not impose any constraints about how and how many argument we can pass to the thread function. In addition to this, 'std::thread' are well written class so encapsulate many useful information related to that particular thread in very elegant manner.


'std::thread' also provides the interface by which you can use the system specific libraries which we have been using till now. So this is kind of two layer abstraction where you have option to write your code which  is not dependent on any system specific calls and if whenever you want to write system specific calls, there is provision for that as well.


C++11 standard has also incorporated the tools for the synchronization. There are rich set of classes available which we can use to synchronize our code/data. There is also features for writing the lock free programming. There is also task based concurrency support available in the new standard.


These features are not just good they are great. Once you would start using them you would realize the benefits of this standard library feature.


This example demonstrate all the key concepts which is required to use  the 'std::thread'.  If we want to executive any function under a particular thread, all we need to pass its address and argument in the constructor of 'std::thread'. Here my thread function needs 'string' data type so that it can use it inside the function.


Once we create the thread object, thread function would start executing as soon  as system schedules this particular thread. However as we do not know  how much time the new thread would take to complete its execution. So as usual parent thread needs to wait for the completion of its launched  thread. I feel this is absolutely necessary due to couple of reasons. First if you have launched something, ideally you would like to make use of that work, otherwise in normal circumstances it does not make much sense. The second reason is more technical. We could verify that all necessary infrastructure like variables and thread object itself gets created on the stack of parent  thread. So if parent thread would not wait and its continue its execution and finishes before the child thread finishes its work, we are in big big trouble. Sooner something nasty is bound to happen. So to avoid those,  system throws and error and terminate the program if parent thread do not wait for its child thread. By this way it maintains the integrity of  the system and also provide the appropriate error message close to the problem. Hopefully user would be able to identify the problem and fix it.


The 'join()' thread class function does this for us. So always remember to use this in the parent thread context to ensure that parent does not finish before the child thread is completed.


'hardware concurrency' is another useful static type thread class function which provides us the number of cores of our machine has. This is very important function  provided by standard as we do not have to write any system specific  code to fetch such low level information. Number of core is very important and would be very important information in multi core,  multi threading environment. We can make use of these while  maintaining the thread pool or in some sort of load balance or how many threads should be created in a particular program.  Its priceless information and we get it in free.



//Code Begins

#include<vector>
#include<iostream>
#include<thread>
#include<string>
#include<chrono>
using namespace std::chrono;

template<typename T>
void display(const T& val) {
    std::cout<<val<<std::endl;
}

int tfunction(std::string n) {
    display(n);
    auto sz=n.size();
    return sz;
}

void learn_std_thread(void) {
    std::string arg_x{"Sachin 10Dulkar"};
    std::thread t_x{tfunction,arg_x};
    t_x.join();
}

int main(int argc, const char* argv[]) {
    std::size_t ncore=std::thread::hardware_concurrency();
    display("Number of Core On This Machine Is:");
    display(ncore);
    learn_std_thread();
    return 0;
}

//Code Ends



When I run the above program, I get the following output which is expected. Please remember that the output may vary as we run this program on different machine. On my machine, number of core is 4.


//Console Output Begins
$ ./test
Number of Core On This Machine Is:
4
Sachin 10Dulkar
//Console Output Ends


The below example looks complicated at the first glance. However complete program can be broken into the following sections:

How we find out the number of core on your machine?
Thread library 'hardware concurrency' provide the no.of cores.

How to create the empty thread pool with size equal to no of core?
We can use create the vector of 'std::thread' of size equal to no.of core.

How we can check  a particular thread is associated with a function or not?
Standard has provided the one static variable 'thread::id{}' which is kind of denotes the invalid id/handle. If your thread id matches with this variable, that
indicates that that thread is not associated with any task.

How we can assign a task to a particular thread from the thread pool?
Once we find the empty thread slot in our thread pool, we can create one local thread object and after that we can call 'std::move' to that particular thread slot. This would associate a particular task with a thread. Due to above call now  thread pool would have valid thread handle so we can do whatever we want to fetch from this thread handle.

How we can use system specific thread library using native handle of thread?
Standard provides the 'native handle()' member function in the 'std::thread' class. We all know that all system specific libraries work on some kind of handle. So we can fetch the 'native handle()' our 'std::thread' and pass this information to system specific libraries. Those would start working. This is big big bonus with the 'std::thread' as it provide us the two layer abstraction. However until its absolutely must, I think we should avoid to mix the both uses as it would lead to non-portable code. So think before using 'native handle()' information in your code.

How we parent thread can wait for its child completion?
As usual 'join()' would do this for us. It would block the parent thread until its child finishes its task and return to it.



//Code Begins

#include<vector>
#include<iostream>
#include<thread>
#include<string>
#include<pthread.h>
using namespace std::chrono;
std::size_t ncore{};

template<typename T>
void display(const T& val) {
    std::cout<<val<<std::endl;
}

int tfunction(std::string n) {
    display(n);
    auto sz=n.size();
    return sz;
}

void learn_std_thread(void) {
    std::vector<std::thread> tpool(ncore);
    for(const auto& x: tpool) {
        if(x.get_id()==std::thread::id{}) {
            display("Current Thread is Not Running");
        }
    }
    std::string arg_x{"Hello, C++11"};
    std::thread t_x{tfunction,arg_x};
    tpool[0] = std::move(t_x);
    if(tpool[0].get_id()==std::thread::id{}) {
        display("It should not happen at this point");
    }
    tpool[0].join();

    std::string arg_y{"C++11, Thread Library"};
    std::thread t_y{tfunction,arg_y};
    tpool[1] = std::move(t_y);
    auto nhandle = tpool[1].native_handle();
    void* value_ptr{nullptr};
    auto ret = pthread_join(nhandle, &value_ptr);
    if(ret != 0) {
        display("pthread_join calls is unsucessfull");
    }
}


int main(int argc, const char* argv[]) {
    ncore= std::thread::hardware_concurrency();
    display("Number of Core On This Machine Is: ");
    display(ncore);
    learn_std_thread();
    return 0;
}

//Code Ends




When I run the above program,I get the following output. I do not know why this program is throwing an exception at the point where I have used system specific library call. It may be bug in my code or it could be possible that current gcc4.8.1 has not implemented this feature yet. Nevertheless I wanted to show how we can use the system specific call to 'std::thread' native handle to execute such calls. Hope this program fulfill that intention.




//Console Output Begins

$ ./test
Number of Core On This Machine Is:
4
Current Thread is Not Running
Current Thread is Not Running
Current Thread is Not Running
Current Thread is Not Running
Hello, C++11
C++11, Thread Library
terminate called without an active exception
Aborted (core dumped)

//Console Output Ends






In this section, I would be covering bit controversial interface(detach) provided by the 'std::thread'. By 'thread::detach()' we can ignore the parent thread to wait for its child thread. I mean once we detach the thread from the parent thread, both are free to run independently.


However there is potential problem in this approach. If there is some resource sharing between the 'child/parent' thread, then there could be scenario where child thread keep using the already released resource by its parent thread. This is very much possible as both threads can execute and finish before waiting for each other. These can lead to most deadliest bug in your program in the form of memory corruption, data racing and in many unknown type. The problem with these bugs are that they would mostly be of non deterministic type, so it is quite possible that it works for 99.99 percent time.


I do not know when we can use this functionality with complete safety and  correctness. The real problem is that we can never know completely about whether parent thread has some shared resource with its child thread. Even though there is no explicit sharing of resources between these threads, but there could be implicit sharing of some resources by system which we(user mode) program can not know.


The below program creates two child threads. The parent thread allocate the heap memory where it stores the 'index' value. This would be shared by value in the thread  where thread function just read this value. 'index' address is shared with the thread2 where thread function writes something into this. Both threads execute some loop and execute  something and then go for 'sleep()'.


Now in parent thread, we have 'join()' call for the thread1 which is correct. However we  have called 'detach()' for the thread2 due to which now parent thread would not wait for  thread2 completion. Just after that I have released the memory as parent thread is about to finish. Clearly, this is real problem and we have nasty nasty heap corruption as thread2 would be writing into the memory which has already been released by parent thread.


Just to aggravate this scenario, I have written the infinite for loop in the program main thread context. By this program would never terminate and hopefully we could see the real problem by ourself. This program has been written to show the reader about the impact of usage of deadly interfaces. Never write such program in your production environment.




//Code Begins

#include<iostream>
#include<thread>
#include<string>
#include<chrono>
using namespace std::chrono;

template<typename T>
void display(const T& val) {
    std::cout<<val<<std::endl;
}

void tfunction_one(std::size_t counter) {
    for(std::size_t index{0};index<counter;++index) {
        display("Inside tfunction_one");
        std::this_thread::sleep_for(seconds(2));
    }
}


void tfunction_two(std::size_t* counter){
    for(;;) {
        display("Inside tfunction_two");
        std::this_thread::sleep_for(seconds(5));
        *counter = 100;
        display(*counter);
    }
}


void learn_std_thread(void){
    std::size_t* index = new std::size_t;
    *index = 2;
    std::thread t_x{tfunction_one,index};
    std::thread t_y{tfunction_two,&index};
    t_x.join();
    t_y.detach();
    delete index;
}


int main(int argc, const char* argv[]) {
    learn_std_thread();
    for(;;) {
        std::this_thread::sleep_for(seconds(5));
    }
    return 0;
}

//Code Ends



When I run the above program, I get the following output on my machine. It could vary depending on the system load, scheduler. From the output it  looks everything is running fine. However this is not the real case.



//Console Output Begins
$ ./test
Inside tfunction_two
Inside tfunction_one
Inside tfunction_one
100
.....................
.....................
//Console Output Ends




However let us run the same program using very very powerful dynamic tool: 'valgrind'. This tool captures almost all set of memory/thread related problem with best possible manner. Here we get the problem information. As we have discussed, that thread2 would be writing into the released memory which leads to heap corruption scenario. The same is reported by 'valgrind'.



//Console Output Begins

$ ./test
...........................
==5552== Thread 3:
==5552== Invalid write of size 8
==5552==    at 0x4010F1: tfunction_two(unsigned long*)
==5552==    by 0x5353E99: start_thread (pthread_create.c:308)
==5552==    by 0x565D3FC: clone (clone.S:112)
==5552==  Address 0x5c25040 is 0 bytes inside a block of size 8 free'd
==5552==    at 0x4C2A4BC: operator delete(void*) (in /usr/lib/valgrind/..)
==5552==    by 0x40117D: learn_std_thread() (test_3.cpp:37)
==5552==    by 0x4011DF: main (test_3.cpp:42)
..............................

//Console Output Ends






In this program we will discuss about the 'thread local' data types. In the simplest form we can think 'thread local' is private data types which lives with the course of complete thread execution. Private here means no other thread would be accessible unless owner thread share the address of this  particular variable to other thread.


This is completely different than the global variable type. The following  program actually uses 'global' as well as 'thread local' data type to explain that both are different. We have created the two child thread and and each thread executes the same function 'tfunction one'.  Inside the function, program first update the global variable and print the address of as well. I have also written 'function' where I have declared 'count' variable to be of 'thread local' type. Here also we have printed the address of 'count'. Hopefully this would explain the difference between these two data types.



//Code Begins

#include<iostream>
#include<thread>
#include<string>
#include<chrono>
using namespace std::chrono;
int counter{0};

template<typename T>
void display(const T& val) {
    std::cout<<val<<std::endl;
}

void function(void){
    thread_local int count{0};
    int tmp=count;
    std::cout<<"Count(Thread Local) Address:"<<&count<<std::endl;
}

void tfunction_one(void) {
    display("Inside tfunction_one");
    counter++;
    std::cout<<"Counter(Global) Address:"<<&counter<<std::endl;
    std::this_thread::sleep_for(seconds(2));
    function();
}


void learn_std_thread(void){
    std::thread t_x{tfunction_one};
    std::thread t_y{tfunction_one};
    t_x.join();
    t_y.join();
}


int main(int argc, const char* argv[]) {
    learn_std_thread();
    return 0;
}

//Code Ends



When I run the above program, I get the following output. We see that global variable address is same when the function is getting executed under two threads. However 'thread local' data type address is different in both thread  execution. This was expected as per the definition of 'thread local' data type.




//Console Output Begins

$ ./test
Inside tfunction_oneInside tfunction_one
Counter(Global) Address: 0x6052bc
Counter(Global) Address: 0x6052bc
Count(Thread Local) Address:0x7f85b5df46fc
Count(Thread Local) Address:0x7f85b55f36fc

//Console Output Ends



This is the last program which we would discuss in this chapter. Till now  if you have noticed that, the the output of different threads to console were kind of mixed. This is not unusual for the previous programs as we have not used any form of synchronization mechanism in those. If there are one  resource which can be used by different threads at one point, we should and must synchronize those.


standard threading mechanism also provides the 'std::mutex' and many of its variants which we can use in our program. Standard also provides the recursive mutex, which allows one thread to acquire the same mutex for more than once. Of course that thread needs to release that mutex number of time it has acquired.


The following program basically synchronize the  global variable 'count' update and also while writing on to the console. So we have defined two objects of the 'std::mutex'  class and used to handle these two synchronization. The rest are self explanatory and  has been written for completeness. 'std::mutex' also provide the 'native handle' functionality like 'std::thread' which can be used to in case you want to use the system specific library. I have not used these in the current program, however it can be used quite easily if you want to.





//Code Begins

#include<iostream>
#include<thread>
#include<mutex>
#include<string>
#include<chrono>
using namespace std::chrono;

int count{0};
std::mutex cout_mutx;
std::mutex count_mutx;

template<typename T>
void display(const T& val) {
    cout_mutx.lock();
    std::cout<<val<<std::endl;
    cout_mutx.unlock();
}

void tfunction_one(std::size_t counter) {
    for(std::size_t index{0};index<counter;++index) {
        display("Inside tfunction_one");
        count_mutx.lock();
        count++;
        count_mutx.unlock();
        std::this_thread::sleep_for(seconds(2));
    }
}

void tfunction_two(std::size_t counter) {
    for(std::size_t index{0};index<counter;++index) {
        display("Inside tfunction_two");
        count_mutx.lock();
        count++;
        count_mutx.unlock();
        std::this_thread::sleep_for(seconds(4));
    }
}

void tfunction_three(std::size_t counter) {
    for(std::size_t index{0};index<counter;++index) {
        display("Inside tfunction_three");
        count_mutx.lock();
        count++;
        count_mutx.unlock();
        std::this_thread::sleep_for(seconds(8));
    }
}

void learn_std_thread(void){
    std::size_t index{2};
    std::thread t_x{tfunction_one,index};
    std::thread t_y{tfunction_two,index*index};
    std::thread t_z{tfunction_three,index*index*index};
    t_x.join();
    t_y.join();
    t_z.join();
    display(count);
}


int main(int argc,const char* argv[]) {
    learn_std_thread();
    return 0;
}

//Code Ends






When I run the above program, I get the following output.




//Console Output Begins

$ ./test
Inside tfunction_two
Inside tfunction_one
Inside tfunction_three
Inside tfunction_one
Inside tfunction_two
Inside tfunction_three
Inside tfunction_two
Inside tfunction_two
Inside tfunction_three
Inside tfunction_three
Inside tfunction_three
Inside tfunction_three
Inside tfunction_three
Inside tfunction_three
14

//Console Output Ends



No comments:

Post a Comment