线程安全的对象生命周期管理
- 存在的问题
- 线程安全的class的定义
- 对象的创建
- 对象的销毁
- 可能会死锁的例子
- 线程安全的Observer
- 解决方案 shared_ptr/weak_ptr
- 关于shared_ptr的线程安全
- shared_ptr技术与陷阱
存在的问题
对于C++来说, 对象的生命周期需要程序员自己管理, 那么当一个对象可以被多个线程看到时, 对象的析构就会存在竞态条件(race condition):
- 在析构一个对象时, 会不会存在其他线程在使用该对象
- 在使用一个对象时, 对象会不会在其他线程被析构
- 在使用某个对象之前, 如何判断这个对象还活着, 它的析构函数会不会恰好只执行了一半
线程安全的class的定义
在解决上述问题之前, 先对线程安全的类的要求进行定义:
- 多个线程同时访问时, 表现出正确的行为
- 无论操作系统如何调度这些线程, 无论线程的执行顺序如何交织
- 调用端代码无需额外的同步或其他协调操作
根据上面的定义, STL里大多数容器都不是线程安全的, 例如: std::string, std::vector, std::map等, 需要在外部加锁才能供多个线程同时访问.
对象的创建
对象的创建要保证线程安全比较简单, 只要不在构造期间泄露this指针即可. 具体做法是 (1)不要在构造函数中注册任何回调.(2)不要在构造函数中把this指针传给跨线程对象.(3)即使在构造函数最后一行也不要.
之所以要这样, 是因为构造函数执行完之前, 对象的初始化动作还没有完成, 如果泄露了this指针, 有可能其他线程会访问一个半成品对象. 之所以在构造函数的最后一行也不能泄露是因为该类可能作为基类使用, 它的构造函数执行完后还需要继续执行子类的构造函数.
对于需要上述需求的, 可以通过二段构造(构造函数+initialize()), 并且可以使得构造函数不必主动抛出异常, 在initialize()的返回值判断是否构造成功, 简化错误处理.
对于二段式构造可以看Bjarne Stroustrup 在Standard-Library Exception Safety 中提到的Delayed acquisition, 也就是在设计类的时候思考清楚, 是否真的有必要在构造函数中申请内存. 如果需要, 可以参考vector::resize()的思路
对象的销毁
对于析构函数来说, mutex无法保证安全. 对于互斥语义来说, 只能保证对于临界资源只有一个线程访问, 因此可能会出现一些问题, 考虑以下代码
1 2 3 4 5 6 7 8 9
| Foo::~Foo() { MutexLockGuard lock(mutex_); }
void Foo::update() { MutexLockGuard lock(mutex_); }
|
1 2 3 4 5 6 7 8 9
|
delete x; x = NULL;
if (x) { x->update(); }
|
上面代码段在 x = NULL; 处的注释是 helpless, 也就是这里是存在问题的, 当线程A 执行到(1)处时, 此时必然已经持有互斥锁, 如果此时线程B通过了 if 检测, 在(2)处阻塞, 这时候接下来得行为是未定义的, 因为构造函数会把mutex析构掉. 也就是说, 如果mutex作为class的数据成员不能保护析构, 只能用来保护正常生命期其他数据成员的读和写, 而析构行为是发生在对象生命期结束时(后)的. 对于基类对象来说, 当执行到基类的析构函数, 表示子类的成员已经被析构了, 那么仍然没有保护整个析构过程.
可能会死锁的例子
读写同一个class两个对象存在死锁的例子:
1 2 3 4 5
| void swap(Object& lhs, Object& rhs) { MutexLockGuard lLock(lhs.mutex_); MutexLockGuard rLokc(rhs.mutex_); }
|
这样当线程A执行 swap(a, b) 线程B执行 swap(b, a)时就可能会产生死锁.
同样的道理在 operator= 也会存在
假设一个用锁实现的Counter是这样的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class Counter: boost::noncopyable { public: Counter(): value_(0) {} int64_t value() const;
private: int64_t value_; mutable MutexLock mutex_; };
int64_t Counter::value() const { MutexLockGuard lock(mutex_); return value_; }
int64_t Counter::getAndIncrease() { MutexLockGuard lock(mutex_); int64_t ret = value_++; return ret; }
|
它的operator= 实现如下:
1 2 3 4 5 6 7 8 9 10
| Counter& Counter::operator=(const Counter& rhs) { if (this == &rhs) { return *this; }
MutexLockGuard myLock(mutex_); MutexLockGuard itsLock(rhs.mutex_);
value_ = rhs.value_;
return *this; }
|
线程安全的Observer
在面向对象设计中, 对象的关系主要有三种: composition, aggregation, association.
这三种关系中, composition(组合)在多线程下比较安全, 对象x的生命周期为拥有者owner控制, owner析构的时候, 会把x也析构掉, 所以一半不会造成内存泄漏活着重复释放的问题.
对于aggregation(聚合)和association(关联)来说会存在一定的问题. association表示一个对象持有另一个对象的引用(或指针), 但是另一个对象的生命周期不受该对象控制, 所以可能会出现上述说过的问题.
考虑使用对象池的方式解决: 只创建不销毁, 使用一个对象池来暂存使用过的对象, 申请新对象时, 如果对象池中有存货, 就重复利用现有的对象, 当对象使用完后放回池子而不释放.
这种方法可以避免失效对象的访问, 但还是带来一些问题:
- 对象池的线程安全, 如何安全完整的把对象放回池子里, 防止部分放回(线程A认为已经放回了, 线程B认为对象还活着)
- 全局对象引发lock contention, 这个集中化的对象池会不会把多线程并发操作串行化
- 如果共享对象的类型不止一种, 那么是重复实现对象池还是使用类模版
- 会不会造成内存的泄露与分片? 因为对象池占用的内存只增不减,而且多个对象池不能共享内存(这里是指不能从不同的对象池获取对象, 会割裂heap/free store) heap vs free stroe
看一个注册非静态成员函数回调成为association的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| class Observer { public: virtual ~Observer(); virtual void update() = 0; };
class Observable { public: void register_(Observer* x); void unregister(Observer* x);
void notifyObservers() { for (Observer* x : observers_) { x->update(); } } private: std::vector<Observer*> observers_; };
|
这里存在的问题在于, 通知每一个Observer对象时, 需要知道该对象是否还存活, 这里可以考虑在Observer的析构函数中解除注册, 但有引来了新的问题.
如下代码:
1 2 3 4 5 6 7 8 9 10 11 12
| class Observer { void observe(Observer* s) { s->register_(this); subject_ = s; } virtual ~Observer() { subject_->unregister(this); }
Observable* subject_; };
|
这里有两个race conditions,(1)unregister如何知道subject_还活着 (2)当线程A执行到unregister之前, 线程B执行到update()之前, x正好就是线程A正在析构的对象.
这里存在的问题可以通过加锁来实现, 但是在哪加锁, 谁来持有锁又是难题, 所以需要引入一个活着的对象来帮助管理对象的生命周期, 可以查看是否存活.
解决方案 shared_ptr/weak_ptr
shared_ptr的几个要点:
- shared_ptr是强引用, 只要有一个指向x的shared_ptr存在, x就不会析构, 当指向x的最后一个shared_ptr析构或reset(), x保证会被销毁
- weak_ptr不控制对象的生命期, 但它可以知道对象是否还活着,如果对象还活着可以提升为有效的shared_ptr, 如果对象已经被销毁那么就会返回一个空shared_ptr. 这个提升行为是线程安全的
- shared_ptr/weak_ptr的计数在主流平台上是原子操作, 没有用锁
- shared_ptr/weak_ptr的线程安全级别与std::string和STL容器一样
孟岩《垃圾收集机制批判》中对智能指针的优势: “C++利用智能指针达成的效果是: 一旦对象不再被引用, 系统刻不容缓, 立刻回收内存. 这通常发生在关键任务完成后的清理(clean up)时期, 不会影响关键任务的实时性, 同时, 内存里所有的对象都是有用的, 绝对没有垃圾空占内存.”
应用到Observer上:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| class Observable { public: void register_(const weak_ptr<Observer>& x); void notifyObservers();
private: mutable MutexLock mutex_; std::vector<weak_ptr<Observer>> observers_; using Iterator = std::vector<weak_ptr<Observer>>::iterator; };
void Observable::notifyObservers() { MutexLockGuard lock(mutex_); Iterator it = observers_.begin();
while (it != observers_.end()) { shared_ptr<Observer> obj(it->lock()); if (obj) { obj->update(); ++it; } else { it = observers_.erase(it); } } }
|
上述方案部分解决了Observer模式的线程安全, 但又引来了新的问题:
- 侵入型: 强制要求Observer必须以shared_ptr来管理
- 不是完全线程安全: Observer析构函数会执行subject_->unregister(this), 但有可能subject_已经被析构掉了, 为了解决这个问题, 又需要 Observable 本身是shared_ptr管理, 并且subject_多半是个 weak_ptr.
- 锁争用: Observable的三个成员函数都是用mutex来同步, 会造成register_()和unregister()等待notifyObservers(), 但这个notifyObservers()的执行时间是无上限的, 因为会同步调用各个update()函数. 我们希望register_()和uniregister()的执行时间不会超过某个固定的上限.
- 死锁: 如果notifyObservers()中的update虚函数调用了(un)register, 这样会有两种情况, 如果mutex_是不可重入的, 就会死锁; 如果mutex_是可重入的程序就会迭代器失效(vector 在遍历期间被修改了). 这个问题只能通过双方协商好解决,例如使用可重入的mutex_, 把容器换成std::list, 并把迭代器往前挪一行.
关于shared_ptr的线程安全
shared_ptr本身并不是100%线程安全的, 它的引用计数本身是安全且无锁的, 但对象的读写不是, 因为它有两个成员, 读写操作不能原子化. shared_ptr的线程安全和内建类型,标准库容器, std::string一样, 即:
- 一个shared_ptr对象实体可以被多个线程同时读取
- 两个shared_ptr对象实体可以被两个线程同时写入, “析构”算写操作
- 如果要从多个线程读写同一个shared_ptr对象, 那么需要加锁
这是shared_ptr本身的线程安全级别, 不是管理的对象的线程安全级别. 如果在多个线程中同时访问同一个shared_ptr, 正确做法是mutex保护:
1 2 3 4 5
| MutexLock mutex; shared_ptr<Foo> globalPtr;
void doit(const shared_ptr<Foo>& pFoo);
|
globalPtr可以被多个线程看到, 所以读写需要加锁, 为了性能只需要使用互斥锁就可以. 因为临界区很小, 所以互斥锁也不会阻塞并发读.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| void read() { shared_ptr<Foo> localPtr; { MutexLockGuard lock(mutex); locakPtr = globalPtr; } doit(localPtr); }
void write() { shared_ptr<Foo> newPtr(new Foo); { MutexLockGuard lock(mutex); globalPtr = newPtr; } doit(newPtr); }
|
上面的read()和write()在临界区外都没有再访问globalPtr, 而是利用了拷贝一个新的栈上对象, 然后在访问这个栈上对象.
shared_ptr技术与陷阱
shared_ptr是强引用, 只要有一个指向x对象的shared_ptr存在, 该对象就确保不会被析构. 因为shared_ptr的拷贝构造和拷贝赋值, 如果不小心遗留了一份拷贝, 那么该对象就永世长存了. 另外std::bind,也会导致这个问题, std::bind会把实参拷贝一份, 如果参数是shared_ptr, 那么拷贝的对象的生命周期就不会短于std::function对象:
1 2 3 4 5 6
| class Foo { void doit(); };
shared_ptr<Foo> pFoo(new Foo); std::function<void()> func = std::bind(&Foo::doit, pFoo);
|
因为要修改引用计数(拷贝的时候通常要加锁), shared_ptr的拷贝开销要比拷贝原始指针要高, 但是多数情况都可以pass by const reference, 一个线程只要最外层函数有一个实体对象, 之后就可以pass by const reference来使用这个shared_ptr. 例如几个函数都用到Foo对象:
1 2 3 4 5 6
| void save(const shared_ptr<Foo>& pFoo); void validateAccount(const Foo& foo);
bool validate(const shared_ptr<Foo>& pFoo) { validateAccount(*pFoo); }
|
通常情况下, 我们可以pass by const reference:
1 2 3 4 5 6
| void onMessage(const string& msg) { shared_ptr<Foo> pFoo(new Foo(msg)); if (validate(pFoo)) { save(pFoo); } }
|
析构动作在创建时被捕获,这意味着:
- 虚析构不再是必要的
- shared_ptr 可以持有任何对象, 而且能安全的释放
- shared_ptr对象可以安全的跨越边界模块, 比如从DLL里返回, 而不会造成从模块A分配的的内存在模块B里被释放这种错误
- 二进制兼容性, 即便Foo对象的大小变了, 那么旧的客户代码仍然可以使用新的动态库, 而无需重新编译. 前提是Foo的头文件中不出现访问对象的成员的inline函数, 而且Foo对象是由动态库中Factory构造, 返回其shared_ptr
- 析构动作可以定制
析构所在的线程
对象x的析构是在最后一个指向x的shared_ptr离开其作用域(shared_ptr被析构)的同一个线程析构的, 这个线程不一定是对象诞生的线程. 这个特性是个双刃剑: 如果对象的析构比较耗时, 就会拖慢关键线程的速度; 可以用一个单独的线程专门做析构, 通过一个BlockingQueue<shared_ptr>把对象析构都转移到那个专用线程, 解放关键线程.
初学C++的教条是new和delete成对出现, 如果使用RAII, 要改成”每一个明确的资源配置动作都应该在单一语句中执行,并在该语句中立刻将配置获得的资源交给handle对象, 程序一般不出现delete”. shared_ptr是管理共享资源的利器, 需要注意避免循环引用, 通常的做法是owner持有指向child的shared_ptr, child持有指向owner的weak_ptr.
script>