2015年4月6日 星期一

EXECUTOR 與 THREAD POOL 筆記

原文: https://crluo0929.wordpress.com/2014/02/09/executor-%E8%88%87-thread-pool-%E7%AD%86%E8%A8%98/

類別架構:

threadpools
java.util.concurrent 提供一方便的介面 Executor 來處理 multithread 任務
Executor 為主要介面,ExecutorService 延伸其功能,如果不須回傳值,則可使用 Runnable 介面,若有回傳值則可使用 Callable 介面
Future 則為執行完的回傳值,若是 Runnable的話,Future 回傳為 null
要建立 ThreadPool,則可透過 Executors 類別,提供多種 static function 建立:
Executor pool = Executors.newSingleThreadExecutor() ; //建立單一執行緒
Executor pool = Executors.newFixedThreadPool(10) ; //建立10個固定大小的thread pool
Executor pool = Executors.newCachedThreadPool() ; //建立一個快取的thread pool
Executor pool = Executors.newScheduledThreadPool(10) ; //建立一個排程的thread pool,大小為10
Executor pool = Executors.newSingleScheduleThreadExecutor() ; //建立一個排程的單一執行緒
使用 FixedThreadPool(10) ,若有100個任務併行,則會由10個thread去處理100個任務,
若使用 CachedThreadPool(),沒有 thread 則建立,預設超過60秒則thread會被移除掉,適合生存期短的工作
ScheduledThreadPool 則須透過 ScheduledExecutorService scheduleService = (ExecutorService) pool ; 由延伸介面來設定排程:
scheduleService.scheduleAtFixedRate( ... ) ; //設定排程
在執行完後必須要透過 ExecutorService 介面來關閉 ThreadPool ,否則不會結束:
ExecutorService service = (ExecutorService) exe ;
service.shutdown() ; //關閉執行緒池
透過 ExecutorService 提供 submit() 方法延伸提交有回傳值的任務 ( Executor 的 execute() 沒有回傳值)
submit() 接受兩種介面,一種是 Runnable 也是傳統無回傳值的介面,另外一種是 Callable 介面,可定義回傳值:
public class MyTask implements Callable<String>{
    int i ;
    public MyTask(int i){ this.i = i ; }
    public String call() throws Exception{
        System.out.println("Threadread.currentThread().getId()+"] working:"+i) ;
        return "Thread["+Thread.currentThread().getId()+"] complete:"+i ;
    }
}
可以取得回傳的結果:
ExecutorService pool = Executors.newFixedThreadPool(10);
for(int i=0;i<100;++i){
    Future f = pool.submit(new MyTask()) ;
    System.out.println(f.get()) ; //取得回傳值
}
但是 Future.get() 這種方式會造成阻塞,當呼叫 get() 時,若結果還沒回來則其它人全部都要等他,
以上述例子來說,則不會100個任務同時被執行,執行到 f.get() 時,其它 thread 會等待其對應的 MyTask 完成後才能繼續
不過沒關係,Java 提供另一個類別來處理這樣的問題: ExecutorCompletionService
可以透過 CompletionService completionPool= new ExecutorCompletionService ( pool ) ; 方式將 pool 再包一層:
ExecutorService pool = Executors.newFixedThreadPool(10);
CompletionService completionPool= new ExecutorCompletionService ( pool ) ; 
for(int i=0;i<100;++i){
   completionPool.submit(new MyTask()) ;
}
for(int i=0;i<100;++i){
   String ret = completionPool.take().get() ;
   //計算 ret 
}
透過 CompletionService 可以依照完成任務的順序獲得結果並馬上處理,而不用一個一個任務等待