拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 执行绪池使用

执行绪池使用

白鹭 - 2022-02-13 2173 0 0

执行绪池

1.工具类实作

/**
* @title: ThreadUtil
* @Author bao
* @description
* @Date: 2021/12/2216:07
*/
public class ThreadUtil {
   private static volatile ExecutorService executorService;
?
   //获取单例物件
   public static ExecutorService getInstance(){
       if (executorService == null) {
           synchronized (ThreadPoolManager.class) {
               if (executorService == null) {
                   int cpuNum = Runtime.getRuntime().availableProcessors();// 获取处理器数量
                   int threadNum = cpuNum * 2;// 根据cpu数量,计算出合理的执行绪并发数
                   executorService = Executors.newFixedThreadPool(threadNum);
              }
          }
      }
       return executorService;
  }
?
?
//ceshi
   public static void main(String[] args) {
       ExecutorService instance = ThreadUtil.getInstance();
       long taskCount = 0;
       for (int i = 0; i < 100; i++) {
           int j = i;
           ThreadUtil.getInstance().execute(()->{
               System.out.println(j);
          });
?
           taskCount = ((ThreadPoolExecutor)instance).getActiveCount();
           System.out.println("taskCount:"+taskCount);
      }
  }
?
}

执行绪池监控:

long activeCount = ((ThreadPoolExecutor)instance).getActiveCount();

  • taskCount 任务的数量

  • completedTaskCount 运行的程序中完成的任务数量

  • largestPoolSize 曾经创建过的最大的执行绪数量

  • getPoolSize 执行绪数量

  • getActiveCount 获取活动的执行绪数

  • 扩展执行绪池:beforeExecuteafterExecute 在执行绪执行前,执行后做点什么

自定义执行绪池

工具类

/**
* @title: ThreadUtil
* @Author bao
* @description
* @Date: 2021/12/2215:39
*/
public class ThreadManager {
?
   public static volatile ThreadPool instance;
?
?
   // 获取单例的执行绪池物件
   public static ThreadPool getInstance() {
       if (instance == null) {
           synchronized (ThreadPoolManager.class) {
               if (instance == null) {
                   int cpuNum = Runtime.getRuntime().availableProcessors();// 获取处理器数量
                   int threadNum = cpuNum * 2;// 根据cpu数量,计算出合理的执行绪并发数
                   instance = new ThreadPool(threadNum, threadNum+1, Integer.MAX_VALUE);
              }
          }
      }
       return instance;
  }
?
   public static class ThreadPool {
       private ThreadPoolExecutor mExecutor;
       private int corePoolSize;
       private int maximumPoolSize;
       private long keepAliveTime;
?
       private ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
           this.corePoolSize = corePoolSize;
           this.maximumPoolSize = maximumPoolSize;
           this.keepAliveTime = keepAliveTime;
      }
?
       public void execute(Runnable runnable) {
           if (runnable == null) {
               return;
          }
           if (mExecutor == null) {
               mExecutor = new ThreadPoolExecutor(corePoolSize,// 核心执行绪数
                       maximumPoolSize, // 最大执行绪数
                       keepAliveTime, // 闲置执行绪存活时间
                       TimeUnit.MILLISECONDS,// 时间单位
                       new LinkedBlockingDeque<Runnable>(Integer.MAX_VALUE),// 执行绪队列
                       Executors.defaultThreadFactory(),// 执行绪工厂
                       new ThreadPoolExecutor.AbortPolicy() {// 队列已满,而且当前执行绪数已经超过最大执行绪数时的例外处理策略
                           @Override
                           public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                               super.rejectedExecution(r, e);
                          }
                      }
              );
          }
           mExecutor.execute(runnable);
      }
  }
?
?
   public static void main(String[] args) {
       for (int i = 0; i < 100; i++) {
           int j = i;
           ThreadManager.getInstance().execute(()->{
               System.out.println(j);
          });
      }
  }
?
}

自变量

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

  • corePoolSize:执行绪池的基本大小, 提前呼叫prestartAllCoreThreads(),会把所有的基本执行绪启动 ,

  • workQueue: ?于保存等待执?的任务的阻塞队列,

    • ArrayBlockingQueue 基于阵列实作的(先进先出),

    • LinkedBlockingQueue 吞吐量要高于ArrayBlockingQueue,

    • SynchronousQueue 吞吐量要高于LinkedBlockingQueue 不存盘元素的阻塞队列,得等一个执行绪做移除操作才能继续进行,要不会一直阻塞,

    • PriorityBlockingQueue 具有优先级的无限阻塞队列,

  • maximumPoolSize: 执行绪池允许创建的最?执行绪数,

  • threadFactory: ?于设定创建执行绪的工厂可以使用谷歌的开源方法,

  • handler: 饱和策略,阻塞队列和我们的执行绪的创建数都满了的时候就会饱和选择一个策略对新提交的策略进行处理,

    • AbortPolicy 直接抛出例外,

    • CallerRunsPolicy 只用呼叫者所在的执行绪来处理任务,

    • DiscardOldestPolicy 丢弃队列里最近的一个任务,

    • DiscardPolicy 直接丢弃,

    • ?定义 自己定义一个处理方式,

  • keepAliveTime:执行绪池的?作执行绪空闲后,保持存活的时间,

  • unit:执行绪活动保持时间的单位,

执行绪数选择

  • CPU密集型 : N cpu + 1 配置尽可能小的执行绪,执行绪数要少一点,减少cpu频繁的背景关系切换,提高cpu的利用率

  • IO 密集型 :2 * N cpu 需要配置尽可能多的执行绪,这样才能保证cpu能被充分的利用

  • 混合型 :拆分成CPU密集型和IO密集型

  • N = Runtime.getRuntime().availableProcessors()

标签:

0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *