Thursday, December 21, 2006

 

Some notes on ThreadPool - 1

We may use different words to describe the concept we will discuss below, thread pool, waiting queue, ...

1.
From http://www.cnblogs.com/jobs/archive/2007/04/27/730255.html

精巧好用的DelayQueue

我们谈一下实际的场景吧。我们在开发中,有如下场景

a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

一种笨笨的办法就是,使用一个后台线程,遍历所有对象,挨个检查。这种笨笨的办法简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,多小则存在效率问题。而且做不到按超时的时间顺序处理。

这场景,使用DelayQueue最适合了。

DelayQueue是java.util.concurrent中提供的一个很有意思的类。很巧妙,非常棒!但是java doc和Java SE 5.0的source中都没有提供Sample。我最初在阅读ScheduledThreadPoolExecutor源码时,发现DelayQueue的妙用。随后在实际工作中,应用在session超时管理,网络应答通讯协议的请求超时处理。

本文将会对DelayQueue做一个介绍,然后列举应用场景。并且提供一个Delayed接口的实现和Sample代码。

DelayQueue是一个BlockingQueue,其特化的参数是Delayed。(不了解BlockingQueue的同学,先去了解BlockingQueue再看本文)
Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。

DelayQueue = BlockingQueue + PriorityQueue + Delayed

DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。

他们的基本定义如下
public interface Comparable {
public int compareTo(T o);
}
public interface Delayed extends Comparable {
long getDelay(TimeUnit unit);
}
public class DelayQueue implements BlockingQueue {
private final PriorityQueue q = new PriorityQueue();
}
DelayQueue内部的实现使用了一个优先队列。当调用DelayQueue的offer方法时,把Delayed对象加入到优先队列q中。如下:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
q.offer(e);
if (first == null || e.compareTo(first) < 0)
available.signalAll();
return true;
} finally {
lock.unlock();
}
}
DelayQueue的take方法,把优先队列q的first拿出来(peek),如果没有达到延时阀值,则进行await处理。如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;

}
}
}
} finally {
lock.unlock();
}
}
-------------------

以下是Sample,是一个缓存的简单实现。共包括三个类Pair、DelayItem、Cache。如下:

public class Pair {
public K first;

public V second;

public Pair() {}

public Pair(K first, V second) {
this.first = first;
this.second = second;
}
}
--------------
以下是Delayed的实现
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class DelayItem implements Delayed {
/** Base of nanosecond timings, to avoid wrapping */
private static final long NANO_ORIGIN = System.nanoTime();

/**
* Returns nanosecond time offset by origin
*/
final static long now() {
return System.nanoTime() - NANO_ORIGIN;
}

/**
* Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied
* entries.
*/
private static final AtomicLong sequencer = new AtomicLong(0);

/** Sequence number to break ties FIFO */
private final long sequenceNumber;

/** The time the task is enabled to execute in nanoTime units */
private final long time;

private final T item;

public DelayItem(T submit, long timeout) {
this.time = now() + timeout;
this.item = submit;
this.sequenceNumber = sequencer.getAndIncrement();
}

public T getItem() {
return this.item;
}

public long getDelay(TimeUnit unit) {
long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
return d;
}

public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof DelayItem) {
DelayItem x = (DelayItem) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
}



以下是Cache的实现,包括了put和get方法,还包括了可执行的main函数。
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Cache {
private static final Logger LOG = Logger.getLogger(Cache.class.getName());

private ConcurrentMap cacheObjMap = new ConcurrentHashMap();

private DelayQueue>> q = new DelayQueue>>();

private Thread daemonThread;

public Cache() {

Runnable daemonTask = new Runnable() {
public void run() {
daemonCheck();
}
};

daemonThread = new Thread(daemonTask);
daemonThread.setDaemon(true);
daemonThread.setName("Cache Daemon");
daemonThread.start();
}

private void daemonCheck() {

if (LOG.isLoggable(Level.INFO))
LOG.info("cache service started.");

for (;;) {
try {
DelayItem> delayItem = q.take();
if (delayItem != null) {
// 超时对象处理
Pair pair = delayItem.getItem();
cacheObjMap.remove(pair.first, pair.second); // compare and remove
}
} catch (InterruptedException e) {
if (LOG.isLoggable(Level.SEVERE))
LOG.log(Level.SEVERE, e.getMessage(), e);
break;
}
}

if (LOG.isLoggable(Level.INFO))
LOG.info("cache service stopped.");
}

// 添加缓存对象
public void put(K key, V value, long time, TimeUnit unit) {
V oldValue = cacheObjMap.put(key, value);
if (oldValue != null)
q.remove(key);

long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit);
q.put(new DelayItem>(new Pair(key, value), nanoTime));
}

public V get(K key) {
return cacheObjMap.get(key);
}

// 测试入口函数
public static void main(String[] args) throws Exception {
Cache cache = new Cache();
cache.put(1, "aaaa", 3, TimeUnit.SECONDS);

Thread.sleep(1000 * 2);
{
String str = cache.get(1);
System.out.println(str);
}

Thread.sleep(1000 * 2);
{
String str = cache.get(1);
System.out.println(str);
}
}
}
运行Sample,main函数执行的结果是输出两行,第一行为aaa,第二行为null。

2.
源代码直接看rotor
http://dotnet.di.unipi.it/Content/sscli/docs/doxygen/clr/vm/win32threadpool_8h.html
http://dotnet.di.unipi.it/Content/sscli/docs/doxygen/clr/vm/win32threadpool_8h-source.html

看到了const int MaxLimitCPThreadsPerCPU = 25 这样的直白要求。不过我觉得还是计算TLS时间来确定比较科学一些。慢慢来了。

没有分析mono是怎么实现的

3.
.NET's ThreadPool Class - Behind The Scenes
By Marc Clifton (按:赞,又是他老人家)
http://www.codeproject.com/csharp/threadtests.asp

这个总结得真好,流程图一目了然

4.
Threads
http://threads.sourceforge.net/
这个库似乎是给Linux用的。

5.
ZThreads
http://zthread.sourceforge.net/
跨平台的C++线程和同步库。为了跨平台,有的地方写的好麻烦。

6.
然后是学习ACE+TAO+RTCOBRA的Lane思想,其根据地的说明肯定要看。该文档由Doxygen产生的,非常漂亮。
http://www.dre.vanderbilt.edu/Doxygen/Current/html/tao/rtcorba/hierarchy.html
http://www.dre.vanderbilt.edu/Doxygen/Current/html/tao/rtcorba/classTAO__Thread__Lane.html
http://www.dre.vanderbilt.edu/Doxygen/Current/html/tao/rtcorba/classTAO__Thread__Pool.html

7.
这里还有个简单的ThreadpoolwithLane中文说明和一些Java资源
http://www.huihoo.org/rt/RTThreadPool.html

RTThreadPool设计分析

(by huihoo.org Cocia Lin(cocia@163.com))

概述
实时系统中的线程池,需要考虑的特性,主要有以下几点:
1.初始服务线程数量,动态增加线程数量
2.预分配线程池占用内存空间
3.请求缓存,缓存请求按照优先级排队
4.预估线程常用优先级,配备这些优先级的独立优先级线程池。也就是配置通道(Lane)的线程池
5.线程池销毁,释放资源

实时线程池的这些特性,重点围绕在线程资源控制,优先级控制。

在通用的非实时系统中,Doug Lea的util.conconrrenct同步线程开发包定义了很优美的结构模型和提供了基本的线程池实现。util.conconrrenct相关的内容,请自行参考。

从非实时的角度来看,Doug Lea提供的线程池实现,已经很完善。但是对于实时系统中,他缺少主要的实时特性。所以我的想法是,在非实时线程池上进行扩展,使他具备实时特性。

非实时线程池中不用修改,就可以被实时线程池利用的功能有以下几点:
1.初始服务线程数量,动态增加线程数量
2.请求缓存排队

对非实时线程池不具备的实时特性,有以下几点:
1.请求缓存按照优先级排队
2.指定任务的执行的线程优先级
3.配置通道(Lane)的线程池
4.预分配线程池占用内存空间(暂时不讨论这一点)

针对在非实时线程池之上扩展实时线程池,我们现在来一一讨论。

请求缓存按照优先级排队
我们先来看看在非实时线程池中,请求的处理时,如果队列中因为线程池没有足够的就绪线程,就需要等待,将请求暂时放置在请求队列中。而这时,可能有很多请求不断添加到请求队列中,队列中的请求任务,也是不分优先级,按照入队的顺序排列。当线程池中有空闲的线程时,在队首的第一个请求任务会首先得到运行,而不是队列中的优先级最高的任务。这就出现了优先级低的任务比优先级高的任务抢先得到运行的情况,叫做优先级翻转。这种情况,需要在实时系统中尽量避免的。

解决的方法,就是在等待运行的请求队列中,实现根据优先级排队的队列。当一个请求任务被放入队列中时,如果发现,在他之前,有低的优先级任务存在,那么,高优先级任务就会向前移动,直到他前面的优先级都没有他低为止。

Util.conconrrent同步工具包中,队列的抽象叫做Channel。并且提供了几种队列实现,但是其中没有排序队列,这就需要我们自己来实现PriorityChannel。实现代码请见参考。

有了优先级排序的队列,取代非实时线程池中的请求队列,就实现了请求缓存按照优先级排队。

指定任务的执行的线程优先级
非实时线程池中的线程,都是默认优先级的,而且都是一样优先级的。所以在任务被执行的时候,cpu时间的抢占,内存资源,网络资源等,都是按照这个默认优先级分配的。无法指定任务运行时的优先级,是无法满足实时要求的。看一下下面的代码:

用户:
PooledExecutor pool = new PooledExecutor(20);
pool.execute(new Runnable() {
public void run() {
process(connection);
}});

线程池内部:
while ( (task = getTaskFromQueue()) != null) {
task.run();
task = null;
}

在pool.execute()方法被调用时,将指定的任务放入请求等待队列,准备运行。然后,线程池的监护线程监测到有新任务到达,就会从请求队列中取出任务,运行。但是,任务是以运行任务的线程优先级运行的。这样就没有体现任何的优先级分别。

现在,需要把任务放入请求队列时的优先级纪录下来,任务在运行的时候,根据这个优先级,改变运行的优先级级别,任务运行完成后,再还原线程优先级。

while ( (task = getTask()) != null) {
//Change current thread priority to the priority of task.
RTRunnable rttask = (RTRunnable)task;
int oldprio = setCurrentThreadPriority(rttask.getPriority());
//run the task.
task.run();
task = null;
rttask = null;
//priority change back.
setCurrentThreadPriority(oldprio);
}

这样,线程池就可以根据请求任务的线程优先级,执行请求任务了。

配置通道(Lane)的线程池
线程池通道(Thread Lane)的概念,出自RT-CORBA规范。但是,这样的线程池形式,对所有的实时系统都很有用。

线程池通道设计的出发点是:在线程池中,分成许多小的线程池,这些小的线程池,都指定了相应的优先级,当一个请求到达的时候,按照请求的优先级,直接将请求交给对应的那个小的线程池中运行

看看下面的例子,当有一个优先级为15的请求任务,要求被执行,就会根据优先级匹配,将这个任务交给Lane2(15)来运行。 如果有一个优先级为13的请求任务到达后,怎么处理呢?线程池会根据优先级最接近的原则,找到Lane2(15),然后取出一个就绪线程,将此线程优先级调整到请求任务要求的优先级,然后运行。

对于用户来说,用户看到得是这个大的线程池,内部的Lane对用户并不可见。当Lane2(15)中已经没有多余的线程了,而其他小的线程池中还有线程。这时用户的请求任务会被阻塞。为了优化线程池,允许线程借调行为。当当前Lane中没有多余线程可用,可以从比他优先级低的Lane中借取线程,升高借来的线程的优先级,运行请求任务。运行完成后,还原线程优先级,退还线程。当所有被他低的Lane中都没有可用线程,这时用户的请求任务才会被阻塞。(按:这个实现比较烦人)

以上描述了带通道的线程池的行为方式。如果将这个线程池退化到只有一个Lane的时候,这个线程池就等价于不带Lane的线程池。所以,只要实现带通道的线程池,我们通过对Lane的设置,就可以得到带Lane的和不带Lane的线程池。实际上,我们也是这样做的。

将前面我们改造过的线程池,作为我们线程池中的Lane,再根据用户的请求任务,为请求任务匹配合适的Lane,再把任务交给具体的Lane运行,就可以了。

结束
上面,对实时线程池的分析和实现进行了讨论。具体的实现代码可以参见后面参考。上面讨论的内容,关于内存的分配没有涉及,因为这可能涉及到更多的内容。所以现阶段先不考虑这个问题。

对于上面的内容,希望大家能够交流和讨论。以资进步。

参考
开放企业基金会
http://www.huihoo.org

orbas是开放源码的一个Java CORBA的实现,对RT-CORBA进行支持。
本文描述的实时线程吃,包含在orbas源码中。
http://www.huihoo.org/orbas/index.html

Doug Lea的util.conconrrenct同步线程开发包
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html

Real-Time Java Expert Group
http://www.rtj.org

基于Jrate的rtsj RT-ORB:ZEN
http://www.zen.uci.edu/

好了,接下来分析我们需要加些什么。首先我们需要ThreadID。在DotNet Threadpool,ThreadID是不重视的。Rotor中写到

threadCB->threadId = threadId; // may be useful for debugging otherwise not used

而我们需要一个ID。简单的想法是产生一个自动增长的Unsigned整数ID,然后循环使用。于是我们看到有人这么写

this->m_ID = InterlockedIncrement(&LastUsedID);

可是,这依然是不太安全的。参见Raymond Chen的blog
Interlocked operations don't solve everything
http://weblogs.asp.net/oldnewthing/archive/2004/09/15/229915.aspx

Interlocked operations are a high-performance way of updating DWORD-sized or pointer-sized values in an atomic manner. Note, however, that this doesn't mean that you can avoid the critical section.

For example, suppose you have a critical section that protects a variable, and in some other part of the code, you want to update the variable atomically. "Well," you say, "this is a simple imcrement, so I can skip the critical section and just do a direct InterlockedIncrement. Woo-hoo, I avoided the critical section bottleneck."

Well, except that the purpose of that critical section was to ensure that nobody changed the value of the variable while the protected section of code was running. You just ran in and changed the value behind that code's back.

Conversely, some people suggested emulating complex interlocked operations by having a critical section whose job it was to protect the variable. For example, you might have an InterlockedMultiply that goes like this:

// Wrong!
LONG InterlockedMultiply(volatile LONG *plMultiplicand, LONG lMultiplier)
{
EnterCriticalSection(&SomeCriticalSection);
LONG lResult = *plMultiplicand *= lMultiplier;
LeaveCriticalSection(&SomeCriticalSection);
return lResult;
}

While this code does protect against two threads performing an InterlockedMultiply against the same variable simultaneously, it fails to protect against other code performing a simple atomic write to the variable. Consider the following:

int x = 2;
Thread1()
{
InterlockedIncrement(&x);
}

Thread2()
{
InterlockedMultiply(&x, 5);
}

If the InterlockedMultiply were truly interlocked, the only valid results would be x=15 (if the interlocked increment beat the interlocked multiply) or x=11 (if the interlocked multiply beat the interlocked increment). But since it isn't truly interlocked, you can get other weird values:

Thread 1 Thread 2
x = 2 at start
InterlockedMultiply(&x, 5)
EnterCriticalSection
load x (loads 2)
InterlockedIncrement(&x);
x is now 3
multiply by 5 (result: 10)
store x (stores 10)
LeaveCriticalSection
x = 10 at end

Oh no, our interlocked multiply isn't very interlocked after all! How can we fix it?

If the operation you want to perform is a function solely of the starting numerical value and the other function parameters (with no dependencies on any other memory locations), you can write your own interlocked-style operation with the help of InterlockedCompareExchange.

LONG InterlockedMultiply(volatile LONG *plMultiplicand, LONG lMultiplier)
{
LONG lOriginal, lResult;
do {
lOriginal = *plMultiplicand;
lResult = lOriginal * lMultiplier;
} while (InterlockedCompareExchange(plMultiplicand,
lResult, lOriginal) != lOriginal);
return lResult;
}

[Typo in algorithm fixed 9:00am.]

To perform a complicated function on the multiplicand, we perform three steps.

First, capture the value from memory: lOriginal = *plMultiplicand;

Second, compute the desired result from the captured value: lResult = lOriginal * lMultiplier;

Third, store the result provided the value in memory has not changed: InterlockedCompareExchange(plMultiplicand, lResult, lOriginal)

If the value did change, then this means that the interlocked operation was unsucessful because somebody else changed the value while we were busy doing our computation. In that case, loop back and try again.

If you walk through the scenario above with this new InterlockedMultiply function, you will see that after the interloping InterlockedIncrement, the loop will detect that the value of "x" has changed and restart. Since the final update of "x" is performed by an InterlockedCompareExchange operation, the result of the computation is trusted only if "x" did not change value.

Note that this technique works only if the operation being performed is a pure function of the memory value and the function parameters. If you have to access other memory as part of the computation, then this technique will not work! That's because those other memory locations might have changed during the computation and you would have no way of knowing, since InterlockedCompareExchange checks only the memory value being updated.

Failure to heed the above note results in problems such as the so-called "ABA Problem". I'll leave you to google on that term and read about it. Fortunately, everybody who talks about it also talks about how to solve the ABA Problem, so I'll leave you to read that, too.

Once you've read about the ABA Problem and its solution, you should be aware that the solution has already been implemented for you, via the Interlocked SList functions.

所以说,我们还是需要CriticalSection。当然,我们应该把这个CriticalSection分给Lane或者更上一层的Pool。于是把其定义成friend function吧。



<< Home

This page is powered by Blogger. Isn't yours?