“在代码的世界里,每一行都是进步的足迹,每一次挑战都是成长的机遇。”

多线程中线程池选择,线程参数如何界定

时势

在Java多线程编程中,合理选择和配置线程池确实是非常重要的,因为它直接关系到程序的性能和资源利用效率。

固定大小的线程池 (FixedThreadPool):

适用于任务执行时间相对均衡,且任务数量适中的场景。 由于线程数固定,不会因任务数量激增而创建大量线程,避免了资源过度占用。 例如,Dubbo线程池通常使用 FixedThreadPool,默认配置是200个线程

缓存线程池 (CachedThreadPool):

适用于任务执行时间短且数量变化大的场景。 能够快速响应任务需求,但需注意在高负载情况下可能因线程数量过多而导致资源竞争加剧。 不适用于需要限制并发线程数量的场景。

单一线程线程池 (SingleThreadExecutor):

适用于需要保证任务执行顺序的场景。 如数据库操作、文件写入等需要按顺序进行的任务。 在一个JVM中可以保证顺序执行避免了并发带来的数据一致性问题。

定时任务线程池 (ScheduledThreadPoolExecutor):

适用于需要定时或周期性执行任务的场景。 如一些中间件和框架的心跳检测、定时清理任务等。 可以灵活设置任务的执行时间和周期。

ForkJoinPool:

适用于可以递归分解为多个子任务并行处理的场景,默认线程数是 CPU 核心数减一。 如大数据处理、并行处理。 通过合理的任务分解和合并,可以显著提高程序的并行性能。

参数配置建议

corePoolSize 和 maximumPoolSize:

CPU密集型任务:核心线程数不宜设置过多。 IO密集型任务:由于IO操作通常耗时较长,且等待时间较多,因此可以设置较多的线程数来充分利用CPU资源,可以压测到利用率到70%以上为止。 高并发应用:在高并发场景下,可以根据实际负载情况动态调整线程池大小,或采用动态线程池(如基于任务队列长度动态调整线程数的线程池)来应对。

workQueue:= 选择合适的任务队列类型(如直接提交队列、有界队列、无界队列等),并根据实际情况设置队列大小。 对于有界队列,需要合理设置队列长度以避免任务积压或线程饥饿,比如Tomcat,如果队列很长但消费能力低,响应时间会很长,可能导致用户体验不佳,可以根据每秒并发数和等待时间来设置队列长度。比如1秒只能处理100个请求,队列200个就差不多了。 对于无界队列,虽然可以避免任务拒绝,但在高负载情况下可能导致内存溢出。

keepAliveTime 和 unit:

控制非核心线程的空闲存活时间,有助于在负载降低时释放资源。 对于固定大小的线程池,此参数可能不太重要,因为线程数量是固定的。 对于缓存线程池,此参数可以帮助控制线程池的大小,避免资源浪费。

threadFactory:

自定义线程工厂可以为线程设置名称、优先级等属性,有助于在问题排查时进行线程追踪和定位。

handler: 合理的拒绝策略可以避免在高负载情况下因任务被拒绝而导致的服务不可用问题。 常见的拒绝策略包括直接抛出异常(AbortPolicy)、在调用者线程中执行任务(CallerRunsPolicy)、丢弃队列中最老的任务(DiscardOldestPolicy)等。 可以根据实际需求选择合适的拒绝策略或自定义拒绝策略。

以下是一个示例模拟调用接口的代码,展示了如何使用ThreadPoolExecutor创建一个FIX线程池,并执行一些异步任务,利用异步编排实现2S内(16核16G内存的配置电脑)返回所有的结果。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ThreadPoolExample {

private static final int INITIAL_CORE_POOL_SIZE = 600;
private static final int INITIAL_MAX_POOL_SIZE = 600;
private static final long KEEP_ALIVE_TIME = 0L;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
private static final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingQueue<>(1200);
private static final OkHttpClient client;
private static final ForkJoinPool customPool = new ForkJoinPool(64); // 使用ForkJoinPool
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
        INITIAL_CORE_POOL_SIZE,
        INITIAL_MAX_POOL_SIZE,
        KEEP_ALIVE_TIME,
        TIME_UNIT,
        WORK_QUEUE,
        new MyNamedThreadFactory()
);
private static final AtomicInteger activeThreads = new AtomicInteger(0);

static {
    OkHttpClient.Builder builder = new OkHttpClient.Builder();

    // 设置连接超时时间
    builder.connectTimeout(1, java.util.concurrent.TimeUnit.SECONDS);

    // 设置读取超时时间
    builder.readTimeout(1, java.util.concurrent.TimeUnit.SECONDS);

    // 设置写入超时时间
    builder.writeTimeout(1, java.util.concurrent.TimeUnit.SECONDS);

    // 构建 OkHttpClient 实例
    client = builder.build();
}

public static void main(String[] args) throws InterruptedException {
    simulateLoadVariation();
}

private static void simulateLoadVariation() throws InterruptedException {
    List<String> list = new ArrayList<>();
    //模拟500个并发
    IntStream.range(0, 500).forEach(item -> list.add("AAAAAAAA"));

    List<CompletableFuture<Number>> completableFuture =
            list.stream().map(item -> CompletableFuture.supplyAsync(() -> {
                try {
                    // 构建完整的Request对象
                    Request request = new Request.Builder()
                            .url("https://xxx.com")
                            .get()
                            .build();

                    // 随机添加一些耗时
                    Random random = new Random();
                    int sleepTime = random.nextInt(500) + 1500; // 500ms 到 1500ms 之间的随机数
                    Thread.sleep(sleepTime);

                    // 记录请求开始的时间
                    Number startTime = System.currentTimeMillis();

                    // 使用client执行请求
                    try (Response response = client.newCall(request).execute()) {
                        if (!response.isSuccessful()) {
                            throw new RuntimeException("Unexpected code " + response);
                        }
                        System.out.println(response.headers());
                    }

                    Number endTime = System.currentTimeMillis();
                    Number elapsedTime = (endTime.longValue() - startTime.longValue());
                    return elapsedTime;
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return 1;
            }, executor)).collect(Collectors.toList());

    List<Number> stringList = completableFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
    System.out.println(stringList);
}

static class MyNamedThreadFactory implements ThreadFactory {
    private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = defaultFactory.newThread(r);
        thread.setName("MyNamedThread-" + thread.getId());
        return thread;
    }
}

}

最后:线程池使用不当会有很多坑的,如滥用线程池、父子线程都用一个线程池,不同业务使用不同的线程池避免被某个业务影响对堆积导致影响了全局。

2 Comments

  1. Aes128 256 在线加密
    8 11 月, 2024

    Aes128 256 在线加密

Write your comment Here