【Java并发编程】启动和停止一个线程的最佳实践与源码分析

苡仁 2020年03月15日 75次浏览

1. 线程的启动

1.1 start()run()方法调用对比


/**
 * 对比start和run两种启动线程的方式
 * @author yiren
 */
public class StartAndRunThread {
    public static void main(String[] args) {

        // 直接使用run方法
        Runnable runnable = () -> System.out.println(Thread.currentThread().getName());
        runnable.run();

        Thread thread = new Thread(runnable);
        thread.run();
        
        // 使用start
        thread.start();
    }
}
main
main
Thread-0

Process finished with exit code 0
  • 由上可知, 无论是Runnable还是Thread的调用run()方法都是在当前线程直接运行,就是方法调用。
  • 而调用start()方法的时候,则是另起线程来运行run()方法中的内容。

1.2 关于start()方法

  • 启动新线程:

    1. 他会涉及到两个线程,要有一个当前线程调用start()方法,常见的为主线程main,也可以是其他线程;另外一个新线程在核实的时候执行run()方法

    2. Thread的对象通过start()方法告诉JVM,我有一个线程需要启动,你在合适的时候运行它。

  • 准备工作

    1. 首先它要让自己进入就绪状态,就绪状态是指我已经获取到除了CPU意外的其他资源(如上下文、栈、线程状态、PC程序计数器等)
  • 不能重复的start()

    /**
     * @author yiren
     */
    public class DoubleStartThread {
        public static void main(String[] args) {
            Thread thread = new Thread(() -> System.out.println(Thread.currentThread().getName()));
            thread.start();
            thread.start();
        }
    }
    
    Exception in thread "main" java.lang.IllegalThreadStateException
    	at java.lang.Thread.start(Thread.java:708)
    	at com.imyiren.concurrency.threadcore.startthread.DoubleStartThread.main(DoubleStartThread.java:10)
    Thread-0
    
    Process finished with exit code 1
    
    • IllegalThreadStateException 非法线程状态
    • 如果start()开始,正常线程线程就会按照 new->runnable->running->dead,如果一个线程执行完毕就会变成终止,就无法返回回去。所以才会抛出非法线程状态异常

1.3 start()方法源码分析

    /* Java thread status for tools,
     * initialized to indicate thread 'not yet started'
     */
    private volatile int threadStatus = 0;

		// Thread的start方法
    public synchronized void start() {
        /**
         * This method is not invoked for the main method thread or "system"
         * group threads created/set up by the VM. Any new functionality added
         * to this method in the future may have to also be added to the VM.
         *
         * A zero status value corresponds to state "NEW".
         */
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);

        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }
  1. 启动新线程检查线程状态

    • 线程的状态threadState,还没有启动的时候,默认值就是0,也就是还没有启动

    • 代码中的状态判断,如果线程状态不为0,那就抛出IllegalThreadStateException异常。

    • 注释上A zero status value corresponds to state "NEW".可知,0就为NEW状态。

  2. 加入线程组

    • 通过状态检查后就把当前线程放入到属性ThreadGroup groupthreads数组中,
  3. 调用start()

    • 然后就去执行private native void start0()方法,注意,此方法是native方法(C++实现)

1.4 run()方法源码分析

	/**
     * If this thread was constructed using a separate
     * <code>Runnable</code> run object, then that
     * <code>Runnable</code> object's <code>run</code> method is called;
     * otherwise, this method does nothing and returns.
     * <p>
     * Subclasses of <code>Thread</code> should override this method.
     *
     * @see     #start()
     * @see     #stop()
     * @see     #Thread(ThreadGroup, Runnable, String)
     */
    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }
  • 在多线程中Threadrun()方法会有两种情况,一种是如果重写了就调用重写Thread类的run()方法,一种是调用Runnable实现的run()方法
  • 如果直接调用run()方法的话,就只是调用一个普通的方法而已。要启动一个线程还是只能调用start方法去间接调用我们的run()方法。

1.5 常见面试题

  • 一个线程两次调用start方法会出现什么情况?为什么?
  • start方法会间接调用run方法,为什么我们不直接调用run方法?

2. 停止线程

2.1 使用Interrupt来停止线程

  • 使用interrupt来通知线程停止,而不是强制停止。

    1. 注意只是通知,并不是让线程立即停止。

    2. 只需要通知线程,你需要停止,线程通过响应interrupt来在合适的地方停止或者退出线程的执行。

  • 为什么要这样做呢?

    线程在停止时,所使用的资源没有释放造成资源浪费甚至BUG,数据处理没有完成造成数据不一致,这样的问题往往会令我们头疼。而如果使用interrupt来通知它,线程可以进行停止前的释放资源,完成必须要处理的数据任务,诸如此类的事情,就会令我们的程序的健壮性提升,也减少了系统出现问题的几率

  • 停止普通线程

    /**
     * run 方法内没有sleep或者wait方法时,停止线程。
     *
     * @author yiren
     */
    public class RightStopThreadWithoutSleep {
    
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(() -> {
                int num = 0;
                long start = System.currentTimeMillis();
                while (num <= Integer.MAX_VALUE / 2) {
                    if (num % 1000 == 0) {
                        System.out.println(num + " 是10000的倍数!");
                    }
                    // 注意 如果不interrupted的响应处理,线程不会处理interrupt
                    if (Thread.currentThread().isInterrupted()) {
                        System.out.println(Thread.currentThread().getName() + " was interrupted");
                        break;
                    }
                    num++;
                }
                long end = System.currentTimeMillis();
                System.out.println("Task was finished! " + (end - start) / 1000.0 + "s");
            });
            thread.start();
            Thread.sleep(2000);
            thread.interrupt();
    
        }
    }
    
    ......
    401797000 是10000的倍数!
    Thread-0 was interrupted
    Task was finished! 2.004s
    
    Process finished with exit code 0
    
  • 停止阻塞线程

    • 如果线程在阻塞状态,比如调用sleep()方法时,响应interrupt的方式是抛出异常。

    • 所以停止阻塞线程使用try-catch来实现

    /**
     * run 方法内有sleep时,停止线程。
     * @author yiren
     */
    public class RightStopThreadWithSleep {
    
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(() -> {
                int num = 0;
                long start = System.currentTimeMillis();
    
                while (num <= 300) {
                    if (num % 100 == 0) {
                        System.out.println(num + " 是100的倍数!");
                    }
                    num++;
                    // 注意 如果不interrupted的响应处理,线程不会处理interrupt
                    if (Thread.currentThread().isInterrupted()) {
                        System.out.println(Thread.currentThread().getName() + " was interrupted");
                        break;
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    System.out.println(Thread.currentThread().getName() + " thread was interrupted by sleep!");
                }
                long end = System.currentTimeMillis();
                System.out.println("Task was finished! " + (end - start) / 1000.0 + "s");
            });
            thread.start();
            Thread.sleep(500);
            thread.interrupt();
        }
    }
    
    0 是100的倍数!
    100 是100的倍数!
    200 是100的倍数!
    300 是100的倍数!
    java.lang.InterruptedException: sleep interrupted
    	at java.lang.Thread.sleep(Native Method)
    	at com.imyiren.concurrency.threadcore.stopthread.RightStopThreadWithSleep.lambda$main$0(RightStopThreadWithSleep.java:26)
    	at java.lang.Thread.run(Thread.java:748)
    Thread-0 thread was interrupted by sleep!
    Task was finished! 0.505s
    
    Process finished with exit code 0
    
  • 每个循环中都有sleep

    • 如果每个循环都有阻塞, 我们就可以不用每个循环都判断一次interrupted了,只需要处理catch的异常即可。
    /**
     * 在执行过程中每次循环都会调用sleep获wait等方法
     *
     * @author yiren
     */
    public class RightStopThreadWithSleepInLoop {
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(() -> {
                int num = 0;
                long start = System.currentTimeMillis();
    
                try {
                    while (num <= 10000) {
                        if (num % 100 == 0) {
                            System.out.println(num + " 是100的倍数!");
                        }
                        Thread.sleep(10);
                        num++;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    System.out.println(Thread.currentThread().getName() + " thread was interrupted by sleep!");
                }
                long end = System.currentTimeMillis();
                System.out.println("Task was finished! " + (end - start) / 1000.0 + "s");
            });
            thread.start();
            Thread.sleep(5000);
            thread.interrupt();
        }
    }
    
    0 是100的倍数!
    100 是100的倍数!
    200 是100的倍数!
    300 是100的倍数!
    400 是100的倍数!
    java.lang.InterruptedException: sleep interrupted
    	at java.lang.Thread.sleep(Native Method)
    	at com.imyiren.concurrency.threadcore.stopthread.RightStopThreadWithSleepInLoop.lambda$main$0(RightStopThreadWithSleepInLoop.java:19)
    	at java.lang.Thread.run(Thread.java:748)
    Thread-0 thread was interrupted by sleep!
    Task was finished! 5.005s
    
    Process finished with exit code 0
    
    • 这个地方需要注意一个地方,try-catch的位置,这个不难看出,如果是下列代码,则不能interrupt,会死循环。。。
    
    /**
     * @author yiren
     */
    public class CantInterrupt {
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(() -> {
                int num = 0;
                long start = System.currentTimeMillis();
    
                while (num <= 10000) {
                    if (num % 100 == 0) {
                        System.out.println(num + " 是100的倍数!");
                    }
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        System.out.println(Thread.currentThread().getName() + " thread was interrupted by sleep!");
                    }
                    num++;
                }
    
                long end = System.currentTimeMillis();
                System.out.println("Task was finished! " + (end - start) / 1000.0 + "s");
            });
            thread.start();
            Thread.sleep(5000);
            thread.interrupt();
        }
    }
    
    0 是100的倍数!
    100 是100的倍数!
    200 是100的倍数!
    300 是100的倍数!
    400 是100的倍数!
    java.lang.InterruptedException: sleep interrupted
    	at java.lang.Thread.sleep(Native Method)
    	at com.imyiren.concurrency.threadcore.stopthread.CantInterrupt.lambda$main$0(CantInterrupt.java:17)
    	at java.lang.Thread.run(Thread.java:748)
    Thread-0 thread was interrupted by sleep!
    500 是100的倍数!
    600 是100的倍数!
    700 是100的倍数!
    800 是100的倍数!
    ......
    
  • InterruptedException处理最佳实践(业务中如何使用?)

    • 绝对不应屏蔽中断请求
  1. run()方法直接抛出interruptedException,不做处理
    • 首先我们不能在业务方法中直接处理掉异常,不能try-catch,需要直接抛出。
    • 那么我们在业务方法中处理了这个异常会怎么样呢?那么如果run()方法中有循环,则无法退出循环。。
    • 最佳实践:在业务代码中有InterruptedException 优先选择 在方法签名中抛出异常,不处理。那么就会使InterruptedExceptionrun()方法中强制try-catch。如下代码
/**
 * 生产中如何处理interrupted
 *
 * @author yiren
 */
public class RightStopThreadInProd implements Runnable {

    @Override
    public void run() {
        try {
            while (true) {
                System.out.println("business code...");
                // 假设调用其他方法
                throwInMethod();
                System.out.println("business code...");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println("catch interruptedException handle interrupted! ...");
        }
    }

    private void throwInMethod() throws InterruptedException {
        Thread.sleep(1000);
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new RightStopThreadInProd());
        thread.start();
        Thread.sleep(500);
        thread.interrupt();
    }
}
business code...
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at com.imyiren.concurrency.threadcore.stopthread.RightStopThreadInProd.throwInMethod(RightStopThreadInProd.java:28)
	at com.imyiren.concurrency.threadcore.stopthread.RightStopThreadInProd.run(RightStopThreadInProd.java:18)
	at java.lang.Thread.run(Thread.java:748)
catch interruptedException handle interrupted! ...

Process finished with exit code 0
  1. 直接在业务方法中恢复中断(当业务方法无法抛出或不想抛出时)

    • 就是利用中断机制,调用Thread.currentThread().interrupt() 来恢复中断
/**
 * 生产中如何处理interrupted 2
 * 最佳实践:在业务代码中有InterruptedException 在catch语句中调用Thread.currentThread().interrupt()
 * 以便于在后续的执行中,能够检测到发生了中断。
 * @author yiren
 */
public class RightStopThreadInProd2 implements Runnable {

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("business code...");
            // 假设调用其他方法
            reInterrupted();
            System.out.println("business code...");
        }
    }

    private void reInterrupted() {
        try {
            System.out.println("reInterrupted method business! ");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " reInterrupted interrupt");
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new RightStopThreadInProd2());
        thread.start();
        Thread.sleep(1500);
        thread.interrupt();
    }
}
business code...
reInterrupted method business! 
business code...
business code...
reInterrupted method business! 
Thread-0 reInterrupted interrupt
business code...

Process finished with exit code 0
  • 响应中断的一些方法
    1. Object.wait(...)
    2. Thraed.sleep(...)
    3. Thread.join(...)
    4. java.util.concurrent.BlockingQueue.take()/put(E)
    5. java.util.concurrent.locks.Lock.lockInterruptibly()
    6. java.util.concurrent.CountDownLatch.await()
    7. java.util.CyclicBarrier.await()
    8. java.util.concurrent.Exchanger.exchange(V)
    9. java.nio.channels.InterruptibleChannel的相关方法
    10. java.nio.channels.Selector的相关方法

2.2 错误停止线程的方式

  • 被弃用的方法:stop()suspend()resume()

    1. stop方法停止
      • 由下代码可看到,很有可能,代码在计算过程中,最后一部分数据没被计算进去。
      • 代码具有偶然性,可能出错,可能不会出错。
      • 可想如果发生在银行转账过程中,那么最终的金额对不上。。。这就是个大故障了。。
    /**
     * 错误的停止方法,用stop来停止线程,会导致线程运行一半突然停止
     * 没办法完成一个基本单位的操作。会造成脏数据等问题
     *
     * @author yiren
     */
    public class ThreadStop {
        public static void main(String[] args) throws InterruptedException {
            final Data data = new Data();
            Thread thread = new Thread(() -> {
                while (true) {
                    int randomInt = (int) (Math.random() * 11);
                    int sum = 0, temp;
                    for (int i = 1; i < data.nums.length + 1; i++) {
                        temp = randomInt * i;
                        sum += temp;
                        data.nums[i-1] += temp;
                        System.out.println("i=" + i + ", num=" + temp);
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            //...
                        }
                    }
                    data.total -= sum;
                }
            });
            thread.start();
            Thread.sleep(931);
            thread.stop();
            System.out.println(data);
    
        }
    }
    class Data{
        int total = Integer.MAX_VALUE;
        int[] nums = new int[5];
    
        @Override
        public String toString() {
            int sum = 0;
            for (int i = 0; i < nums.length; i++) {
                sum += nums[i];
            }
            return "Data{" +
                    "total=" + total +
                    ", nums=" + Arrays.toString(nums) +
                    ", sumNums=" + sum +
                    ", sum=" + (sum + total) +
                    ", Integer.MAX_VALUE=" + Integer.MAX_VALUE +
                    '}';
        }
    }
    
    i=5, num=40
    i=1, num=7
    i=2, num=14
    i=3, num=21
    i=4, num=28
    Data{total=2147482402, nums=[90, 180, 270, 360, 415], sumNums=1315, sum=-2147483579, Integer.MAX_VALUE=2147483647}
    
    Process finished with exit code 0
    
    1. suspend和resume
      • suspend()方法会使得目标线程停下来,但却仍然持有在这之前获得的锁定。这样一来很容造成死锁。
      • resume()方法则是用于 恢复通过调用suspend()方法而停止运行的线程
      • 这两个方法都已被废弃,所以不推荐使用。
  • 用volatile设置boolean标志位

    1. 案例一:可以停止
    /**
     * 看似可行的一个用volatile关键字案例
     * @author yiren
     */
    public class VolatileWrong implements Runnable{
        private volatile boolean canceled = false;
    
    
        @Override
        public void run() {
            int num = 0;
            while (!canceled) {
                num++;
                if (num % 100 == 0) {
                    System.out.println("num = " + num);
                }
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    //...
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            VolatileWrong volatileWrong = new VolatileWrong();
            Thread thread = new Thread(volatileWrong);
            thread.start();
            Thread.sleep(2345);
            System.out.println("开始停止线程...");
            volatileWrong.canceled = true;
        }
    }
    
    num = 100
    num = 200
    开始停止线程...
    
    Process finished with exit code 0
    
    1. 不可以停止
    /**
     * 看似可行的一个用volatile关键字案例 二
     * 阻塞时,volatile时无法停止线程的
     * 实现一个生产者很快消费者很慢的案例
     *
     * @author yiren
     */
    public class VolatileWrongCantStop {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue storage = new ArrayBlockingQueue(10);
            Producer producer = new Producer(storage);
            Thread producerThread = new Thread(producer);
            producerThread.start();
    
            Thread.sleep(1000);
    
            Consumer consumer = new Consumer(storage);
            while (consumer.needMore()) {
                System.out.println(consumer.storage.take() + " 被消费了");
                Thread.sleep(200);
            }
            System.out.println("consumer 不需要数据了");
    
            producer.canceled = true;
        }
    
        static class Producer implements Runnable {
            BlockingQueue<Integer> storage;
            public volatile boolean canceled = false;
    
            public Producer(BlockingQueue storage) {
                this.storage = storage;
            }
    
            @Override
            public void run() {
                int num = 0;
                try {
                    while (!canceled) {
                        num++;
                        if (num % 100 == 0) {
                            System.out.println("num = " + num);
                            storage.put(num);
                        }
                        Thread.sleep(1);
    
                    }
                } catch (InterruptedException e) {
                    System.out.println("??");
                    //...
                } finally {
                    System.out.println("Provider end!");
                }
            }
        }
    
        static class Consumer {
            BlockingQueue<Integer> storage;
    
            public Consumer(BlockingQueue<Integer> storage) {
                this.storage = storage;
            }
    
            public boolean needMore() {
                return Math.random() < 0.9;
            }
        }
    }
    
    • volatile用于停止线程,如果遇到线程阻塞时,是无法停止线程的。如上案例二,运行过后Consumer已经发出信号停止线程,但是由于我们的BlockingQueue满了,停在了storage.put(num);方法上中,所以finally中的输出语句始终没有出现,程序也没有停止。
    • 我们可以看到上面的put方法是抛出了InterruptedException的,所以我们可以利用异常处理来实现。如下代码:
    /**
     * 看似可行的一个用volatile关键字案例 二 使用interrupted修复问题
     *
     * @author yiren
     */
    public class VolatileWrongCantStopFix {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue storage = new ArrayBlockingQueue(10);
            Producer producer = new Producer(storage);
            Thread producerThread = new Thread(producer);
            producerThread.start();
    
            Thread.sleep(1000);
    
            Consumer consumer = new Consumer(storage);
            while (consumer.needMore()) {
                System.out.println(consumer.storage.take() + " 被消费了");
                Thread.sleep(200);
            }
            System.out.println("consumer 不需要数据了");
            producerThread.interrupt();
        }
        static class Producer implements Runnable {
            BlockingQueue<Integer> storage;
    
            public Producer(BlockingQueue storage) {
                this.storage = storage;
            }
    
            @Override
            public void run() {
                int num = 0;
                try {
                    while (true) {
                        num++;
                        if (num % 100 == 0) {
                            System.out.println("num = " + num);
                            storage.put(num);
                        }
                        Thread.sleep(1);
    
                    }
                } catch (InterruptedException e) {
                    System.out.println("interrupt !!!");
                    //...
                } finally {
                    System.out.println("Provider end!");
                }
            }
        }
    
        static class Consumer {
            BlockingQueue<Integer> storage;
    
            public Consumer(BlockingQueue<Integer> storage) {
                this.storage = storage;
            }
    
            public boolean needMore() {
                return Math.random() < 0.9;
            }
        }
    }
    

2.3 关键方法源码

  • interrupted()方法

    • 该方法很简单,里面并没有直接处理中断的代码,而是调用了native方法interrupt0()
    • interrupt0()它在JVM中实际是调用系统的方法
        public void interrupt() {
            if (this != Thread.currentThread())
                checkAccess();
    
            synchronized (blockerLock) {
                Interruptible b = blocker;
                if (b != null) {
                    interrupt0();           // Just to set the interrupt flag
                    b.interrupt(this);
                    return;
                }
            }
            interrupt0();
        }
    
        private native void interrupt0();
    
    
  • isInterruped()方法

    • 该方法返回中断状态,并清除中断,设置为false
        public boolean isInterrupted() {
            return isInterrupted(false);
        }
    
        private native boolean isInterrupted(boolean ClearInterrupted);
    
    
  • Thread.interrupted()

    • 它是唯一一个清除中断的方法。
        public static boolean interrupted() {
            return currentThread().isInterrupted(true);
        }
    
    

  • 文章内容来源:
  • 《Java多线程核心》、《Java并发编程艺术》、JDK1.8版本源码、慕课网悟空JUC课程、不知道那些人的博客。。。