Executor介绍

Posted by Jack on May 26, 2019

Executor介绍

1.简介

在JDK1.5之后,Java为我们提供了一个Java.util.concurrent包,这个包下有Executor接口,这就为我们提供了线程池的方法去开启多个线程。

2.Executor对比 new Thread的优点

new Thread()的缺点

  • 每次我们new Thread都会创建一个对象,不能被重用,而且对象的创建和销毁也是消耗资源的;
  • new Thread我们没法去设置定时、线程中断等功能;
  • new Thread缺乏统一的管理,我们new多了的时候,就会出现线程之间的竞争,或者占用过多的cpu资源,甚至可能导致死机;

采用线程池的优点

  • 重用存在的线程,减少对象创建、消亡的开销,性能佳 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞 提供定时执行、定期执行、单线程、并发数控制等功能

3.Executor方法介绍

查看源码我们可以看到Executor接口其实很简单,就一个方法:

public interface Executor {
	/**
 	* Executes the given command at some time in the future.  The command
 	* may execute in a new thread, in a pooled thread, or in the calling
 	* thread, at the discretion of the {@code Executor} implementation.
	 *
 	* @param command the runnable task
 	* @throws RejectedExecutionException if this task cannot be
 	* accepted for execution
 	* @throws NullPointerException if command is null
 	*/
	void execute(Runnable command);
}

4.ExecutorService

ExecutorService是一个接口,ExecutorService接口继承了Executor接口,定义了一些生命周期的方法。

public interface ExecutorService extends Executor {


    void shutdown();//顺次地关闭ExecutorService,停止接收新的任务,等待所有已经提交的任务执行完毕之后,关闭ExecutorService


    List<Runnable> shutdownNow();//阻止等待任务启动并试图停止当前正在执行的任务,停止接收新的任务,返回处于等待的任务列表


    boolean isShutdown();//判断线程池是否已经关闭

    boolean isTerminated();//如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。


    boolean awaitTermination(long timeout, TimeUnit unit)//等待(阻塞)直到关闭或最长等待时间或发生中断,timeout - 最长等待时间 ,unit - timeout 参数的时间单位  如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false 


    <T> Future<T> submit(Callable<T> task);//提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。


    <T> Future<T> submit(Runnable task, T result);//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。


    Future<?> submit(Runnable task);//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null


    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)//执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。
        throws InterruptedException;


    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)//执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。
        throws InterruptedException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks)//执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。
        throws InterruptedException, ExecutionException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

而ExecutorService接口继承了Executor接口,是Executor的扩展子接口;

这算他们之间的第一个区别,

而第二个区别是,Executor中的execute接口的是实现Runable接口的对象;而ExecutorService中的submit接收的实现Runable接口的对象或者callable接口的对象;

第三个区别是:Executor中的execute方法没有返回值,而submit方法有future返回值;

第四个区别是:ExecutorService还提供了控制线程池的方法;

这里说一下注意点:那就是ExecutorService的submit方法获取返回值get获取数据的时候,会导致get所在的线程发生堵塞,直到你返回值的线程执行完后,get线程才获取最终的结果;

ExecutorService为我们提供了四种线程池:

4.1 newCachedThreadPool线程池

创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果没有空闲的线程,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

Example:

public class MyThreadNum implements Runnable{
	int i=0;
	public MyThreadNum(int i){
		this.i=i;
	}
	public void run() {
		System.out.println(Thread.currentThread().getName()+"====="+i);
	}
}

然后我们来执行这个线程:

public static void main(String[] args) throws InterruptedException {
		ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
		for(int i=0;i<10;i++){
			cachedThreadPool.execute(new MyThreadNum(i));
			Thread.sleep(1000);
		}
	}

主线程让其休息会,目的是为了让线程池里面的线程执行完处于空闲状态;然后我们看执行结果:

pool-1-thread-1=====0
pool-1-thread-1=====1
pool-1-thread-1=====2
pool-1-thread-1=====3
pool-1-thread-1=====4
pool-1-thread-1=====5
pool-1-thread-1=====6
pool-1-thread-1=====7
pool-1-thread-1=====8
pool-1-thread-1=====9

都是一个线程执行的啊,然后我们改一下测试方法,不让主线程休息了:

public static void main(String[] args) throws InterruptedException {
		ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
		for(int i=0;i<10;i++){
			cachedThreadPool.execute(new MyThreadNum(i));
			//Thread.sleep(1000);
		}
	}

这时候的结果呢?

pool-1-thread-2=====1
pool-1-thread-1=====0
pool-1-thread-10=====9
pool-1-thread-4=====3
pool-1-thread-6=====5
pool-1-thread-5=====4
pool-1-thread-7=====6
pool-1-thread-3=====2
pool-1-thread-9=====8
pool-1-thread-8=====7

现在是多个线程了,这就是newCachedThreadPool线程池的作用;

4.2 newFixedThreadPool线程池

newFixedThreadPool线程池是创建一个固定值得线程池,如果业务超出线程数量,那就排队;注意排队的时候然后谁的线程结束了直接拿过来使用,但是线程充足的情况下,它会直接去拿新的线程;

  public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

然后测试一下,线程类不需要修改的,只修改创建线程的测试方法就行我们来看一下,当我们创建10个线程的情况:

public static void main(String[] args) throws InterruptedException {
		ExecutorService cachedThreadPool = Executors.newFixedThreadPool(10);
		for(int i=0;i<10;i++){
			cachedThreadPool.execute(new MyThreadNum(i));
			Thread.sleep(1000);
		}
	}

还是让主线程休息休息,这时候的结果就跟上面不一样了,它线程充足的情况下是不会去回收线程的;

pool-1-thread-1=====0
pool-1-thread-2=====1
pool-1-thread-3=====2
pool-1-thread-4=====3
pool-1-thread-5=====4
pool-1-thread-6=====5
pool-1-thread-7=====6
pool-1-thread-8=====7
pool-1-thread-9=====8
pool-1-thread-10=====9

那么我们再看一下,线程数量不够业务处理的情况,我们来设置五个线程:

public static void main(String[] args) throws InterruptedException {
		ExecutorService cachedThreadPool = Executors.newFixedThreadPool(5);
		for(int i=0;i<10;i++){
			cachedThreadPool.execute(new MyThreadNum(i));
			//Thread.sleep(1000);
		}
	}

再来看一下结果:

pool-1-thread-5=====4
pool-1-thread-2=====1
pool-1-thread-3=====2
pool-1-thread-3=====7
pool-1-thread-3=====8
pool-1-thread-1=====0
pool-1-thread-4=====3
pool-1-thread-3=====9
pool-1-thread-2=====6
pool-1-thread-5=====5

可以看出,newFixedThreadPool线程池是在业务需要达到它的固定值得线程数的时候进行空闲线程回收的;

4.3 newScheduledThreadPool线程池

它也是一个创建固定长度的线程池,支持定时和周期性执行;

public static void main(String[] args) throws InterruptedException {
		ScheduledExecutorService cachedThreadPool = Executors.newScheduledThreadPool(10);
		for(int i=0;i<10;i++){
			cachedThreadPool.scheduleAtFixedRate(new MyThreadNum(i), 1,1000,TimeUnit.MILLISECONDS);
			//cachedThreadPool.schedule(new MyThreadNum(i), 1000,TimeUnit.MILLISECONDS);
			//Thread.sleep(1000);
		}
	}

这里面有两个方法,第一个没有注释的是:第二个参数表示线程第一次运行的初始时间,第三个参数是下一次间隔多长时间,最后一个参数是时间单位:分、秒、时等等;

注释的那个就说一下第二个参数就行就是定时,跟上面第二个参数含义一样,只是它只执行一次;

4.4 newSingleThreadExecutor线程池

创建一个单线程的线程池,他所有的任务都是用这个线程来执行的,保证所有任务按照指定顺序执行;他还能保证当一个线程发生异常时,他会继续往下执行。

public class MyThreadNum implements Runnable{
	int i=0;
	public MyThreadNum(int i){
		this.i=i;
	}
	public void run() {
		int num=100/i;
		System.out.println(Thread.currentThread().getName()+"====="+i);
	}
}
public static void main(String[] args) throws InterruptedException {
	ExecutorService cachedThreadPool = Executors.newSingleThreadExecutor();
	for(int i=0;i<10;i++){
		cachedThreadPool.execute(new MyThreadNum(i));
		//Thread.sleep(1000);
	}
}

我们能够看到有一个线程肯定会把0作为除数,那么这个线程就挂了;咱们看看当这个线程挂了之后,还能不能继续往下执行了?

Exception in thread "pool-1-thread-1" pool-1-thread-2=====1
java.lang.ArithmeticException: / by zero
	at http.MyThreadNum.run(MyThreadNum.java:14)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2=====2
pool-1-thread-2=====3
pool-1-thread-2=====4
pool-1-thread-2=====5
pool-1-thread-2=====6
pool-1-thread-2=====7
pool-1-thread-2=====8
pool-1-thread-2=====9

我们看到打印结果,当线程1挂了之后,newSingleThreadExecutor还是一个线程但是是重新启动了一个线程;

参考链接

Java程序员必备知识-多线程框架Executor详解

Executor使用小结