目录


参考

一、什么是JUC

JUC就是java.util.concurrent工具包的简称。这是一个处理线程的工具包,JDK 1.5开始出现的。

image-20210321110312209

image-20210321115140571



二、基本知识

2.1、进程和线程

进程是程序的一次执行过程,包含进程控制块程序段、数据三部分

1️⃣ 动态性

  • 动态性是进程最基本的特征,表现为:由创建而产生,由调度而执行,得不到资源而暂停执行,由撤销而消亡;有一定的生命周期
  • 程序只是一组有序的指令集合

2️⃣ 并发性

  • 引入进程的目的就是和其他进程能并发执行
  • 程序不能并发执行

3️⃣ 独立性

  • 进程实体是一个能独立运行的基本单位,是系统中独立获得资源和独立调度的基本单位
  • 程序不能作为一个独立的单位进行运行

2.2、Java默认有两个进程

MainGC

2.3、Java能够开启线程吗?

不行,Java是通过native本地方法调底层C++写的方法,Java无法直接操作硬件

2.4、并发和并行

  • 并发:多个事件在同一时间间隔内发生(cpu一核,模拟出来多条线程快速交替运行)
  • 并行:多个事件在同一时刻发生(cpu多核,多个线程可以同时执行)

查看cpu的核数

image-20210321111429010

image-20210321111527079

image-20210321111556525

2.5、线程的状态

NEW尚未启动的线程处于此状态
RUNNABLE在Java虚拟机中执行的线程处于此状态
BLOCKED被阻塞等待监视器锁定的线程处于此状态(IO操作,wait,juc锁定)
WAITING正在等待另一个线程执行特定动作的线程处于此状态(sleep,join)
TIMED_WAITING正在等待另一个线程执行动作达到指定等待时间的线程处于此状态(sleep,join)
TERMINATED已退出的线程处于此状态。

2.6、wait和sleep的区别

  • sleep不释放锁,wait释放锁
  • 来自不同的类:sleep()函数在Thread类中,wait()函数属于Object类
  • 使用范围不同:sleep可以在任何地方使用,wait只能使用在同步代码块中

2.7、什么是可重入锁

可重入,就是可以重复获取相同的锁而不会出现死锁;synchronized和ReentrantLock都是可重入的

// 演示可重入锁是什么意思,可重入,就是可以重复获取相同的锁
// synchronized和ReentrantLock都是可重入的
// 可重入降低了编程复杂性
public class WhatReentrant {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (this) {
                    System.out.println("第1次获取锁,这个锁是:" + this);
                    int index = 1;
                    while (true) {
                        synchronized (this) {
                            System.out.println("第" + (++index) + "次获取锁,这个锁是:" + this);
                        }
                        if (index == 10) {
                            break;
                        }
                    }
                }
            }
        }).start();
    }
}

image-20210321153436838

import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;

// 演示可重入锁是什么意思
public class WhatReentrant2 {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lock();
                    System.out.println("第1次获取锁,这个锁是:" + lock);

                    int index = 1;
                    while (true) {
                        try {
                            lock.lock();
                            System.out.println("第" + (++index) + "次获取锁,这个锁是:" + lock);

                            try {
                                Thread.sleep(new Random().nextInt(200));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }

                            if (index == 10) {
                                break;
                            }
                        } finally {
                            lock.unlock();
                        }

                    }

                } finally {
                    lock.unlock();
                }
            }
        }).start();
    }
}

image-20210321153415399

2.8、synchronized买票案例回顾

真正的开发中,线程只是一个资源类(包含属性),没有任何附属的操作

模拟卖票

  • 以前我们会将Ticket类继承Runnable接口
  • 现在会将Ticket作为一个资源类,里面不添加关于线程的操作
package com.zsr;

public class saleTicket {
    public static void main(String[] args) {
        //一份资源
        Ticket ticket = new Ticket();
        //多个线程
        new Thread(() -> {
            for (int i = 0; i < 30; i++) {
                ticket.sale();
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                ticket.sale();
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 80; i++) {
                ticket.sale();
            }
        }, "C").start();
    }
}

//资源类
class Ticket {
    //票数
    private int number = 100;

    public synchronized void sale() {
        if (number > 0)
            System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余" + number);
    }
}


三、Lock锁

3.1、简介

官方文档地址https://tool.oschina.net/apidocs/apidoc?api=jdk-zh

image-20210321133621186
使用方法:创建锁、加锁、业务代码、解锁
image-20210321134513971

3.2、买票问题重现

我们接下来用Lock锁的方式来解决上述买票问题,Lock接口最常用的实现类就是ReentrantLock可重入锁

  • 可以看到看到ReentrantLock有两个构造方法,可以指定使用 公平锁/非公平锁
  • 公平锁:十分公平,先来后到
  • 非公平锁:不公平,可以插队

image-20210321133926406
修改Ticket

//资源类
class Ticket2 {
    //票数
    private int number = 100;

    //Lock锁
    Lock lock = new ReentrantLock();

    public void sale() {
        lock.lock();//加锁
        try {
            if (number > 0)
                System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "张票,剩余" + number);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();//解锁
        }
    }
}

3.3、和Synchronized的区别

  • Synchronized是内置的关键字,Lock是一个Java类

  • Synchronized无法判断锁的状态,Lock可以判断是否获取到了锁

  • Synchronized会自动释放锁,Lock需要手动释放锁(如果不释放锁,会造成死锁)

  • 假如有两个线程:线程1、线程2;线程1获得了锁

    Synchronized:如果线程1阻塞了,线程2就会一直等待,造成死锁

    Lock:如果线程1阻塞了,线程2不会一直等待,可以通过trylock()方法尝试获取锁

  • 两者都是可重入锁,但是Synchronized不可中断,为非公平锁;而Lock锁可判断锁状态,并且可以设置为公平锁/非公平锁

  • Synchronized适合锁少量代码同步代码,Lock适合锁大量代码同步代码



四、生产者消费者问题(Lock版)

生产者消费者——线程之间的通信问题

  • 通过Synchronized实现,我们常用 object.wait() + Object.notify()
  • 通过Lock怎么实现呢,用condition.await() + condition.signal()

image-20210321143716982

4.1、Synchronized实现

两个线程A、B实现:

如果number不等于0,则number-1;如果number等于0,则number+1

package com.zsr;

public class communicate {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    data.plus();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    data.minor();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();
    }
}

class Data {
    private int number = 0;

    //如果number=0,则number+1
    public synchronized void plus() throws InterruptedException {
        if (number != 0)
            this.wait();//等待
        number++;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        this.notifyAll();//唤醒其他线程
    }

    //如果number!=0,则number-1
    public synchronized void minor() throws InterruptedException {
        if (number == 0)
            this.wait();
        number--;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        this.notifyAll();//唤醒其他线程
    }
}

结果:
image-20210321141752317

虚假唤醒问题

如果再加两个线程呢?

public static void main(String[] args) {
    Data data = new Data();
    new Thread(() -> {
        for (int i = 0; i < 5; i++) {
            try {
                data.plus();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "A").start();
    new Thread(() -> {
        for (int i = 0; i < 5; i++) {
            try {
                data.minor();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "B").start();
    new Thread(() -> {
        for (int i = 0; i < 5; i++) {
            try {
                data.plus();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "C").start();
    new Thread(() -> {
        for (int i = 0; i < 5; i++) {
            try {
                data.minor();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "D").start();
}
}

看结果,会出现2、3的情况;这就是因为if判断只判断一次,两个线程可能同时+1;造成了虚假唤醒的问题
image-20210321142530487
image-20210321142942399
修改代码:if判断改成while判断

class Data {
    private int number = 0;

    //如果number=0,则number+1
    public synchronized void plus() throws InterruptedException {
        while (number != 0)
            this.wait();//等待
        number++;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        this.notifyAll();//唤醒其他线程
    }

    //如果number!=0,则number-1
    public synchronized void minor() throws InterruptedException {
        while (number == 0)
            this.wait();
        number--;
        System.out.println(Thread.currentThread().getName() + "=>" + number);
        this.notifyAll();//唤醒其他线程
    }
}

4.2、Lock实现

image-20210321143908744
修改代码:

package com.zsr;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class communicate2 {
    public static void main(String[] args) {
        Data2 data = new Data2();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.plus();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.minor();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.plus();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.minor();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "D").start();
    }
}

class Data2 {
    private int number = 0;
    //创建Lock锁
    private Lock lock = new ReentrantLock();
    //获得condition
    Condition condition = lock.newCondition();

    //如果number=0,则number+1
    public void plus() throws InterruptedException {
        lock.lock();//加锁
        try {
            while (number != 0)
                condition.await();//等待
            number++;
            System.out.println(Thread.currentThread().getName() + "=>" + number);
            condition.signalAll();//唤醒其他线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();//解锁
        }
    }

    //如果number!=0,则number-1
    public void minor() throws InterruptedException {
        lock.lock();
        try {
            while (number == 0)
                condition.await();
            number--;
            System.out.println(Thread.currentThread().getName() + "=>" + number);
            condition.signalAll();//唤醒其他线程
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

image-20210321145522108
根据结果,成功实现!但是发现一个问题,线程的执行都是随机的,怎么进行有序的实现呢?A=>B=>C=>D

Condition精准通知唤醒

可以设置多个condition监视器,每个监视器监视一个线程,精确等待唤醒某个线程

package com.zsr;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class communicate3 {
    public static void main(String[] args) {
        Data3 data = new Data3();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.plus();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.minor();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.plus();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "C").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    data.minor();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "D").start();
    }
}

class Data3 {
    private int number = 0;
    //创建Lock锁
    private Lock lock = new ReentrantLock();
    //获得condition
    Condition conditionA = lock.newCondition();
    Condition conditionB = lock.newCondition();
    Condition conditionC = lock.newCondition();
    Condition conditionD = lock.newCondition();

    //如果number=0,则number+1
    public void plus() throws InterruptedException {
        lock.lock();//加锁
        try {
            if (Thread.currentThread().getName()=="A") {
                while (number != 0)
                    conditionA.await();//A等待
                number++;
                System.out.println(Thread.currentThread().getName() + "=>" + number);
                conditionB.signal();//唤醒B线程
            }
            if (Thread.currentThread().getName()=="C") {
                while (number != 0)
                    conditionC.await();//C等待
                number++;
                System.out.println(Thread.currentThread().getName() + "=>" + number);
                conditionD.signal();//唤醒D线程
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();//解锁
        }
    }

    //如果number!=0,则number-1
    public void minor() throws InterruptedException {
        lock.lock();
        try {
            if (Thread.currentThread().getName()=="B") {
                while (number == 0)
                    conditionB.await();//B等待
                number--;
                System.out.println(Thread.currentThread().getName() + "=>" + number);
                conditionC.signal();//唤醒C线程
            }
            if (Thread.currentThread().getName()=="D") {
                while (number == 0)
                    conditionD.await();//D等待
                number--;
                System.out.println(Thread.currentThread().getName() + "=>" + number);
                conditionA.signal();//唤醒A线程
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

结果:
image-20210321151156051



五、8个案例彻底理解锁的对象

如何判断锁的是谁!永远的知道什么锁,锁到底锁的是谁(对象、 Class)

创建两个线程A、B,A线程执行发短信方法,B线程执行打电话方法,谁先执行?

package com.zsr.lock8;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        Phone phone = new Phone();
        //线程A发消息
        new Thread(() -> {
            phone.send_message();
        }, "A").start();
        
        //休眠1s
        TimeUnit.SECONDS.sleep(1);
        
        //线程B打电话
        new Thread(() -> {
            phone.call();
        }, "B").start();
    }
}

class Phone {
    public synchronized void send_message() {
        System.out.println("发短信");
    }

    public synchronized void call() {
        System.out.println("打电话");
    }
}

结果:先发短信再打电话
image-20210321190231935

那如果让发短信休眠2s呢?

package com.zsr.lock8;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        Phone phone = new Phone();
        new Thread(() -> {
            try {
                phone.send_message();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();
        //休眠1s
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            phone.call();
        }, "B").start();
    }
}

class Phone {
    public synchronized void send_message() throws InterruptedException {
        //休眠2s
        TimeUnit.SECONDS.sleep(2);
        System.out.println("发短信");
    }

    public synchronized void call() {
        System.out.println("打电话");
    }
}

结果:还是先发短信再打电话
image-20210321190405716
这是为什么呢?并不是因为A先执行,而是有锁的存在

  • synchronized锁的对象是方法的调用者,也就是phone对象,因此打电话和发短信方法锁的是同一个对象,谁先拿到锁谁就先执行

如果新增一个普通方法hello,那先是hello还是发短信呢?

package com.zsr.lock8;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        Phone phone = new Phone();
        new Thread(() -> {
            try {
                phone.send_message();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();
        //休眠1s
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            phone.hello();
        }, "B").start();
    }
}

class Phone {
    public synchronized void send_message() throws InterruptedException {
        //休眠2s
        TimeUnit.SECONDS.sleep(2);
        System.out.println("发短信");
    }

    public synchronized void call() {
        System.out.println("打电话");
    }

    public void hello() {
        System.out.println("hello");
    }
}

image-20210321193511631
根据结果,先执行hello,这是因为hello是一个普通方法,没有锁,不受锁的影响

如果有两个phone对象,是先发短信还是先打电话

package com.zsr.lock8;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        Phone phone1 = new Phone();
        Phone phone2 = new Phone();
        new Thread(() -> {
            try {
                phone1.send_message();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();
        //休眠1s
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            phone2.call();
        }, "B").start();
    }
}

class Phone {
    public synchronized void send_message() throws InterruptedException {
        //休眠2s
        TimeUnit.SECONDS.sleep(2);
        System.out.println("发短信");
    }

    public synchronized void call() {
        System.out.println("打电话");
    }
}

两个对象,两个调用者,所以有两把锁,所以按时间顺序执行

image-20210321194003558

修改两个方法为静态方法,只有一个对象,是先打电话还是发短信

package com.zsr.lock8;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        Phone phone = new Phone();
        new Thread(() -> {
            try {
                phone.send_message();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();
        //休眠1s
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            phone.call();
        }, "B").start();
    }
}

class Phone {
    public static synchronized void send_message() throws InterruptedException {
        //休眠2s
        TimeUnit.SECONDS.sleep(2);
        System.out.println("发短信");
    }

    public static synchronized void call() {
        System.out.println("打电话");
    }
}

根据结果,先发短信
image-20210321194256128
因为static静态方法在类加载的时候就有了,因此这里synchronized锁的是Class模板,也就是Phone.class,全局唯一;也就是两个方法拿的仍是同一把锁

那如果两个方法为静态方法,有两个对象,是先打电话还是发短信?

package com.zsr.lock8;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        Phone phone1 = new Phone();
        Phone phone2 = new Phone();
        new Thread(() -> {
            try {
                phone1.send_message();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();
        //休眠1s
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            phone2.call();
        }, "B").start();
    }
}

class Phone {
    public static synchronized void send_message() throws InterruptedException {
        //休眠2s
        TimeUnit.SECONDS.sleep(2);
        System.out.println("发短信");
    }

    public static synchronized void call() {
        System.out.println("打电话");
    }
}

根据结果,仍是先发短信
image-20210321194617283
因为锁的是Phone.class,也就是说两个方法仍是同一把锁

如果是一个普通的同步方法和一个静态的同步方法,只有一个对象

package com.zsr.lock8;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        Phone phone = new Phone();
        new Thread(() -> {
            try {
                phone.send_message();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();
        //休眠1s
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            phone.call();
        }, "B").start();
    }
}

class Phone {
    public static synchronized void send_message() throws InterruptedException {
        //休眠2s
        TimeUnit.SECONDS.sleep(2);
        System.out.println("发短信");
    }

    public synchronized void call() {
        System.out.println("打电话");
    }
}

结果:
image-20210322095928191
一个锁的是Phone.class模板,一个锁的是phone对象,因此两个方法不是一把锁,因此按时间顺序运行

如果是一个普通的同步方法和一个静态的同步方法,有两个对象

package com.zsr.lock8;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        Phone phone1 = new Phone();
        Phone phone2 = new Phone();
        new Thread(() -> {
            try {
                phone1.send_message();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();
        //休眠1s
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            phone2.call();
        }, "B").start();
    }
}

class Phone {
    public static synchronized void send_message() throws InterruptedException {
        //休眠2s
        TimeUnit.SECONDS.sleep(2);
        System.out.println("发短信");
    }

    public synchronized void call() {
        System.out.println("打电话");
    }
}

同样两个方法不是一把锁,因此按时间顺序运行
image-20210322100003462



六、安全集合类

image-20210322100452113

6.1、CopyOnWriteArrayList

ArrayList不安全

并发情况下,ArrayList不安全,我们通过一个简单的案例来测试:

package com.zsr.collection;

import java.util.ArrayList;
import java.util.UUID;

public class ListTest {
    public static void main(String[] args) {
        //并发情况下
        ArrayList<String> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(list);
            }, String.valueOf(i)).start();
        }
    }
}


根据结果,发现报错了ConcurrentModificationException(并发修改异常)

那么怎么解决呢?

  • 方案一:换成线程安全的vector

    Vector<String> lists = new Vector<>();
    
  • 方案二:利用Collections工具类

    List<String> list = Collections.synchronizedList(new ArrayList<>());
    
  • 方案三:利用JUC包中的类CopyOnWriteArrayList

    CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
    

引入CopyOnWriteArrayList

image-20210322141229600

  • CopyOnWrite简称COW,是计算机程序设计领域的一种优化策略
  • 多个线程调用的时候,读取的时候固定,但写入时会复制,避免写入造成的数据覆盖问题

其效率比Vector更高,因为Vector在方法上都用了Synchronized关键字,会降低效率

CopyOnWriteArratList是用Lock锁实现的,底层也是数组实现,不过添加的时候会先拷贝一份新的数组,最后再拷贝回去
image-20210322134138150
image-20210322134213815


6.2、CopyOnWriteArraySet

HashSet不安全

并发情况下,HashSet不安全,我们通过一个简单的案例来测试:

package com.zsr.collection;

import java.util.HashSet;
import java.util.UUID;

public class SetTest {
    public static void main(String[] args) {
        //并发情况下
        HashSet<String> set = new HashSet<>();
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                set.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(set);
            }, String.valueOf(i)).start();
        }
    }
}

同样,ConcurrentModificationException:并发修改异常
image-20210322134948437
那么怎么解决呢?

  • 方案一:利用Collections工具类

    Set<String> set = Collections.synchronizedSet(new HashSet<String>());
    
  • 方案三:利用JUC包中的类CopyOnWriteArrayList

    CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
    

引入CopyOnWriteArraySet

image-20210322141215322

  • CopyOnWrite简称COW,是计算机程序设计领域的一种优化策略
  • 多个线程调用的时候,读取的时候固定,但写入时会复制,避免写入造成的数据覆盖问题

CopyOnWriteArratSet同样是用Lock锁实现的,底层也是数组实现,不过添加的时候会先拷贝一份新的数组,最后再拷贝回去
image-20210322135648975
image-20210322135621456


6.3、ConcurrentHashMap

HashMap不安全

并发情况下,HashMap不安全,我们通过一个简单的案例来测试:

package com.zsr.collection;

import java.util.HashMap;
import java.util.UUID;

public class MapTest {
    public static void main(String[] args) {
        //并发情况下
        HashMap<String, String> map = new HashMap<>();
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 8));
                System.out.println(map);
            }, String.valueOf(i)).start();
        }
    }
}

同样,ConcurrentModificationException:并发修改异常
image-20210322140807339
那么怎么解决呢?

  • 方案一:利用Collections工具类

    Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());
    
  • 方案三:利用JUC包中的类ConcurrentHashMap

    ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
    

引入ConcurrentHashMap

image-20210322141145100
ConcurrentHashMap融合了hashtablehashmap二者的优势

但是hashtable每次同步执行的时候都要锁住整个结构。看下图:

image-20210414145603675
ConcurrentHashMap正是为了解决这个问题而诞生的,其锁的方式是稍微细粒度的,引入了分段锁的概念;

  • 可以理解为把一个大的Map拆分成N(默认为16)个小的HashTable,根据key.hashCode()来决定把key放到哪个HashTable中。

  • 在ConcurrentHashMap中,就是把Map分成了N个Segment,put和get的时候,都是现根据key.hashCode()算出放到哪个Segment中:

通过把整个Map分为N个Segment(类似HashTable),可以提供相同的线程安全;原来只能一个线程进入,现在却能同时16个写线程进入(写线程才需要锁定,而读线程几乎不受限制),并发性的提升是显而易见的



七、Callable

image-20210322144924183

image-20210323075320880

7.1、同Runnable的区别

image-20210323075243997

  • 可以有返回值
  • 可以抛出异常
  • 重写call()方法而不是run()

7.2、怎么实现

实现Runnable接口时,我们通过Thread.start()进行启动,因为Thread的构造方法可以传入Runnable对象
image-20210323080136708
那么怎么实现Callable呢?我们无法直接通过Thread进行启动,但是我们可以通过Runnable间接的启动
image-20210323080306487
查看帮助文档,可以看到Runnable接口有一个FutureTask启动类,我们点进去看看
image-20210323080439782
可以看到,它有两个构造方法,分别可以传入CallableRunnable对象,这就将两者联系了起来
image-20210323081206052
于事我们就可以通过Thread来启动Callable接口的实现类

package com.zsr;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class dome1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        myThread myThread = new myThread();
        FutureTask<String> futureTask = new FutureTask<String>(myThread);
        new Thread(futureTask).start();
        System.out.println(futureTask.get());//获取call方法返回值
    }
}

class myThread implements Callable<String> {

    @Override
    public String call() throws Exception {
        System.out.println("调用call方法");
        return "call方法执行完成";
    }
}

image-20210323080953589
注意

  • 通过FutureTask的get()获取call方法的返回值,该方法可能会产生阻塞(可能返回结果需要大量的计算,很耗时),一般情况下将其放在最后一行或者使用异步通信来处理

  • FutureTask任务多线程并发访问时为啥只会被执行一次
    image-20210323081830058



八、三大常用辅助类

8.1、CountDownLatch

就是一个减法计数器

image-20210323082713513

image-20210323083302699

package com.zsr.countDown;

import java.util.concurrent.CountDownLatch;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        //创建一个计数器,初始化为6
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "线程执行结束");
                countDownLatch.countDown();//计数器-1
            }, String.valueOf(i)).start();
        }
        countDownLatch.await();//等待计数器归0再往下执行
        System.out.println("所有线程执行完毕");
    }
}

如果不加countDownLatch.await()
image-20210323083117066
加了之后
image-20210323083141410

8.2、CyclicBarrier

相当于加法计数器

image-20210323083357327

image-20210323083929351

package com.zsr.Cyclic;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Test {
    //模拟集齐7课龙珠召唤神龙
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("召唤神龙!");
        });
        for (int i = 1; i <= 7; i++) {
            final int temp = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "收集了第" + temp + "颗龙珠");
                try {
                    cyclicBarrier.await();//计数不断+1,直到为7
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

8.3、Semaphore

计数信号量

image-20210323084317870

  • semaphore.acquire(),获得许可正,如果许可证已经满了,等待其他线程释放许可证
  • semaphore.release(),释放许可证

作用:多个共享资源互斥使用,并发限流,控制最大的线程数

package com.zsr.Sema;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) {
        //模拟3个车位,6辆车要停车
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();//获取许可
                    System.out.println(Thread.currentThread().getName() + "抢到车位");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + "离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();//释放许可
                }
            }, String.valueOf(i)).start();
        }
    }
}

image-20210323084704901



九、读写锁

image-20210325230129179
代码测试:定义一个缓存区用于读写操作,然后启动5个线程分别进行读和写,测试

  • 首先测试不加锁的情况下

    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    public class ReadWriteLockDemo {
        public static void main(String[] args) {
            UnlockCache unlockCache = new UnlockCache();
            //5个线程写入
            for (int i = 1; i <= 5; i++) {
                final int temp = i;
                new Thread(() -> {
                    unlockCache.put(temp, temp);
                }, String.valueOf(i)).start();
            }
            //5个线程读出
            for (int i = 1; i <= 5; i++) {
                final int temp = i;
                new Thread(() -> {
                    unlockCache.get(temp);
                }, String.valueOf(i)).start();
            }
        }
    }
    
    //不加锁的
    class UnlockCache {
        private volatile Map<Integer, Object> map = new HashMap<>();
    
        //写入
        public void put(int key, Object value) {
            System.out.println("开始写入" + key);
            map.put(key, value);
            System.out.println(key + "写入完成");
        }
    
        //读出
        public void get(int key) {
            System.out.println("开始读取" + key);
            map.get(key);
            System.out.println(key + "读取完毕");
        }
    }
    

    image-20210325233019529
    根据结果,可以看到写入时被插队,这是不允许的!

  • 加读写锁,实现只能同时有一个线程写,多个线程读

    也就是写锁为独占锁(一次只能被一个线程占有),读锁为共享锁(多个线程可以同时占有)

    • 读-读:可共存
    • 读-写:不可共存
    • 写-写:不可共存
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    public class ReadWriteLockDemo {
        public static void main(String[] args) {
            LockCache lockCache = new LockCache();
            //5个线程写入
            for (int i = 1; i <= 5; i++) {
                final int temp = i;
                new Thread(() -> {
                    lockCache.put(temp, temp);
                }, String.valueOf(i)).start();
            }
            //5个线程读出
            for (int i = 1; i <= 5; i++) {
                final int temp = i;
                new Thread(() -> {
                    lockCache.get(temp);
                }, String.valueOf(i)).start();
            }
        }
    }
    
    //加锁:实现同时只能有一个线程写,多个线程读
    class LockCache {
        private volatile Map<Integer, Object> map = new HashMap<>();
        //读写锁:实现只能有一个线程写,多个线程写,更细粒度的控制
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    
        //写入:只能有一个线程写
        public void put(int key, Object value) {
            readWriteLock.writeLock().lock();
            try {
                System.out.println("开始写入" + key);
                map.put(key, value);
                System.out.println(key + "写入完成");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readWriteLock.writeLock().unlock();
            }
        }
    
        //读出:可以多个线程读
        public void get(int key) {
            readWriteLock.readLock().lock();
            try {
                System.out.println("开始读取" + key);
                map.get(key);
                System.out.println(key + "读取完毕");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readWriteLock.readLock().unlock();
            }
        }
    }
    

    image-20210325233255584
    根据结果,我们实现了写入时不能被插队,但是读取可以多个线程读取



十、阻塞队列

image-20210325234511948

10.1、关系图

BlockingQueue关系图:

image-20210325235643917

队列阻塞

image-20210325235738373

10.2、ArrayBlockingQueue四组API

方式抛出异常有返回值、不抛出异常阻塞等待限时等待
添加add()offer()put()offer( , )
移除remove()poll()take()poll( , )
判断队列首element()peek()\\

抛出异常(add、remove、element)

//抛出异常
public static void test1() {
    //初始化阻塞队列大小为3
    ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
    System.out.println(blockingQueue.add(1));
    System.out.println(blockingQueue.add(2));
    System.out.println(blockingQueue.add(3));

    //如果添加第四个元素,则发生异常java.lang.IllegalStateException: Queue full
    System.out.println(blockingQueue.add(4));
}

image-20210409190803875

//抛出异常
public static void test1() {
    //初始化阻塞队列大小为3
    ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
    System.out.println(blockingQueue.add(1));
    System.out.println(blockingQueue.add(2));
    System.out.println(blockingQueue.add(3));
    System.out.println(blockingQueue.remove());
    System.out.println(blockingQueue.remove());
    System.out.println(blockingQueue.remove());

    //如果取出第四个元素,则发生异常java.util.NoSuchElementException
    System.out.println(blockingQueue.remove());
}

image-20210409190850740

//抛出异常
public static void test1() {
    //初始化阻塞队列大小为3
    ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
    System.out.println(blockingQueue.add(1));
    System.out.println(blockingQueue.add(2));
    System.out.println(blockingQueue.add(3));

    System.out.println(blockingQueue.remove());
    System.out.println(blockingQueue.remove());
    System.out.println(blockingQueue.remove());

    //如果取出队首元素,则发生异常java.util.NoSuchElementException
    System.out.println(blockingQueue.element());
}

image-20210409191247343

不跑出异常(offer、poll、pick)

//不抛出异常
public static void test1() {
    //初始化阻塞队列大小为3
    ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
    System.out.println(blockingQueue.offer(1));
    System.out.println(blockingQueue.offer(2));
    System.out.println(blockingQueue.offer(3));

    //如果添加第四个元素,返回false
    System.out.println(blockingQueue.offer(4));

    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());

    //如果取出第四个元素,返回null
    System.out.println(blockingQueue.poll());

    //如果取出队首元素,返回null
    System.out.println(blockingQueue.peek());
}

等待阻塞(put、take)

//等待阻塞
public static void test1() throws InterruptedException {
    //初始化阻塞队列大小为3
    ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
    blockingQueue.put(1);
    blockingQueue.put(2);
    blockingQueue.put(3);

    //如果添加第四个元素,程序阻塞
    blockingQueue.put(4);

    System.out.println(blockingQueue.take());
    System.out.println(blockingQueue.take());
    System.out.println(blockingQueue.take());

    //如果取出第四个元素,程序阻塞
    blockingQueue.take();
}

限时等待()

//限时等待
public static void test1() throws InterruptedException {
    //初始化阻塞队列大小为3
    ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
    System.out.println(blockingQueue.offer(1));
    System.out.println(blockingQueue.offer(2));
    System.out.println(blockingQueue.offer(3));

    //如果添加第四个元素,等待2s后程序结束
    System.out.println(blockingQueue.offer(4, 2, TimeUnit.SECONDS));

    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());

    //如果取出第四个元素,等待2s后程序结束
    System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
}

10.3、SynchronousQueue

image-20210409200508319

一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。

public static void main(String[] args) throws InterruptedException {
    SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
    //线程T1,存元素
    new Thread(() -> {
        try {
            System.out.println(Thread.currentThread().getName() + " put 1");
            synchronousQueue.put("1");
            System.out.println(Thread.currentThread().getName() + " put 2");
            synchronousQueue.put("2");
            System.out.println(Thread.currentThread().getName() + " put 3");
            synchronousQueue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "T1").start();
    //线程T2,取元素
    new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
            TimeUnit.SECONDS.sleep(3);
            System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
            TimeUnit.SECONDS.sleep(3);
            System.out.println(Thread.currentThread().getName() + " take " + synchronousQueue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "T2").start();
}


十一、线程池

池化技术:程序的运行就会占用系统资源就会占用系统资源,为了优化资源的使用,就引入了池化技术,事先准备好一些资源,需要使用就来取,用完即放回;例如 线程池、连接池、内存池、对象池

线程池的好处:线程复用、可以控制最大并发数、管理线程

  • 降低资源消耗
  • 提高响应速度
  • 方便管理

11.1、创建线程池三大方法

如何创建线程池呢?java.util.concurrent中提供了Executors

image-20210412140530673
其中有一些静态方法用于创建线程池
image-20210412140902077

newSingleThreadExecutor

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {
    public static void main(String[] args) {
        //创建单一线程池,只有一个线程执行
        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
        //创建线程池,可指定固定线程数量同时执行
        for (int i = 0; i < 20; i++) {
            singleThreadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " ok");

            });
        }
        //线程池用完,关闭线程池
        singleThreadPool.shutdown();
    }
}

image-20210409214330526

newFixedThreadPool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {
    public static void main(String[] args) {
        //创建线程池,可指定固定线程数量同时执行
        ExecutorService fixThreadPool = Executors.newFixedThreadPool(5);
        //使用线程池创建线程
        for (int i = 0; i < 20; i++) {
            fixThreadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " ok");

            });
        }
        //线程池用完,关闭线程池  
        fixThreadPool.shutdown();
    }
}

image-20210409220213831

newCachedThreadPool

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程, 那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPool {
    public static void main(String[] args) {
        //创建可伸缩的线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        //使用线程池创建线程
        for (int i = 0; i < 20; i++) {
            cachedThreadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " ok");

            });
        }
        //线程池用完,关闭线程池
        cachedThreadPool.shutdown();
    }
}

image-20210409220319946

11.2、七大参数

查看newSingleThreadExecutornewFixedThreadPoolnewCachedThreadPool的源码,可以发现本质上就是创建了一个ThreadPoolExcutor对象

image-20210409232955804

image-20210409233027397
image-20210409233052778
再查看ThreadPoolExecutor的源码,可以看到七大参数

public ThreadPoolExecutor(int corePoolSize,	//核心线程池大小
                          int maximumPoolSize,	//最大线程池大小
                          long keepAliveTime,	//存活时间(超时未调用则释放)
                          TimeUnit unit,	//超时单位
                          BlockingQueue<Runnable> workQueue,	//阻塞队列
                          ThreadFactory threadFactory,	//线程工程:创建线程,一般默认不改动
                          RejectedExecutionHandler handler)	//拒绝策略
{
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

各种参数的含义好比银行办理业务,corePoolSize是已经开放的服务窗口,BlockingQueue是候客区,假设人流量非常大,就需要多开放几个服务窗口,maximumPoolSize就是最大开放的服务窗口数;再假如很少有人办理业务,过了一定的时间就关闭窗口,keepAliveTime就是要关闭窗口的事件;RejectedExecutionHandler拒绝策略就好比银行满了,再来人就不让进了

可以看到newCachedThreadPool的核心线程池大小设置未0,最大线程池大小设置为Integer.MAX_VALUE,约等于21亿;也就是说通过Executors.newCachedThreadPool()创建的线程池可以支持并发的线程数介于0~21亿之间,这是十分耗费资源的

因此,阿里巴巴官方手册有以下规定:
image-20210409233957716

11.3、四种拒绝策略

image-20210410002637368
image-20210410002432207
我们来自定义一个线程池,拒绝策略为AbortPolicy

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {
    public static void main(String[] args) {
        //自定义线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        //最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常
        for (int i = 1; i <= 8; i++) {
            threadPool.execute(() -> {
                        System.out.println(Thread.currentThread().getName() + " ok");
                    }
            );
        }
    }
}

image-20210410001714211
如果i<=9,超过了最大承载,则会抛出异常
image-20210410001755571
修改拒绝策略为CallerRunsPolicy:可以看到没有抛出异常,而是由main线程处理

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {
    public static void main(String[] args) {
        //自定义线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        //最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常
        for (int i = 1; i <= 9; i++) {
            threadPool.execute(() -> {
                        System.out.println(Thread.currentThread().getName() + " ok");
                    }
            );
        }
    }
}

image-20210410002758331
修改拒绝策略为DiscardPolicy:可以看到不抛出异常不执行

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {
    public static void main(String[] args) {
        //自定义线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy()
        );
        //最大承载=最大线程池大小+阻塞队列长度=5+3=8 因此如果i最大值设为9则会抛出异常
        for (int i = 1; i <= 9; i++) {
            threadPool.execute(() -> {
                        System.out.println(Thread.currentThread().getName() + " ok");
                    }
            );
        }
    }
}

image-20210410002912020
策略DiscardOldestPolicyDiscardPolicy类似:不抛出异常不执行,但是会尝试竞争

11.4、最大线程数怎么定义?

CPU密集型

电脑的cpu是几核就定义为几,定义为常数换台电脑就不行了

//自定义线程池:CPU密集型
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
    2,
    Runtime.getRuntime().availableProcessors(),
    3,
    TimeUnit.SECONDS,
    new LinkedBlockingDeque<>(3),
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.DiscardOldestPolicy()
);

IO密集型

大于 判断程序中耗费IO的线程 即可



十二、四大函数式接口

新时代的程序员:lambda表达式、链式编程、函数式接口、Stream流式计算

什么是函数式接口

函数式接口(Functional Interface)是jdk8引入的,有且仅有一个抽象方法,但是可以有多个非抽象方法的接口。并且这类接口使用了@FunctionalInterface进行注解,函数式接口可以被隐式转换为 lambda 表达式

JDK 1.8 之前已有的函数式接口:

  • java.lang.Runnable

    @FunctionalInterface
    public interface Runnable {
        public abstract void run();
    }
    
  • java.util.concurrent.Callable

  • java.security.PrivilegedAction

  • java.util.Comparator

  • java.io.FileFilter

  • java.nio.file.PathMatcher

  • java.lang.reflect.InvocationHandler

  • java.beans.PropertyChangeListener

  • java.awt.event.ActionListener

  • javax.swing.event.ChangeListener

JDK 1.8 新增加的函数接口:

  • java.util.function

    image-20210410111741105

这个package中的接口大致分为了以下四类:

  • Function:接收参数,并返回结果,主要方法 R apply(T t)

  • Consumer:接收参数,无返回结果, 主要方法为 void accept(T t)

    // forEach的参数就是消费者类型函数式接口Consumer
    @Override
    public void forEach(Consumer<? super E> action) {
        Objects.requireNonNull(action);
        final int expectedModCount = modCount;
        final Object[] es = elementData;
        final int size = this.size;
        for (int i = 0; modCount == expectedModCount && i < size; i++)
            action.accept(elementAt(es, i));
        if (modCount != expectedModCount)
            throw new ConcurrentModificationException();
    }
    
  • Supplier:不接收参数,但返回结构,主要方法为 T get()

  • Predicate:接收参数,x返回boolean值,主要方法为 boolean test(T t)

12.1、Function

函数式接口:有参数且需要返回值

@FunctionalInterface
public interface Function<T, R> {
    //有参数且有返回值
    R apply(T t);

    default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
        Objects.requireNonNull(before);
        return (V v) -> apply(before.apply(v));
    }

    default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
        Objects.requireNonNull(after);
        return (T t) -> after.apply(apply(t));
    }

    static <T> Function<T, T> identity() {
        return t -> t;
    }
}

实现一个Function接口

package 函数式接口;

import java.util.function.Function;

public class Demo {
    public static void main(String[] args) {
        //匿名内部类,没有类的名称
        Function function1 = new Function<String, String>() {
            @Override
            public String apply(String str) {
                return str;
            }
        };
        //修改为lambda表达式
        Function function2 = (str) -> {
            return str;
        };
        System.out.println(function1.apply("hello"));
        System.out.println(function2.apply("hello lambda"));
    }
}

image-20210410113447834

12.2、Predicate

判断型接口:有参数,返回值为布尔型

@FunctionalInterface
public interface Predicate<T> {
    //有参数,返回值为布尔型
    boolean test(T t);
 	 
    default Predicate<T> and(Predicate<? super T> other) {
        Objects.requireNonNull(other);
        return (t) -> test(t) && other.test(t);
    }

    default Predicate<T> negate() {
        return (t) -> !test(t);
    }

    default Predicate<T> or(Predicate<? super T> other) {
        Objects.requireNonNull(other);
        return (t) -> test(t) || other.test(t);
    }

    static <T> Predicate<T> isEqual(Object targetRef) {
        return (null == targetRef)
                ? Objects::isNull
                : object -> targetRef.equals(object);
    }

    @SuppressWarnings("unchecked")
    static <T> Predicate<T> not(Predicate<? super T> target) {
        Objects.requireNonNull(target);
        return (Predicate<T>)target.negate();
    }
}

实现一个Predicate接口

package 函数式接口;

import java.util.function.Predicate;

public class Demo1 {
    public static void main(String[] args) {
        //匿名内部类,没有类的名称
        Predicate<String> predicate1 = new Predicate<>() {
            @Override
            public boolean test(String s) {
                return s.isEmpty();//判断字符串是否为空
            }
        };
        //修改为lambda
        Predicate<String> predicate2 = (s) -> {
            return s.isEmpty();
        };
        System.out.println(predicate1.test("hello"));
        System.out.println(predicate2.test(""));
    }
}

image-20210410125206650

12.3、Consumer

消费性接口:没有返回值,有参数

@FunctionalInterface
public interface Consumer<T> {
	//没有返回值,有参数
    void accept(T t);

    default Consumer<T> andThen(Consumer<? super T> after) {
        Objects.requireNonNull(after);
        return (T t) -> { accept(t); after.accept(t); };
    }
}

实现Consumer接口

package 函数式接口;

import java.util.function.Consumer;

public class Demo2 {
    public static void main(String[] args) {
        //匿名内部类,没有类的名称
        Consumer<String> consumer1 = new Consumer<>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        };
        //修改为lambda
        Consumer consumer2 = (s) -> {
            System.out.println(s);
        };
        consumer1.accept("hello1");
        consumer1.accept("hello2");
    }
}

image-20210410125950617

12.4、Supplier

供给型接口:无参数,指定返回值类型

@FunctionalInterface
public interface Supplier<T> {
    //无参数,指定返回值类型
    T get();
}

实现Supplier接口

package 函数式接口;

import java.util.function.Supplier;

public class Demo3 {
    public static void main(String[] args) {
        //匿名内部类,没有类的名称
        Supplier<String> supplier1 = new Supplier<>() {
            @Override
            public String get() {
                return "hello1";
            }
        };
        //修改为lambda
        Supplier supplier2 = () -> {
            return "hello2";
        };
        System.out.println(supplier1.get());
        System.out.println(supplier2.get());
    }
}

image-20210410130339781



十三、Stream流式计算

13.1、什么是Stream流式计算

大数据时代分为存储+计算,存储交给数据库、集合等来处理,计算就交给流来做

Java中就提供了java.util.stream用于流式计算
image-20210410133603700

13.2、案例测试

package 流式计算;

import java.util.Arrays;
import java.util.List;

public class Test {
    public static void main(String[] args) {
        User user1 = new User("a", 1, 18);
        User user2 = new User("b", 2, 19);
        User user3 = new User("c", 3, 21);
        User user4 = new User("d", 4, 30);
        User user5 = new User("e", 5, 28);
        User user6 = new User("f", 6, 27);
        //list来存储数据
        List<User> users = Arrays.asList(user1, user2, user3, user4, user5, user6);
        //stream来计算
        //筛选出以下条件的用户:
        //1.id为偶数
        //2.age>23
        //3.name转换为大写字母
        //4.name字母倒序排序
        //5.只输出一个用户
        users.stream()
                .filter(user -> {//判断型接口Predicate
                    return user.getId() % 2 == 0;
                })
                .filter(user -> {//判断型接口Predicate
                    return user.getAge() > 23;
                })
                .map(user -> {//函数式接口Function
                    return user.getName().toUpperCase();
                })
                .sorted((u1, u2) -> {//Comparator函数式接口
                    return u2.compareTo(u1);
                })
                .limit(1)
                .forEach(System.out::println);
    }
}


十四、ForkJoin

14.1、什么是ForkJoin

ForkJoin出现于jdk1.7,用于并行执行任务 ,提高效率,用于大数据量的情况(分支合并)

大数据:Map Reduce——将大任务拆分成小任务

image-20210410194839704

14.2、ForkJoin的特点

工作窃取:里面维护的都是双端队列
image-20210410195206428

14.3、案例:计算求和任务

我们可以在JUC中找到ForkJoinPool
image-20210410204610944
其中有一个方法,可用于执行一个ForkJoinTask
image-20210410204632095
image-20210410204736063
我们需要返回值,查看RecursiveTask,可以找到compute方法进行计算
image-20210410205524323

1️⃣ 编写任务

package forkjoin;

import java.util.concurrent.RecursiveTask;

//计算任务
public class ForkJoinTask extends RecursiveTask<Long> {
    private long begin;
    private long end;

    public ForkJoinTask(long begin, long end) {
        this.begin = begin;
        this.end = end;
    }

    //计算方法:计算1加到1_0000_0000
    @Override
    protected Long compute() {
        long sum = 0;
        if ((end - begin) < 10000) {//如果差值小于10000则暴力加
            for (long i = begin; i <= end; i++)
                sum += i;
            return sum;
        } else {//数据量大于10000采用forkjoin来计算
            long mid = (begin + end) / 2;
            //任务一
            ForkJoinTask task1 = new ForkJoinTask(begin, mid);
            task1.fork();//拆分任务,将线程压入队列
            //任务二
            ForkJoinTask task2 = new ForkJoinTask(mid + 1, end);
            task2.fork();//拆分任务,将线程压入队列
            return task1.join() + task2.join();
        }
    }
}

2️⃣ 测试比较

package forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.LongStream;

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        test1();
        test2();
        test3();
    }

    //普通遍历方式
    public static void test1() {
        long sum = 0;
        long start = System.currentTimeMillis();
        for (long i = 0; i <= 10_0000_0000; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println("普通遍历法耗时" + (end - start) + "结果为:" + sum);
    }

    //ForkJoin方式
    public static void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask task = new ForkJoinTask(0, 10_0000_0000);
        java.util.concurrent.ForkJoinTask<Long> submit = forkJoinPool.submit(task);
        Long sum = submit.get();
        long end = System.currentTimeMillis();
        System.out.println("通过ForkJoin方式耗时" + (end - start) + "结果为:" + sum);
    }

    //Stream并行流方式
    public static void test3() {
        long start = System.currentTimeMillis();
        long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum);//rangeClosed(]
        long end = System.currentTimeMillis();
        System.out.println("通过Stream并行流方式耗时" + (end - start) + "结果为:" + sum);
    }
}

image-20210410214523018



十五、异步回调

异步回调通常用CompletableFuture
image-20210411132654111
发起两个异步请求,一个有返回结果,一个没有返回结果

package future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

//异步调用异步执行:成功回调/失败回调
public class Demo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //发起一个请求(没有返回值的异步回调)
        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
        });
        completableFuture1.get();//阻塞获取执行结果

        //有返回值的异步回调
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
            return 1024;
        });
        System.out.println(completableFuture2
                .whenComplete((t, u) -> {//成功回调:正确的返回结果
                    System.out.println("t=" + t);
                    System.out.println("u=" + u);
                }).exceptionally((e) -> {//失败回调:错误的返回结果
                    System.out.println(e.getMessage());
                    return 233;
                }).get());
    }
}

image-20210411134850651
如果有返回结果的异步回调报错,就会走失败回调的方法,返回233
image-20210411135011585



十六、理解JMM&Volatile

16.1、请你谈谈对Volatile的理解

Volatile是 JVM 提供的轻量级的同步机制

  1. 保证可见性
  2. 不保证原子性
  3. 禁止指令重排

怎么保证可见性?就需要和JMM挂钩

16.2、什么是JMM

Java内存模型,不存在的东西,是一种概念一种约定

关于JMM同步的约定

  • 线程解锁前,必须立刻把自己的共享变量刷回主存
  • 线程加锁前,必须读取主存中的最新值到工作内存中
  • 加锁和解锁是同一把锁

线程分为:工作内存、主内存

image-20210411135950221

16.3、Volatile特点

Volatile可以保证可见性,不能保证原子性,可以避免指令重排的现象

1. 保证可见性

package jmm;

import java.util.concurrent.TimeUnit;

public class Demo {
    private static int num;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (num == 0) ;
        }, "t").start();
        //main线程休眠1s
        TimeUnit.SECONDS.sleep(1);
        num = 1;
        System.out.println(num);
    }
}

image-20210411141806904

开启一个线程,当num=0时不停的死循环;然后让主线程休眠1s后修改num=1,也就是将主内存中的num修改为1;看到结果程序并没有停止,这是因为t线程并没有拿到主内存中num最新的值,不知套其发生了变化,也就是t线程对main线程的变化不可见

如何解决呢?只需要通过volatile关键字修饰num即可保证其的可见性

image-20210411142303690

可以看到,程序立马停止了

2. 不保证原子性

什么是原子性?也就是一个线程执行的时候不能被打扰分割,要么同时成功要么同时失败

package jmm;

public class Demo2 {
    //通过volatile不能保证原子性
    private volatile static int num;

    public static void add() {
        num++;
    }

    public static void main(String[] args) throws InterruptedException {
        //理论上num=20000
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    add();
                }
            }).start();
        }
        //为了让20000条线程跑完,让main线程进行礼让(这里的2表示java固有的两个线程main和gc)
        while (Thread.activeCount() > 2) {
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName() + ":" + num);
    }
}

image-20210411144731153
根据结果,发现volatile并不保证原子性的操作,为什么不安全呢?

我们反编译看看,可以看到num++一行代码在底层是多行操作,因此不能保证原子性,所以是不安全的

D:\学习\IDEA project\Test\out\production\Test\jmm>javap -c Demo2.class
Compiled from "Demo2.java"
public class jmm.Demo2 {
  public jmm.Demo2();
    Code:
       0: aload_0
       1: invokespecial #1                  // Method java/lang/Object."<init>":()V
       4: return

  public static synchronized void add();
    Code:
       0: getstatic     #7                  // Field num:I
       3: iconst_1
       4: iadd
       5: putstatic     #7                  // Field num:I
       8: return

  public static void main(java.lang.String[]) throws java.lang.InterruptedException;
    Code:
       0: iconst_0
       1: istore_1
       2: iload_1
       3: bipush        20
       5: if_icmpge     29
       8: new           #13                 // class java/lang/Thread
      11: dup
      12: invokedynamic #15,  0             // InvokeDynamic #0:run:()Ljava/lang/Runnable;
      17: invokespecial #19                 // Method java/lang/Thread."<init>":(Ljava/lang/Runnable;)V
      20: invokevirtual #22                 // Method java/lang/Thread.start:()V
      23: iinc          1, 1
      26: goto          2
      29: invokestatic  #25                 // Method java/lang/Thread.activeCount:()I
      32: iconst_2
      33: if_icmple     42
      36: invokestatic  #29                 // Method java/lang/Thread.yield:()V

那如果不通过Lock和Synchronized 怎么保证原子性呢?

可以通过java.util.concurrent.atomic包中的原子类解决原子问题 ,这些类的底层都直接和操作系统挂钩,在内存中修改值,这些类是特殊的存在
image-20210411150212298

package jmm;

import java.util.concurrent.atomic.AtomicInteger;

public class Demo {
    //通过volatile不能保证原子性
    private volatile static AtomicInteger num = new AtomicInteger();

    public static void add() {
        num.getAndIncrement();//并不是简单的+1,而是利用了底层的CAS
    }

    public static void main(String[] args) throws InterruptedException {
        //理论上num=20000
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    add();
                }
            }).start();
        }
        //为了让20000条线程跑完,让main线程进行礼让(这里的2表示java固有的两个线程main和gc)
        while (Thread.activeCount() > 2) {
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName() + ":" + num);
    }
}

image-20210411154037289

3. 禁止指令重排

指令重排:计算机并不是按照我们编写的程序去执行

源代码–》编译器优化代码重排–》指令并行重排–》内存系统重排–》执行

处理器在指令重排的时候,会考虑数据之间的依赖性

int x = 1;	//1
int y = 2;	//2
x = x + 5;	//3
y = x * x;	//4

按我们所期望的执行顺序是1->2->3->4
但实际上可能是2143或者1324,但不可能是4123

指令重排可能会导致一些错误的结果,如下图所示:

image-20210411155952539

使用volatile可以避免指令重排,底层实现是通过 内存屏障 实现的,可以保证特定的操作执行顺序,也可以保证某些变量的内存可见性

image-20210411160615817

十七、彻底玩转单例模式

volatile在单例模式中使用的最多

17.1、饿汉式

package 单例模式;

//饿汉式
public class Hungry {
    private static Hungry hungry = new Hungry();

    //构造器私有
    private Hungry() {
    }

    public static Hungry getInstance() {
        return hungry;
    }
}

优点: static变量会在类装载时初始化,不涉及多个线程访问该对象的问题,可以省略synchronized关键字

缺点:类初始化时就创建了对象,如果只是加载本类,而不是要调用 getinstance(),甚至永远没有调用,则会造成资源浪费!

17.2、懒汉式

package 单例模式;

//懒汉式
public class Lazy {
    private static Lazy lazy;

    private Lazy() {

    }

    public static Lazy getInstance() {
        if (lazy == null)
            lazy = new Lazy();
        return lazy;
    }
}

优点: 延迟加载,真正用的时候才实例化对象,提高了资源的利用率

缺点:存在并发访问的问题,以下测试并发访问情况

package 单例模式;

//懒汉式
public class Lazy {
    private static Lazy lazy;

    private Lazy() {
        System.out.println("创建示例");
    }

    public static Lazy getInstance() {
        if (lazy == null)
            lazy = new Lazy();
        return lazy;
    }

    public static void main(String[] args) {
        //10条线程并发访问下
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                Lazy.getInstance();
            }).start();
        }
    }
}

image-20210411162651485
根据结果,可以看到有5个线程打印了结果,也就说进行了5次初始化,这是非常大的漏洞,出现了并发访问的问题

17.3、双重检测锁式

为了解决懒汉式并发访问的问题,加入了sychronized关键字

package 单例模式;

//双重检测锁式
public class DoubleLock {
    private static DoubleLock doubleLock;

    private DoubleLock() {
        System.out.println("创建示例");
    }

    public static DoubleLock getInstance() {
        if (doubleLock == null) {
            synchronized (Lazy.class) {
                if (doubleLock == null)
                    doubleLock = new DoubleLock();
            }
        }
        return doubleLock;
    }

    public static void main(String[] args) {
        //10条线程并发访问下
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                DoubleLock.getInstance();
            }).start();
        }
    }
}

image-20210411164551307
根据打印结果,解决了并发访问的问题;但是这样仍然会存在问题,因为我们new对象时并不是一个完整的原子性操作,而是分为以下三部:

  1. 分配内存空间
  2. 执行构造方法,初始化对象
  3. 把这个对象指向这个空间

单个线程A执行的情况下可以123按顺序执行,也可能由于指令重排按132执行;但是如果线程A按132顺序执行到3时来了一个线程B,此时该对象已经指向了分配的空间,因此B判断对象不是null,就会直接返回对象,但其实对象并没有进行初始化,就造成了错误

因此指令重排也会导致错误,因此完整的双重检测锁式还加入了Volatile关键字来避免指令重排,完整代码如下:

package 单例模式;

//双重检测锁式
public class DoubleLock {
    private volatile static DoubleLock doubleLock;

    private DoubleLock() {
        System.out.println("创建示例");
    }

    public static DoubleLock getInstance() {
        if (doubleLock == null) {
            synchronized (Lazy.class) {
                if (doubleLock == null)
                    doubleLock = new DoubleLock();
            }
        }
        return doubleLock;
    }
}

17.4、静态内部类式

package 单例模式;

public class InnerClass {
    private InnerClass() {

    }

    //静态内部类里面创建对象
    public static class inner {
        private static final InnerClass innerClass = new InnerClass();
    }

    public static InnerClass getInstance() {
        return inner.innerClass;
    }
}

反射破坏单例模式

package 单例模式;

import java.lang.reflect.Constructor;

//双重检测锁式
public class DoubleLock {
    private volatile static DoubleLock doubleLock;

    private DoubleLock() {
        System.out.println("创建示例");
    }

    public static DoubleLock getInstance() {
        if (doubleLock == null) {
            synchronized (Lazy.class) {
                if (doubleLock == null)
                    doubleLock = new DoubleLock();
            }
        }
        return doubleLock;
    }

    public static void main(String[] args) throws Exception {
        DoubleLock instance1 = doubleLock.getInstance();
        Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);
        constructor.setAccessible(true);
        DoubleLock instance2 = constructor.newInstance();
        System.out.println(instance1);
        System.out.println(instance2);
    }
}

image-20210411233747573
根据结果,看到创建了两个实例,也就是单例模式被破坏,那么怎么解决呢?

可以在私有构造中加锁

package 单例模式;

import java.lang.reflect.Constructor;

//双重检测锁式
public class DoubleLock {
    private volatile static DoubleLock doubleLock;

    private DoubleLock() {
        synchronized (DoubleLock.class){
            if(doubleLock!=null){
                throw new RuntimeException("不要试图使用反射破坏异常");
            }
        }
        System.out.println("创建示例");
    }

    public static DoubleLock getInstance() {
        if (doubleLock == null) {
            synchronized (Lazy.class) {
                if (doubleLock == null)
                    doubleLock = new DoubleLock();
            }
        }
        return doubleLock;
    }

    public static void main(String[] args) throws Exception {
        DoubleLock instance1 = doubleLock.getInstance();
        Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);
        constructor.setAccessible(true);
        DoubleLock instance2 = constructor.newInstance();
        System.out.println(instance1);
        System.out.println(instance2);
    }
}

image-20210411234013067
根据结果,可以看到避免了单例模式的破坏?可是上述两个对象一个是通过单例获取,一个通过反射获取;

那如果两个对象都是通过反射获取呢?

public static void main(String[] args) throws Exception {
    Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);
    constructor.setAccessible(true);
    DoubleLock instance1= constructor.newInstance();
    DoubleLock instance2 = constructor.newInstance();
    System.out.println(instance1);
    System.out.println(instance2);
}

image-20210411234133836
根据结果,可以看到单例模式又被破坏了,创建了两个对象!这种情况如何解决呢?

可以通过红绿灯方法实现,定义一个标志位记录对象是否创建

package 单例模式;

import java.lang.reflect.Constructor;

//双重检测锁式
public class DoubleLock {
    private volatile static DoubleLock doubleLock;

    //标志位
    private static boolean flag = false;

    private DoubleLock() {
        synchronized (DoubleLock.class) {
            if (flag == false)
                flag = true;
            else
                throw new RuntimeException("不要试图使用反射破坏异常");
        }
        System.out.println("创建示例");
    }

    public static DoubleLock getInstance() {
        if (doubleLock == null) {
            synchronized (Lazy.class) {
                if (doubleLock == null)
                    doubleLock = new DoubleLock();
            }
        }
        return doubleLock;
    }

    public static void main(String[] args) throws Exception {
        Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);
        constructor.setAccessible(true);
        DoubleLock instance1 = constructor.newInstance();
        DoubleLock instance2 = constructor.newInstance();
        System.out.println(instance1);
        System.out.println(instance2);
    }
}

image-20210411234412838
可以看到我们通过设置标志位flag再次解决了这个问题,但是一旦被获取了这个关键字,单例模式仍然可以通过反射被破解,如下所示

public static void main(String[] args) throws Exception {
    Constructor<DoubleLock> constructor = DoubleLock.class.getDeclaredConstructor(null);
    Field declaredField = DoubleLock.class.getDeclaredField("flag");
    constructor.setAccessible(true);
    declaredField.setAccessible(true);
    DoubleLock instance1 = constructor.newInstance();
    declaredField.set(instance1, false);//第一个对象创建完毕后将flag改为false
    DoubleLock instance2 = constructor.newInstance();
    System.out.println(instance1);
    System.out.println(instance2);
}

image-20210411234824886
可以看到单例模式再次被破坏;因此为了让程序更加安全,通常对flag关键字进行加密处理

那么到底如何完全的避免反射破坏单例模式呢?我们查看newInstance的源码
image-20210411235142204
可以看到,如果是枚举类型的话,就不能通过反射获取枚举;

因此引入了第5种单例模式

17.5、枚举单例

package 单例模式;

import java.lang.reflect.Constructor;

//enum本质上就是一个Class类
public enum EnumSingle {
    INSTANCE;

    public static void main(String[] args) throws Exception {
        EnumSingle instance1 = EnumSingle.INSTANCE;
        Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(null);
        declaredConstructor.setAccessible(true);
        EnumSingle instance2 = declaredConstructor.newInstance();
        System.out.println(instance1);
        System.out.println(instance2);
    }
}

我们再次通过反射创建对象,根据结果报错没有EnumSingle的空构造方法,这不是我们希望看到的
image-20210412000059691
我们对EnumSingle的class文件进行反编译,可以看到明明有空构造方法
image-20210412000353611
但是执行明明报错没有无参构造,我们使用更专业的反编译工具jad对class文件再进行反编译
image-20210412000629008
可以看到枚举类本质上就是继承了Enum类,本身就是一个Class,而且没有无参构造,而是含两个参数的有参构造,我们修改代码在测试

public static void main(String[] args) throws Exception {
    EnumSingle instance1 = EnumSingle.INSTANCE;
    Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class, int.class);
    declaredConstructor.setAccessible(true);
    EnumSingle instance2 = declaredConstructor.newInstance();
    System.out.println(instance1);
    System.out.println(instance2);
}

image-20210412000837225
这才正确显示了报错的信息:无法反射地创建枚举对象



十八、深入理解CAS

18.1、什么是CAS

CAS 是 compareAndSet 的缩写:比较并交换,是CPU的并发原语

package CAS探究;

import java.util.concurrent.atomic.AtomicInteger;

public class CASDemo {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(2020);//底层是CAS
        /**
         * public final boolean compareAndSet(int expectedValue, int newValue)
         * 期望、更新
         * 如果期望的值达到了就更新,否则不更新
         */
        System.out.println(atomicInteger.compareAndSet(2020, 2021));//返回是否修改成功
        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(2020, 2022));
        System.out.println(atomicInteger.get());
    }
}

image-20210412002500909
我们再来看看 atomicInteger.getAndIncrement() 方法是怎么实现的?我们该方法的源码
image-20210412002756242
可以看到是由U调用了getAndAddInt()方法,而U就是Unsafe类的一个实例

什么是Unsafe

  • Java无法操作内存,只能通过调用C++来操作内存,Unsafe就是Java通过C++操作内存的接口
  • 就类似于Java通过native关键字来调用C++本地方法来和操作系统交互

image-20210412004045789
可以看到,底层是一个do while循环,也就是一个自旋锁

因此:CAS就是比较当前工作内存中的值和主内存中的值,如果这个值是期望的,就执行操作;如果不是,就一直循环,因为底层是一个do while循环(自旋锁)

CAS有三个操作数:

  • 期望的值
  • 比较的值
  • 更新的值

缺点

  • 底层是自旋锁,循环耗时
  • 一次性只能保证一个共享变量的原子性
  • 会存在ABA问题

18.2、ABA问题

比如有两个线程A,B同时向修改A的内容,但是B线程执行速度快,首先cas(1,3)将A修改为3,然后又执行cas(3,1)将A修改为1,这之后线程A再cas(1,2)将A修改为2,但此时A=1已经不是原来的1了;

这就是ABA问题

image-20210412005138299

我们来个代码模拟以下

package CAS探究;

import java.util.concurrent.atomic.AtomicInteger;

public class ABADemo {
    //CAS是compareAndSet的缩写:比较并交换,是CPU的并发原语
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(1);//底层是CAS
        //线程B
        System.out.println(atomicInteger.compareAndSet(1, 3));
        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(3, 1));
        System.out.println(atomicInteger.get());
        //线程A
        System.out.println(atomicInteger.compareAndSet(1, 2));
        System.out.println(atomicInteger.get());
    }
}

image-20210412005623931
可以看到三个结果都为true,但不是我们期望的,我们希望知道谁动过A的值

可以通过类似乐观锁的方案来解决,使用 原子引用类AtomicReference/AtomicStampedReference(带时间)
image-20210412010508978
我们使用AtomicStampedReference测试以下

package CAS探究;

import java.util.concurrent.atomic.AtomicStampedReference;

public class ABADemo {
    //CAS是compareAndSet的缩写:比较并交换,是CPU的并发原语
    public static void main(String[] args) {
        //public AtomicStampedReference(V initialRef, int initialStamp):这里的第二个参数等同于乐观锁的version,初始值设为1
        AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1);
        //线程B,多了连个参数:期望的版本号,更新的版本号
        System.out.println(atomicStampedReference.compareAndSet(1, 3,
                atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
        System.out.println(atomicStampedReference.compareAndSet(3, 1,
                atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1));
        //线程A
        System.out.println(atomicStampedReference.compareAndSet(1, 2, 1, 2));
    }
}

image-20210412011555412
可以看到A成功察觉到了B修改过数据,所以执行失败;和乐观锁原理相同

注意:如果泛型是包装类,注意对象引用问题(正常业务都是对象,这里是使用包装类Integer进行测试)

image-20210412011922276
如果我们的范围不再-128~127,则会失败
image-20210412012249646



十九、各种锁的理解

19.1、乐观锁/悲观锁

悲观锁(Pessimistic Lock)

1️⃣ 简介

​ 当要对数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。这种借助数据库锁机制,在修改数据之前先锁定,再修改的方式被称之为悲观并发控制【Pessimistic Concurrency Control,缩写“PCC”,又名“悲观锁”】

​ 悲观锁,正如其名,具有强烈的独占和排他特性。它指的是对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度。因此,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。

img

之所以叫做悲观锁,是因为这是一种对数据的修改持有悲观态度的并发控制方式。总是假设最坏的情况,每次读取数据的时候都默认其他线程会更改数据,因此需要进行加锁操作,当其他线程想要访问数据时,都需要阻塞挂起。悲观锁的实现:

  1. 传统的关系型数据库使用这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
  2. Java 里面的同步 synchronized 关键字的实现。

2️⃣ 分类

悲观锁主要分为 共享锁排他锁

  • 共享锁【shared locks】又称为读锁,简称S锁。顾名思义,共享锁就是多个事务对于同一数据可以共享一把锁,都能访问到数据,但是只能读不能修改。
  • 排他锁【exclusive locks】又称为写锁,简称X锁。顾名思义,排他锁就是不能与其他锁并存,如果一个事务获取了一个数据行的排他锁,其他事务就不能再获取该行的其他锁,包括共享锁和排他锁,但是获取排他锁的事务是可以对数据行读取和修改。

3️⃣ 说明

​ 悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。但是在效率方面,处理加锁的机制会让数据库产生额外的开销,还有增加产生死锁的机会。另外还会降低并行性,一个事务如果锁定了某行数据,其他事务就必须等待该事务处理完才可以处理那行数据。

乐观锁(Optimistic Locking)

1️⃣ 简介

​ 乐观锁是相对悲观锁而言的,乐观锁假设数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则返回给用户错误的信息,让用户决定如何去做。乐观锁适用于读操作多的场景,这样可以提高程序的吞吐量。
img
​ 乐观锁机制采取了更加宽松的加锁机制。乐观锁是相对悲观锁而言,也是为了避免数据库幻读、业务处理时间过长等原因引起数据处理错误的一种机制,但乐观锁不会刻意使用数据库本身的锁机制,而是依据数据本身来保证数据的正确性。

2️⃣ 实现

  1. CAS实现:Java中java.util.concurrent.atomic包下面的原子变量使用了乐观锁的一种 CAS 实现方式
  2. 版本号控制:一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数。当数据被修改时,version 值会+1。当线程A要更新数据值时,在读取数据的同时也会读取 version 值,在提交更新时,若刚才读取到的 version 值与当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功

3️⃣ 说明

​ 乐观并发控制相信事务之间的数据竞争(data race)的概率是比较小的,因此尽可能直接做下去,直到提交的时候才去锁定,所以不会产生任何锁和死锁

19.2、公平/非公平锁

  • 公平锁:非常公平,不能插队,线程的执行必须先来后到
  • 非公平锁:非常不公平,可以插队,默认都为非公平锁!(比如一个线程3s执行完,一个线程1min执行完,如果使用公平锁严重影响某个线程的效率)
    image-20210321133926406

19.3、可重入锁

可重入锁(递归锁)
image-20210412012855770

代码示例:synchronized版

image-20210412013050474

执行结果:

image-20210412013256865

代码示例:Lock版

image-20210412013522089

19.4、自旋锁

不断的尝试,直到成功为止!

image-20210412014256436

我们来编写一个自旋锁

package 自旋锁;

import java.util.concurrent.atomic.AtomicReference;

//自定义自旋锁
public class SpinLock {
    //锁线程
    AtomicReference<Thread> atomicReference = new AtomicReference<>();

    //加锁
    public void myLock() {
        Thread thread = Thread.currentThread();
        System.out.println(thread.getName() + "==>myLock");
        //自旋锁
        while (!atomicReference.compareAndSet(null, thread)) ;
    }

    //解锁
    public void myUnlock() {
        Thread thread = Thread.currentThread();
        System.out.println(thread.getName() + "==>myUnLock");
        atomicReference.compareAndSet(thread, null);
    }
}

然后编写一段测试代码

package 自旋锁;

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args) throws InterruptedException {
        SpinLock spinLock = new SpinLock();
        //线程T1
        new Thread(() -> {
            spinLock.myLock();
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                spinLock.myUnlock();
            }
        }, "T1").start();

        TimeUnit.SECONDS.sleep(1);

        //线程T2
        new Thread(() -> {
            spinLock.myLock();
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                spinLock.myUnlock();
            }
        }, "T2").start();
    }
}

image-20210412082802954
根据结果,总是T1线程解锁后,T2线程才能解锁;因为如果T1线程不解锁,T2就会卡住在while循环不停的尝试cas直到thread=null为止

19.5、死锁

什么是死锁?

是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象

image-20210412083125750

简单的死锁案例

package 死锁;

import java.util.concurrent.TimeUnit;

public class DeadLockDemo {
    public static void main(String[] args) {
        String lockA = "lockA";
        String lockB = "lockB";
        new Thread(new MyThread(lockA, lockB), "T1").start();
        new Thread(new MyThread(lockB, lockA), "T2").start();
    }
}

class MyThread implements Runnable {
    private String lockA;
    private String lockB;

    public MyThread(String lockA, String lockB) {
        this.lockA = lockA;
        this.lockB = lockB;
    }

    @Override
    public void run() {
        synchronized (lockA) {
            System.out.println(Thread.currentThread().getName() + "持有锁" + lockA + "尝试获取" + lockB);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (lockB) {
                System.out.println(Thread.currentThread().getName() + "持有锁" + lockB + "尝试获取" + lockA);
            }
        }
    }
}

image-20210412084034340
根据运行结果,可以看到程序卡死,因为发生了死锁,因为T1和T2分别持有lockA和lockB,但又都试图获取对方的锁!

死锁问题排查

  1. 使用jps -l命令定位进程号

    image-20210412084332447

  2. 使用jstack 进程号查看指定进程的堆栈信息,找到死锁问题

    image-20210412084622766

    可以看到,控制台清晰的打印了找到死锁,并可以看到产生的原因就是T1T2互相尝试获取对方的锁

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐