zl程序教程

您现在的位置是:首页 >  Java

当前栏目

java中断机制深入分析之Thread源码跟踪

2023-03-15 22:00:57 时间

一、Thread.stop() VS Thread.interrupt()

在jdk1.0时代,要终止一个Java线程,可以使用Thread提供的stop()和destroy()等方法,但这些方法在jdk1.4之后就已经不推荐使用了,原因是这些方法会强行关闭当前线程,并解锁当前线程已经持有的所有监视器(互斥锁、共享锁),这会导致被这些监视器保护的数据对象处于不一致的状态,其它线程可以查看到这些不一致状态的数据对象,从而导致各种不可预知的错误。

在jdk1.4引入了另外一种结束线程的方式——中断。简单来说,就是每个线程有一个int类型的成员变量_interrupted(0表示没被中断,1表示被中断)。线程在执行过程中的适当位置,检查这个变量,来获知当前线程是否应该结束。

我们可以使用Thread.interrupt()方法将中断标记_interrupted设为1,也可以使用Thread.interrupted()方法将中断标记_interrupted重置为0。下面是从jdk的Thread.java类中截取的interrupt()源码:

// java.lang.Thread
public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();
        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
    
    private native void interrupt0();  //native方法

可以看到,Thread.interrupt()方法的核心方法是Thread.interrupt0()方法,从注释上看,这个方法只会设置中断标记位(_interrupted变量)。在调用Thread.interrupt()方法之前,如果当前线程已经处于阻塞状态(比如调用了Thread.sleep()方法),那么调用该阻塞线程的Thread.interrupt()方法将导致当前线程从Thread.sleep()函数中醒来,并抛出InterruptedException。下面是Thread.sleep()和Thread.interrupt()的示例:

public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try {
                TimeUnit.HOURS.sleep(1);//当前线程sleep一个小时
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread.start();
        try {
            TimeUnit.SECONDS.sleep(5);//主线程sleep5秒钟,保证thread线程能够执行sleep方法
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread.interrupt();//主线程中断thread线程,导致sleep方法抛出InterruptedException,结束阻塞
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("finish");
    }

在这个示例中,thread线程执行Thread.sleep()方法就标志着,该线程将在未来1个小时内不参与cpu的竞争,thread线程将处于阻塞状态。如果说Thread.interrupt()方法只是修改了中断标记_interrupted的值,那么已经放弃cpu、处于阻塞状态的thread线程如何能感知到这个变量已经被改变,从而立即抛出InterruptedException呢?为了回答这个问题,我们需要深入native方法Thread.interrupt0()。

二、Thread.interrupt0()源码跟踪

1、jdk源码(JNI注册)

// jdk/src/share/native/java/lang/Thread.c:43
static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},   //sleep方法名映射成的jni方法名
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},   //interrupt0方法名映射成jni方法名
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};

// jdk/src/share/javavm/export/jvm.h:254
JNIEXPORT void JNICALL
JVM_Interrupt(JNIEnv *env, jobject thread);   //JVM_Interrupt接口定义

2、java虚拟机(HotSpot实现)

// hotspot/src/share/prims/jvm.cpp:3289
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))  //JVM_Interrupt接口实现
  JVMWrapper("JVM_Interrupt");

  // Ensure that the C++ Thread and OSThread structures aren't freed before we operate
  oop java_thread = JNIHandles::resolve_non_null(jthread);
  MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
  // We need to re-resolve the java_thread, since a GC might have happened during the
  // acquire of the lock
  JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
  if (thr != NULL) {
    Thread::interrupt(thr);      //调用interrupt方法
  }
JVM_END

// hotspot/src/share/vm/runtime/thraed.cpp:634
//成员变量对应着不同的阻塞方法
ParkEvent * _ParkEvent ;   // for synchronized()
ParkEvent * _SleepEvent ; // for Thread.sleep
ParkEvent * _MutexEvent ; // for native internal Mutex/Monitor,互斥锁
ParkEvent * _MuxEvent ;   // for low-level muxAcquire-muxRelease,共享锁

// hotspot/src/share/vm/runtime/thraed.cpp:804
void Thread::interrupt(Thread* thread) {    //调用interrupt方法
  trace("interrupt", thread);
  debug_only(check_for_dangling_thread_pointer(thread);)
  os::interrupt(thread);      //调用os::interrupt方法
}

// hotspot/src/hotspot/os/linux/vm/os_linux.cpp:4192
void os::interrupt(Thread* thread) {     //调用os::interrupt方法
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");

  OSThread* osthread = thread->osthread();

  if (!osthread->interrupted()) {//如果当前线程没被中断
    osthread->set_interrupted(true);//设置中断标记
    // More than one thread can get here with the same value of osthread,
    // resulting in multiple notifications.  We do, however, want the store
    // to interrupted() to be visible to other threads before we execute unpark().
    OrderAccess::fence();
    ParkEvent * const slp = thread->_SleepEvent ;//获取sleep方法对应的ParkEvent,slp不为空则说明当前线程阻塞在sleep方法上
    // Thread.sleep方法继续运行
    if (slp != NULL) slp->unpark() ;//如果当前线程阻塞在sleep方法上,则调用unpark唤醒线程
  }

  // For JSR166. Unpark event if interrupt status already was set
  if (thread->is_Java_thread())
    ((JavaThread*)thread)->parker()->unpark();//如果当前线程是阻塞在互斥锁或者共享锁上,则唤醒线程

  ParkEvent * ev = thread->_ParkEvent ;//获取synchronized方法对应的ParkEvent
  if (ev != NULL) ev->unpark() ;//如果当前线程是阻塞在synchronized上,则唤醒线程
}

// hotspot/src/share/vm/runtime/osThread.hpp:
volatile jint _interrupted; // 中断标记是存储在osThread中,而不是java的Thread中    

volatile bool interrupted() const{ 
    return _interrupted != 0;//查看中断标记
}

void set_interrupted(bool z){ 
    _interrupted = z ? 1 : 0; //设置中断标记
}

由源码可知,Thread.interrupt0()方法实际上并不是像jdk注释中说的那样,只设置中断标记位,而是在设置完中断标记后通过调用类型为ParkEvent的成员变量的unpark()方法,将被阻塞的线程唤醒。由上述源码可知,当前线程可以阻塞在sleep方法、synchronized修饰的方法、获取共享锁/互斥锁的等方法上,不同的类型对应了不同的ParkEvent类型的成员变量。以Thread.sleep()方法为例,该方法在hotspot的实现中最终是调用ParkEvent的park()方法将自己阻塞住,当该线程因为中断被唤醒之后,会立即检查当前线程的中断标记,如果中断标记为1,则会抛出InterruptedException:

JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))    //sleep方法名映射成的jni方法名
  JVMWrapper("JVM_Sleep");
 
  if (millis < 0) {
    //参数异常
    THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
  }
 
  if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
    //如果已经被中断则抛出异常
    THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
  }
 
  //增加线程sleep的次数,并启动计入sleep耗时的计时器
  JavaThreadSleepState jtss(thread);
 
  EventThreadSleep event;
 
  if (millis == 0) {//相当于调用sleep(0),因而本质上当前线程不会被中断
    //x86下ConvertSleepToYield为true
    if (ConvertSleepToYield) {
      os::yield();
    } else {
      ThreadState old_state = thread->osthread()->get_state();
      thread->osthread()->set_state(SLEEPING);
      os::sleep(thread, MinSleepInterval, false);
      thread->osthread()->set_state(old_state);
    }
  } else {//类似于调用sleep(1000),参数值大于0,会被阻塞直到超时或者发生中断
    ThreadState old_state = thread->osthread()->get_state();
    //将osthread的状态置为SLEEPING
    thread->osthread()->set_state(SLEEPING);
    if (os::sleep(thread, millis, true) == OS_INTRPT) {//调用阻塞方法os::sleep方法被中断了!!!
      if (!HAS_PENDING_EXCEPTION) {//如果没有待处理异常
        if (event.should_commit()) {
          //发布事件
          event.set_time(millis);
          event.commit();
        }
        //抛出异常
        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");//抛出InterruptedException异常
      }
    }
    //恢复原来的状态
    thread->osthread()->set_state(old_state);
  }
  if (event.should_commit()) {
    event.set_time(millis);
    event.commit();
  }
JVM_END
 
int os::sleep(Thread* thread, jlong millis, bool interruptible) {   //阻塞方法
  assert(thread == Thread::current(),  "thread consistency check");
 
  ParkEvent * const slp = thread->_SleepEvent ;//类型为ParkEvent的SleepEvent
  slp->reset() ;
  OrderAccess::fence() ;
 
  if (interruptible) {//默认是可中断的
    jlong prevtime = javaTimeNanos();
 
    for (;;) {
      if (os::is_interrupted(thread, true)) {
        //如果线程已被中断
        return OS_INTRPT;
      }
 
      jlong newtime = javaTimeNanos();
 
      if (newtime - prevtime < 0) {
        //linux不支持monotonic_clock
        assert(!Linux::supports_monotonic_clock(), "time moving backwards");
      } else {
        millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
      }
 
      if(millis <= 0) {
        //休眠的时间已过
        return OS_OK;
      }
 
      prevtime = newtime;
 
      {
        assert(thread->is_Java_thread(), "sanity check");
        JavaThread *jt = (JavaThread *) thread;
        //修改线程状态为_thread_blocked,等代码块退出将其恢复成_thread_in_vm,转换过程中会检查安全点
        ThreadBlockInVM tbivm(jt);
        //将OSThread的状态修改为CONDVAR_WAIT,代码块退出时恢复原来的状态
        OSThreadWaitState osts(jt->osthread(), false /* not Object.wait() */);
 
        jt->set_suspend_equivalent();
        //让当前线程休眠,如果线程被唤醒了则继续for循环
        slp->park(millis);        //调用ParkEvent类的park方法阻塞当前线程,park方法的阻塞原理见后文
 
        jt->check_and_wait_while_suspended();
      }
    }
  } else {
    OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
    jlong prevtime = javaTimeNanos();
    
    //逻辑同上只是不需要检查目标线程是否中断,使用与sleep时间特别短的情形,如sleep(0)
    for (;;) {
      
      jlong newtime = javaTimeNanos();
 
      if (newtime - prevtime < 0) {
        assert(!Linux::supports_monotonic_clock(), "time moving backwards");
      } else {
        millis -= (newtime - prevtime) / NANOSECS_PER_MILLISEC;
      }
 
      if(millis <= 0) break ;
 
      prevtime = newtime;
      slp->park(millis);
    }
    return OS_OK ;
  }
}

//返回当前线程是否已经被中断
bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
  trace("is_interrupted", thread);
  debug_only(check_for_dangling_thread_pointer(thread);)
  return os::is_interrupted(thread, clear_interrupted);
}
 
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");
 
  OSThread* osthread = thread->osthread();
  bool interrupted = osthread->interrupted();
  if (interrupted && clear_interrupted) {
    //如果需要清除interrupted标识
    osthread->set_interrupted(false);
  }
 
  return interrupted;
}

Thread、JavaThread、OSThread、ParkEvent和Parker之间的关系

上述源码跟踪过程中,涉及到几个类:Thread、JavaThread、OSThread、ParkEvent和Parker等对象。这里介绍下他们之间的关系:

|                  |              |           |
|      java        |     vm       |     os    |
|                  |              |           |
 java.lang.Thread <-> JavaThread ->  OSThread
                          |--ParkEvent * _ParkEvent ;   // 用于 synchronized()方法的阻塞和唤醒
                          |--ParkEvent * _SleepEvent ; // 用于 Thread.sleep方法的阻塞和唤醒
                          |--ParkEvent * _MutexEvent ; // for native internal Mutex/Monitor,互斥锁
                          |--ParkEvent * _MuxEvent ;   // for low-level muxAcquire-muxRelease,共享锁
                          |--Parker*    _parker ;      // for Unsafe.park(),Unsafe.unpark(Thread thread)
  • java.lang.Thread 实例表示一个Java层的线程,虚拟机内部则用JavaThread类的对象来表示一个java.lang.Thread实例。它包含了一些附加信息来追踪线程的状态。JavaThread持有一个与之相关联的java.lang.Thread对象(oop表示)的引用,java.lang.Thread对象也保存了对应的JavaThread(原生int类型表示)的引用。JavaThread同时也持有了相关联的OSThread实例的引用。
  • OSThread实例表示了一个操作系统内核线程,它包含了一些操作系统级别的附加信息,用于追踪线程状态,“_interrupted”变量就存储在该实例中。
  • ParkEvent和Parker则是JavaThread的成员变量,每个线程都拥有这些成员变量的实例,不同线程之间不共享。JavaThread使用这两种类对应的成员变量来实现Java线程的阻塞和唤醒。

当我们执行Thread thread = new Thread()方法实际上只创建了Thread实例;当我们执行thread.start()方法时,虚拟机内部会先创建JavaThread实例,然后Thread实例和JavaThread实例相互引用,最后会调用pthread_create方法创建OSThread,并让JavaThread持有该内核线程的引用。当我们执行thread的各种方法时,实际上会通过JavaThread最终影响到OSThread。

ParkEvent和Parker阻塞原理源码跟踪

ParkEvent和Parker底层实现阻塞的机制非常相似,本质上都是利用了操作系统提供的原子操作Atomic::xchg(cas命令就是基于此)、互斥锁mutex和条件变量condition等机制来实现的,阻塞和唤醒原理可以表示为下图:

具体源码的跟踪和分析可以参见参考博客3参考博客4

不支持中断的BIO和支持中断的NIO

早期基于BIO模型的Socket和ServerSocket的阻塞方法accept、readLine等方法不支持中断,因而当线程因等待服务端返回而阻塞时,即使该线程被标记为中断,也不会对当前线程产生任何影响。下面给出了一个客户端通过Socket请求服务端并输出服务端响应的示例:

public class client {
    //客户端 
    public static void main(String[] args) {
        try {
            //1.创建客户端Socket,指定服务器地址和端口号 
            Socket socket=new Socket("127.0.0.1", 8888);
            //2.获取输出流,用来向服务器发送信息 
            OutputStream os=socket.getOutputStream();
            //字节输出流 
            //转换为打印流 
            PrintWriter pw=new PrintWriter(os);
            pw.write("用户名:admin;密码:admin");
            pw.flush();
            //刷新缓存,向服务器端输出信息 
            //关闭输出流 
            socket.shutdownOutput();
            //3.获取输入流,用来读取服务器端的响应信息 
            InputStream is=socket.getInputStream();
            BufferedReader br=new BufferedReader(new InputStreamReader(is));
            String info=null;
            while((info=br.readLine())!=null){//br.readLine方法为阻塞方法,且不支持中断,所以调用interrupt()方法并不会使得readLine方法从阻塞状态醒来
                System.out.println("我是客户端,服务器端返回的信息是:"+info);
            }
            //4.关闭资源 
            br.close();
            is.close();
            pw.close();
            os.close();
            socket.close();
        }
        catch (IOException ex) {
            Logger.getLogger(client.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

所以,在微服务架构中,使用Hystrix等熔断降级组件时,需要确保阻塞方法要能响应中断。因为Hystrix的超时中断机制使用的就是线程中断。PS:基于NIO的netty框架的阻塞方法支持中断。

参考博客:

1、https://www.jianshu.com/p/f75b77bdf389 Java 并发之线程中断

2、https://www.dazhuanlan.com/2020/01/02/5e0d9b848a3b7/  hotspot Thread JavaThread OSThread

3、https://blog.csdn.net/qq_31865983/article/details/105184585 Hotspot Parker和ParkEvent 源码解析

4、https://blog.csdn.net/Saintyyu/article/details/107426428 Unsafe类park和unpark方法源码深入分析(mutex+cond)

5、https://blog.csdn.net/a7980718/article/details/83661613 jdk1.8 Unsafe类 park和unpark方法解析

6、https://www.cnblogs.com/linzhanfly/p/11258496.html Thread.interrupt()源码跟踪

7、https://blog.csdn.net/qq_31865983/article/details/105174567 Hotspot Thread本地方法实现 源码解析

8、https://www.jb51.net/article/131547.htm Java Socket编程服务器响应客户端实例代码

9、https://blog.csdn.net/iter_zc/article/details/41843595 聊聊JVM(五)从JVM角度理解线程

10、https://www.cnblogs.com/xy-nb/p/6769586.html   参照Openjdk源码分析Object.getClass()方法

11、https://blog.csdn.net/summer_fish/article/details/108263390 java线程和os线程