[color=#000080]前面谈了多线程应用程序能极大地改善用户相应。例如对于一个Web应用程序,每当一个用户请求服务器连接时,服务器就可以启动一个新线程为用户服务。 然而,创建和销毁线程本身就有一定的开销,如果频繁创建和销毁线程,CPU和内存开销就不可忽略,垃圾收集器还必须负担更多的工作。因此,线程池就是为了避免频繁创建和销毁线程。 每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁创建和销毁线程。 Worker Pattern实现了类似线程池的功能。首先定义Task接口: package *域名隐藏* ; public interface Task { void execute(); } 线程将负责执行execute()方法。注意到任务是由子类通过实现execute()方法实现的,线程本身并不知道自己执行的任务。它只负责运行一个耗时的execute()方法。 具体任务由子类实现,我们定义了一个CalculateTask和一个TimerTask: // *域名隐藏* package *域名隐藏* ; public class CalculateTask implements Task { private static int count = 0; private int num = count; public CalculateTask() { count++; } public void execute() { *域名隐藏* n("[CalculateTask " + num + "] start..."); try { *域名隐藏* (3000); } catch(InterruptedException ie) {} *域名隐藏* n("[CalculateTask " + num + "] done."); } } // *域名隐藏* package *域名隐藏* ; public class TimerTask implements Task { private static int count = 0; private int num = count; public TimerTask() { count++; } public void execute() { *域名隐藏* n("[TimerTask " + num + "] start..."); try { *域名隐藏* (2000); } catch(InterruptedException ie) {} *域名隐藏* n("[TimerTask " + num + "] done."); } } 以上任务均简单的sleep若干秒。 TaskQueue实现了一个队列,客户端可以将请求放入队列,服务器线程可以从队列中取出任务: package *域名隐藏* ; import *域名隐藏* .*; public class TaskQueue { private List queue = new LinkedList(); public synchronized Task getTask() { while( *域名隐藏* ()==0) { try { *域名隐藏* (); } catch(InterruptedException ie) { return null; } } return (Task) *域名隐藏* (0); } public synchronized void putTask(Task task) { *域名隐藏* (task); *域名隐藏* All(); } } 终于到了真正的WorkerThread,这是真正执行任务的服务器线程: package *域名隐藏* ; public class WorkerThread extends Thread { private static int count = 0; private boolean busy = false; private boolean stop = false; private TaskQueue queue; public WorkerThread(ThreadGroup group, TaskQueue queue) { super(group, "worker-" + count); count++; *域名隐藏* = queue; } public void shutdown() { stop = true; *域名隐藏* upt(); try { *域名隐藏* (); } catch(InterruptedException ie) {} } public boolean isIdle() { return !busy; } public void run() { *域名隐藏* n(getName() + " start."); while(!stop) { Task task = *域名隐藏* k(); if(task!=null) { busy = true; *域名隐藏* e(); busy = false; } } *域名隐藏* n(getName() + " end."); } } 前面已经讲过, *域名隐藏* k()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,WorkerThread还有一个shutdown方法,用于安全结束线程。 ***后是ThreadPool,负责管理所有的服务器线程,还可以动态增加和减少线程数: package *域名隐藏* ; import *域名隐藏* .*; public class ThreadPool extends ThreadGroup { private List threads = new LinkedList(); private TaskQueue queue; public ThreadPool(TaskQueue queue) { super("Thread-Pool"); *域名隐藏* = queue; } public synchronized void addWorkerThread() { Thread t = new WorkerThread(this, queue); *域名隐藏* (t); *域名隐藏* (); } public synchronized void removeWorkerThread() { if( *域名隐藏* ()>0) { WorkerThread t = (WorkerThread) *域名隐藏* (0); *域名隐藏* wn(); } } public synchronized void currentStatus() { *域名隐藏* n("-----------------------------------------------"); *域名隐藏* n("Thread count = " + *域名隐藏* ()); Iterator it = *域名隐藏* or(); while( *域名隐藏* t()) { WorkerThread t = (WorkerThread) *域名隐藏* (); *域名隐藏* n( *域名隐藏* e() + ": " + ( *域名隐藏* () ? "idle" : "busy")); } *域名隐藏* n("-----------------------------------------------"); } } currentStatus()方法是为了方便调试,打印出所有线程的当前状态。 ***后,Main负责完成main()方法: package *域名隐藏* ; public class Main { public static void main(String[] args) { TaskQueue queue = new TaskQueue(); ThreadPool pool = new ThreadPool(queue); for(int i=0; i<10; i++) { *域名隐藏* k(new CalculateTask()); *域名隐藏* k(new TimerTask()); } *域名隐藏* kerThread(); *域名隐藏* kerThread(); doSleep(8000); *域名隐藏* tStatus(); *域名隐藏* kerThread(); *域名隐藏* kerThread(); *域名隐藏* kerThread(); *域名隐藏* kerThread(); *域名隐藏* kerThread(); doSleep(5000); *域名隐藏* tStatus(); } private static void doSleep(long ms) { try { *域名隐藏* (ms); } catch(InterruptedException ie) {} } } main()一开始放入了20个Task,然后动态添加了一些服务线程,并定期打印线程状态,运行结果如下: worker-0 start. [CalculateTask 0] start... worker-1 start. [TimerTask 0] start... [TimerTask 0] done. [CalculateTask 1] start... [CalculateTask 0] done. [TimerTask 1] start... [CalculateTask 1] done. [CalculateTask 2] start... [TimerTask 1] done. [TimerTask 2] start... [TimerTask 2] done. [CalculateTask 3] start... ----------------------------------------------- Thread count = 2 worker-0: busy worker-1: busy ----------------------------------------------- [CalculateTask 2] done. [TimerTask 3] start... worker-2 start. [CalculateTask 4] start... worker-3 start. [TimerTask 4] start... worker-4 start. [CalculateTask 5] start... worker-5 start. [TimerTask 5] start... worker-6 start. [CalculateTask 6] start... [CalculateTask 3] done. [TimerTask 6] start... [TimerTask 3] done. [CalculateTask 7] start... [TimerTask 4] done. [TimerTask 7] start... [TimerTask 5] done. [CalculateTask 8] start... [CalculateTask 4] done. [TimerTask 8] start... [CalculateTask 5] done. [CalculateTask 9] start... [CalculateTask 6] done. [TimerTask 9] start... [TimerTask 6] done. [TimerTask 7] done. ----------------------------------------------- Thread count = 7 worker-0: idle worker-1: busy worker-2: busy worker-3: idle worker-4: busy worker-5: busy worker-6: busy ----------------------------------------------- [CalculateTask 7] done. [CalculateTask 8] done. [TimerTask 8] done. [TimerTask 9] done. [CalculateTask 9] done. 仔细观察:一开始只有两个服务器线程,因此线程状态都是忙,后来线程数增多,6个线程中的两个状态变成idle,说明处于wait()状态。 思考:本例的线程调度算法其实根本没有,因为这个应用是围绕TaskQueue设计的,不是以Thread Pool为中心设计的。因此,Task调度取决于TaskQueue的getTask()方法,你可以改进这个方法,例如使用优先队列,使优先级高的任务先被执行。 如果所有的服务器线程都处于busy状态,则说明任务繁忙,TaskQueue的队列越来越长,***终会导致服务器内存耗尽。因此,可以限制TaskQueue的等待任务数,超过***长度就拒绝处理。许多Web服务器在用户请求繁忙时就会拒绝用户:HTTP 503 SERVICE UNAVAILABLE [/color][/color]
|