Tuesday, March 22, 2005

 

Some notes on Thread - 1

1.
JTC线程库阅读分析

By linweixuan
From http://blog.csdn.net/linweixuan/archive/2004/10/19/142473.aspx

在Java中,线程运行着一个可运行对象,该对象的概念在JTC C++ 中的对象即是JTCRunnable. 即一个线程运行的对象实体.线程本身是一个对象,线程中要运行的实体也是一个对象. JTCRunnable



=====================================================

主对象:

class Clock : public JTCRunnable

{

void run(); //当线程启动是调用

void start(); //启动线程

void stop(); //停止线程

};



=====================================================

其实JTCRunable很简单,只有一个虚方法,只有run才是我们要完全实现的方法.

class JTCRunnable : public virtual JTCRefCount

{

public:

virtual void run() = 0;

};



=====================================================

我们可以看到JTCRunable的继承类是个引用计数类.

class JTCRefCount

{

public:

long m_refcount;



在Win32下通过interlocked增加和减少引用.

在Unix下就需要一个Mutex来保护引用.



#ifndef WIN32

JTCMutex m_ref_mutex;

#endif



JTCRefCount()

: m_refcount(0)

{

}



virtual

~JTCRefCount()

{

}

};



=====================================================

线程将要运行的实体

run()

{

while(timer_)

try{

timer_ -> sleep(100);

}catch(const JTCInterruptedException&)

{}

}



=====================================================

函数实现线程运行的实体代码.需要一个线程的载体来运行他

start()

{

timer_ = new JTCThread(JTCRunnableHandle(this));

timer_ -> start(); //线程启动,

//具体JTCThread的函数start调用了:

posix-> pthread_create

windows -> _beginthreadex

来创建一个挂起的线程.run()函数与线程就扯上关系了.

}



=====================================================

停止线程后,将主对象中的线程句柄置为空

void stop()

{

timer_ = JTCThreadHandle();

}



=====================================================

主对象Clock调用:

Main()

{

try

{

JTCInitialize bootJTC(argc, argv);

JTCHandleT c = new Clock();

c -> start();

JTCThread::sleep(1000*5);

c -> stop();

}

catch(const JTCException& e){

cerr << "JTCException: " << m_name =" 0;" m_target =" target;//初始运行的目标主体" m_state =" new_thread;//初始线程状态" m_thread_id =" JTCThreadId();//初始线程id" m_detached =" false;" m_thread_number =" get_next_thread_number();" m_is_adopted =" false;" m_handle =" 0;//win32线程的句柄" m_group =" group;"> JTCThreadHandle;



class JTCThreadGroup;

typedef JTCHandleT JTCThreadGroupHandle;



class JTCRunnable;

typedef JTCHandleT JTCRunnableHandle;



=====================================================

这里使用到了模板类JTCHandleT,提供赋值和比较操作

template

class JTCHandleT{

JTCHandleT(T* tg = 0);

JTCHandleT(const JTCHandleT& rhs );

operator=

operator==

operator!=

operator!

operator bool

operator->()

operator*

T* m_object;

};

类通过m_object指针指向对象.

模板类封装了JTCThread,JTCThreadGroup,JTCRunable类指针操作,

如:在Clock类的start函数中,创建线程语句:

timer_ = new JTCThread(JTCRunnableHandle(this));

即传入了一个JTCRunable的类的实例指针.





JTCThread

=====================================================

class JTCThread : public virtual JTCRefCount

{

}











=====================================================

JTCThread中的run其实调用了构造传入初始化m_target对象的run方法

Void JTCThread::run()

{

if (m_target)

{

m_target -> run();

}

}





=====================================================

启动线程,即创建线程执行.

void JTCThread::start()

{

//声明

JTCSyncT guard(m_mutex);



//判断当前线程是否未新线程.

if (m_state != JTCThread::new_thread)

{

throw JTCIllegalThreadStateException("state is not new_thread");

}



分POSIX和WIN32实现

#if defined(HAVE_POSIX_THREADS)

…..

#ifdef HAVE_WIN32_THREADS

……

}





=====================================================

UNIX下的线程创建实现

#if defined(HAVE_POSIX_THREADS)

pthread_attr_t attr;

try{

JTC_SYSCALL_1(PTHREAD_ATTR_INIT, &attr, != 0)

}catch(const JTCSystemCallException& e) {

if (e.getError() == ENOMEM) {

throw JTCOutOfMemoryError(e.getMessage());

}

throw;

}



# if defined(HAVE_POSIX_THREADS) && !defined(__hpux)

// All threads set their priority explicitely. Under HPUX 11.x this

// requires root privilege so it's disabled.

pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

# endif



# ifdef HAVE_PTHREAD_ATTR_SETSTACKSIZE

if (lsd_initial_stack_size != 0) {

pthread_attr_setstacksize(&attr, lsd_initial_stack_size);

}

# endif



// Call the custom pthread attribute hook, if defined.

if (sm_attr_hook != 0) {

(*sm_attr_hook)(&attr);

}



pthread_t id;

try{

PTHREAD_CREATE(id, attr, lsf_thread_adapter, this)

}catch(const JTCSystemCallException& e) {

PTHREAD_ATTR_DESTROY(&attr);

if (e.getError() == EAGAIN e.getError() == ENOMEM) {

throw JTCOutOfMemoryError(e.getMessage());

}

throw;

}

m_thread_id = JTCThreadId(id);

PTHREAD_ATTR_DESTROY(&attr);

#endif



=====================================================

WIN32下的线程创建实现

#ifdef HAVE_WIN32_THREADS

DWORD id;

try

{

JTC_SYSCALL_6(

m_handle = (HANDLE)::_beginthreadex,

NULL, lsd_initial_stack_size,

(unsigned (__stdcall*)(void*))lsf_thread_adapter, (LPVOID)this,

0, (unsigned int*)&id, == NULL)

}

catch(const JTCSystemCallException& e)

{

if (e.getError() == ERROR_NOT_ENOUGH_MEMORY

e.getError() == ERROR_OUTOFMEMORY)

{

throw JTCOutOfMemoryError(e.getMessage());

}

throw;

}

m_thread_id = JTCThreadId(id);

#endif





==========================================================

sleep()函数是很独立的函数,与本身线程没有多大关系

void JTCThread::sleep(long millis, int nano)

{

if (millis <> 999999)

{

throw JTCIllegalArgumentException

("nanosecond timeout out of range");

}



//m_state = JTCThread::not_runnable;



#if defined(HAVE_PTHREAD_DELAY_NP)

struct timespec tv;

tv.tv_sec = millis/1000;

tv.tv_nsec = (millis% 1000)*1000000 + nano;

JTC_SYSCALL_1(pthread_delay_np, &tv, != 0);



#elif defined(HAVE_POSIX_THREADS)



struct timespec tv;

tv.tv_sec = millis/1000;

tv.tv_nsec = (millis% 1000)*1000000 + nano;

if (nanosleep(&tv, 0) < errno ="="" m_state =" JTCThread::runnable;">;



//最后一个是互斥成员

JTCRecursiveMutex m_mon_mutex;

}





==========================================================

JTCMonitorBase的成员也比较简单

class JTCMonitorBase

{

//检查Mornitor是否被锁

validate_owner(const JTCThreadId, const char* caller)



//Mornitor的条件变量

JTCCond m_mon_cond;

//记录有多少挂起的通知

int m_nnotify

}





==========================================================

JTCMonitor 和JTCSynchronized究竟如何关连起来?

JTCSynchronized的定义也简单,主要是定义了几个成员变量.竟然定义了三个互斥变量.



class JTCSynchronized

{

JTCMonitor* m_monitor

JTCMutex* m_mutex

JTCRecursiveMutex* m_rmutex

JTCRWMutex* m_rwmutex

}



m_monitor指向要监视器实体;

m_mutex指向关连的互斥实体

m_rmutex指向关连recursive的互斥实体

m_rwmutex指向读写互斥



类构造函数

JTCSynchronized(const JTCMonitor& mon);

m_monitor(&mon),

m_lock_type(monitor)

{

//事实上同步类利用了构造进来的监视器来加锁

m_monitor -> lock();

}

-------------------------------------------

其他构造也类似,利用构造进来的实体来加锁

JTCSynchronized::JTCSynchronized(const JTCMutex& m)

: m_mutex(&m),

m_lock_type(mutex)

{

m_mutex -> lock();

}



--------------------------------------------

JTCSynchronized::JTCSynchronized(

const JTCRecursiveMutex& m

)

: m_rmutex(&m),

m_lock_type(recursive_mutex)

{

m_rmutex -> lock();

}



-----------------------------------------------

JTCSynchronized::JTCSynchronized(

const JTCRWMutex& m,

ReadWriteLockType type

)

: m_rwmutex(&m),

m_lock_type(read_write_mutex)

{

if (type == read_lock)

m_rwmutex -> read_lock();

else

m_rwmutex -> write_lock();

}



==========================================================

举个应用例子:

例子TestThread继承了JTCMornitor,在线程实体run()函数,中使用同步sync,将TestThread的实例

this作构造参数传入.这样线程同步就起作用了.



class TestThread : public JTCThread, public JTCMonitor

{

run()

{

JTCSynchronized sync(*this);

//这里其实构造中调用了继承过来的JTCMonitor lock()函数,锁是保存在

//JTCMonitor中的定义的JTCRecursiveMutex 成员变量m_mon_mutex..

//下一步是到看看互斥定义的时候了.

//do your code here

……

}

}



==========================================================

先回过头来看lock()函数的实现

void JTCMonitor::lock() const

{

//JTCMonitor本身的互斥变量来枷锁

if (m_mon_mutex.lock()){

// 第一次置m_nnotify为0

JTCMonitor* This = (JTCMonitor*)this;

This -> m_nnotify = 0;

}

}





==========================================================

class JTCRecursiveMutex

{

//对外的锁函数

lock();

unlock();

trylock();

get_owner();

count();



//内部实现的锁函数

lock_internal();

trylock_internal();

unlock_internal();



reset_for_condvar();



//到了与平台相关的定义了

#if defined(HAVE_POSIX_THREADS)

pthread_mutex_t m_lock; // Pthreads mutex.

#endif



#if defined(HAVE_WIN32_THREADS)

CRITICAL_SECTION m_lock; // WIN32 critical section.

#endif



//成员定义

JTCMutex m_internal; //内部锁

unsigned int m_count; //加锁的次数

JTCThreadId m_owner; //加锁线程ID



friend class JTCCondHelper;

friend class JTCCond;

}



==========================================================

在lock()调用地时候,JTCRecursiveMutex类通过JTCMutex联系起来.



看看调用lock()函数,发生了什么

bool

JTCRecursiveMutex::lock() const

{

//

// Work around lack of mutable.

//

return ((JTCRecursiveMutex*)this) -> lock_internal(1);

}





==========================================================

lock_internal()函数地实现

bool JTCRecursiveMutex::lock_internal(int count)

{

bool rc = false;

bool obtained = false;

while (!obtained)

{

---------------------------------------------------------------------------

m_internal.lock();

if (m_count == 0)

{

m_count = count;

m_owner = JTCThreadId::self();

obtained = true;

rc = true;

try

{

#if defined(HAVE_POSIX_THREADS)

JTC_SYSCALL_1(pthread_mutex_lock, &m_lock, != 0)

#endif



#if defined(HAVE_WIN32_THREADS)

EnterCriticalSection(&m_lock);

#endif

}

catch(...)

{

try{

m_internal.unlock();

}catch(...)

{

}

throw;

}

}

else if (m_owner == JTCThreadId::self())

{

m_count += count;

obtained = true;

}

m_internal.unlock();

---------------------------------------------------------------------------



if (!obtained)

{

#if defined(HAVE_POSIX_THREADS)

JTC_SYSCALL_1(pthread_mutex_lock, &m_lock, != 0)

pthread_mutex_unlock(&m_lock);

#endif



#if defined(HAVE_WIN32_THREADS)

EnterCriticalSection(&m_lock);

LeaveCriticalSection(&m_lock);

#endif



}

}

return rc;

}





==========================================================

函数内部使用了JTCMutex作为内部的锁,是看JTCMutex类的时候了.

class JTCMutex

{

--------------------------------------------------------------------

lock();

unlock();

trylock();

get_owner();

count();

--------------------------------------------------------------------

//平台相关成员定义

#if defined(HAVE_POSIX_THREADS)

pthread_mutex_t m_lock; // Pthread mutex.

#endif



#if defined(HAVE_WIN32_THREADS)

CRITICAL_SECTION m_lock; // WIN32 critical section.

#endif

--------------------------------------------------------------------



友元定义

friend class JTCCond;

}





==========================================================

lock()函数的UNIX实现

{

#if defined(HAVE_POSIX_THREADS)

#if defined(__GNUC__) && defined(__OPTIMIZE__)

//

// The optimizer for GCC 2.95.1 is broken. The following

// three lines of code "fix" the problem.

//

volatile int i = 1;

if (i == 0)

++i;

#endif

pthread_mutex_t* lock = &((JTCMutex*)this) -> m_lock;

JTC_SYSCALL_1(pthread_mutex_lock, lock, != 0)

#endif

}



lock()函数的WIN32实现

{

#if defined(HAVE_WIN32_THREADS)

CRITICAL_SECTION* crit = &((JTCMutex*)this) -> m_lock;

EnterCriticalSection(crit);

#endif

}





==========================================================

回到以前的JTCMornitorBase的类定义中,有一个是条件变量成员

JTCCond m_mon_cond;,在JTCMornitor的类的wait()函数使用到他



Void JTCMonitor::wait()

{

validate_owner(m_mon_mutex.get_owner(), "wait()");

notify_internal(m_nnotify); // int m_nnotify挂起的通知数

try

{

// JTCRecursiveMutex m_mon_mutex;等待监视器的互斥变量

m_mon_cond.wait(m_mon_mutex);

}

catch(...)

{

m_nnotify = 0;

throw;

}

m_nnotify = 0;

}



==========================================================

条件变量类,他通过构造函数传入的等待的条件对象,实现对对象的改变的通知.

例如:多个线程等待某个线程的互斥条件有没有释放,某个时刻释放后通知各个线程

class JTCCond

{

wait(JTCRecursiveMutex& mutex);

wait(JTCMutex& mutex);

signal();

broadcast();

wait_internal( JTCRecursiveMutex&,long timeout);

calc_timeout(long timeout);//posix



//平台相关成员变量定义

#if defined(HAVE_POSIX_THREADS)

pthread_cond_t m_cond; // Pthread condition variable.

#endif



#if defined(HAVE_WIN32_THREADS)

CondImpl* m_impl;

#endif

}





wait()函数简单调用了内部wait_internal()函数,等待入参的互斥条件

JTCCond::wait(JTCRecursiveMutex& mutex)

{

wait_internal(mutex, -1);

}



先看win32的实现

#if defined(HAVE_WIN32_THREADS)

wait_internal(JTCRecursiveMutex& mutex,)

{

m_impl -> pre_wait();

unsigned int count = mutex.reset_for_condvar();

try

{

//调用内部条件类的的wait函数,内部利用信号量来实现

bool rc = m_impl -> wait(timeout);

mutex.lock(count);

return rc;

}

catch(...)

{

mutex.lock(count);

throw;

}

#endif

}



主要焦点在成员变量的函数m_impl->wait()上

CondImpl* m_impl



bool CondImpl::wait(long timeout)

{

// Wait for the queue semaphore to be signaled.

try{

bool rc = m_queue.wait(timeout);

postwait(!rc);

return rc;

}catch(...)

{

postwait(false);

throw;

}

}



发现有一个队列成员变量和发送等待的postwait()函数.先看一下CondImpl的定义



==========================================================

class CondImpl

{

signal();

pre_wait();

wait();

postwait();



//关键的成员定义

JTCSemaphore m_gate;

JTCSemaphore m_queue;//可以说是信号队列

JTCMutex m_internal; //本身内部用来加锁

long m_blocked;

long m_unblocked;

long m_to_unblock;

}



==========================================================

成员m_queue虽然叫队列,但他是通过信号量来实现的.

class JTCSemaphore

{

JTCSemaphore(long initial_count = 0);

~JTCSemaphore();

//wait和post实现了信号量等待和投递

bool wait(long timeout = -1);

void post(int count = 1);



#if defined(HAVE_WIN32_THREADS)

HANDLE m_sem; // The semaphore handle

#endif

};



构造函数调用了CreateSemaphore();函数,创建的时候信号量最大为0x7fffffff,

开始的信号量为构造参数初始化.

JTCSemaphore::JTCSemaphore(long initial_count)

{

#if defined(HAVE_WIN32_THREADS)

JTC_SYSCALL_4(m_sem = CreateSemaphore, 0, initial_count, 0x7fffffff, 0,

== INVALID_HANDLE_VALUE)

#endif

}



当信号量为大于0时,友信号,等于0时无信号.

win32的信号量实现调用了WaitForSingleObject和ReleaseSemaphore函数.

当信号量为0,Wait()一直在等待,如果信号量大于0时函数返回,信号量减一,

而post()是将信号量加一.

bool JTCSemaphore::wait(long timeout)

{

#if defined(HAVE_WIN32_THREADS)

if (timeout < 0) {

timeout = INFINITE;

}

int rc;

JTC_SYSCALL_2(rc = WaitForSingleObject, m_sem, timeout, == WAIT_ABANDONED);

return rc != WAIT_TIMEOUT;

#endif

}



void JTCSemaphore::post(int count)

{

#if defined(HAVE_WIN32_THREADS)

JTC_SYSCALL_3(ReleaseSemaphore, m_sem, count, 0, == 0)

#endif

}



==========================================================

psotwait()的实现

void CondImpl::postwait(bool timeout)

{

m_internal.lock();

m_unblocked++;



if (m_to_unblock != 0)

{

bool last = (--m_to_unblock == 0);

m_internal.unlock();



if (timeout) {

m_queue.wait();

}



if (last) {

m_gate.post();

}else{

m_queue.post();

}

}else{

m_internal.unlock();

}

}







关于SYSCALL





提供对重要函数调用出现失败时异常错误信息和定位发生异常的代码行和文件名输出和继续抛出异常的宏包装。FN是函数名,其他是参数,最后是函数返回值。这种做法值得学习。

#define JTC_SYSCALL_6(FN,A1,A2,A3,A4,A5,A6,COND)
do {
DECLARE_ERRNO
if ((ASSIGN_ERRNO (FN (A1,A2,A3,A4,A5,A6))) COND)
JTC_THROW_EXCEPTION( ERRNO, #FN << JTC_FMT_ARG_6(A1,A2,A3,A4,A5,A6) )
} while (0);





# define JTC_THROW_EXCEPTION(CODE,MSG)
{
long error = CODE;
char msg[ 512 ];
msg[sizeof(msg)-1] = '\0';
JTC_STD(ostrstream) stream(msg, sizeof(msg)-1);
stream << MSG << " == " << error << " [" << __FILE__
<< ':' << __LINE__ << ']' << JTC_STD(ends);
throw JTCSystemCallException(msg, error);
}



JTC_STD是关于标准流的宏定义

#ifndef HAVE_JTC_NO_IOSTREAM

# ifdef HAVE_STD_IOSTREAM

# define JTC_STD(x) std::x

# else

# define JTC_STD(x) x

# endif

#endif // !HAVE_JTC_NO_IOSTREAM



函数返回值的宏定义

#if defined(WIN32)

# define ERRNO GetLastError()

# define DECLARE_ERRNO

# define ASSIGN_ERRNO

#else

# define ERRNO _jtc_syscallError

# define DECLARE_ERRNO int _jtc_syscallError;

# define ASSIGN_ERRNO _jtc_syscallError =

#endif

2.
boost::thread简要分析

By billdavid
From http://blog.vckbase.com/billdavid/archive/2005/05/19/5566.html
http://blog.vckbase.com/billdavid/archive/2005/05/24/5734.html
http://blog.vckbase.com/billdavid/archive/2005/05/24/5736.html

注:以下讨论基于boost1.32.0。

boost::thread库跟boost::function等很多其它boost组成库不同,它只是一个跨平台封装库(简单的说,就是根据不同的宏调用不同的API),里面没有太多的GP编程技巧,因此,研究起来比较简单。

boost::thread库主要由以下部分组成:
thread
mutex
scoped_lock
condition
xtime
barrier
下面依次解析如下:

thread
thread自然是boost::thread库的主角,但thread类的实现总体上是比较简单的,前面已经说过,thread只是一个跨平台的线程封装库,其中按照所使用的编译选项的不同,分别决定使用Windows线程API还是pthread,或者MAC的thread实现。以下只讨论Windows,即使用BOOST_HAS_WINTHREADS的情况。
thread类提供了两种构造函数:
thread::thread()
thread::thread(const function0& threadfunc)
第一种构造函数用于调用GetCurrentThread构造一个当前线程的thread对象,第二种则通过传入一个函数或者一个functor来创建一个新的线程。第二种情况下,thread类在其构造函数中间接调用CreateThread来创建线程,并将线程句柄保存到成员变量m_thread中,并执行传入的函数,或执行functor的operator ()方法来启动工作线程。
此外,thread类有一个Windows下的程序员可能不大熟悉的成员函数join,线程(通常是主线程)可以通过调用join函数来等待另一线程(通常是工作线程)退出,join的实现也十分简单,是调用WaitForSingleObject来实现的:
WaitForSingleObject(reinterpret_cast(m_thread), INFINITE);
我们可以用以下三种方式启动一个新线程:
1、传递一个工作函数来构造一个工作线程
#include
#include
#include

boost::mutex io_mutex;

void count() // worker function
{
for (int i = 0; i < 10; ++i)
{
boost::mutex::scoped_lock lock(io_mutex);
std::cout << i << std::endl;
}
}

int main(int argc, char* argv[])
{
boost::thread thrd1(&count);
boost::thread thrd2(&count);
thrd1.join();
thrd2.join();

return 0;
}
2、传递一个functor对象来构造一个工作线程
#include
#include
#include

boost::mutex io_mutex;

struct count
{
count(int id) : id(id) { }

void operator()()
{
for (int i = 0; i < 10; ++i)
{
boost::mutex::scoped_lock lock(io_mutex); // lock io, will be explained soon.
std::cout << id << ": " << i << std::endl;
}
}

int id;
};

int main(int argc, char* argv[])
{
boost::thread thrd1(count(1));
boost::thread thrd2(count(2));
thrd1.join();
thrd2.join();
return 0;
}
3、无需将类设计成一个functor,借助bind来构造functor对象以创建工作线程
#include
#include
#include
#include

boost::mutex io_mutex;

struct count
{
static int num;
int id;

count() : id(num++) {}

int do_count(int n)
{
for (int i = 0; i < n; ++i)
{
boost::mutex::scoped_lock lock(io_mutex);
std::cout << id << ": " << i << std::endl;
}
return id;
}
};

int count::num = 1;

int main(int argc, char* argv[])
{
count c1;
boost::thread thrd1(boost::bind(&count::do_count, &c1, 10));
thrd1.join();
return 0;
}
其中bind是一个函数模板,它可以根据后面的实例化参数构造出一个functor来,上面的boost::bind(&count::do_count, &c1, 10)其实等价于返回了一个functor:
struct countFunctor
{
int operator() ()
{
(&c1)->do_count(10); // just a hint, not actual code
}
};
因此,以后就跟2中是一样的了。

除了thread,boost::thread库另一个重要组成部分是mutex,以及工作在mutex上的boost::mutex::scoped_lock、condition和barrier,这些都是为实现线程同步提供的。

mutex
boost提供的mutex有6种:
boost::mutex
boost::try_mutex
boost::timed_mutex
boost::recursive_mutex
boost::recursive_try_mutex
boost::recursive_timed_mutex
下面仅对boost::mutex进行分析。
mutex类是一个CriticalSection(临界区)封装类,它在构造函数中新建一个临界区并InitializeCriticalSection,然后用一个成员变量
void* m_mutex;
来保存该临界区结构。
除此之外,mutex还提供了do_lock、do_unlock等方法,这些方法分别调用EnterCriticalSection、LeaveCriticalSection来修改成员变量m_mutex(CRITICAL_SECTION结构指针)的状态,但这些方法都是private的,以防止我们直接对mutex进行锁操作,所有的锁操作都必须通过mutex的友元类detail::thread::lock_ops来完成,比较有意思的是,lock_ops的所有方法:lock、unlock、trylock等都是static的,如lock_ops::lock的实现:
template
class lock_ops : private noncopyable
{
...
public:
static void lock(Mutex& m)
{
m.do_lock();
}
...
}
boost::thread的设计者为什么会这么设计呢?我想大概是:
1、boost::thread的设计者不希望被我们直接操作mutex,改变其状态,所以mutex的所有方法都是private的(除了构造函数,析构函数)。
2、虽然我们可以通过lock_ops来修改mutex的状态,如:
#include
#include
#include

int main()
{
boost::mutex mt;
//mt.do_lock(); // Error! Can not access private member!

boost::detail::thread::lock_ops::lock(mt);

return 0;
}
但是,这是不推荐的,因为mutex、scoped_lock、condition、barrier是一套完整的类系,它们是相互协同工作的,像上面这么操作没有办法与后面的几个类协同工作。

scoped_lock
上面说过,不应该直接用lock_ops来操作mutex对象,那么,应该用什么呢?答案就是scoped_lock。与存在多种mutex一样,存在多种与mutex对应的scoped_lock:
scoped_lock
scoped_try_lock
scoped_timed_lock
这里我们只讨论scoped_lock。
scoped_lock是定义在namespace boost::detail::thread下的,为了方便我们使用(也为了方便设计者),mutex使用了下面的typedef:
typedef detail::thread::scoped_lock scoped_lock;
这样我们就可以通过:
boost::mutex::scoped_lock
来使用scoped_lock类模板了。
由于scoped_lock的作用仅在于对mutex加锁/解锁(即使mutex EnterCriticalSection/LeaveCriticalSection),因此,它的接口也很简单,除了构造函数外,仅有lock/unlock/locked(判断是否已加锁),及类型转换操作符void*,一般我们不需要显式调用这些方法,因为scoped_lock的构造函数是这样定义的:
explicit scoped_lock(Mutex& mx, bool initially_locked=true)
: m_mutex(mx), m_locked(false)
{
if (initially_locked) lock();
}
注:m_mutex是一个mutex的引用。
因此,当我们不指定initially_locked参数构造一个scoped_lock对象时,scoped_lock会自动对所绑定的mutex加锁,而析构函数会检查是否加锁,若已加锁,则解锁;当然,有些情况下,我们可能不需要构造时自动加锁,这样就需要自己调用lock方法。后面的condition、barrier也会调用scoped_lock的lock、unlock方法来实现部分方法。
正因为scoped_lock具有可在构造时加锁,析构时解锁的特性,我们经常会使用局部变量来实现对mutex的独占访问。如thread部分独占访问cout的例子:
#include
#include
#include

boost::mutex io_mutex;

void count() // worker function
{
for (int i = 0; i < 10; ++i)
{
boost::mutex::scoped_lock lock(io_mutex);
std::cout << i << std::endl;
}
}

int main(int argc, char* argv[])
{
boost::thread thrd1(&count);
boost::thread thrd2(&count);
thrd1.join();
thrd2.join();

return 0;
}
在每次输出信息时,为了防止整个输出过程被其它线程打乱,通过对io_mutex加锁(进入临界区),从而保证了输出的正确性。
在使用scoped_lock时,我们有时候需要使用全局锁(定义一个全局mutex,当需要独占访问全局资源时,以该全局mutex为参数构造一个scoped_lock对象即可。全局mutex可以是全局变量,也可以是类的静态方法等),有时候则需要使用对象锁(将mutex定义成类的成员变量),应该根据需要进行合理选择。
Java的synchronized可用于对方法加锁,对代码段加锁,对对象加锁,对类加锁(仍然是对象级的),这几种加锁方式都可以通过上面讲的对象锁来模拟;相反,在Java中实现全局锁好像有点麻烦,必须将请求封装到类中,以转换成上面的四种synchronized形式之一。

condition
condition的接口如下:
class condition : private boost::noncopyable // Exposition only
{
public:
// construct/copy/destruct
condition();
~condition();

// notification
void notify_one();
void notify_all();

// waiting
template void wait(ScopedLock&);
template void wait(ScopedLock&, Pred);
template
bool timed_wait(ScopedLock&, const boost::xtime&);
template
bool timed_wait(ScopedLock&, Pred);
};
其中wait用于等待某个condition的发生,而timed_wait则提供具有超时的wait功能,notify_one用于唤醒一个等待该condition发生的线程,notify_all则用于唤醒所有等待该condition发生的线程。

由于condition的语义相对较为复杂,它的实现也是整个boost::thread库中最复杂的(对Windows版本而言,对支持pthread的版本而言,由于pthread已经提供了pthread_cond_t,使得condition实现起来也十分简单),下面对wait和notify_one进行简要分析。
condition内部包含了一个condition_impl对象,由该对象执行来处理实际的wait、notify_one...等操作。

下面先对condition_impl进行简要分析。
condition_impl在其构造函数中会创建两个Semaphore(信号量):m_gate、m_queue,及一个Mutex(互斥体,跟boost::mutex类似,但boost::mutex是基于CriticalSection<临界区>的):m_mutex,其中:
m_queue
相当于当前所有等待线程的等待队列,构造函数中调用CreateSemaphore来创建Semaphore时,lMaximumCount参数被指定为(std::numeric_limits::max)(),即便如此,condition的实现者为了防止出现大量等待线程的情况(以至于超过了long的最大值),在线程因执行condition::wait进入等待状态时会先:
WaitForSingleObject(reinterpret_cast(m_queue), INFINITE);
以等待被唤醒,但很难想象什么样的应用需要处理这么多线程。
m_mutex
用于内部同步的控制。
但对于m_gate我很奇怪,我仔细研究了一下condition_imp的实现,还是不明白作者引入m_gate这个变量的用意何在,既然已经有了用于同步控制的m_mutex,再引入一个m_gate实在让我有点不解。

以下是condition::wait调用的do_wait方法简化后的代码:
template
void do_wait(M& mutex)
{
m_impl.enter_wait();
lock_ops::unlock(mutex, state); //对传入的scoped_lock对象解锁,以便别的线程可以对其进行加锁,并执行某些处理,否则,本线程等待的condition永远不会发生(因为没有线程可以获得访问资源的权利以使condition发生)
m_impl.do_wait(); //执行等待操作,等待其它线程执行notify_one或notify_all操作以获得
lock_ops::lock(mutex, state); //重新对scoped_lock对象加锁,获得独占访问资源的权利
}
condition::timed_wait的实现方法与此类似,而notify_one、notify_all仅将调用请求转发给m_impl,就不多讲了。

虽然condition的内部实现比较复杂,但使用起来还是比较方便的。下面是一个使用condition的多Producer-多Consumer同步的例子(这是本人为即将推出的“大卫的Design Patterns学习笔记”编写的Mediator模式的示例):
#include
#include
#include
#include

#include
#include // for time()

#include // for Sleep, change it for other platform, we can use
// boost::thread::sleep, but it's too inconvenient.

typedef boost::mutex::scoped_lock scoped_lock;
boost::mutex io_mutex;

class Product
{
int num;
public:
Product(int num) : num(num) {}

friend std::ostream& operator<< (std::ostream& os, Product& product)
{
return os << product.num;
}
};

class Mediator
{
private:
boost::condition cond;
boost::mutex mutex;

Product** pSlot; // product buffer/slot
unsigned int slotCount, // buffer size
productCount; // current product count
bool stopFlag; // should all thread stop or not

public:
Mediator(const int slotCount) : slotCount(slotCount), stopFlag(false), productCount(0)
{
pSlot = new Product*[slotCount];
}

virtual ~Mediator()
{
for (int i = 0; i < static_cast(productCount); i++)
{
delete pSlot[i];
}
delete [] pSlot;
}

bool Stop() const { return stopFlag; }
void Stop(bool) { stopFlag = true; }

void NotifyAll() // notify all blocked thread to exit
{
cond.notify_all();
}

bool Put( Product* pProduct)
{
scoped_lock lock(mutex);
if (productCount == slotCount)
{
{
scoped_lock lock(io_mutex);
std::cout << "Buffer is full. Waiting..." << std::endl;
}
while (!stopFlag && (productCount == slotCount))
cond.wait(lock);
}
if (stopFlag) // it may be notified by main thread to quit.
return false;

pSlot[ productCount++ ] = pProduct;
cond.notify_one(); // this call may cause *pProduct to be changed if it wakes up a consumer

return true;
}

bool Get(Product** ppProduct)
{
scoped_lock lock(mutex);
if (productCount == 0)
{
{
scoped_lock lock(io_mutex);
std::cout << "Buffer is empty. Waiting..." << std::endl;
}
while (!stopFlag && (productCount == 0))
cond.wait(lock);
}
if (stopFlag) // it may be notified by main thread to quit.
{
*ppProduct = NULL;
return false;
}

*ppProduct = pSlot[--productCount];
cond.notify_one();

return true;
}
};

class Producer
{
private:
Mediator* pMediator;
static unsigned int num;
unsigned int id; // Producer id

public:
Producer(Mediator* pMediator) : pMediator(pMediator) { id = num++; }

void operator() ()
{
Product* pProduct;
srand( (unsigned)time( NULL ) + id ); // each thread need to srand differently
while (!pMediator->Stop())
{
pProduct = new Product( rand() % 100 );
// must print product info before call Put, as Put may wake up a consumer
// and cause *pProuct to be changed
{
scoped_lock lock(io_mutex);
std::cout << "Producer[" << id << "] produces Product["
<< *pProduct << "]" << std::endl;
}
if (!pMediator->Put(pProduct)) // this function only fails when it is notified by main thread to exit
delete pProduct;

Sleep(100);
}
}
};

unsigned int Producer::num = 1;

class Consumer
{
private:
Mediator* pMediator;
static unsigned int num;
unsigned int id; // Consumer id

public:
Consumer(Mediator* pMediator) : pMediator(pMediator) { id = num++; }

void operator() ()
{
Product* pProduct = NULL;
while (!pMediator->Stop())
{
if (pMediator->Get(&pProduct))
{
scoped_lock lock(io_mutex);
std::cout << "Consumer[" << id << "] is consuming Product["
<< *pProduct << "]" << std::endl;
delete pProduct;
}

Sleep(100);
}
}
};

unsigned int Consumer::num = 1;

int main()
{
Mediator mediator(2); // we have only 2 slot to put products

// we have 2 producers
Producer producer1(&mediator);
boost::thread thrd1(producer1);
Producer producer2(&mediator);
boost::thread thrd2(producer2);
// and we have 3 consumers
Consumer consumer1(&mediator);
boost::thread thrd3(consumer1);
Consumer consumer2(&mediator);
boost::thread thrd4(consumer2);
Consumer consumer3(&mediator);
boost::thread thrd5(consumer3);

// wait 1 second
Sleep(1000);
// and then try to stop all threads
mediator.Stop(true);
mediator.NotifyAll();

// wait for all threads to exit
thrd1.join();
thrd2.join();
thrd3.join();
thrd4.join();
thrd5.join();

return 0;
}

barrier
barrier类的接口定义如下:
class barrier : private boost::noncopyable // Exposition only
{
public:
// construct/copy/destruct
barrier(size_t n);
~barrier();

// waiting
bool wait();
};
barrier类为我们提供了这样一种控制线程同步的机制:
前n - 1次调用wait函数将被阻塞,直到第n次调用wait函数,而此后第n + 1次到第2n - 1次调用wait也会被阻塞,直到第2n次调用,依次类推。
barrier::wait的实现十分简单:
barrier::barrier(unsigned int count)
: m_threshold(count), m_count(count), m_generation(0)
{
if (count == 0)
throw std::invalid_argument("count cannot be zero.");
}

bool barrier::wait()
{
boost::mutex::scoped_lock lock(m_mutex); // m_mutex is the base of barrier and is initilized by it's default constructor.
unsigned int gen = m_generation; // m_generation will be 0 for call 1~n-1, and 1 for n~2n - 1, and so on...

if (--m_count == 0)
{
m_generation++; // cause m_generation to be changed in call n/2n/...
m_count = m_threshold; // reset count
m_cond.notify_all(); // wake up all thread waiting here
return true;
}

while (gen == m_generation) // if m_generation is not changed, lock current thread.
m_cond.wait(lock);
return false;
}
因此,说白了也不过是mutex的一个简单应用。
以下是一个使用barrier的例子:
#include
#include

int i = 0;
boost::barrier barr(3); // call barr.wait 3 * n times will release all threads in waiting

void thread()
{
++i;
barr.wait();
}

int main()
{
boost::thread thrd1(&thread);
boost::thread thrd2(&thread);
boost::thread thrd3(&thread);

thrd1.join();
thrd2.join();
thrd3.join();

return 0;
}
如果去掉其中thrd3相关的代码,将使得线程1、2一直处于wait状态,进而使得主线程无法退出。

xtime
xtime是boost::thread中用来表示时间的一个辅助类,它是一个仅包含两个成员变量的结构体:
struct xtime
{
//...
xtime_sec_t sec;
xtime_nsec_t nsec;
};
condition::timed_wait、thread::sleep等涉及超时的函数需要用到xtime。
需要注意的是,xtime表示的不是一个时间间隔,而是一个时间点,因此使用起来很不方便。为了方便使用xtime,boost提供了一些辅助的xtime操作函数,如xtime_get、xtime_cmp等。
以下是一个使用xtime来执行sleep的例子(跟简单的一句Sleep比起来,实在是太复杂了),其中用到了xtime初始化函数xtime_get:
#include
#include
#include

int main()
{
boost::xtime xt;
boost::xtime_get(&xt, boost::TIME_UTC); // initialize xt with current time
xt.sec += 1; // change xt to next second
boost::thread::sleep(xt); // do sleep

std::cout << "1 second sleep over." << std::endl;

return 0;
}

多线程编程中还有一个重要的概念:Thread Local Store(TLS,线程局部存储),在boost中,TLS也被称作TSS,Thread Specific Storage。
boost::thread库为我们提供了一个接口简单的TLS的面向对象的封装,以下是tss类的接口定义:
class tss
{
public:
tss(boost::function1* pcleanup);
void* get() const;
void set(void* value);
void cleanup(void* p);
};
分别用于获取、设置、清除线程局部存储变量,这些函数在内部封装了TlsAlloc、TlsGetValue、TlsSetValue等API操作,将它们封装成了OO的形式。
但boost将该类信息封装在detail名字空间内,即不推荐我们使用,当需要使用tss时,我们应该使用另一个使用更加方便的类:thread_specific_ptr,这是一个智能指针类,该类的接口如下:
class thread_specific_ptr : private boost::noncopyable // Exposition only
{
public:
// construct/copy/destruct
thread_specific_ptr();
thread_specific_ptr(void (*cleanup)(void*));
~thread_specific_ptr();

// modifier functions
T* release();
void reset(T* = 0);

// observer functions
T* get() const;
T* operator->() const;
T& operator*()() const;
};
即可支持get、reset、release等操作。
thread_specific_ptr类的实现十分简单,仅仅为了将tss类“改装”成智能指针的样子,该类在其构造函数中会自动创建一个tss对象,而在其析构函数中会调用默认参数的reset函数,从而引起内部被封装的tss对象被析构,达到“自动”管理内存分配释放的目的。

以下是一个运用thread_specific_ptr实现TSS的例子:
#include
#include
#include
#include

boost::mutex io_mutex;
boost::thread_specific_ptr ptr; // use this method to tell that this member will not shared by all threads

struct count
{
count(int id) : id(id) { }

void operator()()
{
if (ptr.get() == 0) // if ptr is not initialized, initialize it
ptr.reset(new int(0)); // Attention, we pass a pointer to reset (actually set ptr)

for (int i = 0; i < 10; ++i)
{
(*ptr)++;
boost::mutex::scoped_lock lock(io_mutex);
std::cout << id << ": " << *ptr << std::endl;
}
}

int id;
};

int main(int argc, char* argv[])
{
boost::thread thrd1(count(1));
boost::thread thrd2(count(2));
thrd1.join();
thrd2.join();

return 0;
}

此外,thread库还提供了一个很有趣的函数,call_once,在tss::init的实现中就用到了该函数。
该函数的声明如下:
void call_once(void (*func)(), once_flag& flag);
该函数的Windows实现通过创建一个Mutex使所有的线程在尝试执行该函数时处于等待状态,直到有一个线程执行完了func函数,该函数的第二个参数表示函数func是否已被执行,该参数往往被初始化成BOOST_ONCE_INIT(即0),如果你将该参数初始化成1,则函数func将不被调用,此时call_once相当于什么也没干,这在有时候可能是需要的,比如,根据程序处理的结果决定是否需要call_once某函数func。
call_once在执行完函数func后,会将flag修改为1,这样会导致以后执行call_once的线程(包括等待在Mutex处的线程和刚刚进入call_once的线程)都会跳过执行func的代码。

需要注意的是,该函数不是一个模板函数,而是一个普通函数,它的第一个参数1是一个函数指针,其类型为void (*)(),而不是跟boost库的很多其它地方一样用的是function模板,不过这样也没有关系,有了boost::bind这个超级武器,想怎么绑定参数就随你的便了,根据boost的文档,要求传入的函数不能抛出异常,但从实现代码中好像不是这样。

以下是一个典型的运用call_once实现一次初始化的例子:
#include
#include
#include

int i = 0;
int j = 0;
boost::once_flag flag = BOOST_ONCE_INIT;

void init()
{
++i;
}

void thread()
{
boost::call_once(&init, flag);
++j;
}

int main(int argc, char* argv[])
{
boost::thread thrd1(&thread);
boost::thread thrd2(&thread);
thrd1.join();
thrd2.join();

std::cout << i << std::endl;
std::cout << j << std::endl;

return 0;
}
结果显示,全局变量i仅被执行了一次++操作,而变量j则在两个线程中均执行了++操作。

其它
boost::thread目前还不十分完善,最主要的问题包括:没有线程优先级支持,或支持线程的取消操作等,而且,目前的实现机制似乎不大容易通过简单修改达到这一要求,也许将来的某个版本会在实现上出现较大调整,但目前支持的接口应该会相对保持稳定,目前支持的特性也还会继续有效。



<< Home

This page is powered by Blogger. Isn't yours?