0%

Futures, Promises, Packaged Task, Async in C++11

這篇是Concurrency in C++11的部份筆記,先寫有關C++11標準的部份。
future, promise被加入C++11,大概等同於C#的Task或是Java8的Future

最簡單的情況

以讀寫一個檔案作為範例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
vector<char> readFile(const string& inPath)
{
ifstream file(inPath, ios::binary | ios::ate);
size_t length = (size_t)file.tellg();
vector<char> buffer(length);
file.seekg(0, std::ios::beg);
file.read(&buffer[0], length);
return buffer;
}
size_t writeFile(const vector<char>& buffer, const string& outPath)
{
ofstream file(outPath, ios::binary);
file.write(&buffer[0], buffer.size());
return (size_t)file.tellp();
}

如果要拷貝一個檔案,可以這樣寫。

1
2
3
4
size_t sync_copyFile(const string& inFile, const string& outFile)
{
return writeFile(readFile(inFile), outFile);
}

這樣寫得問題在於,讀寫都是同步動作。如果檔案一大,就什麼事都不用做了。如何使用Multithread來感善動作。

Future and Promise

future 代表的是呼叫者未來可能得到的結果,而promise代表的是被呼叫者未來會算出一個結果,然後告知呼叫者
改寫上面的範例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
size_t future_copyFile(const string& inFile, const string& outFile)
{
promise<vector<char>> prom1;
future<vector<char>> fut1 = prom1.get_future();
thread th1([&prom1, inFile](){
prom1.set_value(readFile(inFile));
});
promise<int> prom2;
future<int> fut2 = prom2.get_future();
thread th2([&fut1, &prom2, outFile](){
prom2.set_value(writeFile(fut1.get(), outFile));
});
size_t result = fut2.get();
th1.join();
th2.join();
return result;
}

從上面這段Code可以知道幾件事情。

  • future是從promise衍生出來,如上面的future<vector<char>> fut1 = prom1.get_future()
  • 上面有三個thread,th1負責讀取,th2負責寫入,另外一個thread就是目前的main thread。
  • 當th1得到結果的時候,會透過set_value將結果送出。
  • 而th2在等待th1的結果,一旦拿到結果,會進行寫入的動作,並且將結果送回main thread。
  • 一旦main thread收到th2的結果的話,事情就結束了。

不過這段Code沒有善用到任何優勢,只是作為future跟promise的示範。

Packaged task

STL裡面也提供了packaged_task,可以稍微簡化上面的程式碼,不過看起來效果不大。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
size_t packagedtask_copyFile(const string& inFile, const string& outFile)
{
using Task_Type_Read = vector<char>(const string&);
packaged_task<Task_Type_Read> pt1(readFile);
future<vector<char>> fut1 = pt1.get_future();
thread th1(move(pt1), inFile);
using Task_Type_Write = size_t(const string&);
packaged_task<Task_Type_Write> pt2([&fut1](const string& path) {
return writeFile(fut1.get(), path);
});
future<size_t> fut2(pt2.get_future());
thread th2(move(pt2), outFile);
size_t result = fut2.get();
th1.join();
th2.join();
return result;
}

packaged_task幾乎等同於promise跟function的綜合體,比較不同的是。promisepackaged_task都沒有copy-semantics,所以要傳志另外一個thread時需要使用std::move明確標示要控制權轉移。

Async

上面的packaged_task還是要手動控制Thread的生成和結束,如何妥善利用系統資源(假設需要建立新Thread,就建立一個新的,不然就用之前就建立好,但目前沒人使用的Thread)。這時候std::async就是一個解法,以下是個範例。

1
2
3
4
5
6
7
8
9
size_t async_copyFile(const string& inFile, const string& outFile)
{
future<vector<char>> fut1 = async(readFile, inFile);
future<size_t> fut2 = async([&fut1](const string& path) {
return writeFile(fut1.get(), path);
}, outFile);

return fut2.get();
}

比起上面幾個版本,這個版本更容易讀懂。
在Windows平台下,std::async的實做是透過Parallel Patterns Library