4. Executor Framework
Executor Framework trong Java
Phần này trình bày chi tiết về Executor Framework trong Java, một công cụ mạnh mẽ để quản lý và thực thi các tác vụ đa luồng. Nội dung bao gồm tổng quan về các giao diện Executor, các loại thread pool, xử lý bất đồng bộ với Future
và Callable
, tùy chỉnh ThreadPoolExecutor
, và cách đóng thread pool an toàn.
1. Executor Interface
1.1. Tổng quan về Executor
, ExecutorService
, và ScheduledExecutorService
Executor
, ExecutorService
, và ScheduledExecutorService
Executor Framework, nằm trong gói java.util.concurrent
, cung cấp cách quản lý thread hiệu quả, thay vì tạo và quản lý thread thủ công. Nó bao gồm các giao diện chính:
Executor
:Giao diện cơ bản nhất, định nghĩa phương thức
execute(Runnable task)
để thực thi một tác vụ bất đồng bộ.Tách biệt logic tác vụ (
Runnable
) khỏi cơ chế thực thi (thread pool hoặc thread đơn).
ExecutorService
:Mở rộng
Executor
, cung cấp các phương thức để quản lý vòng đời của executor và thực thi tác vụ có trả về (Callable
).Các phương thức chính:
submit(Runnable/Callable)
: Thực thi tác vụ và trả vềFuture
.invokeAll/invokeAny
: Thực thi danh sách tác vụ.shutdown()/shutdownNow()
: Đóng executor.
ScheduledExecutorService
:Mở rộng
ExecutorService
, hỗ trợ lập lịch thực thi tác vụ (chạy sau một khoảng thời gian hoặc định kỳ).Các phương thức chính:
schedule
: Chạy tác vụ sau một khoảng thời gian.scheduleAtFixedRate/scheduleWithFixedDelay
: Chạy tác vụ định kỳ.
Ví dụ cơ bản với Executor:
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class ExecutorExample {
public static void main(String[] args) {
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Task executed by " + Thread.currentThread().getName()));
}
}
2. Thread Pools
2.1. Các loại Thread Pool
Thread pool là tập hợp các thread được tái sử dụng để thực thi nhiều tác vụ, giúp giảm chi phí tạo và hủy thread. Lớp Executors
cung cấp các phương thức factory để tạo các loại thread pool phổ biến:
FixedThreadPool
:Duy trì số lượng thread cố định (
nThreads
).Thích hợp cho các ứng dụng cần giới hạn số lượng thread đồng thời.
Tác vụ dư thừa được lưu trong hàng đợi (
LinkedBlockingQueue
).
CachedThreadPool
:Tạo thread mới khi cần, tái sử dụng thread nhàn rỗi.
Thích hợp cho các ứng dụng có nhiều tác vụ ngắn hạn.
Có thể tạo nhiều thread không giới hạn, cần cẩn thận với tài nguyên.
SingleThreadExecutor
:Chỉ sử dụng một thread duy nhất để thực thi tất cả tác vụ theo thứ tự.
Thích hợp cho các tác vụ cần tuần tự hoặc quản lý trạng thái.
ScheduledThreadPool
:Hỗ trợ lập lịch thực thi tác vụ định kỳ hoặc trì hoãn.
Thích hợp cho các tác vụ cần chạy theo lịch (ví dụ: kiểm tra trạng thái hệ thống).
Ví dụ các loại Thread Pool:
import java.util.concurrent.*;
public class ThreadPoolExample {
public static void main(String[] args) {
// FixedThreadPool
ExecutorService fixedPool = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
fixedPool.submit(() -> System.out.println("Fixed: " + Thread.currentThread().getName()));
}
fixedPool.shutdown();
// CachedThreadPool
ExecutorService cachedPool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
cachedPool.submit(() -> System.out.println("Cached: " + Thread.currentThread().getName()));
}
cachedPool.shutdown();
// SingleThreadExecutor
ExecutorService singlePool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
singlePool.submit(() -> System.out.println("Single: " + Thread.currentThread().getName()));
}
singlePool.shutdown();
// ScheduledThreadPool
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1);
scheduledPool.scheduleAtFixedRate(() -> System.out.println("Scheduled: " + Thread.currentThread().getName()),
0, 1, TimeUnit.SECONDS);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledPool.shutdown();
}
}
3. Future và Callable
3.1. Xử lý Kết quả Bất đồng bộ với Future
và Callable
Future
và Callable
Callable
:Giao diện tương tự
Runnable
, nhưng phương thứccall()
có thể trả về kết quả và ném ngoại lệ.Cú pháp:
V call() throws Exception
.
Future
:Đại diện cho kết quả của một tác vụ bất đồng bộ.
Các phương thức chính:
get()
: Chờ và lấy kết quả (chặn thread gọi).get(long timeout, TimeUnit unit)
: Chờ với thời gian giới hạn.isDone()
: Kiểm tra tác vụ đã hoàn thành chưa.cancel(boolean mayInterruptIfRunning)
: Hủy tác vụ.
Ví dụ Future và Callable:
import java.util.concurrent.*;
public class FutureCallableExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(2);
// Tạo Callable trả về kết quả
Callable<Integer> task = () -> {
Thread.sleep(1000);
return 42;
};
// Submit Callable và lấy Future
Future<Integer> future = executor.submit(task);
System.out.println("Task submitted, waiting for result...");
// Lấy kết quả từ Future
Integer result = future.get(); // Chặn cho đến khi có kết quả
System.out.println("Result: " + result);
executor.shutdown();
}
}
Kết quả: Chương trình đợi 1 giây và in ra kết quả
42
.
4. ThreadPoolExecutor
4.1. Tùy chỉnh Thread Pool
ThreadPoolExecutor
là lớp cốt lõi của Executor Framework, cho phép tùy chỉnh các tham số của thread pool:
Core Pool Size: Số lượng thread tối thiểu luôn được duy trì.
Maximum Pool Size: Số lượng thread tối đa có thể tạo.
Keep-Alive Time: Thời gian thread nhàn rỗi được giữ trước khi bị hủy.
Work Queue: Hàng đợi lưu trữ các tác vụ khi số thread vượt quá core pool size (ví dụ:
LinkedBlockingQueue
,ArrayBlockingQueue
).
Ví dụ ThreadPoolExecutor:
import java.util.concurrent.*;
public class ThreadPoolExecutorExample {
public static void main(String[] args) {
// Tạo ThreadPoolExecutor
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60, TimeUnit.SECONDS, // keepAliveTime
new LinkedBlockingQueue<>(2), // workQueue
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() // rejectedExecutionHandler
);
// Submit tasks
for (int i = 0; i < 6; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
Giải thích:
Thread pool bắt đầu với 2 thread (core).
Nếu có nhiều hơn 2 tác vụ, hàng đợi lưu trữ tối đa 2 tác vụ, và tối đa 2 thread bổ sung được tạo (tổng cộng 4).
Nếu vượt quá giới hạn (queue đầy và 4 thread đang chạy),
AbortPolicy
némRejectedExecutionException
.
4.2. Rejected Execution Handler
Khi thread pool không thể chấp nhận thêm tác vụ, RejectedExecutionHandler
được gọi. Các chính sách phổ biến:
AbortPolicy
: NémRejectedExecutionException
(mặc định).CallerRunsPolicy
: Tác vụ được thực thi bởi thread gọisubmit
.DiscardPolicy
: Loại bỏ tác vụ mà không báo lỗi.DiscardOldestPolicy
: Loại bỏ tác vụ cũ nhất trong hàng đợi.
5. Shutdown và Termination
5.1. Quản lý Việc Đóng Thread Pool An toàn
Để đóng một ExecutorService
an toàn, cần sử dụng các phương thức:
shutdown()
:Ngăn nhận thêm tác vụ mới, nhưng hoàn thành các tác vụ đang chạy và trong hàng đợi.
shutdownNow()
:Cố gắng hủy tất cả tác vụ (đang chạy và trong hàng đợi), trả về danh sách tác vụ chưa thực thi.
awaitTermination(long timeout, TimeUnit unit)
:Chờ tất cả tác vụ hoàn thành trong khoảng thời gian xác định.
Ví dụ Shutdown:
import java.util.concurrent.*;
public class ShutdownExample {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
System.out.println("Task executed by " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Task interrupted");
}
});
}
// Đóng executor
executor.shutdown();
System.out.println("Shutdown initiated");
// Chờ tối đa 5 giây
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("Executor did not terminate, forcing shutdown");
executor.shutdownNow();
}
System.out.println("Executor terminated");
}
}
Giải thích:
shutdown()
cho phép các tác vụ hiện tại hoàn thành.awaitTermination
chờ tối đa 5 giây.Nếu chưa hoàn thành,
shutdownNow()
cố gắng hủy các tác vụ còn lại.
5.2. Lưu ý
Luôn gọi
shutdown()
hoặcshutdownNow()
để tránh rò rỉ tài nguyên.Kiểm tra trạng thái bằng
isShutdown()
vàisTerminated()
khi cần.Xử lý
InterruptedException
đúng cách trong các tác vụ để hỗ trợ hủy.
Kết luận
Executor Framework là công cụ mạnh mẽ để quản lý các tác vụ đa luồng trong Java:
Executor
,ExecutorService
,ScheduledExecutorService
cung cấp các giao diện linh hoạt để thực thi và lập lịch tác vụ.Thread Pools (
Fixed
,Cached
,Single
,Scheduled
) tối ưu hóa việc sử dụng thread.Future
vàCallable
hỗ trợ xử lý kết quả bất đồng bộ.ThreadPoolExecutor
cho phép tùy chỉnh chi tiết thread pool.Shutdown và Termination đảm bảo đóng thread pool an toàn.
Phần tiếp theo có thể tập trung vào Concurrent Collections để xử lý dữ liệu thread-safe. Nếu bạn muốn, tôi có thể tiếp tục với nội dung này hoặc cung cấp thêm ví dụ thực hành!
Last updated