博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ThreadPoolExecutor
阅读量:5952 次
发布时间:2019-06-19

本文共 7816 字,大约阅读时间需要 26 分钟。

简述:

  有的时候,系统处理很多任务,如何这些任务要是都是通过new Thread来做的话,系统就不得不常常的创建之后还要销毁Thread,这个是非常消耗时间而且还占用资源,所以我们通过创建线程池来管理我们的线程。

ThreadPoolExecutor:

  我们是首先来看一下他的构造函数,并且讲解一下里面参数的意思

public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue
workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }复制代码
  • corePoolSize   核心线程数。当我们向线程池中提交新的任务的时候,线程池会创建一个新的线程来执行这个任务,直到线程数等于corePoolSize,默认情况下核心线程会一直存活下去的。
  • maximumPoolSize   最大线程数。要是阻塞队列满了的话,继续添加新的线程的话,并且当前线程小于maximumPoolSize,那么就会创建新的线程来执行这个任务
  • keepAliveTime   线程允许空闲状态存活的时间,前提是此线程不是核心线程。
  • unit   这个参数是keepAliveTime的时间的单位,例如MICROSECONDS,SECONDS,MINUTES等等
  • workQueue   阻塞队列。用来存储等待执行的任务的阻塞队列,系统给我们提供了一下几个阻塞队列:ArrayBlockingQueue,LinkedBlockingQuene,SynchronousQuene和priorityBlockingQuene(我们在后面会讲到)
  • threadFactory   线程工厂。默认的是DefaultThreadFactory,里面定义了线程计数,线程命名规则。当然这些我们都可以自定义。
  • handler   饱和策略。我们可以实现RejectedExecutionHandler重写rejectedExecution方法来自定义,当然系统也给我们提供了几个:AbortPolicy, DiscardPolicy,DiscardOldestPolicy,CallerRunsPolicy(我们在后面也会讲到)

  我们前面笼统的提到了很多概念我猜测很多人现在都不太懂,那我们一点一点的来深入吧。

BlockingQueue

  BlockingQueue接口继承了Queue接口,里面有比较重要的几个方法,分别是offer方法,poll方法,普通方法,take方法,我们看看他的区别:

---- 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e.time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用

add方法会在插入数据时,插入失败就会抛异常,offer会在插入失败时返回false,put方法会在当队列存储对象达到限定值阻塞线程,而在队列不为空的时唤醒被take方法所阻塞的线程。

常见BlockingQueue的实现

  • ArrayBlockingQueue

  他是一个底层是由数组实现的有界的FIFO(先进先出)的阻塞队列。我们看一下他的构造函数

public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        this.items = new Object[capacity];        lock = new ReentrantLock(fair);        notEmpty = lock.newCondition();        notFull =  lock.newCondition();    }复制代码

  参数1:指定queue的capacity(容量),并且一旦设定则无法更改.   参数2:一个默认是false的bool值,指的是创建的是不是公平锁,如果为true,则按照 FIFO 顺序访问插入或移除时受阻塞线程的队列;如果为 false,则访问顺序是不确定的

public ReentrantLock(boolean fair) {        sync = fair ? new FairSync() : new NonfairSync();    }复制代码
  • LinkedBlockingQueue

  一个基于链表的阻塞队列,他的内部维持着一个链表队列用于存储。他的容量也是有固定大小的,如果不设置的情况下默认值是Integer.MAX_VALUE,他之所以能够高效的处理高并发的数据。是因为他的采用了两个独立的锁来控制数据同步

/** Lock held by take, poll, etc */   private final ReentrantLock takeLock = new ReentrantLock();   /** Wait queue for waiting takes */   private final Condition notEmpty = takeLock.newCondition();   /** Lock held by put, offer, etc */   private final ReentrantLock putLock = new ReentrantLock();   /** Wait queue for waiting puts */   private final Condition notFull = putLock.newCondition();复制代码
  • PriorityBlockingQueue   一个基于优先级的队列。他的优先级通过构造函数传入的Compator对象来决定,当时这个队列并不会阻塞数据的生产者,当数据队列中没有可消费的数据的时候,会阻塞消费者。那么我们就需要注意生成者的生成数据的速率不能高于消费速率,不然会消耗可用的堆内存。

  • SynchronousQueue   这个队列的神奇之处在于它内部没有容器,什么意思呢,就是当你的生产线程(put)成产啦,如果没有消费者消费(take),那么此生产线程就会被阻塞,知道有消费者调用take的时候。反过来先take等待put也是同理

ThreadFactory

  ThreadFactory接口里面的内容很简单,我们看一下源码

public interface ThreadFactory {    /**     * Constructs a new {@code Thread}.  Implementations may also initialize     * priority, name, daemon status, {@code ThreadGroup}, etc.     *     * @param r a runnable to be executed by new thread instance     * @return constructed thread, or {@code null} if the request to     *         create a thread is rejected     */    Thread newThread(Runnable r);}复制代码

  他的实现也是非常简单,只需要我们重写里面的newThread方法返回一个Thread就可以啦,我们可以在里面设置我们自己的命名格式等等。那我们先看看JDK中的默认实现DefaultThreadFactory:

/**     * The default thread factory.     */    private static class DefaultThreadFactory implements ThreadFactory {        private static final AtomicInteger poolNumber = new AtomicInteger(1);        private final ThreadGroup group;        private final AtomicInteger threadNumber = new AtomicInteger(1);        private final String namePrefix;        DefaultThreadFactory() {            SecurityManager s = System.getSecurityManager();            group = (s != null) ? s.getThreadGroup() :                                  Thread.currentThread().getThreadGroup();            namePrefix = "pool-" +                          poolNumber.getAndIncrement() +                         "-thread-";        }        public Thread newThread(Runnable r) {            Thread t = new Thread(group, r,                                  namePrefix + threadNumber.getAndIncrement(),                                  0);//创建生成线程的名字            if (t.isDaemon())                t.setDaemon(false);            if (t.getPriority() != Thread.NORM_PRIORITY)                t.setPriority(Thread.NORM_PRIORITY);            return t;        }    }复制代码

handler

  既然要说这个饱和策略那就先说一个任务是怎么被添加的吧!.一个新的任务通过 execute(Runnable) 方法被添加到线程池,任务就是一个 Runnable 类型的对象,任务的执行方法就是 Runnable 类型对象的 run() 方法。那么当一个任务通过 execute(Runnable) 方法欲添加到线程池时,线程池采用的策略如下:

**1. 当线程池中的数量小于corePoolSize的时候,即使你的线程池中的线程都处于空闲状态,也要开启一个新的线程来处理新的任务。

  • 当你线程池的数量等于corePoolSize的时候,且你的BlockQueue队列没有满的时候,那么任务被加入缓冲队列。
  • 当你的线程池的数量大于corePoolSize的时候,并且你的BlockQueue队列满的时候,但是线程池的数量小于maximumPoolSize的时候,会创建新的线程处理新的任务。
  • 当你的线程池的数量大于corePoolSize的时候,并且你的BlockQueue队列满的时候,但是线程池的数量等于maximumPoolSize的时候,我们的handler出场啦,就有这个饱和策略来处理新的任务**

  我们有四种饱和模式

  • ThreadPoolExecutor.AbortPolicy()     对拒绝任务执行抛弃处理,并且抛出异常
  • ThreadPoolExecutor.CallerRunsPolicy      这个策略重试添加当前的任务,他会自动重复调用 execute() 方法,直到成功
  • ThreadPoolExecutor.DiscardOldestPolicy()     对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列
  • ThreadPoolExecutor.DiscardPolicy     对拒绝任务直接无声抛弃,没有异常信息。

ThreadPoolExecutor的使用

  小伙伴在看完之后肯定觉得线程池使用起来有点麻烦,难道就没有现成的让我们使用吗?当然有啦,好东西都是留在最后的吗!

  • newFixedThreadPool   固定大小的线程池,根据参数来决定生成的核心线程数量和最大线程数量,内部使用的是LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue
()); }复制代码
  • newCachedThreadPool   可缓存的线程池,因为内部使用的是SynchronousQueue队列,由于核心线程数为0,最大线程是为Integer.MAX_VALUE,可以让他灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue
()); }复制代码
  • newScheduledThreadPool   根据参数来生成一个定长的线程池,由于使用的是DelayedWorkQueue,这个我们在前面没有介绍,在这里简单的说一下 他是一个无界阻塞队列,只有在延迟期满时才能从中提取元素,那么就可以用来做延迟操作或者周期性的操作啦
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {       return new ScheduledThreadPoolExecutor(corePoolSize);   }      public ScheduledThreadPoolExecutor(int corePoolSize) {       super(corePoolSize, Integer.MAX_VALUE,             DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,             new DelayedWorkQueue());   }复制代码
  • newSingleThreadExecutor   他的核心线程数还有最大线程数都是1,又由于用的是LinkedBlockingQueue无参数构造,那就是默认的Integer.MAX_VALUE的个数的阻塞队列,换句话说就是这里会一个一个安装指定顺序执行任务
public static ExecutorService newSingleThreadExecutor() {       return new FinalizableDelegatedExecutorService           (new ThreadPoolExecutor(1, 1,                                   0L, TimeUnit.MILLISECONDS,                                   new LinkedBlockingQueue
())); }复制代码

  当然除了这些还有好多自带呢不在一一说明,希望读者能够自己带着感悟去看源码,将会有跟深层次的理解!

转载于:https://juejin.im/post/5addbce2f265da0ba062c5bb

你可能感兴趣的文章
urllib模块使用笔记
查看>>
mysql 连接慢的问题(超过了1秒)
查看>>
1297. [SCOI2009]迷路【矩阵乘法】
查看>>
Linux嵌入式GDB调试环境搭建
查看>>
安全性测试要点转摘
查看>>
java分析jvm常用指令
查看>>
【Linux】Linux 在线安装yum
查看>>
oracle 管理操作 (转)
查看>>
POJ 1836, Alignment
查看>>
前端工程化
查看>>
Javascript模块化编程(三):require.js的用法 (转)
查看>>
html_01之基础标签
查看>>
DEV 等待窗口
查看>>
maven安装出错原因分析
查看>>
触发器及触发器的作用
查看>>
浅释丹道筑基功―—―混元桩【转载】
查看>>
django admin基础
查看>>
virtualenv使用
查看>>
手机页面点击不选中元素
查看>>
023_接口类,抽象类,多态,鸭子类型,封装
查看>>