C++ Concurrent Programming and Multithreading
C++11 introduced standardized multithreading support, providing rich concurrent programming tools including threads, mutexes, condition variables, atomic operations, etc.
Thread Basics
Creating threads:
cpp#include <thread> #include <iostream> void hello() { std::cout << "Hello from thread!" << std::endl; } int main() { // Create thread std::thread t(hello); // Wait for thread to complete t.join(); return 0; }
Threads with parameters:
cppvoid printMessage(const std::string& message, int count) { for (int i = 0; i < count; ++i) { std::cout << message << std::endl; } } int main() { std::string msg = "Hello"; std::thread t(printMessage, msg, 3); t.join(); return 0; }
Using lambda expressions:
cppint main() { int value = 42; std::thread t([value]() { std::cout << "Value: " << value << std::endl; }); t.join(); return 0; }
Mutex
Basic usage:
cpp#include <mutex> #include <thread> #include <iostream> std::mutex mtx; int counter = 0; void increment() { for (int i = 0; i < 10000; ++i) { std::lock_guard<std::mutex> lock(mtx); ++counter; } } int main() { std::thread t1(increment); std::thread t2(increment); t1.join(); t2.join(); std::cout << "Counter: " << counter << std::endl; // 20000 return 0; }
unique_lock:
cppstd::mutex mtx; void process() { std::unique_lock<std::mutex> lock(mtx); // Can manually unlock lock.unlock(); // Perform operations that don't need lock // Re-lock lock.lock(); } // Used with condition variables std::mutex mtx; std::condition_variable cv; bool ready = false; void worker() { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, []{ return ready; }); // Execute work }
try_lock:
cppstd::mutex mtx1, mtx2; void process() { std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock); std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock); if (std::try_lock(lock1, lock2) == -1) { // Successfully acquired both locks // Perform operations } else { // Failed to acquire all locks } }
Condition Variable
Basic usage:
cpp#include <condition_variable> #include <mutex> #include <thread> #include <queue> #include <iostream> std::mutex mtx; std::condition_variable cv; std::queue<int> dataQueue; bool finished = false; void producer() { for (int i = 0; i < 10; ++i) { { std::lock_guard<std::mutex> lock(mtx); dataQueue.push(i); std::cout << "Produced: " << i << std::endl; } cv.notify_one(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } { std::lock_guard<std::mutex> lock(mtx); finished = true; } cv.notify_all(); } void consumer() { while (true) { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, []{ return !dataQueue.empty() || finished; }); if (dataQueue.empty() && finished) { break; } int value = dataQueue.front(); dataQueue.pop(); lock.unlock(); std::cout << "Consumed: " << value << std::endl; } } int main() { std::thread p(producer); std::thread c(consumer); p.join(); c.join(); return 0; }
Atomic Operations
Basic types:
cpp#include <atomic> #include <thread> #include <iostream> std::atomic<int> atomicCounter(0); void increment() { for (int i = 0; i < 10000; ++i) { atomicCounter.fetch_add(1, std::memory_order_relaxed); } } int main() { std::thread t1(increment); std::thread t2(increment); t1.join(); t2.join(); std::cout << "Atomic counter: " << atomicCounter << std::endl; return 0; }
Memory ordering:
cppstd::atomic<bool> flag(false); std::atomic<int> data(0); void writer() { data.store(42, std::memory_order_release); flag.store(true, std::memory_order_release); } void reader() { while (!flag.load(std::memory_order_acquire)) { // Wait } int value = data.load(std::memory_order_acquire); std::cout << "Data: " << value << std::endl; }
CAS (Compare-And-Swap):
cppstd::atomic<int> value(0); bool updateIfEqual(int expected, int desired) { return value.compare_exchange_weak(expected, desired); } // Usage if (updateIfEqual(0, 1)) { std::cout << "Updated successfully" << std::endl; } else { std::cout << "Update failed" << std::endl; }
Thread Local Storage
thread_local:
cpp#include <thread> #include <iostream> thread_local int threadLocalVar = 0; void printThreadId() { ++threadLocalVar; std::cout << "Thread ID: " << std::this_thread::get_id() << ", Value: " << threadLocalVar << std::endl; } int main() { std::thread t1(printThreadId); std::thread t2(printThreadId); std::thread t3(printThreadId); t1.join(); t2.join(); t3.join(); return 0; }
Async Operations (Future and Promise)
Basic usage:
cpp#include <future> #include <iostream> int calculate() { std::this_thread::sleep_for(std::chrono::seconds(2)); return 42; } int main() { std::future<int> result = std::async(std::launch::async, calculate); std::cout << "Doing other work..." << std::endl; int value = result.get(); // Wait for result std::cout << "Result: " << value << std::endl; return 0; }
Promise:
cpp#include <future> #include <thread> #include <iostream> void setValue(std::promise<int> prom) { std::this_thread::sleep_for(std::chrono::seconds(1)); prom.set_value(100); } int main() { std::promise<int> prom; std::future<int> fut = prom.get_future(); std::thread t(setValue, std::move(prom)); int value = fut.get(); std::cout << "Value: " << value << std::endl; t.join(); return 0; }
Packaged Task:
cpp#include <future> #include <functional> int add(int a, int b) { return a + b; } int main() { std::packaged_task<int(int, int)> task(add); std::future<int> result = task.get_future(); std::thread t(std::move(task), 10, 20); t.join(); std::cout << "Result: " << result.get() << std::endl; return 0; }
Thread Pool Implementation
Simple thread pool:
cpp#include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <functional> #include <vector> class ThreadPool { private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queueMutex; std::condition_variable condition; bool stop; public: ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this] { while (true) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queueMutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) { return; } task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } }); } } template <class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queueMutex); if (stop) { throw std::runtime_error("enqueue on stopped ThreadPool"); } tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } ~ThreadPool() { { std::unique_lock<std::mutex> lock(queueMutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) { worker.join(); } } }; // Usage int main() { ThreadPool pool(4); auto result1 = pool.enqueue([](int a, int b) { return a + b; }, 10, 20); auto result2 = pool.enqueue([](int a, int b) { return a * b; }, 5, 6); std::cout << "Result 1: " << result1.get() << std::endl; std::cout << "Result 2: " << result2.get() << std::endl; return 0; }
Best Practices
1. Avoid data races
cpp// Wrong: data race int sharedData = 0; std::thread t1([&](){ ++sharedData; }); std::thread t2([&](){ ++sharedData; }); // Correct: use mutex std::mutex mtx; int sharedData = 0; std::thread t1([&](){ std::lock_guard<std::mutex> lock(mtx); ++sharedData; }); std::thread t2([&](){ std::lock_guard<std::mutex> lock(mtx); ++sharedData; });
2. Avoid deadlocks
cpp// Wrong: may cause deadlock std::mutex mtx1, mtx2; void thread1() { std::lock_guard<std::mutex> lock1(mtx1); std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::lock_guard<std::mutex> lock2(mtx2); } void thread2() { std::lock_guard<std::mutex> lock2(mtx2); std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::lock_guard<std::mutex> lock1(mtx1); } // Correct: use std::lock void thread1Safe() { std::lock(mtx1, mtx2); std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock); std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock); } void thread2Safe() { std::lock(mtx1, mtx2); std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock); std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock); }
3. Use RAII to manage locks
cpp// Recommended void safeFunction() { std::lock_guard<std::mutex> lock(mtx); // Critical section code } // Automatically release lock // Not recommended void unsafeFunction() { mtx.lock(); // Critical section code mtx.unlock(); // May be forgotten or exception causes non-release }
4. Prefer atomic operations
cpp// Recommended: atomic operations std::atomic<int> counter(0); ++counter; // Not recommended: mutex (for simple operations) std::mutex mtx; int counter = 0; { std::lock_guard<std::mutex> lock(mtx); ++counter; }
Notes
- Always ensure threads are properly joined or detached
- Avoid using mutexes in destructors
- Be aware of spurious wakeups with condition variables
- Choose memory ordering appropriately, avoid overusing memory_order_seq_cst
- Avoid overly fine-grained locks, which may lead to performance degradation
- Use thread pools to manage many short-lived tasks
- Pay attention to exception safety, ensure resources are properly released
- Avoid using global variables in multithreaded environments