ScheduledThreadPoolExecutor是JDK5提供的可執(zhí)行定時(shí)任務(wù)的一個(gè)工具類,可以在多線程環(huán)境下延遲執(zhí)行任務(wù)或者定期執(zhí)行任務(wù);和Timer類似,它也提供了三種定時(shí)模式:
- 延遲執(zhí)行任務(wù)
- 固定延遲的定期執(zhí)行(fixed delay)
- 按照固定的周期執(zhí)行(fixed rate)
延遲執(zhí)行任務(wù)
任務(wù)將按照給定的時(shí)間延遲delay后開始執(zhí)行;對(duì)應(yīng)的方法如下:
schedule(Runnable command, long delay, TimeUnit unit) schedule(Callable<V> callable, long delay, TimeUnit unit)
下面我們通過一個(gè)例子檢驗(yàn)下結(jié)果是否正確:
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); PrintUtil.print("start schedule a task."); executor.schedule(new Runnable() { @Override public void run() { PrintUtil.print("task is running."); }}, 5, TimeUnit.SECONDS);
我們計(jì)劃了一個(gè)5秒鐘后執(zhí)行的任務(wù),通過打印結(jié)果可以看到確實(shí)按照給定時(shí)間執(zhí)行了:
15:44:07--start schedule a task.15:44:12--task is running.
固定延遲的定期執(zhí)行
任務(wù)第一次按照給定的初始延遲initialDelay執(zhí)行,后續(xù)每一次執(zhí)行的時(shí)間為上一次任務(wù)的結(jié)束時(shí)間加上給定的period后執(zhí)行;對(duì)應(yīng)的方法如下:
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
同樣我們通過一個(gè)例子檢驗(yàn)下結(jié)果是否正確:
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); PrintUtil.print("start schedule a task."); executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { PrintUtil.print("task is running."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }}, 0, 5, TimeUnit.SECONDS);
我們計(jì)劃了一個(gè)定期(每5秒鐘)延遲執(zhí)行的任務(wù),第一次任務(wù)立即執(zhí)行,每次任務(wù)執(zhí)行時(shí)長2秒鐘,通過打印的日志我們可以看到每次任務(wù)開始執(zhí)行的時(shí)間為:上次任務(wù)結(jié)束時(shí)間 5秒鐘:
15:55:16--start schedule a task.15:55:16--task is running.15:55:18--task is finished.15:55:23--task is running.15:55:25--task is finished.15:55:30--task is running.15:55:32--task is finished.
按照固定的周期執(zhí)行
任務(wù)第一次按照給定的初始延遲initialDelay執(zhí)行,后續(xù)每一次執(zhí)行的時(shí)間為固定的時(shí)間間隔period,如果線程池中工作線程不夠則任務(wù)順延執(zhí)行;對(duì)應(yīng)的方法如下:
scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit)
同樣我們通過一個(gè)例子檢驗(yàn)下結(jié)果是否正確:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); PrintUtil.print("start schedule a task."); executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { PrintUtil.print("task is running."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } PrintUtil.print("task is finished."); }}, 0, 5, TimeUnit.SECONDS);
我們創(chuàng)建了一個(gè)核心線程池為10的ScheduledThreadPoolExecutor,并計(jì)劃了一個(gè)定期(每5秒鐘)執(zhí)行一次的任務(wù),過打印的日志我們可以看到每次任務(wù)開始執(zhí)行的時(shí)間為:上次任務(wù)開始時(shí)間 5秒鐘:
16:02:43--start schedule a task.16:02:43--task is running.16:02:45--task is finished.16:02:48--task is running.16:02:50--task is finished.
上面的例子都是計(jì)劃了一個(gè)任務(wù),如果是有多個(gè)定時(shí)任務(wù)同時(shí)執(zhí)行會(huì)怎么樣呢?
如果線程足夠并且CPU資源足夠,那就會(huì)同時(shí)執(zhí)行,如果線程或者CPU資源不夠那只能排隊(duì)執(zhí)行了。有興趣的話可以克隆文末的測(cè)試代碼,里面提供了一些測(cè)試的例子。
底層原理
ScheduledThreadPoolExecutor是如何保證我們計(jì)劃的任務(wù)都是按照正確的時(shí)間點(diǎn)執(zhí)行的呢?
其內(nèi)部實(shí)現(xiàn)了一個(gè)阻塞隊(duì)列DelayedWorkQueue,所有的任務(wù)都會(huì)放到這個(gè)隊(duì)列里。這個(gè)阻塞隊(duì)列內(nèi)部通過一個(gè)數(shù)組來保存這些任務(wù),并且基于最小堆排序,按照每個(gè)任務(wù)的下次執(zhí)行時(shí)間進(jìn)行排序,這樣就保證了執(zhí)行線程拿到的這個(gè)隊(duì)列中的第一個(gè)元素就是最接近當(dāng)前時(shí)間執(zhí)行的任務(wù)了。
相關(guān)的源碼如下:
// 保存定時(shí)任務(wù)的隊(duì)列private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];// 最小堆排序相關(guān)的方法private void siftUp(int k, RunnableScheduledFuture<?> key)private void siftDown(int k, RunnableScheduledFuture<?> key)
那時(shí)間上是如何保證的呢?
DelayedWorkQueue重寫了take和poll方法,利用了AQS的ConditionObject機(jī)制使當(dāng)前線程休眠,等時(shí)間到了再喚醒線程去拿第一個(gè)任務(wù)。
關(guān)于AQS和ConditonObject的介紹,可以參考下文末的鏈接。
public RunnableScheduledFuture<?> take() throws InterruptedException { final Reentrantlock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); }}
優(yōu)點(diǎn)
作為對(duì)JDK1.3推出的Timer的替代,ScheduledThreadPoolExecutor有如下優(yōu)點(diǎn):
- 它支持多個(gè)定時(shí)任務(wù)同時(shí)執(zhí)行,而Timer是單線程執(zhí)行的
- 它通過System.nanoTime()保證了任務(wù)執(zhí)行時(shí)間不受操作系統(tǒng)時(shí)間變化的影響
- 一個(gè)定時(shí)任務(wù)拋出異常,其他定時(shí)任務(wù)不受影響,而Timer卻不支持這一點(diǎn)
Demo代碼
參考文章
版權(quán)聲明:本文內(nèi)容由互聯(lián)網(wǎng)用戶自發(fā)貢獻(xiàn),該文觀點(diǎn)僅代表作者本人。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如發(fā)現(xiàn)本站有涉嫌抄襲侵權(quán)/違法違規(guī)的內(nèi)容, 請(qǐng)發(fā)送郵件至 舉報(bào),一經(jīng)查實(shí),本站將立刻刪除。