并发编程最全总结
线程和线程池线程线程和进程的区别进程是一个可执行的程序,是系统分配资源的基本单位。线程是进程内部相对独立的可执行单元,是任务调度的基本单位。多线程的优缺点优点:充分利用多核CPU的优势,提高CPU的利用率和程序运行效率缺点:1、线程过多影响性能,CPU切换增加内存开销。2、存在线程同步和线程安全问题。3、可能会发生死锁。4、增加了开发人员的技术难度线程有几种状态5种状态:新建,就绪,运行,阻塞和死
线程和线程池
线程
线程和进程的区别
进程是一个可执行的程序,是系统分配资源的基本单位。线程是进程内部相对独立的可执行单元,是任务调度的基本单位。
多线程的优缺点
优点:充分利用多核CPU的优势,提高CPU的利用率和程序运行效率
缺点:1、线程过多影响性能,CPU切换增加内存开销。2、存在线程同步和线程安全问题。3、可能会发生死锁。4、增加了开发人员的技术难度
线程有几种状态
5种状态:新建,就绪,运行,阻塞和死亡。
- 新建状态:new创建一个线程时,还没开始运行,就是新建状态。
- 就绪状态:新建后,调用start()方法,线程就处于就绪态,等待CPU调度。
- 运行状态:当线程获得了CPU时间后,进入运行状态,执行run()里的内容
- 阻塞状态:线程运行中随时可能被阻塞:比如调用sleep()方法;等待获取锁被阻塞;线程在等待其他触发条件。暂时让出CPU资源。
- 死亡状态:有两个原因导致线程死亡:run()方法正常结束;一个未捕获的异常终止了run()方法
一个线程OOM了,其他线程是否还能运行
答案是还能运行,虽然堆是共享的,一个线程OOM了,可能说明其他线程也会OOM。但是当一个线程OOM了,它所占据的空间会立刻释放掉,从而不会影响其他线程的运行。另外,如果主线程异常了,子线程也可以运行。线程不像进程,一个进程之间的线程没有父子之分,都是平级关系。
创建线程的几种方法
- 继承Thread类,重写run()方法,利用Thread.start()启动线程
- 实现Runnable接口,重写run()方法,利用new Thread(Runnable a)创建线程,调用start()方法启动线程。
- 通过Callable和futureTask创建线程,实现Callable接口,重写call方法,利用future对象包装callable实例,通过Thread方法创建线程。
- 通过线程池创建线程
sleep和wait
- wait只能在synchronized中调用,属于对象级别的方法,sleep不需要,属于Thread的方法。
- 调用wait方法会释放锁,sleep不会释放锁
- wait超时之后线程进入就绪状态,等待获取cpu继续执行。
yield和join
- yield会释放cpu资源,不会释放锁,让当前线程进入就绪状态,只能使同优先级或更高优先级的线程有执行的机会。
- join会释放cpu资源和锁,底层是wait()方法实现的,join会等待调用join()方法的线程执行完成之后再继续执行。
死锁
死锁定义
死锁是指两个及以上的线程在执行过程中,因争夺资源而造成的一种互相等待(饥饿)的现象。若无外力作用,他们都将无法运行下去
死锁原因
- 系统资源的争夺:系统中拥有不可剥夺的资源,其数量不足以满足多个线程运行的需要,使得线程在运行过程因为争夺资源而陷入僵局。
- 线程推进顺序非法:线程在获得一个锁L1的情况下再去申请另一个锁L2,也就是在没有释放锁L1的情况下又去申请锁L2,这个是产生死锁最根本的原因。
死锁的必要条件
- 互斥条件:进程要求对所分配的资源在一段时间内只能由一个进程拥有。
- 不可剥夺条件:资源在进程未使用完成之前,不能被其他进程夺走,除非是主动释放
- 请求和保持条件:进程已经保持了一个资源,又去申请另一个资源,但是该资源已经被其他进程占有。
- 循环等待条件:进程资源循环等待,A拥有资源1,申请资源2,B拥有资源2,申请资源1
如何避免死锁
- 加锁顺序要合理
- 加锁时限要适当:线程尝试获取锁要加上一定时限,超时就要放弃请求,同时释放自己的锁。
- 死锁检测
线程池
概念
就是个管理线程的池子。帮助我们管理线程,减少创建线程和销毁线程的资源消耗。线程池的好处很多:提高响应速度,直接从线程池中拿线程比创建线程快,而且线程可以重复利用。
ThreadPoolExecutor
线程池可以通过ThreadPoolExecutor创建,看下构造函数,总共7个参数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- keepAliveTime:线程池中非核心线程的空闲的存活时间
- TimeUnit:keepAliveTime的时间单位
- workQueue:存放任务的阻塞队列
- threadFactory:用于创建核心线程的线程工厂,可以给创建的线程定义名称
- handler:线程池的饱和策略(拒绝策略),有四种
线程池的执行流程
四种拒绝策略
- AbortPolicy:直接拒绝,抛出一个异常,默认的拒绝策略
- DiscardPolicy:直接丢弃任务
- DiscardOldestPolicy:丢弃任务里最老的任务,将当前这个任务继续提交给线程池
- CallerRunsPolicy:交给线程池调用所在的线程进行处理
阻塞队列
- ArrayBlockingQueue:有界队列,是一个数组实现的有界阻塞队列,按照FIFO排序。
- LindkedBlockingQueue:基于链表实现的阻塞队列,按照FIFO排序,容量可以设置,不设置的话就是一个无边界的阻塞队列(最大长度是Integer.MAX_VALUE),吞吐量要高于ArrayBlockingQueue。newFixedThreadPool就是使用的这个队列。
- DelayQueue:一个任务定时周期延迟执行的队列,根据指定的执行从小到大排序,否则根据插入到队列的先后顺序,newScheduledThreadPool使用的这个队列。
- PriorityBlockingQueue:优先级队列是具有优先级的无界阻塞队列。
- SynchronousQueue:同步队列,一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程调用移除操作,否则插入操作一直阻塞。newCachedThreadPool使用了这个队列。
常用线程池
newFixedThreadPool
固定线程数目的线程池,内部使用LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- 核心线程数=最大线程数
- 没有非空闲时间,即keepAliveTime=0
- 阻塞队列是无界队列LinkedBlockingQueue。
- 使用场景:适用于处理CPU密集型的任务,确保CPU在长期工作线程使用的情况下,尽可能少的分配线程。
newCachedThreadPool
可缓存线程的线程池,内部使用SynchronousBlockingQueue
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
- 核心线程数=0,最大线程数=Integer.Max_VALUE
- 非核心线程存活时间为60s
- 这个池也有一个问题:当提交的任务数量大于处理任务的数量时,每次提交一个任务必然会创建一个非核心线程,极端情况下会创建过多的线程,耗尽CPU和内存
- 使用场景:用于并发量大执行大量短期的小任务
newSingleThreadPool
单线程的线程池,内部使用linkedBlockingQueue
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
- 核心线程数=最大线程数=1,也就是这个池子里始终都只有一个活着的线程。
- keepAliveTime=0,这个参数无效
- 阻塞队列是无界的LinkedBlockingQueue
- 使用场景:适用于串行执行任务的场景,一个任务接一个任务执行。
newScheduledThreadPool
定时及周期性执行的线程池,内部使用DelayQueue
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
- 核心线程自定义,最大线程数为Integer.Max_Value
- keepAliveTime=0
- 使用场景:周期性执行任务的场景。
这里scheduleAtFixedRate()和scheduleWithFixedDelay()两个方法区别如下:
- scheduleAtFixedRate:按某种速率周期性执行,不管上一个任务是否结束。
- scheduleWithFixedDelay:在某个延迟之后执行,是要等上一个任务执行结束开始算起的。
线程池状态
5个状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED
//线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
- Running:该状态的线程池会接收新任务,并处理阻塞队列中的任务。调用shudown()方法可切换到SHUTDOWN状态。调用shutdownNow()方法变为STOP状态。
- Shutdown:该状态线程池不会接收新任务,但会处理阻塞队列中的任务。队列为空,并且线程池中执行的任务也为空,进入TIDYING状态。
- Stop:该状态的线程池不会接收新任务,也不会处理队列中的任务,而且会中断正在执行的任务。
- Tidying:该状态表明所有任务运行终止,记录的任务数量为0.terminated()执行完毕进入TERMINATED状态。
- Terminated:该状态表明线程池终止或死亡。
ThreadLocal
ThreadLocal是什么
表面看ThreadLocal是和多线程和线程同步有关的一个工具类,但它与线程同步机制无关。线程同步机制是多个线程共享一个变量,而ThreadLocal是为每个线程创建一个单独的变量副本,每个线程都可以改变自己的变量副本而不影响其他线程对应的副本。
官方API:该类提供了线程局部变量。这些变量不同于他们的普通对应物,因为访问某个变量的每个线程都有自己的局部变量,他独立于变量的初始化副本。ThreadLocal实例通常是private static字段,他们希望将状态与某一个线程(例如用户ID或事物ID)相关联。
API
四个方法
- get():返回此线程局部变量当前副本的值
- set(T value):将线程局部变量当前副本的值设置为指定值。
- initialValue():返回此线程局部变量当前副本的初始值。
- remove():移除此线程局部变量当前副本的值
ThreadLocal有一个重要的静态内部类ThreadLocalMap,该类是实现线程隔离机制的关键。get(),set(),remove()等都是基于该map操作,ThreadLocalMap的key为当前ThreadLocal对象,value为对应线程的变量副本,每个线程都有自己的ThreadLocal对象,也就是都有自己的ThreadLocalMap,对自己的ThreadLocalMap操作当然是互不影响的,这就不存在线程安全问题了。
使用示例
假设每个线程都需要计数,且在运行时都要修改计数器的值,那么ThreadLocal就是很好的选择了。
public class SeqCount {
private static ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
public int nextSeq() {
threadLocal.set(threadLocal.get()+1);
return threadLocal.get();
}
public static void main(String [] args) {
SeqCount seqCount = new SeqCount();
SeqThread seqThread1 = new SeqThread(seqCount);
SeqThread seqThread2 = new SeqThread(seqCount);
SeqThread seqThread3 = new SeqThread(seqCount);
Thread thread1 = new Thread(seqThread1);
Thread thread2 = new Thread(seqThread2);
Thread thread3 = new Thread(seqThread3);
thread1.start();
thread2.start();
thread3.start();
}
public static class SeqThread implements Runnable {
private SeqCount seqCount;
public SeqThread(SeqCount seqCount) {
this.seqCount = seqCount;
}
@Override
public void run() {
for (int i=0; i<5; i++) {
System.out.println(Thread.currentThread().getName()+",seqCount="+seqCount.nextSeq());
}
}
}
}
/*
一个可能的输出结果
Thread-0,seqCount=1
Thread-0,seqCount=2
Thread-0,seqCount=3
Thread-0,seqCount=4
Thread-0,seqCount=5
Thread-1,seqCount=1
Thread-1,seqCount=2
Thread-1,seqCount=3
Thread-1,seqCount=4
Thread-1,seqCount=5
Thread-2,seqCount=1
Thread-2,seqCount=2
Thread-2,seqCount=3
Thread-2,seqCount=4
Thread-2,seqCount=5
*/
内存泄露问题
ThreadLocalMap的key为ThreadLocal实例,它是一个弱引用,我们知道弱引用有利于GC回收,当key==null时,GC就会回收这部分空间,但value不一定能被回收,因为它和Current Thread之间还存在一个强引用的关系,由于这个强引用的关系会导致value无法被回收。如果线程对象不消除这个强引用关系,就可能出现OOM。
Java内存模型(JMM)
硬件内存结构
处理器和计算机存储设备之间运算速度相差几个数量级,为了达到“高并发”的效果,在处理器和内存之间加了高速缓存“Cache”。
将运算需要使用的数据复制到缓存中,让运算能够快速进行,当运算完成后,再将缓存中的结果写入主内存中,这样运算器就不用等待主内存的读写操作。 每个处理器都有自己的高速缓存,同时又共同操作同一块主存,当多个处理器同时操作主存时,可能导致数据不一致,因此需要“缓存一致性协议”保障。
Java内存模型
Java内存模型简称JMM(Java Memory Model),用来屏蔽各种硬件和操作系统的内存访问差异,实现让Java程序在各平台下都能够达到一致的内存访问效果。 JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存,每个线程都有一个私有的本地内存,本地内存存储了该线程读写变量的副本。本地内存是JMM的一个抽象概念,不是真实存在。
主内存:主要存储Java实例对象,所有线程创建的实例对象都存放在主存中,不管该实例对象是成员变量还是方法中的局部变量,当然也包括共享的类信息、常量、静态变量。共享数据区域,多个线程对同一个变量访问可能会出现线程安全问题。
工作内存:主要存储当前方法的所有本地变量信息,每个线程只能访问自己的工作内存,即线程中的本地变量对其他线程是不可见的,由于工作内存是每个线程的私有数据,线程间无法相互访问工作内存,因此存储在工作内存的数据不存在线程安全问题,例如ThreadLocal。
JMM的8种内存交互
内存交互有8种,虚拟机必须保证每个操作都是原子的,不可再分的
- lock(锁定):作用于主内存的变量,把一个变量标识为线程独占状态。
- read(读取):作用于主内存变量,把一个变量的值从主内存传输到线程的工作内存,以便随后的load动作使用。
- load(载入):作用于工作内存的变量,它把read操作从主内存中的得到的变量放入工作内存的变量副本中。
- use(使用):作用于工作内存的变量,它把工作内存的变量传输到执行引擎,每当虚拟机遇到一个需要使用到变量的值的字节码指令时将会执行这个操作。
- assign(赋值):作用工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量副本。
- store(存储):作用于工作内存的变量,它把一个从工作内存的一个变量的值传送到主内存中,以便后续write使用。
- write(写入):作用主内存的变量,它把store操作从工作内存得到的变量的值放入主内存的变量中。
- unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
如果是将变量从主内存复制到工作内存,就是read->load->use,如果是将变量从工作内存同步到主内存,就是assign->store->write,JMM要求按顺序执行,但不是必须连续执行,中间可以插入其他操作。
JMM的三个特性
原子性
JMM提供了read、load、use、assign、store和write六个指令直接提供原子操作,我们可以认为java的基本变量的读写操作是原子的(除了double和long,因为有些虚拟机可以将64位分为高32位和低32位分开运算)。对于lock、unlock,虚拟机没有将操作直接给用户使用,而是提供了更高层次的字节码指令monitorenter和monitorexit来隐式使用这两个操作,对应java中的synchronized关键字。
可见性
线程之间是隔离的,线程拿到的是主存变量的副本,更改变量需要刷新回主存,其他线程需要从主存重新获取才能拿到变更的值。所有变量都需要经过这个过程,但是volatile修饰的变量可以在修改后强制刷回到主存,并在使用时从主存获取。
synchronized在执行完毕之后,进行Unlock之前,必须将共享变量同步回主内存中(执行store和write操作)
final修饰的字段,只要在构造函数中一旦初始化完成,且没有对象逃逸,那么在其他线程中就可以看到final字段的值。
有序性
有序性:在单线程观察到的结果是有序的。但在多线程下,就会出现乱序的情况。
Java提供了volatile和synchronized来保证线程的有序性。volatile使用内存屏障,而synchronized基于lock之后必须unlock,其他线程才能重新lock的规则,让同步块串行执行。
Happens-Before
先行发生是java内存模型中定义的两个操作的顺序,如果说操作A先行发生于操作B,操作A产生的影响能被B观察到,“影响”包括修改了内存中共享变量的值,发送了消息,调用了方法等。
在JMM下具有一些天然的先行发生关系:
- 程序次序原则:在一个线程内,程序的执行规则跟程序的书写规则是一致的,从上往下执行。
- 锁定规则:一个Unlock操作肯定先于下一次Lock操作。这里必须是同一个锁。
- volatile变量规则:对同一个volatile变量,先行发生的写操作,肯定早于后续发生的读操作。
- 线程启动规则:Thread对象的start()操作先行发生于此线程的每一个动作。
- 线程终止规则:Thread对象的终止检测操作肯定晚于线程中的所有操作。
- 对象终止规则:一个对象的初始化方法肯定早于它的finalize()方法
- 传递性:如果操作A先于操作B,操作B先于操作C,则操作A先于操作C。
CAS
CAS定义
CAS(Compare And Swap),比较并交换。Doug Lea大神在同步组件中大量使用了CAS技术实现了Java多线程的并发操作。整个AQS组件、Atomic原子类操作都是基于CAS实现的,甚至ConcurrentHashMap在JDK1.8中也将ReentrantLock改为CAS+Synchronized。
CAS原理
在CAS中有三个参数:内存值V、旧的预期值A、要更新的值B,当且仅当内存值V的值等于旧的预期值A时才会将内存值V的值修改为B,否则什么也不干。可以想象为什么?当一个线程向更新这个变量的值,内存值是V,如果没有其他线程修改这个变量,那么理所当然认为它的预期值A=V,这个时候更新才不会发生问题。 伪代码如下:
if (this.value==A){
this.value=B;
return true;
} else{
return false;
}
下面以AtomicInteger为例阐述CAS的实现
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
CAS可以保证一次读写操作是原子操作,在单处理器上容易实现,但是在多处理器上实现有点复杂,CPU提供了两种方法实现多处理器的原子操作:
- 总线加锁:总线加锁就是使用处理器提供一个LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占使用共享内存。这种处理方式一刀切,在锁定期间,其他处理器都不能访问内存地址的数据,开销大。
- 缓存加锁:针对上面的情况我们只需要保证在同一时刻对某个内存地址的操作是原子性即可。缓存加锁就是缓存在内存区域的数据如果在加锁期间,当它执行锁操作写回内存时,处理器不再输出LOCK#信号,而是修改内部的内存地址,利用缓存一致性协议保证原子性。缓存一致性机制可以保证同一个内存区域的数据仅能被一个处理器修改,也就是说当CPU1修改缓存行中的i时使用缓存锁定,那么CPU2就不能同时缓存了i的缓存行。
CAS的缺陷
循环时间太长
如果自旋CAS长时间不成功,则会给CPU带来非常大的开销,在JUC有些地方就限制了CAS自旋的次数。
只能保证一个共享变量原子操作
CAS只针对一个共享变量,如果是多个共享变量就只能使用锁了。
ABA
CAS需要检查操作值有没有改变,如果没有改变就更新,但是存在一种情况,一个变量原来值=A,后来变成了B,然后又变成了A,那么在CAS检查的时候会发现没改变。这就是所谓的ABA问题。对于这种问题的解决方法就是加上版本号,即在每个变量加上版本号,每次改变都加1,则A->B->A变成1A->2B->3A。
AQS
AQS介绍
AQS(AbstractQueuedSynchronizer)核心思想是:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时所分配的机制,这个机制AQS是用CLH队列锁实现的,即把暂时获取不到锁的线程加入到队列中。
CLH是一个虚拟机的双向队列(虚拟的双向队列不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点Node来实现锁的分配。
队列同步器AQS(以下简称同步器)是用来构建锁或者其他同步组件的基础框架,它使用一个int类型的成员变量state表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承,子类继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的三个方法getState()、setState(int newState)和campareAndSeState(int expect, int update)来进行操作。
同步器既可以支持独占式获取同步状态、也支持共享式,这样就可以实现不同类型的同步组件(ReentrantLock、ReenTrantReadWriteLock以及CountdownLock等)。
同步器和锁的关系:同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。锁是面向使用者,定义好了接口,屏蔽了实现细节。同步器面向的是锁的实现者,它简化了锁的实现方式。
同步器的设计是基于模板方法模式,也就是说使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用重写的方法
同步器可重写的方法:
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值表示获取成功,否则获取失败 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
protected boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
同步器提供的模板方法分为三类:独占式获取与释放同步状态、共享式获取与释放同步状态、查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模板方法实现自己的同步语义。 |
Mutex独占锁的实现
独占锁就是在同一时刻只能有一个线程获取到锁,其他未获取到锁的线程只能在同步队列中等待,只有等获取锁的线程释放了锁,后续线程才能继续获取锁。
public class Mutex implements Lock{
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 是否处于占用状态
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
/**
* 当状态为0时获取锁
* @param acquires
* @return
*/
@Override
protected boolean tryAcquire(int acquires) {
assert acquires == 1;
if (compareAndSetState(0,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 释放锁,将状态设置为0
* @param acquires
* @return
*/
@Override
protected boolean tryRelease(int acquires) {
assert acquires == 1;
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 返回一个Condition,每个COndition都包含了一个condition队列
* @return
*/
Condition newConditon() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newConditon();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
@Override
public void lockInterruptibly() throws InterruptedException{
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException{
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
}
上面的代码中,独占锁Mutex是一个自定义同步组件,它定义了一个静态内部类,该内部类Sync继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire(int acquires)方法中,如果经过CAS设置成功,则表示获取到了同步状态。在tryRelease(int releases)方法中只是将同步状态置为0。
源码解读
同步队列
同步器依赖内部的同步队列来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态信息构造成一个Node结点放入同步队列,同时会阻塞当前线程,当同步状态释放时,会把头结点中的线程唤醒,使其再次尝试获取同步状态。
同步队列中的Node结点保存了线程引用、等待状态以及前驱和后继结点,结点的属性类型与名称如下:
//因为超时或者中断,节点会被设置成取消状态,被取消的节点不会参与到竞争中,保持取消状态不会改变
static final int CANCELLED = 1;
//后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消了,将会通知后继节点,使后继节点的线程得以运行
static final int SIGNAL = -1;
//表示节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中
static final int CONDITION = -2;
//表示下一次共享式同步状态获取将会无条件被传播下去
static final int PROPAGATE = -3;
Node中还定义了不同的节点
//后继节点
volatile Node next;
//前驱节点,当节点加入同步队列时被设置
volatile Node prev;
//获取同步状态的线程
volatile Thread thread;
//等待队列中的后继节点,如果当前节点是共享的,那么该字段是一个SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共有一个字段。
Node nextWaiter;
节点是构成同步队列的基础,同步器拥有头结点和尾结点,没有成功获得同步状态的线程会加入队列尾部
同步器遵循FIFO,头结点是获取同步状态成功的节点,头结点的线程在释放同步状态时,将会唤醒后继节点,而后继节点在获取同步状态成功时会将自己设置成头结点。
独占式同步状态的获取与释放
acquire方法
调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感
public final void acquire(int arg) {
//尝试获取锁
if (!tryAcquire(arg) &&
//获取不到,则进入等待队列,返回是否中断
//acquireQueued返回true表示中断
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//如果返回中断,则调用当前线程的interrupt方法
selfInterrupt();
}
首先调用自定义同步器实现的tryAcquire()方法,如果失败则构造节点,并通过addWaiter()将该节点加入同步队列的尾部,最后调用acquireQueued(Node node, int arg)方法,使得该节点以自旋方式获取同步状态,如果获取不到则调用LockSupport.park(this)阻塞节点中的线程。
private Node addWaiter(Node mode) {
//将当前线程封装成Node,并且mode为独占锁
Node node = new Node(Thread.currentThread(), mode);
//tail是AQS中同步队列的队尾
Node pred = tail;
if (pred != null) {
//将当前节点node的前驱节点设置为尾结点
node.prev = pred;
//CAS设置尾结点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//第一次添加tail=null,将node添加到同步队列中
enq(node);
return node;
}
private Node enq(final Node node) {
//循环+CAS
for (;;) {
Node t = tail;
//如果是第一次添加到队列,那么tail=null
if (t == null) { // Must initialize
//CAS设置头结点
if (compareAndSetHead(new Node()))
tail = head;
}
//否则添加逻辑和addWaiter相似
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在enq(final Node node)中,同步器通过死循环保证节点的正确添加,在死循环中只有通过CAS将节点设置成尾结点后,当前线程才返回。 节点进入同步队列后就进入自旋过程,每个节点(或线程)都在观察自己的条件是否满足,一旦获取到同步状态就从自旋过程中退出。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//其前驱是头结点,并且再次调用tryAcquire成功获得锁
if (p == head && tryAcquire(arg)) {
//将自己设为头结点
setHead(node);
p.next = null; // help GC
failed = false;
//成功获得锁,返回
return interrupted;
}
//没有得到锁时,shouldParkAfterFailedAcquire方法:返回是否需要阻塞当前线程
//parkAndCheckInterrupt方法:阻塞当前线程,当线程再次唤醒时,返回是否中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//修改中断标志位
interrupted = true;
}
} finally {
if (failed)
//获取锁失败,则将此线程对应的node的waitStatus改为CANCEL
cancelAcquire(node);
}
}
/**
获取锁失败时,检查并更新Node的waitStatus,如果线程需要阻塞,返回true
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//前驱节点的waitStatus是SIGNAL
if (ws == Node.SIGNAL)
/*
* SIGNAL状态的节点,释放锁后,会唤醒其后继节点,因为此线程可以安全阻塞
*/
return true;
if (ws > 0) {
/*
* 前驱节点对应的线程被取消,CANCELLED=1.需要将取消状态的节点从队列中移除知道找到一个不是取消的节点为止
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
*除了以上情况,通过CAS将前驱结点的状态设置为SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
当shouldParkAfterFailedAcquire返回true,则调用此方法阻塞当前线程
*/
private final boolean parkAndCheckInterrupt() {
//阻塞当前线程
LockSupport.park(this);
//判断当前线程是否被中断,如果中断了返回true。
return Thread.interrupted();
}
独占式获取同步状态的可以概括为:对于每个得不到锁的线程,会被包装成Node,加入同步队列尾部,然后自旋判断自己的前驱节点是否是头结点,如果是就去尝试获得锁,获取成功就把自己置为头结点。如果获取失败则继续检查前驱节点的状态:如果状态为SIGNAL(-1),则当前节点进入阻塞状态(前驱节点可以唤醒自己,降低自旋开销)。如果是除了CANCELLED(1)之外的状态,则修改为SIGNAL(-1)。当又新增一个节点时,首先自旋判断当前节点的前驱节点是否为头结点,如果不是则直接判断状态是否为SIGNAL(-1),是的话就阻塞,如果是除了CANCELLED(1)之外的其他状态,则修改为SIGNAL(-1)。
还有release方法以及共享式获取与释放同步状态留待大家摸索。
Synchronized
概述
synchronized可以保证方法或代码执行时,同一时刻只有一个线程可以进入临界区,同时可以保证共享变量的内存可见性。 Java中每个对象都可以作为锁,这是Synchronize实现同步的基础:
- 普通同步方法:锁是当前实例对象
- 静态同步方法:锁是当前类的class对象
- 静态代码块:锁是括号内的对象
原理
代码使用synchronized加锁后,编译后的字节码出现monitorenter和monitorexit两个指令,可以理解为代码块执行前的加锁和退出同步时的解锁。 执行monitorenter指令时,线程会为锁对象关联一个ObjectMonitor对象。 每个线程都有两个ObjectMonitor对象列表,分别为free和used列表,如果当前free列表为空,线程将向全局global list请求分配ObjectMonitor。 ObjectMonitor的owner、WaitSet、Cxq、EntryList这几个属性比较关键。WaitSet、Cxq、EntryList的队列元素是包装线程后的对象-ObjectWaiter。获取owner的线程即为获得锁的线程。
总之:
- 线程遇到synchronized同步时,先会进入EntryList队列中,然后尝试把owner变量设置为当前线程,同时monitor中计数器的count加1,即获得对象锁。否则通过一定次数的自旋加锁,失败则进入Cxq队列阻塞等待。
- 线程执行完毕将释放持有的owner,owner变量恢复为null,count-1,以便其他线程进入获取锁。
synchronize修饰方法的原理类似,只不过不是用monitor指令,而是使用ACC_SYNCHRONIZED标识方法的同步。
锁优化
适应性自旋
线程阻塞的时候,让等待的线程不放弃cpu资源,而是执行一个自旋(一般是空循环),这就叫做自旋锁。
自旋等待虽然避免了线程切换的开销,但要占用CPU时间,因此如果锁被占用的时间很短,效果很好,但如果锁被占用的时间很长,那么自旋的线程只会白白消耗资源。
因此自旋等待的时间必须有一定限制,如果自旋超过一定次数还是没有获取锁,就应该挂起了。
锁消除
虚拟机运行时,对一些代码上要求同步但是检测到不会存在共享数据竞争的锁进行消除。一般根据逃逸分析的数据支持来作为判定依据。
锁粗化
原则上我们在编写代码时会将同步块的作用范围缩小,这是为了减少锁等待的时间。但如果一系列操作频繁对同一个对象加锁解锁,或者加锁解锁在循环体内,就会扩大加锁范围。
轻量级锁
轻量级锁是JDK1.6引入的,作用是在没有多线程竞争下,减少传统的重量级锁使用操作系统互斥量产生的性能消耗。HotSpot虚拟机的对象头分为两部分信息:第一部分用于存储对象自身的运行时数据,这部分称为Mark Word,还有一部分存储指向方法区对象类型数据的指针。
加锁
在代码进入同步块时,如果此同步对象没有被锁定(锁标志位为01),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储对象目前的Mark Word拷贝。然后虚拟机使用CAS操作将对象的Mark Word更新为指向Lock Record的指针,如果更新成功,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位转变为“00”,表示此对象处于轻量级锁锁定状态。如果失败,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块,否则说明这个锁对象已经被其他线程抢占了。如果有两条以上线程争用同一个锁,那轻量级锁就不再有效,膨胀为重量级锁,锁标志位变为“10”。
解锁
解锁也是CAS操作。如果对象的Mark Word仍指向线程的锁记录,那就用CAS把对象当前的Mark Word和线程中复制的Displaces Mark Word替换回来,如果替换成功,这个同步过程就完成了。如果替换失败,说明有其他线程尝试获取该锁,那就要在释放锁同时,唤醒被挂起的线程。
偏向锁
偏向锁也是JDK1.6引入的优化,目的是消除数据在无竞争情况下的同步原语。如果说轻量级锁是在无竞争下使用CAS消除同步使用的互斥量,那偏向锁就是无竞争下把整个同步都消除。 当锁对象第一次被线程获取时,虚拟机将对象头的标志位设为“01”,即偏向模式,同时使用CAS把获取到这个锁的线程ID记录在对象的Mark Word中,如果CAS成功,持有偏向锁的线程以后每次进入这个同步块,可以不进行任何同步操作。当有另外一个线程尝试获取这个锁时,偏向模式结束。
HotSpot实现的JVM在64位机器的Mark Word信息:
锁升级
锁升级:new->偏向锁->轻量级锁(自旋锁)->重量级锁。
Volatile
原理
- 保证可见性,不保证原子性
- 禁止指令重排序
- 底层使用内存屏障实现
内存语义
- 当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值立即刷新到主内存中。
- 当读一个volatile变量时,JMM会把线程本地变量的值置为无效,从主内存中读取。
重排序规则
- 如果第一个操作是volatile读,不管第二个操作是什么,都不允许重排序。这个操作确保volatile读之后的操作不会被编译器重排序到volatile读之前。
- 如果第二个操作是volatile写,不管第一个操作是什么,都不允许重排序。这个操作确保volatile写之前的操作不会被编译器重排序到volaile写之后。
volatile底层是通过插入内存屏障实现的:
- 在每一个volatile写操作前插入一个StoreStore屏障。
- 在每一个volatile写操作后插入一个StoreLoad屏障。
- 在每一个volatile读操作后插入一个LoadLoad屏障。
- 在每一个volatile读操作后插入一个LoadStore屏障。
Lock
ReentrantLock
ReentrantLock重入锁,是实现Lock接口的一个类,支持重入性,表示能够对共享资源重复加锁,即当前线程再次获取该锁不会被阻塞。还支持公平锁和非公平锁两种方式。
重入性的实现原理
要想支持重入性,就要解决两个问题:
- 在线程获取锁时,如果已经获取锁的线程是当前线程则直接获取成功。
- 由于锁会被获取n次,那么只有当锁被释放同样的n次之后,该锁才算完全释放成功。
对于第一个问题,我们来看看ReentrantLock的实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果该锁未被任何线层占有,该锁能被当前线程获取
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果被占有,检查占有线程是否为当前线程
else if (current == getExclusiveOwnerThread()) {
//再次获取,计数加一
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这段代码逻辑很简单,如果该锁已经被线程占有了继续检查占有线程的是否为当前线程,如果是,同步状态+1,表示可以再次获取成功。再来看下释放
protected final boolean tryRelease(int releases) {
//同步状态减1
int c = getState() - releases;
//如果不是当前线程,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//只有当同步状态为0时,锁成功被释放,返回true
free = true;
setExclusiveOwnerThread(null);
}
//锁未被完全释放,返回false
setState(c);
return free;
}
重入锁的释放必须等同步状态为0时才算成功释放,否则锁仍未释放。
公平锁与非公平锁
ReentrantLock支持非公平锁和公平锁,公平性是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求上的绝对时间顺序,满足FIFO。默认是非公平锁。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
我们看看公平锁的处理:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这段代码和非公平锁唯一的区别就是增加了hasQueuedProdecessors()的判断,该方法是用来判断当前节点在同步队列中是否有前驱节点的判断,如果有前驱节点说明有线程比当前线程更早请求资源,根据公平性,当前线程请求资源失败。
ReentrantReadWriteLock
在一些读多写少的业务场景,如果仅仅是读数据的话并不会影响数据正确性,如果依然使用独占锁的话,就会出现性能瓶颈。针对这种读多写少的情况,java提供了读写锁ReentrantReadWriteLock。读写锁允许同一时刻被多个线程访问,但是在写线程访问时,所有的读线程和其他写线程必须阻塞。
- 公平性选择:支持非公平性和公平性。
- 重入性:支持重入,读锁获取后能再次获取,写锁获取后能再次获取写锁,同时也能获取读锁。
- 锁降级:遵循获取写锁,获取读锁再释放写锁的次序,写锁能降级成为读锁。
锁降级
锁降级指的写锁降级为读锁,即先获取写锁、获取读锁再释放写锁的过程,目的是保证数据的可见性。假设有两个线程A和B,若线程A获取到写锁,不获取读锁而是直接释放写锁,这时线程B获取了写锁并修改了数据,那么线程A无法知道线程B的数据更新。如果线程A获取读锁,即遵循锁降级的步骤,则线程B将会被阻塞,直到线程A使用数据并释放读锁之后,线程B才能获取写锁进行数据更新。
- 当有线程获取读锁后,不允许其他线程获取写锁
- 当有线程获取写锁后,不允许其他线程获取读锁和写锁。
- 写锁能降级成为读锁,读锁无法升级为写锁。
Condition
跟Object的监视器方法一样,Condition接口也提供了类似方法,配合Lock可以实现等待/通知模式。
对比项 | Object监视器方法 | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock()获取锁,然后调用lock.newCondition()获取Condition对象 |
调用方式 | 直接调用Object.wait() | 直接调用condition.wait() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态,在等待状态不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
并发工具类
CountdownLatch
CountdownLatch是通过一个计数器实现的,当我们在new一个CountdownLatch对象的时候,需要传入计数器的值,该值表示线程的数量,每当一个线程完成任务后,计数器的值就会减1,当计数器的值变为0时,就表示所有线程均已完成任务,然后就可以恢复等待的线程继续执行了。
和CyclicBarrier的区别:
- CountdownLatch的作用是允许一个或多个线程等待其他线程完成后继续执行。而CyclicBarrier则是允许多个线程相互等待。
- CountdownLatch的计数器无法被重置,CyclicBarrier的计数器可以被重置后使用。
实现原理
CountdownLatch仅提供了一个构造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
再来看看Sync,是CountDownLatch的一个内部类
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
//获取同步状态
int getCount() {
return getState();
}
//尝试获取同步状态
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//尝试释放同步状态
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownLatch内部通过共享锁实现:
- 在创建CountDownLatch实例时,传递一个int参数:count,该参数为计数器的初始值,也可以理解为共享锁可以获取的总次数。
- 当某个线程调用await()方法,首先判断count的值是否为0,如果不为0则会一直等待直到为0为止。
- 当其他线程调用countdown()方法时,则执行释放共享锁状态,count-1。
- 注意CountDownLatch不能回滚重置。
CyclicBarrier
CyclicBarrier是一个同步辅助类,它允许一组线程相互等待,直到达到某个公共屏障点。在涉及一组大小固定的线程的程序里,这些线程必须不时的相互等待。因为CyclicBarrier在释放等待线程后可以重用,因此成为循环屏障。
实现原理
看下CyclicBarrier的定义
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
//parties变量表示拦截线程的总数量
private final int parties;
//barrierCommand变量为CyclicBarrier接收的Runnable命令,用于在线程到达屏障时,优先执行barrierCommand,用于处理更加复杂的业务场景。
private final Runnable barrierCommand;
//generation变量表示CyclicBarrier的更新换代
private Generation generation = new Generation();
可以看出CyclicBarrier内部使用可重入锁ReentrantLock和Condition。它有两个构造函数
/**
创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动barrier时执行给定的屏障操作,该操作由最后一个进入barrier的线程执行。
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/**
创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动barrier时执行预定义的操作。
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
CyclicBarrier是怎么让线程达到屏障后处于等待状态的呢?使用await()方法,每个线程调用await()方法后告诉CyclicBarrier自己到达了屏障,然后当前线程被阻塞。当所有线程都到达了屏障,阻塞结束,所有线程都可继续执行后续逻辑。
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//分代
final Generation g = generation;
//当前generation已损坏,抛出BrokenBarrierException异常
if (g.broken)
throw new BrokenBarrierException();
//如果线程中断,终止CyclicBarrier
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//进来一个线程,count-1
int index = --count;
//如果count==0表示所有线程均已到达屏障,可以触发barrierCommand任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//唤醒所有等待线程,并更新generation
nextGeneration();
return 0;
} finally {
//如果barrierCommand执行失败,终止CyclicBarrier
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
//如果不是超时等待,则调用Condition.await()方法等待
if (!timed)
trip.await();
else if (nanos > 0L)
//如果是超时等待,则调用Condition.awaitNanos()等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
//generation已经更新,返回Index
if (g != generation)
return index;
//超时等待并且时间已经到了,终止CyclicBarrier,并抛出超时异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放锁
lock.unlock();
}
应用
CyclicBarrier适用于多线程合并的场景,用于多线程计算数据,最后合并计算结果的应用场景
public class CyclicBarrierTest {
private static CyclicBarrier cyclicBarrier;
private static final Integer THREAD_COUNT = 10;
static class CyclicBarrierThread implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"到教室了");
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String [] args) {
cyclicBarrier = new CyclicBarrier(THREAD_COUNT, new Runnable() {
@Override
public void run() {
System.out.println("同学们都到齐了,开始上课吧...");
}
});
for (int i=0; i< THREAD_COUNT; i++) {
Thread thread = new Thread(new CyclicBarrierThread());
thread.start();
}
}
}
Semophore
Semophore是一个控制访问多个共享资源的计数器,和CountdownLatch一样,本质上是一个共享锁,维护了一个许可集。以停车场为例:假设停车场有5个停车位,一开始车位都空着,先后来了3辆车,车位够,安排进去停车。然后又来了3辆车,这时由于只有两个车位,所以只能停两辆,有1辆需要在外面等候,直到停车场有空位。从程序来看,停车场就是信号量Semophore,许可集为5,车辆为线程,每来一辆车,许可数-1,但必须>=0,否则线程就要阻塞(车辆等待)。如果有一辆车开出,则许可数+1,然后唤醒一个线程(可以放进一辆车)。
public class SemaphoreTest {
static class Parking {
private Semaphore semaphore;
Parking(int count) {
semaphore = new Semaphore(count);
}
public void park() {
try {
//获取信号量
semaphore.acquire();
long time = (long) (Math.random()*10+1);
System.out.println(Thread.currentThread().getName()+"进入停车场停车,停车时间:"+time+"秒");
//模拟停车时间
Thread.sleep(time);
System.out.println(Thread.currentThread().getName()+"开出停车场...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放信号量(跟lock的用法差不多)
semaphore.release();
}
}
}
static class Car implements Runnable{
private Parking parking;
Car(Parking parking) {
this.parking = parking;
}
/**
* 每辆车相当于一个线程,线程的任务就是停车
*/
@Override
public void run() {
parking.park();
}
}
public static void main(String [] args) {
//假设有3个停车位
Parking parking = new Parking(3);
//这时候同时来了5辆车,只有3辆车可以进去停车,其余2辆车需要等待有空余车位之后才能进去停车。
for (int i=0; i<5; i++) {
Thread thread = new Thread(new Car(parking));
thread.start();
}
}
}
Exchanger
Exchanger是一个同步器,这个类的主要作用是交换数据。Exchanger类似CyclicBarrier,CyclicBarrier是一个单向栅栏,而Exchanger是一个双向栅栏,线程1到达栅栏后,首先观察有没有其他线程已经到达栅栏,如果没有就会等待。如果已经有其他线程(比如线程2)到达了,就会以成对方式交换信息,因此Exchanger适合两个线程之间的数据交换。
并发容器
CopyOnWriteArrayList
COW的设计思想
如果简单使用读写锁ReentrantReadWriteLock,当写锁被获取后,读写线程被阻塞,只有当写锁被释放后读线程才有机会获取到锁从而读取到最新的数据。但是读线程想任何时候都可以获取到最新的数据。COW就是通过Copy-On-Write,即写时复制的思想来通过延时更新的策略实现数据的最终一致性,并且保证读线程之间不阻塞。
COW就是当我们往一个容器添加元素时,不直接往当前容器添加,而是先将当前容器copy,复制出一个新容器,然后往新容器添加元素,添加完元素后,再将原容器的引用指向新的容器。所以CopyOnWrite是一种读写分离的思想,延时更新的策略是通过在写的时候针对的是不同的数据容器来实现的,放弃数据实时性,实现最终一致性。
源码解读
CopyOnWriteArrayList内部维护的就是一个数组
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
且该数组用volatile修饰,保证可见性,看下add()方法
public boolean add(E e) {
final ReentrantLock lock = this.lock;
//1. 使用Lock,保证写线程在同一时刻只有一个
lock.lock();
try {
//2. 获取旧数组引用
Object[] elements = getArray();
int len = elements.length;
//3. 创建新的数组,并将旧数组的数据复制到新数组中
Object[] newElements = Arrays.copyOf(elements, len + 1);
//4. 往新数组中添加新的数据
newElements[len] = e;
//5. 将旧数组引用指向新的数组
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
add方法需要注意以下几点:
- 采用ReentrantLock,保证同一时刻只有一个写线程进行数组的复制,否则的话内存中会有多个被复制的数据。
- 数组引用是volatile修饰的,因此将旧数组引用指向新数组,根据volatile的happens-before原则,写线程对数组引用的修改对读线程是不可见的。
- 由于写数据的时候是在新的数组中插入数据,从而保证读写是在两个不同的数据容器中进行。
ConcurrentSkipListMap
ConcurrentSkipListMap内部使用跳表的数据结构实现,跳表就是一个多层链表,底层是一个普通链表,然后逐层减少,通常通过一个简单的算法实现每一层的元素是下一层元素的二分之一,这样当搜索元素时从最顶层开始搜索,可以说是另一种形式的二分查找。理论上它的查找、插入和删除的时间复杂度都是O(logN)。
插入值为15的元素
插入后
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)