面试高频——JUC并发工具包快速上手(超详细总结)
目录一、什么是JUC二、基本知识2.1、进程和线程2.2、Java默认有两个进程2.3、Java能够开启线程吗?2.4、并发和并行2.5、线程的状态2.6、wait和sleep的区别2.7、什么是可重入锁2.8、synchronized买票案例回顾三、Lock锁3.1、简介3.2、买票问题重现3.3、和Synchronized的区别四、生产者消费者问题(Lock版)4.1、Synchronized
目录
参考:
一、什么是JUC
JUC
就是java.util.concurrent
工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。
二、基本知识
2.1、进程和线程
1️⃣ 动态性
- 动态性是进程最基本的特征,表现为:由创建而产生,由调度而执行,得不到资源而暂停执行,由撤销而消亡;有一定的生命周期
- 程序只是一组有序的指令集合
2️⃣ 并发性
- 引入进程的目的就是和其他进程能并发执行
- 程序不能并发执行
3️⃣ 独立性
- 进程实体是一个能独立运行的基本单位,是系统中独立获得资源和独立调度的基本单位
- 程序不能作为一个独立的单位进行运行
2.2、Java默认有两个进程
Main
和GC
2.3、Java能够开启线程吗?
不行,Java是通过native本地方法调底层C++写的方法,Java无法直接操作硬件
2.4、并发和并行
- 并发:多个事件在同一时间间隔内发生(cpu一核,模拟出来多条线程快速交替运行)
- 并行:多个事件在同一时刻发生(cpu多核,多个线程可以同时执行)
查看cpu的核数
2.5、线程的状态
NEW | 尚未启动的线程处于此状态 |
---|---|
RUNNABLE | 在Java虚拟机中执行的线程处于此状态 |
BLOCKED | 被阻塞等待监视器锁定的线程处于此状态(IO操作,wait,juc锁定) |
WAITING | 正在等待另一个线程执行特定动作的线程处于此状态(sleep,join) |
TIMED_WAITING | 正在等待另一个线程执行动作达到指定等待时间的线程处于此状态(sleep,join) |
TERMINATED | 已退出的线程处于此状态。 |
2.6、wait和sleep的区别
- sleep不释放锁,wait释放锁
- 来自不同的类:sleep()函数在Thread类中,wait()函数属于Object类
- 使用范围不同:sleep可以在任何地方使用,wait只能使用在同步代码块中
2.7、什么是可重入锁
可重入,就是可以重复获取相同的锁而不会出现死锁;synchronized和ReentrantLock都是可重入的
// 演示可重入锁是什么意思,可重入,就是可以重复获取相同的锁
// synchronized和ReentrantLock都是可重入的
// 可重入降低了编程复杂性
public class WhatReentrant {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
synchronized (this) {
System.out.println("第1次获取锁,这个锁是:" + this);
int index = 1;
while (true) {
synchronized (this) {
System.out.println("第" + (++index) + "次获取锁,这个锁是:" + this);
}
if (index == 10) {
break;
}
}
}
}
}).start();
}
}
import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;
// 演示可重入锁是什么意思
public class WhatReentrant2 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock.lock();
System.out.println("第1次获取锁,这个锁是:" + lock);
int index = 1;
while (true) {
try {
lock.lock();
System.out.println("第" + (++index) + "次获取锁,这个锁是:" + lock);
try {
Thread.sleep(new Random().nextInt(200));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (index == 10) {
break;
}
} finally {
lock.unlock();
}
}
} finally {
lock.unlock();
}
}
}).start();
}
}
2.8、synchronized买票案例回顾
真正的开发中,线程只是一个资源类(包含属性),没有任何附属的操作
模拟卖票:
- 以前我们会将
Ticket
类继承Runnable接口 - 现在会将
Ticket
作为一个资源类,里面不添加关于线程的操作
package com.zsr;
public class saleTicket {
public static void main(String[] args) {
//一份资源
Ticket ticket = new Ticket();
//多个线程
new Thread(() -> {
for (int i = 0; i < 30; i++) {
ticket.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
ticket.sale();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 80; i++) {
ticket.sale();
}
}, "C").start();
}
}
//资源类
class Ticket {
//票数
private int number = 100;
public synchronized void sale() {
if (number > 0)
System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余" + number);
}
}
三、Lock锁
3.1、简介
官方文档地址:https://tool.oschina.net/apidocs/apidoc?api=jdk-zh
使用方法:创建锁、加锁、业务代码、解锁
3.2、买票问题重现
我们接下来用Lock锁
的方式来解决上述买票问题,Lock接口
最常用的实现类就是ReentrantLock
可重入锁
- 可以看到看到
ReentrantLock
有两个构造方法,可以指定使用 公平锁/非公平锁 - 公平锁:十分公平,先来后到
- 非公平锁:不公平,可以插队
修改Ticket
类
//资源类
class Ticket2 {
//票数
private int number = 100;
//Lock锁
Lock lock = new ReentrantLock();
public void sale() {
lock.lock();//加锁
try {
if (number > 0)
System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余" + number);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();//解锁
}
}
}
3.3、和Synchronized的区别
-
Synchronized
是内置的关键字,Lock
是一个Java类 -
Synchronized
无法判断锁的状态,Lock
可以判断是否获取到了锁 -
Synchronized
会自动释放锁,Lock
需要手动释放锁(如果不释放锁,会造成死锁) -
假如有两个线程:线程1、线程2;线程1获得了锁
Synchronized
:如果线程1阻塞了,线程2就会一直等待,造成死锁Lock
:如果线程1阻塞了,线程2不会一直等待,可以通过trylock()
方法尝试获取锁 -
两者都是可重入锁,但是
Synchronized
不可中断,为非公平锁;而Lock
锁可判断锁状态,并且可以设置为公平锁/非公平锁 -
Synchronized
适合锁少量代码同步代码,Lock
适合锁大量代码同步代码
四、生产者消费者问题(Lock版)
生产者消费者——线程之间的通信问题
- 通过
Synchronized
实现,我们常用object.wait()
+Object.notify()
- 通过
Lock
怎么实现呢,用condition.await()
+condition.signal()
4.1、Synchronized实现
两个线程A、B实现:
如果number不等于0,则number-1;如果number等于0,则number+1
package com.zsr;
public class communicate {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
class Data {
private int number = 0;
//如果number=0,则number+1
public synchronized void plus() throws InterruptedException {
if (number != 0)
this.wait();//等待
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();//唤醒其他线程
}
//如果number!=0,则number-1
public synchronized void minor() throws InterruptedException {
if (number == 0)
this.wait();
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();//唤醒其他线程
}
}
结果:
虚假唤醒问题
如果再加两个线程呢?
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
看结果,会出现2、3的情况;这就是因为if判断只判断一次,两个线程可能同时+1;造成了虚假唤醒的问题
修改代码:if
判断改成while
判断
class Data {
private int number = 0;
//如果number=0,则number+1
public synchronized void plus() throws InterruptedException {
while (number != 0)
this.wait();//等待
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();//唤醒其他线程
}
//如果number!=0,则number-1
public synchronized void minor() throws InterruptedException {
while (number == 0)
this.wait();
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
this.notifyAll();//唤醒其他线程
}
}
4.2、Lock实现
修改代码:
package com.zsr;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class communicate2 {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class Data2 {
private int number = 0;
//创建Lock锁
private Lock lock = new ReentrantLock();
//获得condition
Condition condition = lock.newCondition();
//如果number=0,则number+1
public void plus() throws InterruptedException {
lock.lock();//加锁
try {
while (number != 0)
condition.await();//等待
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();//唤醒其他线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();//解锁
}
}
//如果number!=0,则number-1
public void minor() throws InterruptedException {
lock.lock();
try {
while (number == 0)
condition.await();
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();//唤醒其他线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
根据结果,成功实现!但是发现一个问题,线程的执行都是随机的,怎么进行有序的实现呢?A=>B=>C=>D
Condition精准通知唤醒
可以设置多个condition监视器,每个监视器监视一个线程,精确等待唤醒某个线程
package com.zsr;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class communicate3 {
public static void main(String[] args) {
Data3 data = new Data3();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.plus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.minor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class Data3 {
private int number = 0;
//创建Lock锁
private Lock lock = new ReentrantLock();
//获得condition
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();
Condition conditionD = lock.newCondition();
//如果number=0,则number+1
public void plus() throws InterruptedException {
lock.lock();//加锁
try {
if (Thread.currentThread().getName()=="A") {
while (number != 0)
conditionA.await();//A等待
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
conditionB.signal();//唤醒B线程
}
if (Thread.currentThread().getName()=="C") {
while (number != 0)
conditionC.await();//C等待
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
conditionD.signal();//唤醒D线程
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();//解锁
}
}
//如果number!=0,则number-1
public void minor() throws InterruptedException {
lock.lock();
try {
if (Thread.currentThread().getName()=="B") {
while (number == 0)
conditionB.await();//B等待
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
conditionC.signal();//唤醒C线程
}
if (Thread.currentThread().getName()=="D") {
while (number == 0)
conditionD.await();//D等待
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
conditionA.signal();//唤醒A线程
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
结果:
五、8个案例彻底理解锁的对象
如何判断锁的是谁!永远的知道什么锁,锁到底锁的是谁(对象、 Class)
创建两个线程A、B,A线程执行发短信方法,B线程执行打电话方法,谁先执行?
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
//线程A发消息
new Thread(() -> {
phone.send_message();
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
//线程B打电话
new Thread(() -> {
phone.call();
}, "B").start();
}
}
class Phone {
public synchronized void send_message() {
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
结果:先发短信再打电话
那如果让发短信休眠2s呢?
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone.call();
}, "B").start();
}
}
class Phone {
public synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
结果:还是先发短信再打电话
这是为什么呢?并不是因为A先执行,而是有锁的存在
synchronized
锁的对象是方法的调用者,也就是phone
对象,因此打电话和发短信方法锁的是同一个对象,谁先拿到锁谁就先执行
如果新增一个普通方法hello,那先是hello还是发短信呢?
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone.hello();
}, "B").start();
}
}
class Phone {
public synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
public void hello() {
System.out.println("hello");
}
}
根据结果,先执行hello,这是因为hello
是一个普通方法,没有锁,不受锁的影响
如果有两个
phone
对象,是先发短信还是先打电话
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
phone1.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
class Phone {
public synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
两个对象,两个调用者,所以有两把锁,所以按时间顺序执行
修改两个方法为静态方法,只有一个对象,是先打电话还是发短信
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone.call();
}, "B").start();
}
}
class Phone {
public static synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}
根据结果,先发短信
因为static静态方法在类加载的时候就有了,因此这里synchronized
锁的是Class模板,也就是Phone.class
,全局唯一;也就是两个方法拿的仍是同一把锁
那如果两个方法为静态方法,有两个对象,是先打电话还是发短信?
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
phone1.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
class Phone {
public static synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}
根据结果,仍是先发短信
因为锁的是Phone.class
,也就是说两个方法仍是同一把锁
如果是一个普通的同步方法和一个静态的同步方法,只有一个对象
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone.call();
}, "B").start();
}
}
class Phone {
public static synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
结果:
一个锁的是Phone.class
模板,一个锁的是phone
对象,因此两个方法不是一把锁,因此按时间顺序运行
如果是一个普通的同步方法和一个静态的同步方法,有两个对象
package com.zsr.lock8;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
phone1.send_message();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
//休眠1s
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
phone2.call();
}, "B").start();
}
}
class Phone {
public static synchronized void send_message() throws InterruptedException {
//休眠2s
TimeUnit.SECONDS.sleep(2);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
同样两个方法不是一把锁,因此按时间顺序运行
六、安全集合类
6.1、CopyOnWriteArrayList
ArrayList不安全
并发情况下,ArrayList
不安全,我们通过一个简单的案例来测试:
package com.zsr.collection;
import java.util.ArrayList;
import java.util.UUID;
public class ListTest {
public static void main(String[] args) {
//并发情况下
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
根据结果,发现报错了ConcurrentModificationException
(并发修改异常)
那么怎么解决呢?
-
方案一:换成线程安全的
vector
Vector<String> lists = new Vector<>();
-
方案二:利用Collections工具类
List<String> list = Collections.synchronizedList(new ArrayList<>());
-
方案三:利用JUC包中的类
CopyOnWriteArrayList
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
引入CopyOnWriteArrayList
CopyOnWrite
简称COW
,是计算机程序设计领域的一种优化策略- 多个线程调用的时候,读取的时候固定,但写入时会复制,避免写入造成的数据覆盖问题
其效率比Vector
更高,因为Vector在方法上都用了Synchronized
关键字,会降低效率
而CopyOnWriteArratList
是用Lock
锁实现的,底层也是数组实现,不过添加的时候会先拷贝一份新的数组,最后再拷贝回去
6.2、CopyOnWriteArraySet
HashSet不安全
并发情况下,HashSet
不安全,我们通过一个简单的案例来测试:
package com.zsr.collection;
import java.util.HashSet;
import java.util.UUID;
public class SetTest {
public static void main(String[] args) {
//并发情况下
HashSet<String> set = new HashSet<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}
同样,ConcurrentModificationException
:并发修改异常
那么怎么解决呢?
-
方案一:利用Collections工具类
Set<String> set = Collections.synchronizedSet(new HashSet<String>());
-
方案三:利用JUC包中的类
CopyOnWriteArrayList
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
引入CopyOnWriteArraySet
CopyOnWrite
简称COW
,是计算机程序设计领域的一种优化策略- 多个线程调用的时候,读取的时候固定,但写入时会复制,避免写入造成的数据覆盖问题
CopyOnWriteArratSet
同样是用Lock
锁实现的,底层也是数组实现,不过添加的时候会先拷贝一份新的数组,最后再拷贝回去
6.3、ConcurrentHashMap
HashMap不安全
并发情况下,HashMap
不安全,我们通过一个简单的案例来测试:
package com.zsr.collection;
import java.util.HashMap;
import java.util.UUID;
public class MapTest {
public static void main(String[] args) {
//并发情况下
HashMap<String, String> map = new HashMap<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 8));
System.out.println(map);
}, String.valueOf(i)).start();
}
}
}
同样,ConcurrentModificationException
:并发修改异常
那么怎么解决呢?
-
方案一:利用Collections工具类
Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());
-
方案三:利用JUC包中的类
ConcurrentHashMap
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
引入ConcurrentHashMap
ConcurrentHashMap
融合了hashtable
和hashmap
二者的优势
但是hashtable每次同步执行的时候都要锁住整个结构。看下图:
ConcurrentHashMap正是为了解决这个问题而诞生的,其锁的方式是稍微细粒度的,引入了分段锁
的概念;
-
可以理解为把一个大的Map拆分成N(默认为16)个小的HashTable,根据key.hashCode()来决定把key放到哪个HashTable中。
-
在ConcurrentHashMap中,就是把Map分成了N个Segment,put和get的时候,都是现根据key.hashCode()算出放到哪个Segment中:
通过把整个Map分为N个Segment(类似HashTable),可以提供相同的线程安全;原来只能一个线程进入,现在却能同时16个写线程进入(写线程才需要锁定,而读线程几乎不受限制),并发性的提升是显而易见的
七、Callable
7.1、同Runnable的区别
- 可以有返回值
- 可以抛出异常
- 重写call()方法而不是run()
7.2、怎么实现
实现Runnable接口时,我们通过Thread.start()
进行启动,因为Thread
的构造方法可以传入Runnable
对象
那么怎么实现Callable呢?我们无法直接通过Thread
进行启动,但是我们可以通过Runnable
间接的启动
查看帮助文档,可以看到Runnable
接口有一个FutureTask
启动类,我们点进去看看
可以看到,它有两个构造方法,分别可以传入Callable
和Runnable
对象,这就将两者联系了起来
于事我们就可以通过Thread
来启动Callable
接口的实现类
package com.zsr;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class dome1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
myThread myThread = new myThread();
FutureTask<String> futureTask = new FutureTask<String>(myThread);
new Thread(futureTask).start();
System.out.println(futureTask.get());//获取call方法返回值
}
}
class myThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("调用call方法");
return "call方法执行完成";
}
}
注意:
-
通过
FutureTask
的get()获取call方法的返回值,该方法可能会产生阻塞(可能返回结果需要大量的计算,很耗时),一般情况下将其放在最后一行或者使用异步通信来处理 -
FutureTask
任务多线程并发访问时为啥只会被执行一次
八、三大常用辅助类
8.1、CountDownLatch
就是一个减法计数器
package com.zsr.countDown;
import java.util.concurrent.CountDownLatch;
public class Test {
public static void main(String[] args) throws InterruptedException {
//创建一个计数器,初始化为6
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "线程执行结束");
countDownLatch.countDown();//计数器-1
}, String.valueOf(i)).start();
}
countDownLatch.await();//等待计数器归0再往下执行
System.out.println("所有线程执行完毕");
}
}
如果不加countDownLatch.await()
加了之后
8.2、CyclicBarrier
相当于加法计数器
package com.zsr.Cyclic;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Test {
//模拟集齐7课龙珠召唤神龙
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙!");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "收集了第" + temp + "颗龙珠");
try {
cyclicBarrier.await();//计数不断+1,直到为7
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
8.3、Semaphore
计数信号量
semaphore.acquire()
,获得许可正,如果许可证已经满了,等待其他线程释放许可证semaphore.release()
,释放许可证作用:多个共享资源互斥使用,并发限流,控制最大的线程数
package com.zsr.Sema;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
//模拟3个车位,6辆车要停车
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();//获取许可
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();//释放许可
}
}, String.valueOf(i)).start();
}
}
}
九、读写锁
代码测试:定义一个缓存区用于读写操作,然后启动5个线程分别进行读和写,测试
-
首先测试不加锁的情况下
import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockDemo { public static void main(String[] args) { UnlockCache unlockCache = new UnlockCache(); //5个线程写入 for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(() -> { unlockCache.put(temp, temp); }, String.valueOf(i)).start(); } //5个线程读出 for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(() -> { unlockCache.get(temp); }, String.valueOf(i)).start(); } } } //不加锁的 class UnlockCache { private volatile Map<Integer, Object> map = new HashMap<>(); //写入 public void put(int key, Object value) { System.out.println("开始写入" + key); map.put(key, value); System.out.println(key + "写入完成"); } //读出 public void get(int key) { System.out.println("开始读取" + key); map.get(key); System.out.println(key + "读取完毕"); } }
根据结果,可以看到写入时被插队,这是不允许的! -
加读写锁,实现只能同时有一个线程写,多个线程读
也就是写锁为
独占锁
(一次只能被一个线程占有),读锁为共享锁
(多个线程可以同时占有)- 读-读:可共存
- 读-写:不可共存
- 写-写:不可共存
import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockDemo { public static void main(String[] args) { LockCache lockCache = new LockCache(); //5个线程写入 for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(() -> { lockCache.put(temp, temp); }, String.valueOf(i)).start(); } //5个线程读出 for (int i = 1; i <= 5; i++) { final int temp = i; new Thread(() -> { lockCache.get(temp); }, String.valueOf(i)).start(); } } } //加锁:实现同时只能有一个线程写,多个线程读 class LockCache { private volatile Map<Integer, Object> map = new HashMap<>(); //读写锁:实现只能有一个线程写,多个线程写,更细粒度的控制 ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); //写入:只能有一个线程写 public void put(int key, Object value) { readWriteLock.writeLock().lock(); try { System.out.println("开始写入" + key); map.put(key, value); System.out.println(key + "写入完成"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } //读出:可以多个线程读 public void get(int key) { readWriteLock.readLock().lock(); try { System.out.println("开始读取" + key); map.get(key); System.out.println(key + "读取完毕"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } }
根据结果,我们实现了写入时不能被插队,但是读取可以多个线程读取
十、阻塞队列
10.1、关系图
BlockingQueue
关系图:
队列阻塞:
10.2、ArrayBlockingQueue四组API
方式 | 抛出异常 | 有返回值、不抛出异常 | 阻塞等待 | 限时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer( , ) |
移除 | remove() | poll() | take() | poll( , ) |
判断队列首 | element() | peek() | \ | \ |
抛出异常(add、remove、element)
//抛出异常
public static void test1() {
//初始化阻塞队列大小为3
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add(1));
System.out.println(blockingQueue.add(2));
System.out.println(blockingQueue.add(3));
//如果添加第四个元素,则发生异常java.lang.IllegalStateException: Queue full
System.out.println(blockingQueue.add(4));
}
//抛出异常
public static void test1() {
//初始化阻塞队列大小为3
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add(1));
System.out.println(blockingQueue.add(2));
System.out.println(blockingQueue.add(3));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//如果取出第四个元素,则发生异常java.util.NoSuchElementException
System.out.println(blockingQueue.remove());
}
//抛出异常
public static void test1() {
//初始化阻塞队列大小为3
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add(1));
System.out.println(blockingQueue.add(2));
System.out.println(blockingQueue.add(3));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//如果取出队首元素,则发生异常java.util.NoSuchElementException
System.out.println(blockingQueue.element());
}
不跑出异常(offer、poll、pick)
//不抛出异常
public static void test1() {
//初始化阻塞队列大小为3
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(2));
System.out.println(blockingQueue.offer(3));
//如果添加第四个元素,返回false
System.out.println(blockingQueue.offer(4));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//如果取出第四个元素,返回null
System.out.println(blockingQueue.poll());
//如果取出队首元素,返回null
System.out.println(blockingQueue.peek());
}
等待阻塞(put、take)
//等待阻塞
public static void test1() throws InterruptedException {
//初始化阻塞队列大小为3
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);
//如果添加第四个元素,程序阻塞
blockingQueue.put(4);
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//如果取出第四个元素,程序阻塞
blockingQueue.take();
}
限时等待()
//限时等待
public static void test1() throws InterruptedException {
//初始化阻塞队列大小为3
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(2));
System.out.println(blockingQueue.offer(3));
//如果添加第四个元素,等待2s后程序结束
System.out.println(blockingQueue.offer(4, 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//如果取出第四个元素,等待2s后程序结束
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
}
10.3、SynchronousQueue
一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek
,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll()
将会返回 null
。对于其他 Collection
方法(例如 contains
),SynchronousQueue
作为一个空 collection。此队列不允许 null
元素。
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
//线程T1,存元素
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
//线程T2,取元素
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
十一、线程池
池化技术:程序的运行就会占用系统资源就会占用系统资源,为了优化资源的使用,就引入了池化技术,事先准备好一些资源,需要使用就来取,用完即放回;例如 线程池、连接池、内存池、对象池
线程池的好处:线程复用、可以控制最大并发数、管理线程
- 降低资源消耗
- 提高响应速度
- 方便管理
11.1、创建线程池三大方法
如何创建线程池呢?java.util.concurrent
中提供了Executors
类
其中有一些静态方法用于创建线程池
newSingleThreadExecutor
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
public static void main(String[] args) {
//创建单一线程池,只有一个线程执行
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
//创建线程池,可指定固定线程数量同时执行
for (int i = 0; i < 20; i++) {
singleThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
//线程池用完,关闭线程池
singleThreadPool.shutdown();
}
}
newFixedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
public static void main(String[] args) {
//创建线程池,可指定固定线程数量同时执行
ExecutorService fixThreadPool = Executors.newFixedThreadPool(5);
//使用线程池创建线程
for (int i = 0; i < 20; i++) {
fixThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
//线程池用完,关闭线程池
fixThreadPool.shutdown();
}
}
newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程, 那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
public static void main(String[] args) {
//创建可伸缩的线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
//使用线程池创建线程
for (int i = 0; i < 20; i++) {
cachedThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
//线程池用完,关闭线程池
cachedThreadPool.shutdown();
}
}
11.2、七大参数
查看newSingleThreadExecutor
、newFixedThreadPool
、newCachedThreadPool
的源码,可以发现本质上就是创建了一个ThreadPoolExcutor对象
再查看ThreadPoolExecutor
的源码,可以看到七大参数
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize, //最大线程池大小
long keepAliveTime, //存活时间(超时未调用则释放)
TimeUnit unit, //超时单位
BlockingQueue<Runnable> workQueue, //阻塞队列
ThreadFactory threadFactory, //线程工程:创建线程,一般默认不改动
RejectedExecutionHandler handler) //拒绝策略
{
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
各种参数的含义好比银行办理业务,corePoolSize
是已经开放的服务窗口,BlockingQueue
是候客区,假设人流量非常大,就需要多开放几个服务窗口,maximumPoolSize
就是最大开放的服务窗口数;再假如很少有人办理业务,过了一定的时间就关闭窗口,keepAliveTime
就是要关闭窗口的事件;RejectedExecutionHandler
拒绝策略就好比银行满了,再来人就不让进了
可以看到newCachedThreadPool
的核心线程池大小设置未0,最大线程池大小设置为Integer.MAX_VALUE
,约等于21亿;也就是说通过Executors.newCachedThreadPool()创建的线程池可以支持并发的线程数介于0~21亿之间,这是十分耗费资源的
因此,阿里巴巴官方手册有以下规定:
11.3、四种拒绝策略
我们来自定义一个线程池,拒绝策略为AbortPolicy
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常
for (int i = 1; i <= 8; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
}
);
}
}
}
如果i<=9,超过了最大承载,则会抛出异常
修改拒绝策略为CallerRunsPolicy
:可以看到没有抛出异常,而是由main线程处理
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常
for (int i = 1; i <= 9; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
}
);
}
}
}
修改拒绝策略为DiscardPolicy
:可以看到不抛出异常不执行
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy()
);
//最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常
for (int i = 1; i <= 9; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
}
);
}
}
}
策略DiscardOldestPolicy
同DiscardPolicy
类似:不抛出异常不执行,但是会尝试竞争
11.4、最大线程数怎么定义?
CPU密集型
电脑的cpu是几核就定义为几,定义为常数换台电脑就不行了
//自定义线程池:CPU密集型
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
IO密集型
大于 判断程序中耗费IO的线程 即可
十二、四大函数式接口
新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算
什么是函数式接口
函数式接口(Functional Interface)是jdk8引入的,有且仅有一个抽象方法,但是可以有多个非抽象方法的接口。并且这类接口使用了
@FunctionalInterface
进行注解,函数式接口可以被隐式转换为 lambda 表达式JDK 1.8 之前已有的函数式接口:
java.lang.Runnable
@FunctionalInterface public interface Runnable { public abstract void run(); }
java.util.concurrent.Callable
java.security.PrivilegedAction
java.util.Comparator
java.io.FileFilter
java.nio.file.PathMatcher
java.lang.reflect.InvocationHandler
java.beans.PropertyChangeListener
java.awt.event.ActionListener
javax.swing.event.ChangeListener
JDK 1.8 新增加的函数接口:
java.util.function
这个package中的接口大致分为了以下四类:
Function:接收参数,并返回结果,主要方法
R apply(T t)
Consumer:接收参数,无返回结果, 主要方法为
void accept(T t)
// forEach的参数就是消费者类型函数式接口Consumer @Override public void forEach(Consumer<? super E> action) { Objects.requireNonNull(action); final int expectedModCount = modCount; final Object[] es = elementData; final int size = this.size; for (int i = 0; modCount == expectedModCount && i < size; i++) action.accept(elementAt(es, i)); if (modCount != expectedModCount) throw new ConcurrentModificationException(); }
Supplier:不接收参数,但返回结构,主要方法为
T get()
Predicate:接收参数,x返回boolean值,主要方法为
boolean test(T t)
12.1、Function
函数式接口:有参数且需要返回值
@FunctionalInterface
public interface Function<T, R> {
//有参数且有返回值
R apply(T t);
default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
Objects.requireNonNull(before);
return (V v) -> apply(before.apply(v));
}
default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (T t) -> after.apply(apply(t));
}
static <T> Function<T, T> identity() {
return t -> t;
}
}
实现一个Function接口
package 函数式接口;
import java.util.function.Function;
public class Demo {
public static void main(String[] args) {
//匿名内部类,没有类的名称
Function function1 = new Function<String, String>() {
@Override
public String apply(String str) {
return str;
}
};
//修改为lambda表达式
Function function2 = (str) -> {
return str;
};
System.out.println(function1.apply("hello"));
System.out.println(function2.apply("hello lambda"));
}
}
12.2、Predicate
判断型接口:有参数,返回值为布尔型
@FunctionalInterface
public interface Predicate<T> {
//有参数,返回值为布尔型
boolean test(T t);
default Predicate<T> and(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) && other.test(t);
}
default Predicate<T> negate() {
return (t) -> !test(t);
}
default Predicate<T> or(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) || other.test(t);
}
static <T> Predicate<T> isEqual(Object targetRef) {
return (null == targetRef)
? Objects::isNull
: object -> targetRef.equals(object);
}
@SuppressWarnings("unchecked")
static <T> Predicate<T> not(Predicate<? super T> target) {
Objects.requireNonNull(target);
return (Predicate<T>)target.negate();
}
}
实现一个Predicate接口
package 函数式接口;
import java.util.function.Predicate;
public class Demo1 {
public static void main(String[] args) {
//匿名内部类,没有类的名称
Predicate<String> predicate1 = new Predicate<>() {
@Override
public boolean test(String s) {
return s.isEmpty();//判断字符串是否为空
}
};
//修改为lambda
Predicate<String> predicate2 = (s) -> {
return s.isEmpty();
};
System.out.println(predicate1.test("hello"));
System.out.println(predicate2.test(""));
}
}
12.3、Consumer
消费性接口:没有返回值,有参数
@FunctionalInterface
public interface Consumer<T> {
//没有返回值,有参数
void accept(T t);
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}
实现Consumer接口
package 函数式接口;
import java.util.function.Consumer;
public class Demo2 {
public static void main(String[] args) {
//匿名内部类,没有类的名称
Consumer<String> consumer1 = new Consumer<>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
//修改为lambda
Consumer consumer2 = (s) -> {
System.out.println(s);
};
consumer1.accept("hello1");
consumer1.accept("hello2");
}
}
12.4、Supplier
供给型接口:无参数,指定返回值类型
@FunctionalInterface
public interface Supplier<T> {
//无参数,指定返回值类型
T get();
}
实现Supplier接口
package 函数式接口;
import java.util.function.Supplier;
public class Demo3 {
public static void main(String[] args) {
//匿名内部类,没有类的名称
Supplier<String> supplier1 = new Supplier<>() {
@Override
public String get() {
return "hello1";
}
};
//修改为lambda
Supplier supplier2 = () -> {
return "hello2";
};
System.out.println(supplier1.get());
System.out.println(supplier2.get());
}
}
十三、Stream流式计算
13.1、什么是Stream流式计算
大数据时代分为存储+计算,存储交给数据库、集合等来处理,计算就交给流来做
Java中就提供了java.util.stream
用于流式计算
13.2、案例测试
package 流式计算;
import java.util.Arrays;
import java.util.List;
public class Test {
public static void main(String[] args) {
User user1 = new User("a", 1, 18);
User user2 = new User("b", 2, 19);
User user3 = new User("c", 3, 21);
User user4 = new User("d", 4, 30);
User user5 = new User("e", 5, 28);
User user6 = new User("f", 6, 27);
//list来存储数据
List<User> users = Arrays.asList(user1, user2, user3, user4, user5, user6);
//stream来计算
//筛选出以下条件的用户:
//1.id为偶数
//2.age>23
//3.name转换为大写字母
//4.name字母倒序排序
//5.只输出一个用户
users.stream()
.filter(user -> {//判断型接口Predicate
return user.getId() % 2 == 0;
})
.filter(user -> {//判断型接口Predicate
return user.getAge() > 23;
})
.map(user -> {//函数式接口Function
return user.getName().toUpperCase();
})
.sorted((u1, u2) -> {//Comparator函数式接口
return u2.compareTo(u1);
})
.limit(1)
.forEach(System.out::println);
}
}
十四、ForkJoin
14.1、什么是ForkJoin
ForkJoin出现于jdk1.7,用于并行执行任务 ,提高效率,用于大数据量的情况(分支合并)
大数据:Map Reduce——将大任务拆分成小任务
14.2、ForkJoin的特点
工作窃取:里面维护的都是双端队列
14.3、案例:计算求和任务
我们可以在JUC中找到ForkJoinPool
类
其中有一个方法,可用于执行一个ForkJoinTask
我们需要返回值,查看RecursiveTask
,可以找到compute
方法进行计算
1️⃣ 编写任务
package forkjoin;
import java.util.concurrent.RecursiveTask;
//计算任务
public class ForkJoinTask extends RecursiveTask<Long> {
private long begin;
private long end;
public ForkJoinTask(long begin, long end) {
this.begin = begin;
this.end = end;
}
//计算方法:计算1加到1_0000_0000
@Override
protected Long compute() {
long sum = 0;
if ((end - begin) < 10000) {//如果差值小于10000则暴力加
for (long i = begin; i <= end; i++)
sum += i;
return sum;
} else {//数据量大于10000采用forkjoin来计算
long mid = (begin + end) / 2;
//任务一
ForkJoinTask task1 = new ForkJoinTask(begin, mid);
task1.fork();//拆分任务,将线程压入队列
//任务二
ForkJoinTask task2 = new ForkJoinTask(mid + 1, end);
task2.fork();//拆分任务,将线程压入队列
return task1.join() + task2.join();
}
}
}
2️⃣ 测试比较
package forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();
test2();
test3();
}
//普通遍历方式
public static void test1() {
long sum = 0;
long start = System.currentTimeMillis();
for (long i = 0; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("普通遍历法耗时" + (end - start) + "结果为:" + sum);
}
//ForkJoin方式
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask task = new ForkJoinTask(0, 10_0000_0000);
java.util.concurrent.ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("通过ForkJoin方式耗时" + (end - start) + "结果为:" + sum);
}
//Stream并行流方式
public static void test3() {
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum);//rangeClosed(]
long end = System.currentTimeMillis();
System.out.println("通过Stream并行流方式耗时" + (end - start) + "结果为:" + sum);
}
}
十五、异步回调
异步回调通常用CompletableFuture
发起两个异步请求,一个有返回结果,一个没有返回结果
package future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
//异步调用异步执行:成功回调/失败回调
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//发起一个请求(没有返回值的异步回调)
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
});
completableFuture1.get();//阻塞获取执行结果
//有返回值的异步回调
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
return 1024;
});
System.out.println(completableFuture2
.whenComplete((t, u) -> {//成功回调:正确的返回结果
System.out.println("t=" + t);
System.out.println("u=" + u);
}).exceptionally((e) -> {//失败回调:错误的返回结果
System.out.println(e.getMessage());
return 233;
}).get());
}
}
如果有返回结果的异步回调报错,就会走失败回调的方法,返回233
十六、理解JMM&Volatile
16.1、请你谈谈对Volatile的理解
Volatile
是 JVM 提供的轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
怎么保证可见性?就需要和JMM挂钩
16.2、什么是JMM
Java内存模型,不存在的东西,是一种概念一种约定
关于JMM同步的约定:
- 线程解锁前,必须立刻把自己的共享变量刷回主存
- 线程加锁前,必须读取主存中的最新值到工作内存中
- 加锁和解锁是同一把锁
线程分为:工作内存、主内存
16.3、Volatile特点
Volatile
可以保证可见性,不能保证原子性,可以避免指令重排的现象
1. 保证可见性
package jmm;
import java.util.concurrent.TimeUnit;
public class Demo {
private static int num;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (num == 0) ;
}, "t").start();
//main线程休眠1s
TimeUnit.SECONDS.sleep(1);
num = 1;
System.out.println(num);
}
}
开启一个线程,当num=0时不停的死循环;然后让主线程休眠1s后修改num=1,也就是将主内存中的num修改为1;看到结果程序并没有停止,这是因为t线程并没有拿到主内存中num最新的值,不知套其发生了变化,也就是t线程对main线程的变化不可见
如何解决呢?只需要通过volatile
关键字修饰num即可保证其的可见性
可以看到,程序立马停止了
2. 不保证原子性
什么是原子性?也就是一个线程执行的时候不能被打扰分割,要么同时成功要么同时失败
package jmm;
public class Demo2 {
//通过volatile不能保证原子性
private volatile static int num;
public static void add() {
num++;
}
public static void main(String[] args) throws InterruptedException {
//理论上num=20000
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
//为了让20000条线程跑完,让main线程进行礼让(这里的2表示java固有的两个线程main和gc)
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + ":" + num);
}
}
根据结果,发现volatile
并不保证原子性的操作,为什么不安全呢?
我们反编译看看,可以看到num++
一行代码在底层是多行操作,因此不能保证原子性,所以是不安全的
D:\学习\IDEA project\Test\out\production\Test\jmm>javap -c Demo2.class
Compiled from "Demo2.java"
public class jmm.Demo2 {
public jmm.Demo2();
Code:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
public static synchronized void add();
Code:
0: getstatic #7 // Field num:I
3: iconst_1
4: iadd
5: putstatic #7 // Field num:I
8: return
public static void main(java.lang.String[]) throws java.lang.InterruptedException;
Code:
0: iconst_0
1: istore_1
2: iload_1
3: bipush 20
5: if_icmpge 29
8: new #13 // class java/lang/Thread
11: dup
12: invokedynamic #15, 0 // InvokeDynamic #0:run:()Ljava/lang/Runnable;
17: invokespecial #19 // Method java/lang/Thread."<init>":(Ljava/lang/Runnable;)V
20: invokevirtual #22 // Method java/lang/Thread.start:()V
23: iinc 1, 1
26: goto 2
29: invokestatic #25 // Method java/lang/Thread.activeCount:()I
32: iconst_2
33: if_icmple 42
36: invokestatic #29 // Method java/lang/Thread.yield:()V
那如果不通过Lock和Synchronized 怎么保证原子性呢?
可以通过java.util.concurrent.atomic
包中的原子类解决原子问题 ,这些类的底层都直接和操作系统挂钩,在内存中修改值,这些类是特殊的存在
package jmm;
import java.util.concurrent.atomic.AtomicInteger;
public class Demo {
//通过volatile不能保证原子性
private volatile static AtomicInteger num = new AtomicInteger();
public static void add() {
num.getAndIncrement();//并不是简单的+1,而是利用了底层的CAS
}
public static void main(String[] args) throws InterruptedException {
//理论上num=20000
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
//为了让20000条线程跑完,让main线程进行礼让(这里的2表示java固有的两个线程main和gc)
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + ":" + num);
}
}
3. 禁止指令重排
指令重排:计算机并不是按照我们编写的程序去执行
源代码–》编译器优化代码重排–》指令并行重排–》内存系统重排–》执行
处理器在指令重排的时候,会考虑数据之间的依赖性
int x = 1; //1
int y = 2; //2
x = x + 5; //3
y = x * x; //4
按我们所期望的执行顺序是1->2->3->4
但实际上可能是2143或者1324,但不可能是4123
指令重排可能会导致一些错误的结果,如下图所示:
使用volatile
可以避免指令重排,底层实现是通过 内存屏障 实现的,可以保证特定的操作执行顺序,也可以保证某些变量的内存可见性
十七、彻底玩转单例模式
volatile
在单例模式中使用的最多
17.1、饿汉式
package 单例模式;
//饿汉式
public class Hungry {
private static Hungry hungry = new Hungry();
//构造器私有
private Hungry() {
}
public static Hungry getInstance() {
return hungry;
}
}
优点: static变量会在类装载时初始化,不涉及多个线程访问该对象的问题,可以省略synchronized关键字
缺点:类初始化时就创建了对象,如果只是加载本类,而不是要调用 getinstance(),甚至永远没有调用,则会造成资源浪费!
17.2、懒汉式
package 单例模式;
//懒汉式
public class Lazy {
private static Lazy lazy;
private Lazy() {
}
public static Lazy getInstance() {
if (lazy == null)
lazy = new Lazy();
return lazy;
}
}
优点: 延迟加载,真正用的时候才实例化对象,提高了资源的利用率
缺点:存在并发访问的问题,以下测试并发访问情况
package 单例模式;
//懒汉式
public class Lazy {
private static Lazy lazy;
private Lazy() {
System.out.println("创建示例");
}
public static Lazy getInstance() {
if (lazy == null)
lazy = new Lazy();
return lazy;
}
public static void main(String[] args) {
//10条线程并发访问下
for (int i = 0; i < 10; i++) {
new Thread(() -> {
Lazy.getInstance();
}).start();
}
}
}
根据结果,可以看到有5个线程打印了结果,也就说进行了5次初始化,这是非常大的漏洞,出现了并发访问的问题
17.3、双重检测锁式
为了解决懒汉式并发访问的问题,加入了sychronized
关键字
package 单例模式;
//双重检测锁式
public class DoubleLock {
private static DoubleLock doubleLock;
private DoubleLock() {
System.out.println("创建示例");
}
public static DoubleLock getInstance() {
if (doubleLock == null) {
synchronized (Lazy.class) {
if (doubleLock == null)
doubleLock = new DoubleLock();
}
}
return doubleLock;
}
public static void main(String[] args) {
//10条线程并发访问下
for (int i = 0; i < 10; i++) {
new Thread(() -> {
DoubleLock.getInstance();
}).start();
}
}
}
根据打印结果,解决了并发访问的问题;但是这样仍然会存在问题,因为我们new
对象时并不是一个完整的原子性操作,而是分为以下三部:
- 分配内存空间
- 执行构造方法,初始化对象
- 把这个对象指向这个空间
单个线程A执行的情况下可以123按顺序执行,也可能由于指令重排按132执行;但是如果线程A按132顺序执行到3时来了一个线程B,此时该对象已经指向了分配的空间,因此B判断对象不是null,就会直接返回对象,但其实对象并没有进行初始化,就造成了错误
因此指令重排也会导致错误,因此完整的双重检测锁式
还加入了Volatile
关键字来避免指令重排,完整代码如下:
package 单例模式;
//双重检测锁式
public class DoubleLock {
private volatile static DoubleLock doubleLock;
private DoubleLock() {
System.out.println("创建示例");
}
public static DoubleLock getInstance() {
if (doubleLock == null) {
synchronized (Lazy.class) {
if (doubleLock == null)
doubleLock = new DoubleLock();
}
}
return doubleLock;
}
}
17.4、静态内部类式
package 单例模式;
public class InnerClass {
private InnerClass() {
}
//静态内部类里面创建对象
public static class inner {
private static final InnerClass innerClass = new InnerClass();
}
public static InnerClass getInstance() {
return inner.innerClass;
}
}
反射破坏单例模式
package 单例模式;
import java.lang.reflect.Constructor;
//双重检测锁式
public class DoubleLock {
private volatile static DoubleLock doubleLock;
private DoubleLock() {
System.out.println("创建示例");
}
public static DoubleLock getInstance() {
if (doubleLock == null) {
synchronized (Lazy.class) {
if (doubleLock == null)
doubleLock = new DoubleLock();
}
}
return doubleLock;
}
public static void main(String[] args) throws Exception {
DoubleLock instance1 = doubleLock.getInstance();
Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);
constructor.setAccessible(true);
DoubleLock instance2 = constructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
根据结果,看到创建了两个实例,也就是单例模式被破坏,那么怎么解决呢?
可以在私有构造中加锁
package 单例模式;
import java.lang.reflect.Constructor;
//双重检测锁式
public class DoubleLock {
private volatile static DoubleLock doubleLock;
private DoubleLock() {
synchronized (DoubleLock.class){
if(doubleLock!=null){
throw new RuntimeException("不要试图使用反射破坏异常");
}
}
System.out.println("创建示例");
}
public static DoubleLock getInstance() {
if (doubleLock == null) {
synchronized (Lazy.class) {
if (doubleLock == null)
doubleLock = new DoubleLock();
}
}
return doubleLock;
}
public static void main(String[] args) throws Exception {
DoubleLock instance1 = doubleLock.getInstance();
Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);
constructor.setAccessible(true);
DoubleLock instance2 = constructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
根据结果,可以看到避免了单例模式的破坏?可是上述两个对象一个是通过单例获取,一个通过反射获取;
那如果两个对象都是通过反射获取呢?
public static void main(String[] args) throws Exception {
Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);
constructor.setAccessible(true);
DoubleLock instance1= constructor.newInstance();
DoubleLock instance2 = constructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
根据结果,可以看到单例模式又被破坏了,创建了两个对象!这种情况如何解决呢?
可以通过红绿灯方法实现,定义一个标志位记录对象是否创建
package 单例模式;
import java.lang.reflect.Constructor;
//双重检测锁式
public class DoubleLock {
private volatile static DoubleLock doubleLock;
//标志位
private static boolean flag = false;
private DoubleLock() {
synchronized (DoubleLock.class) {
if (flag == false)
flag = true;
else
throw new RuntimeException("不要试图使用反射破坏异常");
}
System.out.println("创建示例");
}
public static DoubleLock getInstance() {
if (doubleLock == null) {
synchronized (Lazy.class) {
if (doubleLock == null)
doubleLock = new DoubleLock();
}
}
return doubleLock;
}
public static void main(String[] args) throws Exception {
Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);
constructor.setAccessible(true);
DoubleLock instance1 = constructor.newInstance();
DoubleLock instance2 = constructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
可以看到我们通过设置标志位flag
再次解决了这个问题,但是一旦被获取了这个关键字,单例模式仍然可以通过反射被破解,如下所示
public static void main(String[] args) throws Exception {
Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);
Field declaredField = DoubleLock.class.getDeclaredField("flag");
constructor.setAccessible(true);
declaredField.setAccessible(true);
DoubleLock instance1 = constructor.newInstance();
declaredField.set(instance1, false);//第一个对象创建完毕后将flag改为false
DoubleLock instance2 = constructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
可以看到单例模式再次被破坏;因此为了让程序更加安全,通常对flag
关键字进行加密处理
那么到底如何完全的避免反射破坏单例模式呢?我们查看newInstance
的源码
可以看到,如果是枚举类型的话,就不能通过反射获取枚举;
因此引入了第5种单例模式
17.5、枚举单例
package 单例模式;
import java.lang.reflect.Constructor;
//enum本质上就是一个Class类
public enum EnumSingle {
INSTANCE;
public static void main(String[] args) throws Exception {
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true);
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
}
我们再次通过反射创建对象,根据结果报错没有EnumSingle的空构造方法,这不是我们希望看到的
我们对EnumSingle
的class文件进行反编译,可以看到明明有空构造方法
但是执行明明报错没有无参构造,我们使用更专业的反编译工具jad
对class文件再进行反编译
可以看到枚举类本质上就是继承了Enum
类,本身就是一个Class,而且没有无参构造,而是含两个参数的有参构造,我们修改代码在测试
public static void main(String[] args) throws Exception {
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class, int.class);
declaredConstructor.setAccessible(true);
EnumSingle instance2 = declaredConstructor.newInstance();
System.out.println(instance1);
System.out.println(instance2);
}
这才正确显示了报错的信息:无法反射地创建枚举对象
十八、深入理解CAS
18.1、什么是CAS
CAS
是 compareAndSet 的缩写:比较并交换,是CPU的并发原语
package CAS探究;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);//底层是CAS
/**
* public final boolean compareAndSet(int expectedValue, int newValue)
* 期望、更新
* 如果期望的值达到了就更新,否则不更新
*/
System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回是否修改成功
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2020, 2022));
System.out.println(atomicInteger.get());
}
}
我们再来看看 atomicInteger.getAndIncrement() 方法是怎么实现的?我们该方法的源码
可以看到是由U
调用了getAndAddInt()
方法,而U
就是Unsafe
类的一个实例
什么是
Unsafe
类
- Java无法操作内存,只能通过调用C++来操作内存,
Unsafe
就是Java通过C++操作内存的接口- 就类似于Java通过native关键字来调用C++本地方法来和操作系统交互
可以看到,底层是一个do while循环,也就是一个自旋锁
因此:CAS
就是比较当前工作内存中的值和主内存中的值,如果这个值是期望的,就执行操作;如果不是,就一直循环,因为底层是一个do while循环(自旋锁)
CAS有三个操作数:
- 期望的值
- 比较的值
- 更新的值
缺点:
- 底层是自旋锁,循环耗时
- 一次性只能保证一个共享变量的原子性
- 会存在ABA问题
18.2、ABA问题
比如有两个线程A,B同时向修改A的内容,但是B线程执行速度快,首先cas(1,3)将A修改为3,然后又执行cas(3,1)将A修改为1,这之后线程A再cas(1,2)将A修改为2,但此时A=1已经不是原来的1了;
这就是ABA问题
我们来个代码模拟以下
package CAS探究;
import java.util.concurrent.atomic.AtomicInteger;
public class ABADemo {
//CAS是compareAndSet的缩写:比较并交换,是CPU的并发原语
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);//底层是CAS
//线程B
System.out.println(atomicInteger.compareAndSet(1, 3));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(3, 1));
System.out.println(atomicInteger.get());
//线程A
System.out.println(atomicInteger.compareAndSet(1, 2));
System.out.println(atomicInteger.get());
}
}
可以看到三个结果都为true,但不是我们期望的,我们希望知道谁动过A的值
可以通过类似乐观锁的方案来解决,使用 原子引用类AtomicReference
/AtomicStampedReference
(带时间)
我们使用AtomicStampedReference
测试以下
package CAS探究;
import java.util.concurrent.atomic.AtomicStampedReference;
public class ABADemo {
//CAS是compareAndSet的缩写:比较并交换,是CPU的并发原语
public static void main(String[] args) {
//public AtomicStampedReference(V initialRef, int initialStamp):这里的第二个参数等同于乐观锁的version,初始值设为1
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1);
//线程B,多了连个参数:期望的版本号,更新的版本号
System.out.println(atomicStampedReference.compareAndSet(1, 3,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
System.out.println(atomicStampedReference.compareAndSet(3, 1,
atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
//线程A
System.out.println(atomicStampedReference.compareAndSet(1, 2, 1, 2));
}
}
可以看到A成功察觉到了B修改过数据,所以执行失败;和乐观锁原理相同
注意:如果泛型是包装类,注意对象引用问题(正常业务都是对象,这里是使用包装类Integer进行测试)
如果我们的范围不再-128~127,则会失败
十九、各种锁的理解
19.1、乐观锁/悲观锁
悲观锁(Pessimistic Lock)
1️⃣ 简介
当要对数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。这种借助数据库锁机制,在修改数据之前先锁定,再修改的方式被称之为悲观并发控制【Pessimistic Concurrency Control,缩写“PCC”,又名“悲观锁”】
悲观锁,正如其名,具有强烈的独占和排他特性。它指的是对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度。因此,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。
之所以叫做悲观锁,是因为这是一种对数据的修改持有悲观态度的并发控制方式。总是假设最坏的情况,每次读取数据的时候都默认其他线程会更改数据,因此需要进行加锁操作,当其他线程想要访问数据时,都需要阻塞挂起。悲观锁的实现:
- 传统的关系型数据库使用这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
- Java 里面的同步 synchronized 关键字的实现。
2️⃣ 分类
悲观锁主要分为 共享锁
和 排他锁
共享锁
【shared locks】又称为读锁,简称S锁。顾名思义,共享锁就是多个事务对于同一数据可以共享一把锁,都能访问到数据,但是只能读不能修改。排他锁
【exclusive locks】又称为写锁,简称X锁。顾名思义,排他锁就是不能与其他锁并存,如果一个事务获取了一个数据行的排他锁,其他事务就不能再获取该行的其他锁,包括共享锁和排他锁,但是获取排他锁的事务是可以对数据行读取和修改。
3️⃣ 说明
悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。但是在效率方面,处理加锁的机制会让数据库产生额外的开销,还有增加产生死锁的机会。另外还会降低并行性,一个事务如果锁定了某行数据,其他事务就必须等待该事务处理完才可以处理那行数据。
乐观锁(Optimistic Locking)
1️⃣ 简介
乐观锁是相对悲观锁而言的,乐观锁假设数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则返回给用户错误的信息,让用户决定如何去做。乐观锁适用于读操作多的场景,这样可以提高程序的吞吐量。
乐观锁机制采取了更加宽松的加锁机制。乐观锁是相对悲观锁而言,也是为了避免数据库幻读、业务处理时间过长等原因引起数据处理错误的一种机制,但乐观锁不会刻意使用数据库本身的锁机制,而是依据数据本身来保证数据的正确性。
2️⃣ 实现
- CAS实现:Java中java.util.concurrent.atomic包下面的原子变量使用了乐观锁的一种 CAS 实现方式
- 版本号控制:一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数。当数据被修改时,version 值会+1。当线程A要更新数据值时,在读取数据的同时也会读取 version 值,在提交更新时,若刚才读取到的 version 值与当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功
3️⃣ 说明
乐观并发控制相信事务之间的数据竞争(data race)的概率是比较小的,因此尽可能直接做下去,直到提交的时候才去锁定,所以不会产生任何锁和死锁
19.2、公平/非公平锁
- 公平锁:非常公平,不能插队,线程的执行必须先来后到
- 非公平锁:非常不公平,可以插队,默认都为非公平锁!(比如一个线程3s执行完,一个线程1min执行完,如果使用公平锁严重影响某个线程的效率)
19.3、可重入锁
可重入锁(递归锁)
代码示例:synchronized版
执行结果:
代码示例:Lock版
19.4、自旋锁
不断的尝试,直到成功为止!
我们来编写一个自旋锁
package 自旋锁;
import java.util.concurrent.atomic.AtomicReference;
//自定义自旋锁
public class SpinLock {
//锁线程
AtomicReference<Thread> atomicReference = new AtomicReference<>();
//加锁
public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + "==>myLock");
//自旋锁
while (!atomicReference.compareAndSet(null, thread)) ;
}
//解锁
public void myUnlock() {
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + "==>myUnLock");
atomicReference.compareAndSet(thread, null);
}
}
然后编写一段测试代码
package 自旋锁;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
SpinLock spinLock = new SpinLock();
//线程T1
new Thread(() -> {
spinLock.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
spinLock.myUnlock();
}
}, "T1").start();
TimeUnit.SECONDS.sleep(1);
//线程T2
new Thread(() -> {
spinLock.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
spinLock.myUnlock();
}
}, "T2").start();
}
}
根据结果,总是T1
线程解锁后,T2
线程才能解锁;因为如果T1
线程不解锁,T2
就会卡住在while循环不停的尝试cas直到thread=null为止
19.5、死锁
什么是死锁?
是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象
简单的死锁案例
package 死锁;
import java.util.concurrent.TimeUnit;
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA, lockB), "T1").start();
new Thread(new MyThread(lockB, lockA), "T2").start();
}
}
class MyThread implements Runnable {
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + "持有锁" + lockA + "尝试获取" + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + "持有锁" + lockB + "尝试获取" + lockA);
}
}
}
}
根据运行结果,可以看到程序卡死,因为发生了死锁,因为T1和T2分别持有lockA和lockB,但又都试图获取对方的锁!
死锁问题排查
-
使用
jps -l
命令定位进程号 -
使用
jstack 进程号
查看指定进程的堆栈信息,找到死锁问题可以看到,控制台清晰的打印了找到死锁,并可以看到产生的原因就是
T1
和T2
互相尝试获取对方的锁
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)