JAVA 執行緒池 分批處理 大量資料方案

一、問題分析

程式碼開發中,我們經常會遇到這種情況:需要從儲存庫中查詢出大批次的資料,然後對這些大批次的資料進行相應的處理,此時會產生以下的問題:

一次查詢可能會撐爆記憶體

多批次資料處理,啟動執行緒的數目需要控制, 否則會導致OOM

可能需要對處理結果進行後續的處理

基於以上的問題,透過開發中的實踐和測試,我們總結出了相應的解決方案,下面就以上的問題點分別給出解決方案

對於一次查詢無法塞入記憶體的問題,我們可以首先分批次多次查詢

引入執行緒池對啟動的執行緒數目進行控制,外圍可以透過自定的執行緒控制器,防止執行緒池的OOM問題

對於需要返回值的執行緒,我們可以透過Future類進行接收

二、程式碼思路

1。自定義執行緒控制類

public class ThreadController {

/**最大執行緒數目*/

private int threadMaxNum;

private Map threadCountMap = null;

private static final String THREAD_COUNT = “THREAD_COUNT”;

public ThreadController(int threadMaxNum) {

this。threadMaxNum = threadMaxNum;

this。threadCountMap = new HashMap();

}

/**

* 啟動新執行緒時,呼叫此方法告訴控制器一個新執行緒開始執行

* @return

*/

public void addThreadCount() {

synchronized (this) {

if(threadCountMap。get(THREAD_COUNT)==null) {

threadCountMap。put(THREAD_COUNT, new Integer(1));

}else {

threadCountMap。put(THREAD_COUNT, threadCountMap。get(THREAD_COUNT)+1);

}

}

}

/**

* 執行緒結束時呼叫此方法告訴控制器該執行緒執行完畢

*/

public void decreaseThreadCount() {

synchronized (this) {

threadCountMap。put(THREAD_COUNT, threadCountMap。get(THREAD_COUNT)-1);

}

}

/**

* 是否可以啟動新執行緒

* @return

*/

public boolean canStartNewThread() {

if(threadCountMap。get(THREAD_COUNT)==null||threadCountMap。get(THREAD_COUNT)

return true;

}

return false;

}

}

2。執行分組、啟動執行緒池、獲取返回值,以下是對應的執行虛擬碼

int allCount=queryDBDataCount();

int baseGroupSize=1000;

int cycleNumber=(allCount%baseGroupSize) == ?(allCount/baseGroupSize):(allCount/baseGroupSize+1);

//引入自定義的執行緒控制器

ThreadController tc = new ThreadController(5);

ExecutorService executorService = Executors。newFixedThreadPool(cycleNumber);

List futureList=new ArrayList;

int i=;

int startIndex=;

int endIndex=;

List businessDataList=null;

while(true){

if(i==cycleNum){

break;

}

startIndex=i*baseGroupSize;

endIndex=(i+1)*baseGroupSize;

//分組查詢需要的業務資料

businessDataList=queryBusinessDataByIndex(startIndex,endIndex);

while(tc。canStartNewThread()&&i

//呼叫自定義的執行緒處理類,傳入自己的業務資料List

Callable customizedCallable=new CustomizedCallable(businessDataList);

//加入執行緒池處理業務邏輯

futureList。add(executorService。submit(customizedCallable));

tc。addThreadCount();

i++;

}

//可以讓執行緒稍微等一下

Thread。sleep(100);

}

if(futureList。size()>){

//進行後續的業務處理

dealWithOtherLogic(futureList)

}

三、總結

透過以上的處理方案,基本可以做到業務快速處理,不佔用大量應用記憶體,合理利用CPU核心,做到高效能的要求。

TAG: 執行緒threadCountMapThreadCountint