线程安全的对象生命周期管理

  1. 存在的问题
  2. 线程安全的class的定义
  3. 对象的创建
  4. 对象的销毁
  5. 可能会死锁的例子
  6. 线程安全的Observer
  7. 解决方案 shared_ptr/weak_ptr
  8. 关于shared_ptr的线程安全
  9. shared_ptr技术与陷阱

存在的问题

对于C++来说, 对象的生命周期需要程序员自己管理, 那么当一个对象可以被多个线程看到时, 对象的析构就会存在竞态条件(race condition):

线程安全的class的定义

在解决上述问题之前, 先对线程安全的类的要求进行定义:

根据上面的定义, STL里大多数容器都不是线程安全的, 例如: std::string, std::vector, std::map等, 需要在外部加锁才能供多个线程同时访问.

对象的创建

对象的创建要保证线程安全比较简单, 只要不在构造期间泄露this指针即可. 具体做法是 (1)不要在构造函数中注册任何回调.(2)不要在构造函数中把this指针传给跨线程对象.(3)即使在构造函数最后一行也不要.

之所以要这样, 是因为构造函数执行完之前, 对象的初始化动作还没有完成, 如果泄露了this指针, 有可能其他线程会访问一个半成品对象. 之所以在构造函数的最后一行也不能泄露是因为该类可能作为基类使用, 它的构造函数执行完后还需要继续执行子类的构造函数.

对于需要上述需求的, 可以通过二段构造(构造函数+initialize()), 并且可以使得构造函数不必主动抛出异常, 在initialize()的返回值判断是否构造成功, 简化错误处理.

对于二段式构造可以看Bjarne Stroustrup 在Standard-Library Exception Safety 中提到的Delayed acquisition, 也就是在设计类的时候思考清楚, 是否真的有必要在构造函数中申请内存. 如果需要, 可以参考vector::resize()的思路

对象的销毁

对于析构函数来说, mutex无法保证安全. 对于互斥语义来说, 只能保证对于临界资源只有一个线程访问, 因此可能会出现一些问题, 考虑以下代码

1
2
3
4
5
6
7
8
9
Foo::~Foo() {
MutexLockGuard lock(mutex_);
// free internal state (1)
}

void Foo::update() {
MutexLockGuard lock(mutex_); // (2)
// make use of internal state
}
1
2
3
4
5
6
7
8
9

// thread A
delete x;
x = NULL; // helpless

// thread B
if (x) {
x->update();
}

上面代码段在 x = NULL; 处的注释是 helpless, 也就是这里是存在问题的, 当线程A 执行到(1)处时, 此时必然已经持有互斥锁, 如果此时线程B通过了 if 检测, 在(2)处阻塞, 这时候接下来得行为是未定义的, 因为构造函数会把mutex析构掉. 也就是说, 如果mutex作为class的数据成员不能保护析构, 只能用来保护正常生命期其他数据成员的读和写, 而析构行为是发生在对象生命期结束时(后)的. 对于基类对象来说, 当执行到基类的析构函数, 表示子类的成员已经被析构了, 那么仍然没有保护整个析构过程.

可能会死锁的例子

读写同一个class两个对象存在死锁的例子:

1
2
3
4
5
void swap(Object& lhs, Object& rhs) {
MutexLockGuard lLock(lhs.mutex_);
MutexLockGuard rLokc(rhs.mutex_);
// swap
}

这样当线程A执行 swap(a, b) 线程B执行 swap(b, a)时就可能会产生死锁.

同样的道理在 operator= 也会存在

假设一个用锁实现的Counter是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Counter: boost::noncopyable {
public:
Counter(): value_(0) {}
int64_t value() const;

private:
int64_t value_;
mutable MutexLock mutex_;
};

int64_t Counter::value() const {
MutexLockGuard lock(mutex_);
return value_;
}

int64_t Counter::getAndIncrease() {
MutexLockGuard lock(mutex_);
int64_t ret = value_++;
return ret;
}

它的operator= 实现如下:

1
2
3
4
5
6
7
8
9
10
Counter& Counter::operator=(const Counter& rhs) {
if (this == &rhs) { return *this; }

MutexLockGuard myLock(mutex_); // 这里存在潜在死锁 线程A a = b; 线程B b = a;
MutexLockGuard itsLock(rhs.mutex_);

value_ = rhs.value_; // 这里如果改成 value_ = rhs.value() 会与上一句死锁

return *this;
}

线程安全的Observer

在面向对象设计中, 对象的关系主要有三种: composition, aggregation, association.

这三种关系中, composition(组合)在多线程下比较安全, 对象x的生命周期为拥有者owner控制, owner析构的时候, 会把x也析构掉, 所以一半不会造成内存泄漏活着重复释放的问题.

对于aggregation(聚合)和association(关联)来说会存在一定的问题. association表示一个对象持有另一个对象的引用(或指针), 但是另一个对象的生命周期不受该对象控制, 所以可能会出现上述说过的问题.

考虑使用对象池的方式解决: 只创建不销毁, 使用一个对象池来暂存使用过的对象, 申请新对象时, 如果对象池中有存货, 就重复利用现有的对象, 当对象使用完后放回池子而不释放.

这种方法可以避免失效对象的访问, 但还是带来一些问题:

看一个注册非静态成员函数回调成为association的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Observer 模式
class Observer { // : boost::noncopyable
public:
virtual ~Observer();
virtual void update() = 0;
// ...
};

class Observable { // : boost::noncopyable
public:
void register_(Observer* x);
void unregister(Observer* x);

void notifyObservers() {
for (Observer* x : observers_) {
x->update();
}
}
private:
std::vector<Observer*> observers_;
};

这里存在的问题在于, 通知每一个Observer对象时, 需要知道该对象是否还存活, 这里可以考虑在Observer的析构函数中解除注册, 但有引来了新的问题.

如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
class Observer {
// 同前
void observe(Observer* s) {
s->register_(this);
subject_ = s;
}
virtual ~Observer() {
subject_->unregister(this);
}

Observable* subject_;
};

这里有两个race conditions,(1)unregister如何知道subject_还活着 (2)当线程A执行到unregister之前, 线程B执行到update()之前, x正好就是线程A正在析构的对象.

这里存在的问题可以通过加锁来实现, 但是在哪加锁, 谁来持有锁又是难题, 所以需要引入一个活着的对象来帮助管理对象的生命周期, 可以查看是否存活.

解决方案 shared_ptr/weak_ptr

shared_ptr的几个要点:

孟岩《垃圾收集机制批判》中对智能指针的优势: “C++利用智能指针达成的效果是: 一旦对象不再被引用, 系统刻不容缓, 立刻回收内存. 这通常发生在关键任务完成后的清理(clean up)时期, 不会影响关键任务的实时性, 同时, 内存里所有的对象都是有用的, 绝对没有垃圾空占内存.”

应用到Observer上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Observable { // not 100% thread safe
public:
void register_(const weak_ptr<Observer>& x);
// void unregister(weak_ptr<Observer> x); // 不再需要
void notifyObservers();

private:
mutable MutexLock mutex_;
std::vector<weak_ptr<Observer>> observers_; // 这里如果改成shared_ptr, 就不会主动释放内存和从容器中取出
using Iterator = std::vector<weak_ptr<Observer>>::iterator;
};

void Observable::notifyObservers() {
MutexLockGuard lock(mutex_);
Iterator it = observers_.begin();

while (it != observers_.end()) {
shared_ptr<Observer> obj(it->lock());
if (obj) {
// 提升成功, 此时引用计数至少为2 lock构造一个临时对象,拷贝到obj
obj->update();
++it;
} else {
// 对象已销毁, 从 容器中拿掉weak_ptr
it = observers_.erase(it);
}
}
}

上述方案部分解决了Observer模式的线程安全, 但又引来了新的问题:

关于shared_ptr的线程安全

shared_ptr本身并不是100%线程安全的, 它的引用计数本身是安全且无锁的, 但对象的读写不是, 因为它有两个成员, 读写操作不能原子化. shared_ptr的线程安全和内建类型,标准库容器, std::string一样, 即:

这是shared_ptr本身的线程安全级别, 不是管理的对象的线程安全级别. 如果在多个线程中同时访问同一个shared_ptr, 正确做法是mutex保护:

1
2
3
4
5
MutexLock mutex; // No need for ReadWriterLock
shared_ptr<Foo> globalPtr;

// 把globalPtr安全的传给doit()
void doit(const shared_ptr<Foo>& pFoo);

globalPtr可以被多个线程看到, 所以读写需要加锁, 为了性能只需要使用互斥锁就可以. 因为临界区很小, 所以互斥锁也不会阻塞并发读.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void read() {
shared_ptr<Foo> localPtr;
{
MutexLockGuard lock(mutex);
locakPtr = globalPtr; // read globalPtr
}
// use localPtr since here, 读写localPtr无需加锁.
doit(localPtr);
}

void write() {
shared_ptr<Foo> newPtr(new Foo); // 在临界区外创建, 而不是临界区内globalPtr.reset(new Foo) 减少临界区长度
{
MutexLockGuard lock(mutex);
globalPtr = newPtr;
}
// use newPtr since here, 读写newPtr无需加锁.
doit(newPtr);
}

上面的read()和write()在临界区外都没有再访问globalPtr, 而是利用了拷贝一个新的栈上对象, 然后在访问这个栈上对象.

shared_ptr技术与陷阱

shared_ptr是强引用, 只要有一个指向x对象的shared_ptr存在, 该对象就确保不会被析构. 因为shared_ptr的拷贝构造和拷贝赋值, 如果不小心遗留了一份拷贝, 那么该对象就永世长存了. 另外std::bind,也会导致这个问题, std::bind会把实参拷贝一份, 如果参数是shared_ptr, 那么拷贝的对象的生命周期就不会短于std::function对象:

1
2
3
4
5
6
class Foo {
void doit();
};

shared_ptr<Foo> pFoo(new Foo);
std::function<void()> func = std::bind(&Foo::doit, pFoo); // long life foo

因为要修改引用计数(拷贝的时候通常要加锁), shared_ptr的拷贝开销要比拷贝原始指针要高, 但是多数情况都可以pass by const reference, 一个线程只要最外层函数有一个实体对象, 之后就可以pass by const reference来使用这个shared_ptr. 例如几个函数都用到Foo对象:

1
2
3
4
5
6
void save(const shared_ptr<Foo>& pFoo); // pass by const reference
void validateAccount(const Foo& foo);

bool validate(const shared_ptr<Foo>& pFoo) { // pass by const reference
validateAccount(*pFoo);
}

通常情况下, 我们可以pass by const reference:

1
2
3
4
5
6
void onMessage(const string& msg) {
shared_ptr<Foo> pFoo(new Foo(msg)); // 只要最外层有一个实体, 就是安全的
if (validate(pFoo)) { // 没有拷贝
save(pFoo); // 没有拷贝
}
}

对象x的析构是在最后一个指向x的shared_ptr离开其作用域(shared_ptr被析构)的同一个线程析构的, 这个线程不一定是对象诞生的线程. 这个特性是个双刃剑: 如果对象的析构比较耗时, 就会拖慢关键线程的速度; 可以用一个单独的线程专门做析构, 通过一个BlockingQueue<shared_ptr>把对象析构都转移到那个专用线程, 解放关键线程.

初学C++的教条是new和delete成对出现, 如果使用RAII, 要改成”每一个明确的资源配置动作都应该在单一语句中执行,并在该语句中立刻将配置获得的资源交给handle对象, 程序一般不出现delete”. shared_ptr是管理共享资源的利器, 需要注意避免循环引用, 通常的做法是owner持有指向child的shared_ptr, child持有指向owner的weak_ptr.

script>