雪过无痕
 域名:http://snowody.blog.globalimporter.net/
 
  ·java 多线程设计模式

[color=#000080]前面谈了多线程应用程序能极大地改善用户相应。例如对于一个Web应用程序,每当一个用户请求服务器连接时,服务器就可以启动一个新线程为用户服务。

然而,创建和销毁线程本身就有一定的开销,如果频繁创建和销毁线程,CPU和内存开销就不可忽略,垃圾收集器还必须负担更多的工作。因此,线程池就是为了避免频繁创建和销毁线程。

每当服务器接受了一个新的请求后,服务器就从线程池中挑选一个等待的线程并执行请求处理。处理完毕后,线程并不结束,而是转为阻塞状态再次被放入线程池中。这样就避免了频繁创建和销毁线程。

Worker Pattern实现了类似线程池的功能。首先定义Task接口:

package  *域名隐藏* ;
public interface Task {
    void execute();
}

线程将负责执行execute()方法。注意到任务是由子类通过实现execute()方法实现的,线程本身并不知道自己执行的任务。它只负责运行一个耗时的execute()方法。

具体任务由子类实现,我们定义了一个CalculateTask和一个TimerTask:

//  *域名隐藏*
package  *域名隐藏* ;
public class CalculateTask implements Task {
    private static int count = 0;
    private int num = count;
    public CalculateTask() {
        count++;
    }
    public void execute() {
         *域名隐藏* n("[CalculateTask " + num + "] start...");
        try {
             *域名隐藏* (3000);
        }
        catch(InterruptedException ie) {}
         *域名隐藏* n("[CalculateTask " + num + "] done.");
    }
}

//  *域名隐藏*
package  *域名隐藏* ;
public class TimerTask implements Task {
    private static int count = 0;
    private int num = count;
    public TimerTask() {
        count++;
    }
    public void execute() {
         *域名隐藏* n("[TimerTask " + num + "] start...");
        try {
             *域名隐藏* (2000);
        }
        catch(InterruptedException ie) {}
         *域名隐藏* n("[TimerTask " + num + "] done.");
    }
}

以上任务均简单的sleep若干秒。

TaskQueue实现了一个队列,客户端可以将请求放入队列,服务器线程可以从队列中取出任务:

package  *域名隐藏* ;
import  *域名隐藏* .*;
public class TaskQueue {
    private List queue = new LinkedList();
    public synchronized Task getTask() {
        while( *域名隐藏* ()==0) {
            try {
                 *域名隐藏* ();
            }
            catch(InterruptedException ie) {
                return null;
            }
        }
        return (Task) *域名隐藏* (0);
    }
    public synchronized void putTask(Task task) {
         *域名隐藏* (task);
         *域名隐藏* All();
    }
}

终于到了真正的WorkerThread,这是真正执行任务的服务器线程:

package  *域名隐藏* ;
public class WorkerThread extends Thread {
    private static int count = 0;
    private boolean busy = false;
    private boolean stop = false;
    private TaskQueue queue;
    public WorkerThread(ThreadGroup group, TaskQueue queue) {
        super(group, "worker-" + count);
        count++;
         *域名隐藏*  = queue;
    }
    public void shutdown() {
        stop = true;
         *域名隐藏* upt();
        try {
             *域名隐藏* ();
        }
        catch(InterruptedException ie) {}
    }
    public boolean isIdle() {
        return !busy;
    }
    public void run() {
         *域名隐藏* n(getName() + " start.");        
        while(!stop) {
            Task task =  *域名隐藏* k();
            if(task!=null) {
                busy = true;
                 *域名隐藏* e();
                busy = false;
            }
        }
         *域名隐藏* n(getName() + " end.");
    }
}

前面已经讲过, *域名隐藏* k()是一个阻塞方法,服务器线程可能在此wait()一段时间。此外,WorkerThread还有一个shutdown方法,用于安全结束线程。

***后是ThreadPool,负责管理所有的服务器线程,还可以动态增加和减少线程数:

package  *域名隐藏* ;
import  *域名隐藏* .*;
public class ThreadPool extends ThreadGroup {
    private List threads = new LinkedList();
    private TaskQueue queue;
    public ThreadPool(TaskQueue queue) {
        super("Thread-Pool");
         *域名隐藏*  = queue;
    }
    public synchronized void addWorkerThread() {
        Thread t = new WorkerThread(this, queue);
         *域名隐藏* (t);
         *域名隐藏* ();
    }
    public synchronized void removeWorkerThread() {
        if( *域名隐藏* ()>0) {
            WorkerThread t = (WorkerThread) *域名隐藏* (0);
             *域名隐藏* wn();
        }
    }
    public synchronized void currentStatus() {
         *域名隐藏* n("-----------------------------------------------");
         *域名隐藏* n("Thread count = " +  *域名隐藏* ());
        Iterator it =  *域名隐藏* or();
        while( *域名隐藏* t()) {
            WorkerThread t = (WorkerThread) *域名隐藏* ();
             *域名隐藏* n( *域名隐藏* e() + ": " + ( *域名隐藏* () ? "idle" : "busy"));
        }
         *域名隐藏* n("-----------------------------------------------");
    }
}

currentStatus()方法是为了方便调试,打印出所有线程的当前状态。

***后,Main负责完成main()方法:

package  *域名隐藏* ;
public class Main {
    public static void main(String[] args) {
        TaskQueue queue = new TaskQueue();
        ThreadPool pool = new ThreadPool(queue);
        for(int i=0; i<10; i++) {
             *域名隐藏* k(new CalculateTask());
             *域名隐藏* k(new TimerTask());
        }
         *域名隐藏* kerThread();
         *域名隐藏* kerThread();
        doSleep(8000);
         *域名隐藏* tStatus();
         *域名隐藏* kerThread();
         *域名隐藏* kerThread();
         *域名隐藏* kerThread();
         *域名隐藏* kerThread();
         *域名隐藏* kerThread();
        doSleep(5000);
         *域名隐藏* tStatus();
    }
    private static void doSleep(long ms) {
        try {
             *域名隐藏* (ms);
        }
        catch(InterruptedException ie) {}
    }
}

main()一开始放入了20个Task,然后动态添加了一些服务线程,并定期打印线程状态,运行结果如下:

worker-0 start.
[CalculateTask 0] start...
worker-1 start.
[TimerTask 0] start...
[TimerTask 0] done.
[CalculateTask 1] start...
[CalculateTask 0] done.
[TimerTask 1] start...
[CalculateTask 1] done.
[CalculateTask 2] start...
[TimerTask 1] done.
[TimerTask 2] start...
[TimerTask 2] done.
[CalculateTask 3] start...
-----------------------------------------------
Thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[CalculateTask 2] done.
[TimerTask 3] start...
worker-2 start.
[CalculateTask 4] start...
worker-3 start.
[TimerTask 4] start...
worker-4 start.
[CalculateTask 5] start...
worker-5 start.
[TimerTask 5] start...
worker-6 start.
[CalculateTask 6] start...
[CalculateTask 3] done.
[TimerTask 6] start...
[TimerTask 3] done.
[CalculateTask 7] start...
[TimerTask 4] done.
[TimerTask 7] start...
[TimerTask 5] done.
[CalculateTask 8] start...
[CalculateTask 4] done.
[TimerTask 8] start...
[CalculateTask 5] done.
[CalculateTask 9] start...
[CalculateTask 6] done.
[TimerTask 9] start...
[TimerTask 6] done.
[TimerTask 7] done.
-----------------------------------------------
Thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[CalculateTask 7] done.
[CalculateTask 8] done.
[TimerTask 8] done.
[TimerTask 9] done.
[CalculateTask 9] done.

仔细观察:一开始只有两个服务器线程,因此线程状态都是忙,后来线程数增多,6个线程中的两个状态变成idle,说明处于wait()状态。

思考:本例的线程调度算法其实根本没有,因为这个应用是围绕TaskQueue设计的,不是以Thread Pool为中心设计的。因此,Task调度取决于TaskQueue的getTask()方法,你可以改进这个方法,例如使用优先队列,使优先级高的任务先被执行。

如果所有的服务器线程都处于busy状态,则说明任务繁忙,TaskQueue的队列越来越长,***终会导致服务器内存耗尽。因此,可以限制TaskQueue的等待任务数,超过***长度就拒绝处理。许多Web服务器在用户请求繁忙时就会拒绝用户:HTTP 503 SERVICE UNAVAILABLE
[/color][/color]

[2005年 10月20日 17 : 0]      评论:[0] | 浏览:[2024]
  日 历 calendar
« 9月 2025 »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30            
  全 站 搜 索

  博 客 介 绍 
     心地常飞六月雪
火内方开五色莲
  文 章 分 类 
· 凌乱的轨迹 [9]
· Like,No Reason [1]
· 民以食为天 [2]
  最 新 发 表 
    什么是E-gold?(推荐)
    trip
    **林肯的辩护词**
    家常菜的做法
    java 多线程设计模式
    芹菜的烹调方法
  文 章 归 档 
  最新回复(已关闭) 
  我的连 接(已关闭) 
  博 客 统 计 
·
文章总数:11
·
评论总数:14
·
访问总数:98137
   管理入口