自定义线程池的使用


在 Java 中,创建自定义线程池时,线程工厂(ThreadFactory) 用于控制线程的创建过程,包括线程命名、优先级、守护状态等。

下面是一个详细的实现指南和并发场景示例:


一、线程工厂的创建与使用

1. 实现自定义 ThreadFactory

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomThreadFactory implements ThreadFactory {
    // 线程组
    private final ThreadGroup group;
    // 线程计数(线程安全)
    private final AtomicInteger threadCounter = new AtomicInteger(1);
    // 线程名前缀
    private final String namePrefix;
    // 是否守护线程
    private final boolean daemon;

    public CustomThreadFactory(String poolName, boolean daemon) {
        SecurityManager s = System.getSecurityManager();
        this.group = (s != null) ? s.getThreadGroup() : 
                      Thread.currentThread().getThreadGroup();
        this.namePrefix = poolName + "-thread-";
        this.daemon = daemon;
    }

    @Override
    public Thread newThread(Runnable r) {
        // 创建线程
        Thread t = new Thread(group, r, 
                              namePrefix + threadCounter.getAndIncrement());
        // 设置守护状态
        t.setDaemon(daemon);
        // 设置优先级(非最低)
        if (t.getPriority() != Thread.NORM_PRIORITY) {
            t.setPriority(Thread.NORM_PRIORITY);
        }
        // 设置未捕获异常处理器
        t.setUncaughtExceptionHandler((thread, ex) -> 
            System.err.println("线程异常: " + thread.getName() + ", 错误: " + ex));
        return t;
    }
}

2. 关键配置项

配置项说明
线程名称前缀便于日志跟踪和问题排查(如 order-pool-thread-1
守护线程设置 daemon=true 可使线程随 JVM 退出终止
线程优先级通常设为 NORM_PRIORITY(避免饥饿)
异常处理器捕获线程内未处理的异常,防止静默失败

二、并发场景示例:订单处理系统

场景描述

  • 需要处理大量订单(1000个)
  • 每个订单处理耗时 50-200ms(模拟网络IO)
  • 使用线程池提高吞吐量
  • 限制最大并发数防止资源耗尽
import java.util.concurrent.*;

public class OrderProcessingSystem {
    // 订单处理任务
    static class OrderTask implements Runnable {
        private final int orderId;

        public OrderTask(int orderId) {
            this.orderId = orderId;
        }

        @Override
        public void run() {
            try {
                // 模拟订单处理(随机耗时50-200ms)
                int processTime = 50 + (int)(Math.random() * 150);
                Thread.sleep(processTime);
                
                System.out.println(Thread.currentThread().getName() 
                                 + " 完成订单: " + orderId 
                                 + " | 耗时: " + processTime + "ms");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) {
        // 1. 创建线程工厂
        ThreadFactory factory = new CustomThreadFactory("OrderPool", false);
        
        // 2. 创建线程池(自定义参数)
        int corePoolSize = 5;      // 核心线程数
        int maxPoolSize = 20;      // 最大线程数
        long keepAliveTime = 30;   // 空闲线程存活时间(秒)
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(100); // 任务队列
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            corePoolSize, 
            maxPoolSize,
            keepAliveTime,
            TimeUnit.SECONDS,
            queue,
            factory,  // 使用自定义线程工厂
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );

        // 3. 提交1000个订单任务
        for (int i = 1; i <= 1000; i++) {
            executor.execute(new OrderTask(i));
            
            // 每100个任务打印队列情况
            if (i % 100 == 0) {
                System.out.println("已提交: " + i + " 订单 | " +
                                   "队列大小: " + queue.size());
            }
        }

        // 4. 优雅关闭
        executor.shutdown(); // 停止接收新任务
        try {
            // 等待所有任务完成(最多30分钟)
            if (!executor.awaitTermination(30, TimeUnit.MINUTES)) {
                executor.shutdownNow(); // 强制终止
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        System.out.println("所有订单处理完成");
    }
}

三、线程池配置详解

1. 核心参数说明

参数作用
corePoolSize5核心线程数(长期保留)
maxPoolSize20最大线程数(临时线程)
keepAliveTime30秒非核心线程空闲存活时间
workQueue容量100任务队列(缓冲来不及处理的任务)
handlerCallerRunsPolicy拒绝策略(队列满时由提交线程执行)

2. 拒绝策略选择

策略行为适用场景
AbortPolicy(默认)抛出 RejectedExecutionException需严格监控的敏感系统
CallerRunsPolicy由提交任务的线程执行推荐 避免任务丢失
DiscardPolicy静默丢弃新任务可容忍丢失的监控任务
DiscardOldestPolicy丢弃队列最老任务并重试提交时效性优先场景

四、执行流程与线程生命周期

graph TD
    A[提交任务] --> B{核心线程<br>是否空闲?}
    B -->|是| C[使用核心线程执行]
    B -->|否| D{任务队列<br>是否已满?}
    D -->|否| E[任务入队等待]
    D -->|是| F{线程数<最大值?}
    F -->|是| G[创建临时线程执行]
    F -->|否| H[执行拒绝策略]
    C --> I[任务完成]
    E -->|队列任务被取出| C
    G --> I

五、最佳实践与注意事项

  1. 线程命名规范 使用有意义的名称(如 OrderPool-thread-1),便于日志分析和问题排查

  2. 守护线程选择

    • 后台任务:daemon=true(随JVM退出终止)
    • 关键业务:daemon=false(必须完成)
  3. 队列容量设置

    • CPU密集型:小队列(如 ArrayBlockingQueue
    • IO密集型:大队列(如 LinkedBlockingQueue
  4. 异常处理 必须设置 UncaughtExceptionHandler 防止任务异常导致线程退出

  5. 资源释放 使用 shutdown() + awaitTermination() 组合确保线程池优雅关闭

  6. 监控指标 定期记录关键指标:

    // 监控示例
    System.out.println(
        "活跃线程: " + executor.getActiveCount() +
        " | 完成任务: " + executor.getCompletedTaskCount() +
        " | 队列大小: " + executor.getQueue().size()
    );
    

六、不同场景的线程池配置建议

场景类型核心线程数最大线程数队列类型适用案例
CPU密集型CPU核数CPU核数+1有界队列(容量小)数据计算
IO密集型CPU核数×2CPU核数×4无界/大容量有界队列网络请求处理
混合型CPU核数×1.5CPU核数×3有界队列(容量中等)Web服务后端
突发流量较低(如5)较高(如100)同步移交队列秒杀系统

通过合理配置线程工厂和线程池参数,可以构建出高效稳定的并发处理系统,有效应对高负载场景。

知识点
多线程