Java中的多线程

进程和线程

操作系统有两个容易混淆的概念,进程和线程

进程:一个计算机程序的运行实例,包含了需要执行的指令;有自己的独立地址空间,包含程序内容和数据;不同进程的地址空间是互相隔离的;进程拥有各种资源和状态信息,包括打开的文件、子进程和信号处理。
线程:表示程序的执行流程,是CPU调度执行的基本单位;线程有自己的程序计数器、寄存器、堆栈和帧。同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源。

进程是操作系统资源分配的最小单位(CPU时间, 内存空间),线程就是共享虚拟内存映射的进程,多个线程属于一个进程。

Java标准库提供了进程和线程相关的API,进程主要包括表示进程的java.lang.Process类和创建进程的java.lang.ProcessBuilder类

表示线程的是java.lang.Thread类,在虚拟机启动之后,通常只有Java类的main方法这个普通线程运行,运行时可以创建和启动新的线程;还有一类守护线程(damon thread),守护线程在后台运行,提供程序运行时所需的服务。当虚拟机中运行的所有线程都是守护线程时,虚拟机终止运行。

线程基础及源码剖析

线程基础回顾

创建线程的方式有两种:第一种是继承Thread类或者匿名内部类new Thread(){ },第二种是实现Runnable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

/*
* 线程的第一种创建方式
*/
Thread thread1 = new Thread(){
@Override
public void run() {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
while(true){
System.out.println(Thread.currentThread().getName());
}
}
};

thread1.start();

/*
*线程的第二种创建方式
*/
Thread thread2 = new Thread(new Runnable() {

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
System.out.println(Thread.currentThread().getName());
}
}
});

thread2.start();

上面代码中是我们都很熟悉的线程的两种创建方式,我相信你对上面的创建方式并不陌生,因为这是Java的基础部分知识。

Thread类的构造方法(点击放大)

打开Thread类的源码可以看到Thread类有8个构造函数,我们先看看上面的两种构造函数的源码。

1
2
3
public Thread() {  
init(null, null, "Thread-" + nextThreadNum(), 0);
}

在构造的时候直接调用了init方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private void init(ThreadGroup g, Runnable target, String name,  
long stackSize) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}

Thread parent = currentThread();
SecurityManager security = System.getSecurityManager();
if (g == null) {
/* Determine if it's an applet or not */

/* If there is a security manager, ask the security manager
what to do. */
if (security != null) {
g = security.getThreadGroup();
}

/* If the security doesn't have a strong opinion of the matter
use the parent thread group. */
if (g == null) {
g = parent.getThreadGroup();
}
}

/* checkAccess regardless of whether or not threadgroup is
explicitly passed in. */
g.checkAccess();

/*
* Do we have the required permissions?
*/
if (security != null) {
if (isCCLOverridden(getClass())) {
security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
}
}

g.addUnstarted();

this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
this.name = name.toCharArray();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext = AccessController.getContext();
this.target = target;
setPriority(priority);
if (parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;

/* Set thread ID */
tid = nextThreadID();
}

里面的东西比较多,但是我们可以看到会初始化一个变量Runnable target;
下面我们再来看看run方法中是个什么东东?

1
2
3
4
5
6
@Override  
public void run() {
if (target != null) {
target.run();
}
}

原来run方法中会先判断是否初始化了Runnable target变量,如果没有则空实现,如果target不为空则先执行Runnable接口中的run方法。有的朋友可能会猜想下面的代码会先调用Runnable接口中的run方法,然后才调用Thread实现类中的run方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* 
* 线程的调用优先级
*/
new Thread(new Runnable() {

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
while(true){
System.out.println("Runnable");
}
}
}){
public void run() {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
while(true){
System.out.println("Thread");
}
};
}.start();

其实事实并非如此,因为上面代码中是一个匿名内部类,实际上是一种从Thread的继承和实现,所以下面的run方法覆盖了Thread中的run方法,所以Runnable中的run方法根本不会执行。

下面再看看Runnable接口的源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public  
interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}

发现Runnable接口只有一个抽象的run方法。

为什么要搞一个Runnable接口来实现多线程呢?从Thread继承不是更方便吗?Runnable接口有如下优势,所以我们常常会选择实现Runnable接口:

特点一、Runnable接口适合多个程序代码的线程去处理同一个资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ThreadTest1 extends Thread {  
private int count = 5;

public void run() {
for (int i = 0; i < 7; i++) {
if (count > 0) {
System.out.println("count= " + count--);
}
}
}

public static void main(String[] args) {
//这样实际上是创建了三个互不影响的线程实例
ThreadTest1 t1 = new ThreadTest1();
ThreadTest1 t2 = new ThreadTest1();
ThreadTest1 t3 = new ThreadTest1();
t1.start();
t2.start();
t3.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadTest1{  

public static void main(String [] args) {
MyThread my = new MyThread();
//开启了三个线程,但是操作的是同一个run方法
new Thread(my, "1号窗口").start();
new Thread(my, "2号窗口").start();
new Thread(my, "3号窗口").start();
}
}

class MyThread implements Runnable{

private int ticket = 5; //5张票

public void run() {
for (int i=0; i<=20; i++) {
if (this.ticket > 0) {
System.out.println(Thread.currentThread().getName()+ "正在卖票"+this.ticket--);
}
}
}
}

特点二、避免Java特性中的单根继承的限制(这个是显而易见的,因为Java只支持单继承)。
特点三、可以保持代码和数据的分离(创建线程数和数据无关)。
特点四、更能体现Java面向对象的设计特点。

定时器

先来看一段有关定时器的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {  
new Timer().schedule(new TimerTask() {

@Override
public void run() {
System.out.println("阳光小强");
}
}, 5000);

int i = 0;
while(true){
System.out.println(i++);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

线程执行结果

将上面代码修改如下:

1
2
3
4
5
6
7
new Timer().schedule(new TimerTask() {  

@Override
public void run() {
System.out.println("阳光小强");
}
}, 2000, 3000);

此时定时器Timer就会在2000ms后开始执行run方法,每隔3000ms重复执行。

如果有一种需求,隔1s、2s、1s、2s…分别执行定时器中的代码(偶数秒和奇数秒分别执行不同的定时器),我们如何实现(s代表秒)

实现方式一、定时器嵌套。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) {  
new Timer().schedule(new TimerTask() {

@Override
public void run() {
System.out.println("阳光小强2秒");
new Timer().schedule(new TimerTask() {

@Override
public void run() {
System.out.println("阳光小强4秒");
}
}, 1000);
}
}, 1000, 2000);

int i = 0;
while(true){
System.out.println(i++);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

实现方式二、通过变量自身迭代:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int count  = 0;  
public static void main(String[] args) {

class MyTimerTask extends TimerTask{
@Override
public void run() {
count = (count + 1) % 2;
System.out.println("阳光小强" + (2 + 2 * count) + "秒");
new Timer().schedule(new MyTimerTask(), 2000 + 2000 * count);
}

}
new Timer().schedule(new MyTimerTask(), 2000);

int i = 0;
while(true){
System.out.println(i++);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

还有一种实际情况就是定时发邮件,我们可以用下面的方式

Timer类的shedule方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.Calendar;  
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

public class TestTimer {
// 时间间隔
private static final long PERIOD_DAY = 24 * 60 * 60 * 1000;

public static void main(String[] args) {
Calendar calendar = Calendar.getInstance();
/*** 定制每日2:00执行方法 ***/
calendar.set(Calendar.HOUR_OF_DAY, 2);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);

Date date = calendar.getTime(); // 第一次执行定时任务的时间

// 如果第一次执行定时任务的时间 小于 当前的时间
// 此时要在 第一次执行定时任务的时间 加一天,以便此任务在下个时间点执行。如果不加一天,任务会立即执行。
if (date.before(new Date())) {
date = addDay(date, 1);
}

new Timer().schedule(new TimerTask() {

@Override
public void run() {
//TODO 去发邮件,或者其他定时任务
}
}, date, PERIOD_DAY);
}

// 增加或减少天数
public static Date addDay(Date date, int num) {
Calendar startDT = Calendar.getInstance();
startDT.setTime(date);
startDT.add(Calendar.DAY_OF_MONTH, num);
return startDT.getTime();
}

}

线程同步和互斥以及死锁

为什么会有线程同步的概念呢?为什么要同步?什么是线程同步?先看一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.maso.test;  

public class ThreadTest2 implements Runnable{
private TestObj testObj = new TestObj();

public static void main(String[] args) {
ThreadTest2 tt = new ThreadTest2();
Thread t1 = new Thread(tt, "thread_1");
Thread t2 = new Thread(tt, "thread_2");
t1.start();
t2.start();
}

@Override
public void run() {

for(int j = 0; j < 10; j++){
int i = fix(1);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + " : i = " + i);
}

}

public int fix(int y){
return testObj.fix(y);
}

public class TestObj{
int x = 10;

public int fix(int y){
return x = x - y;
}
}
}

输出结果后,就会发现变量x被两个线程同时操作,这样就很容易导致误操作。如何才能解决这个问题呢?用线程的同步技术,加上synchronized关键字

1
2
3
public synchronized int fix(int y){  
return testObj.fix(y);
}

加上同步后,就可以看到有序的从9输出到-10.
如果加到TestObj类的fix方法上能不能实现同步呢?

1
2
3
4
5
6
7
public class TestObj{  
int x = 10;

public synchronized int fix(int y){
return x = x - y;
}
}

可以判断出两个线程使用的TestObj类的同一个实例testOjb,所以可以实现同步,但是输出的结果却不是理想的结果。这是因为当A线程执行完x = x - y后还没有输出则B线程已经进入开始执行x = x - y.

所以像下面这样输出就不会有什么问题了:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestObj{  
public TestObj(){
System.out.println("调用了构造函数");
}

int x = 10;

public synchronized int fix(int y){
x = x - y;
System.out.println(Thread.currentThread().getName() + " : x = " + x);
return x;
}
}

如果将外部的fix方法修改如下:

1
2
3
4
5
6
7
8
public int fix(int y){  
ax++ ;
if(ax%2 == 0){
return testObj.fix(y, testObj.str1);
}else{
return testObj.fix(y, testObj.str2);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestObj{  
String str1 = "a1";
String str2 = "a2";

public TestObj(){
System.out.println("调用了构造函数");
}

int x = 10;

public int fix(int y, String str){
synchronized (str) {
x = x - y;
System.out.println(Thread.currentThread().getName() + " : x = " + x);
}
return x;
}
}

此时synchronized中的str对象不是同一个对象,所以两个线程所持有的对象锁不是同一个,这样就不能实现同步。要实现线程之间的互斥就要使用同一个对象锁。

什么是死锁呢?举个例子就是比如你和同学租了个两室的房子,你拿着你房子的钥匙,你同学拿着他房子的钥匙,现在你在房子等你同学将他的钥匙给你然后你进他房子,你同学在他的房子等你将钥匙给他然后他进你的房子,这样就死锁了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.maso.test;  

public class ThreadDieSock implements Runnable {
private int flag = 1;
private Object obj1 = new Object(), obj2 = new Object();

public void run() {
System.out.println("flag=" + flag);
if (flag == 1) {
synchronized (obj1) {
System.out.println("我已经锁定obj1,休息0.5秒后锁定obj2去!");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (obj2) {
System.out.println("1");
}
}
}
if (flag == 0) {
synchronized (obj2) {
System.out.println("我已经锁定obj2,休息0.5秒后锁定obj1去!");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (obj1) {
System.out.println("0");
}
}
}
}

public static void main(String[] args) {
ThreadDieSock run01 = new ThreadDieSock();
ThreadDieSock run02 = new ThreadDieSock();
run01.flag = 1;
run02.flag = 0;
Thread thread01 = new Thread(run01);
Thread thread02 = new Thread(run02);
System.out.println("线程开始喽!");
thread01.start();
thread02.start();
}
}

从这道面试题说起

有一道这样的面试题:开启一个子线程和主线程同时运行,子线程输出10次后接着主线程输出100次,如此反复50次。先看下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.maso.test;  

/**
*
* @author Administrator
* 两个线程,其中是一个主线程,第一个线程先运行输出10次,主线程接着运行输出100次,如此反复50次
*/
public class ThreadTest3 implements Runnable{
private static Test test;
@Override
public void run() {
for(int i=0; i<50; i++){
test.f1(i);
}
}

public static void main(String[] args) {
test = new Test();
new Thread(new ThreadTest3()).start();
for(int i=0; i<50; i++){
test.f2(i);
}
}

/**
* 将控制和逻辑及数据分类(该类就是数据)
* @author Administrator
*
*/
static class Test{
private boolean isf1 = true;
/**
* 输出10次
*/
public synchronized void f1(int j){
if(!isf1){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for(int i=1; i<=10; i++){
System.out.println(Thread.currentThread().getName() + "第" + j + "次轮巡,输出" + i);
}
isf1 = false;
notify();
}

/**
* 输出100次
*/
public synchronized void f2(int j){
if(isf1){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for(int i=1; i<=100; i++){
System.out.println(Thread.currentThread().getName() + "第" + j + "次轮巡,输出" + i);
}
isf1 = true;
notify();
}
}
}

上面判断用的是if语句,这样做看似没有什么问题,实际上这样做是不安全的,因为线程在等待的过程中有可能被假唤醒,所以我们需要使用while语句。另外在使用wait和notify的时候需要注意一下几点:

  1. 调用object的wait方法和notity方法时,必须先获得object的对象锁(必须写在synchronized中)。
  2. 如果调用了object的wait方法,则该线程就放掉了对象锁。
  3. 如果A1、A2、A3都在object.wait(),则B调用object.notify()只能唤醒A1、A2、A3中的一个(具体哪一个由JVM决定)
  4. object.notifyAll()能够唤醒全部。
  5. B在唤醒A的时候,B如果还持有对象锁,则要等到B释放锁后,A才有机会执行。

Sleep和Wait有什么区别?

sleep()并不释放对象锁,wait()释放对象锁。但是wait()和sleep()都可以通过interrupt()方法打断线程的暂停状态,从而使线程立刻抛出InterruptedException。如果线程A希望立即结束线程B,则可以对线程B对应的Thread实例调用interrupt方法。如果此刻线程B正在wait/sleep/join,则线程B会立刻抛出InterruptedException,在catch() {} 中直接return即可安全地结束线程。需要注意的是,InterruptedException是线程自己从内部抛出的,并不是interrupt()方法抛出的。对某一线程调用interrupt()时,如果该线程正在执行普通的代码,那么该线程根本就不会抛出InterruptedException。但是,一旦该线程进入到wait()/sleep()/join()后,就会立刻抛出InterruptedException。

下面我们来看看线程的生命周期:

线程生命周期

实现线程调度的方法如下:

  • sleep():该线程是让线程休眠一定的时间,需要捕获InterruptedException
  • yield():暂停当前线程,让同等级优先权的线程运行,如果没有同等级优先权线程则不会起作用。起作用后会让出CPU运行时间,进入就绪状态。
  • join():让一个线程等待调用join方法的线程执行完毕后再继续执行。

线程生命周期

看一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ThreadTest4 implements Runnable{  
private static int a = 0;
@Override
public void run() {
for(int i=0; i<10; i++){
a++;
}
}

public static void main(String[] args) {
new Thread(new ThreadTest4()).start();
System.out.println(a);
}
}

这段代码会输出10吗?答案是不会的,因为在启动子线程后,就立马输出了a的值,此时子线程对a还没有操作。修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadTest4 implements Runnable{  
private static int a = 0;
@Override
public void run() {
for(int i=0; i<10; i++){
a++;
}
}

public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new ThreadTest4());
t.start();
t.join();
System.out.println(a);
}
}

这回输出了10,join()方法的作用由此可见,它会让其他线程等待该线程执行完毕后再执行。

线程之间共享数据

如果是每个线程都执行相同的代码,则可以使用同一个Runnable来实现共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MultiThreadShareData {  
public static void main(String[] args) {
new Thread(new ShareData()).start();
new Thread(new ShareData()).start();
}

static class ShareData implements Runnable{
private int j = 100;
@Override
public synchronized void run() {
j--;
}
}
}

上面代码中两个线程共享数据实现对j变量的递减操作,至于上面代码中为什么要使用一个静态内部类,该类static的作用是不依赖外部类的实例创建对象。

如果每个线程执行的代码不同,则需要使用不同的Runnable对象来实现共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class MultiThreadShareData {  
public static void main(String[] args) {
final ShareData data1 = new ShareData();
//启动第一个线程
new Thread(new Runnable() {

@Override
public void run() {
data1.increment(); //加
}
}).start();
//启动第二个线程
new Thread(new Runnable() {

@Override
public void run() {
data1.decrement(); //减
}
}).start();
}

static class ShareData{
private int j = 0;
public synchronized void increment(){
j++;
}

public synchronized void decrement(){
j--;
}
}
}

将上面代码进修改(将数据作为外部类的成员变量,让Runnable接口操作该成员变量)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class MultiThreadShareData {  
public static void main(String[] args) {
ShareData data1 = new ShareData();
new Thread(new MyRunnable1(data1)).start();
new Thread(new MyRunnable2(data1)).start();
}

static class MyRunnable1 implements Runnable{
private ShareData data1;
public MyRunnable1(ShareData data1){
this.data1 = data1;
}
@Override
public void run() {
data1.increment();
}

}

static class MyRunnable2 implements Runnable{

private ShareData data1;

public MyRunnable2(ShareData data2){
this.data1 = data1;
}

@Override
public void run() {
data1.decrement();
}

}

static class ShareData{
private int j = 0;
public synchronized void increment(){
j++;
}

public synchronized void decrement(){
j--;
}
}
}

一道面试题(设计4个线程,其中两个线程每次对j增加1,另外两个线程对j每次减少1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**  

* 设计 4 个 ( 可以是 n 个 ) 线程,其中两个线程每次对 j 增加 1 ,另外两个线程对 j 每次减少 1

*/

package com.jiaocaigen.test;


public class Test {

// 采用 Runnable 接口方式创建的多条线程可以共享实例属性
private int i ;

// 同步增加方法
private synchronized void inc(){
i ++;
System. out .println(Thread.currentThread().getName()+ "--inc--" + i );
}

// 同步减算方法
private synchronized void dec(){
i --;
System. out .println(Thread.currentThread().getName()+ "--dec--" + i );
}

// 增加线程
class Inc implements Runnable {
public void run() {
inc();
}
}

// 减算线程
class Dec implements Runnable{

public void run() {
dec();
}
}


public static void main(String[] args) {

Test t = new Test();

// 内部类的实例化
Inc inc = t. new Inc();
Dec dec = t. new Dec();

// 创建 2*n 个线程 此处 n=2
for ( int i = 0; i < 2; i++) {
new Thread(inc).start();
new Thread(dec).start();
}
}
}

多线程之间数据隔离

什么是数据隔离呢?比如说我们现在开启了两个线程,这两个线程都要同时给同一个全局变量data赋值,各个线程操作它赋值后的变量数据,这里就需要用到隔离。先看一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.Random;  


public class ThreadLocalTest {
private static int data = 0;
public static void main(String[] args) {
for(int i=0; i<2; i++){
new Thread(new Runnable() {

@Override
public void run() {
data = new Random().nextInt();
System.out.println(Thread.currentThread().getName() +
" has put data: " + data);
new A().get();
new B().get();
}
}).start();
}
}

static class A{
public int get(){
System.out.println("A from " + Thread.currentThread().getName() +
" has get data: " + data);
return data;
}
}

static class B{
public int get(){
System.out.println("B from " + Thread.currentThread().getName() +
" has get data: " + data);
return data;
}
}
}

线程执行结果

从上面我们可以看到Thread-0和Thread-1都在操作变量data,但是两个线程之间没有做到对数据操作的隔离,所以输出结果中两个线程共用了一个data变量。

我们将上面代码修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.HashMap;  
import java.util.Map;
import java.util.Random;


public class ThreadLocalTest {
//private static int data = 0;
private static Map<Thread, Integer> map = new HashMap<Thread, Integer>();
public static void main(String[] args) {
for(int i=0; i<2; i++){
new Thread(new Runnable() {

@Override
public void run() {
//data = new Random().nextInt();
int data = new Random().nextInt();
map.put(Thread.currentThread(), data);
System.out.println(Thread.currentThread().getName() +
" has put data: " + data);
new A().get();
new B().get();
}
}).start();
}
}

static class A{
public int get(){
System.out.println("A from " + Thread.currentThread().getName() +
" has get data: " + map.get(Thread.currentThread()));
return map.get(Thread.currentThread());
}
}

static class B{
public int get(){
System.out.println("B from " + Thread.currentThread().getName() +
" has get data: " + map.get(Thread.currentThread()));
return map.get(Thread.currentThread());
}
}
}

输出结果:

线程执行结果

上面代码中我们用一个Map集合隔离了线程对data数据的操作,其实相当于创建了一个data数据的备份(双份的data)实现了线程之间数据的隔离,其实早在Java 1.2就引入了一个用来支持线程数据隔离的类(java.lang.ThreadLocal),下面我们来看看如何使用ThreadLocal实现线程之间的数据隔离。

ThreadLocal中的三个方法:

  • get() :返回当前线程的线程局部变量副本
  • protected initialValue() :返回该线程局部变量的当前线程的初始值
  • void set(Object value) :设置当前线程的线程局部变量副本的值

其中initialValue方法是为子类写的方法,在一个线程第一次调用get()或者set()方法时执行,并且仅执行一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.Random;  


public class ThreadLocalTest {
//private static int data = 0;
//private static Map<Thread, Integer> map = new HashMap<Thread, Integer>();
private static ThreadLocal<Integer> tl = new ThreadLocal<Integer>();
public static void main(String[] args) {
for(int i=0; i<2; i++){
new Thread(new Runnable() {

@Override
public void run() {
//data = new Random().nextInt();
int data = new Random().nextInt();
//map.put(Thread.currentThread(), data);
tl.set(data);
System.out.println(Thread.currentThread().getName() +
" has put data: " + data);
new A().get();
new B().get();
}
}).start();
}
}

static class A{
public int get(){
System.out.println("A from " + Thread.currentThread().getName() +
" has get data: " + tl.get());
return tl.get();
}
}

static class B{
public int get(){
System.out.println("B from " + Thread.currentThread().getName() +
" has get data: " + tl.get());
return tl.get();
}
}
}

运行结果:

线程执行结果

上面代码明显少了很多,其实ThreadLocal中底层也是用Map来存储变量副本实现的。

Java5中的线程并发库

接下来我们将看看Java 5之后给我们添加的新的对线程操作的API,首先看看api文档:

线程并发库

java.util.concurrent包含许多线程安全、测试良好、高性能的并发构建块,我们先看看atomic包下的AtomicInteger.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.atomic.AtomicInteger;  

public class AtomicIntegerTest {
private static AtomicInteger data = new AtomicInteger(0);
public static void main(String[] args) {
new Thread(new Runnable() {

@Override
public void run() {
data.incrementAndGet(); //加
}
}).start();

new Thread(new Runnable() {

@Override
public void run() {
data.decrementAndGet(); //减
}
}).start();
}
}

使用AtomicInteger可以很方便的实现线程之间的数据共享,如果某个成员变量要被多个线程操作则可以使用AtomicInteger来处理,其他数据类型也有对应的Atomic.

下面我们再来看一下线程并发池的使用,在java.util.concurrent包下有关于线程并发池的相关工具类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class ThreadPoolTest {
public static void main(String[] args) {
//创建3个线程执行任务
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//ExecutorService threadPool = Executors.newCachedThreadPool(); //动态添加线程
//创建单个线程(线程死后会重新启动)
//ExecutorService threadPool = Executors.newSingleThreadExecutor();
//池子中添加10个任务
for(int i=1; i<=10; i++){
final int task = i;
threadPool.execute(new Runnable() {

@Override
public void run() {
//每个任务是输出1到10
for(int j=1; j<=10; j++){
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +
" loop of " + j + " for task " + task);
}
}
});
}
System.out.println("10 task has commited");
threadPool.shutdown(); //线程池执行完后结束线程
//threadPool.shutdownNow(); 立即结束线程

//线程池启动定时器
Executors.newScheduledThreadPool(3).schedule(new Runnable() {

@Override
public void run() {
System.out.println("bombing!");
}
}, 10, TimeUnit.SECONDS);

/*Executors.newScheduledThreadPool(3).scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
System.out.println("bombing!");
}
}, 10, 3, TimeUnit.SECONDS);*/
}
}

创建线程池的方式有三种:

  1. newFixedThreadPool(n) :创建n个线程
  2. newCachedThreadPool() :动态添加线程(依据任务池中的任务数量动态创建线程)
  3. newSingleThreadPool() :创建单一线程(线程死后会重新创建)

上面代码中创建了3个线程并分配了10个任务,3个线程会先执行任务池中的3个任务,当某个线程任务执行完后会从任务池中取没有被执行的任务继续执行,直到任务池中的所有任务执行完成后,线程会处于等待状态,最后使用shutdown()方法结束线程。

Java5中Futrue获取线程返回结果

我们先来看一下ExecutorService中的执行方法:

ExecutorService类

上面我们使用了execute方法启动线程池中的线程执行,这一篇我们来看看submit方法的使用:submit提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.Callable;  
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableAndFutrue {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
Future<String> future = threadPool.submit(new Callable<String>() {

@Override
public String call() throws Exception {
Thread.sleep(2000);
return "阳光小强";
}

});

System.out.println(future.get());
}
}

在上面代码中我们启动了一个线程,休眠了2秒后返回结果“阳光小强”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableAndFutrue {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
Future future = threadPool.submit(new Runnable(){

@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

System.out.println(future.get());
}
}

提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null。

再看看最后一个submit,可以添加一个result,当线程执行完成后会返回该result

1
2
3
4
5
6
7
8
9
10
11
Future future = threadPool.submit(new Runnable(){  

@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "阳光小强");

如果我们提交多个任务(也就是开启多个线程)后如何有序的返回结果呢?

CompletionService

将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。

通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。

CompletionService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.Random;  
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CallableAndFutrue {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(threadPool);
for(int i=0; i<10; i++){
final int seq = i;
ecs.submit(new Callable<Integer>() {

@Override
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(5000));
return seq;
}
});
}

for(int i=0; i<10; i++){
System.out.println(ecs.take().get());
}
}
}

CompletionService

线程锁Lock

在前面我们在解决线程同步问题的时候使用了synchronized关键字,今天我们来看看Java 5.0以后提供的线程锁Lock.

Lock类

Lock接口的实现类提供了比使用synchronized关键字更加灵活和广泛的锁定对象操作,而且是以面向对象的方式进行对象加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override  
public void run() {
while(true){
Lock lock = new ReentrantLock();
try {
lock.lock();
Thread.sleep(new Random().nextInt(3000));
String data = readData();
System.out.print("读取数据: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
}

读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,写锁与写锁互斥,这是由JVM控制的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import java.util.Random;  
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;


public class ReadWriteLockTest {
static ReadWriteLock rwl = new ReentrantReadWriteLock();

private static String data = null;
public static void main(String[] args) {
Runnable runnable1 = new MyRunnable1();
Runnable runnable2 = new MyRunnable2();
for(int i=0; i<3; i++){
new Thread(runnable1).start();
new Thread(runnable2).start();
}
}

static class MyRunnable1 implements Runnable{

@Override
public void run() {
writeData("" + new Random().nextInt(100));
}
}

static class MyRunnable2 implements Runnable{

@Override
public void run() {
readData();
}
}

private static void writeData(String var){
rwl.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 准备写");
Thread.sleep(new Random().nextInt(3000));
data = var;
System.out.println(Thread.currentThread().getName() + " 写完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
rwl.writeLock().unlock();
}

}

private static void readData(){
rwl.readLock().lock(); //用读锁锁住
try {
System.out.println(Thread.currentThread().getName() + " 准备读");
Thread.sleep(new Random().nextInt(3000));
System.out.println(Thread.currentThread().getName() + " 读完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
rwl.readLock().unlock();
}
}
}

线程执行结果

用过Hibernate框架的朋友可能知道,Hibernate查询数据库有缓存机制,如果某个数据在内存中存在则可以并发的去读取,如果缓存中没有数据则需要互斥的去从数据库取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.HashMap;  
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;


public class CacheDemo {
private Map<String, Object> cache = new HashMap<String, Object>();
public static void main(String[] args) {

}

private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
/**
* 实现多个并发读,互斥的写
* @param key
* @return
*/
public Object getData(String key){
rwl.readLock().lock();
Object value = null;
try{
value = cache.get(key);
if(value == null){
rwl.readLock().unlock(); //释放读锁
rwl.writeLock().lock(); //添加写锁
try{
if(value == null){ //放置其他线程加载数据
value = "去数据库查询"; //这里模拟从数据库查询
if(value == null){
//TODO 抛出异常
}
}
}finally{
rwl.writeLock().unlock();
}
rwl.readLock().lock(); //锁还给读线程
}
}finally{
rwl.readLock().unlock();
}
return value;
}
}

上面获取数据的大概过程如下:

第一步、获取读锁,读取数据
第二步、如果有数据则直接返回,并释放读锁让其他线程读。
第三步、如果内存中没有数据则从数据库写入内存,释放读锁并添加写锁(这样写入数据就可以达到可以互斥)
第四步、读入内存后释放写锁并还回读锁(和后面的unlock()对应)
第五步、如果在添加写锁的时候同时有多个线程,则只能有其中一个线程抢到锁,等拥有锁的线程释放写锁后,其他线程就会抢到写锁,但是此时数据已写入内存,则需要判断内存数据是否为null如果不为null则直接释放写锁。