CRO-成都学习天地

运营支撑团队

Fork-join

概述

fork/join框架是ExecutorService接口的一种具体实现,目的是为了帮助更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。 Fork/Join 模式有自己的适用范围。如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决。 Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+……+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下:

fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。

ExecutorService扩展了Executor并添加了一些生命周期管理的方法。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止。Executor创建时处于运行状态。当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不应该再想Executor中添加任务,所有已添加的任务执行完毕后,Executor处于终止状态,isTerminated()返回true 并发问题可以很容易地通过Callable线程的Executor接口来解决,通过为每个任务实例化一 个Callable实例,并在ExecutorService类中汇总计算结果来得出最终结果可以实现这一目的。 使 用ExecutorService和Callable的主要问题是,Callable实例在本质上是阻塞的。一旦一个Callable实例开始执行,其他所有Callable都会被阻塞。由于队列后面的Callable实例在前一实例未执行完成的时候不会被执行,因此许多资源无法得到利用。

Fork/Join框架被引入解决并行问题,而Executor解决的是并发问题。 ForkJoinPool和ThreadPoolExecutor共同继承与AbstractExecutorService;ForkjoinPool使用到了RecursiveAction和RecursiveTask。他们两个中RecursiveAction应用于执行的任务不需要返回结果的场景,而RecursiveTask应用于需要返回执行结果的场景。这点类似于ThreadPoolExecutor使用Runnable和Callable的参数来分别表示不需要返回值和需要返回值的线程执行对象。 ForkJoinPool由ForkJoinTask数组 和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。 其中ForkJoinPool有三个用来提交或开始任务的方法: submit:异步的执行Task,并且在任务结束后,可以使用getRawResult()方法获取返回值;在获取返回值之前,可能需要使用如 isQuiescent () 方法判断当前任务的执行结果。 execute:同submit类似,但是不带返回值 invoke:同submit类似,但是是同步的。 其余重要方法: shutdown():执行此方法之后,ForkJoinPool 不再接受新的任务,但是已经提交的任务可以继续执行。如果希望立刻停止所有的任务,可以尝试 shutdownNow() 方法 awaitTermination():阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束

而ForkJoinTask也有两个用来执行的方法: fork:类似于上面的submi invoke:类似于上面的invoke方法

ForkJoinTask的fork方法实现原理。当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步的执行这个任务,然后立即返回结果。代码如下:

1
2
3
4
5
Publicfinal ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
       Return this;
}

pushTask方法把当前任务存放在ForkJoinTask数组queue里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q;ints, m;
       if((q =queue) !=null) {   // ignore if queue removed
           longu = (((s =queueTop) & (m = q.length- 1)) <<ASHIFT) +ABASE;
           UNSAFE.putOrderedObject(q, u, t);
           queueTop= s + 1;        // or use putOrderedInt
           if((s -=queueBase) <= 2)
               pool.signalWork();
           elseif(s == m)
                growQueue();
        }
    }

ForkJoinTask的join方法实现原理。Join方法的主要作用是阻塞当前线程并等待获取结果。让我们一起看看ForkJoinTask的join方法的实现,代码如下:

1
2
3
4
5
6
Public final V join() {
       if(doJoin() !=NORMAL)
           return reportResult();
       else
           return getRawResult();
}

首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结果,任务状态有四种:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。 q 如果任务状态是已完成,则直接返回任务结果。 q 如果任务状态是被取消,则直接抛出CancellationException。 q 如果任务状态是抛出异常,则直接抛出对应的异常。

在doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完了,如果执行完了,则直接返回任务状态,如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成了,则设置任务状态为NORMAL,如果出现异常,则纪录异常,并将任务状态设置为EXCEPTIONAL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Privateint doJoin() {
        Thread t; ForkJoinWorkerThread w;int s;Boolean completed;
       if((t = Thread.currentThread())instanceofForkJoinWorkerThread) {
           if((s =status) < 0)
               return s;
            if((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
               try{
                    completed = exec();
                }catch(Throwable rex) {
                   returnsetExceptionalCompletion(rex);
                }
               if(completed)
                   returnsetCompletion(NORMAL);
            }
           Return w.joinTask(this);
        }
       else
           return externalAwaitDone();
    }

关于工作窃取

fork/join框架会将任务分发给线程池中的工作线程。fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。 每个工作线程都有自己的工作队列,这是使用双端队列(或者叫做 deque)来实现的。当一个任务划分一个新线程时,它将自己推到 deque 的头部。 当线程的任务队列为空,它将尝试从另一个线程的 deque 的尾部 窃取另一个任务。 与标准队列相比,deque 具有两方面的优势:减少争用和窃取。 因为只有工作线程会访问自身的 deque 的头部,deque 头部永远不会发生争用; 因为只有当一个线程空闲时才会访问 deque 的尾部,所以也很少存在线程的 deque 尾部的争用。 跟传统的基于线程池的方法相比,减少争用会大大降低同步成本。此外,这种方法暗含的后进先出(last-in-first-out,LIFO)任务排队机制意味着最大的任务排在队列的尾部,当另一个线程需要窃取任务时,它将得到一个能够分解成多个小任务的任务,从而避免了在未来窃取任务。因此,工作窃取实现了合理的负载平衡,无需进行协调并且将同步成本降到了最小。

实例一:斐波那契数列 公式定义: f(n) = f(n-1) + f(n-2); n-2 >0; f(1)=1; f(2)=2; 使用Fork/Join框架实现多任务的并发执行计算,以n=25为例,数据结构分析:

详见代码FibonnaciTask.7z; 讲解点: a.fork和join的先进后出关系;b.递归调用和join的先后对性能的影响

实例二:合并排序实例(对比单线程、ExecutorService实现、Fork/Join实现的性能) 实例流程说明:

Partition方法算法(子任务拆分算法)说明:

步骤一:生成随机数填充的数组(取正负100以内的值) 23 -24 65 53 90 87 …… 66 步骤二:以数据最后一个值,做为分拣标准,小于的向左,大于的向右: 23 -24 65 53 66 87 …… 90 步骤三:以标准值做为分界点,任务一拆为二 Task1: 23 -24 65 53 66 Task2: 87 …… 90 步骤四:重复步骤一,直到子任务小于阀值

详见代码TestForkJoinSimple.7z; 讲解点:a. ExecutorService的代码实现;b.好的性能对比例子; c.不好的性能对比例子

JAVA AQS 解密

JAVA AQS 实现细节

AQS(AbstractQueuedSynchronizer)是java lock的实现基础,如下类图描述,juc的很多实现都依赖AQS。AQS确实比较复杂, 所有不同锁的每个细节难以描述清楚,所以本文的重点是使用ReentrantLock作为例子只分析NonfairSync和condition的实现原理, 其它锁的实现原理都是以此作为基础进行扩展和优化。

从Lock的一段简单代码开始

就以下面一段我们经常使用的简单代码作为例子,例子中就是使用Lock和Condition实现线程安全的生产者-消费者模型,借助这个简单的例子, 我们一步步分析一下Lock内部是如何实现线程同步的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
 
  import java.util.concurrent.locks.Condition;
  import java.util.concurrent.locks.Lock;
  import java.util.concurrent.locks.ReentrantLock;
  
public class ProductQueue<T> {

private T[]       items;
private Lock      lock         = new ReentrantLock();
final static int  DEAFULT_SIZE = 10;

//满队列条件
private Condition notFull      = lock.newCondition();

//空队列条件
private Condition notEmpty     = lock.newCondition();

private int       head, tail, count;

@SuppressWarnings("unchecked")
public ProductQueue(int size) {
    items = (T[]) new Object[size];
}

public ProductQueue() {
    this(DEAFULT_SIZE);
}

/**
 * 生产产品
 * 
 * @param t
 * @throws InterruptedException
 */
public void put(T t) throws InterruptedException {
    lock.lock();//获取独占锁
    try {
        Thread.sleep(2 * 1000);
        while (count == getCapacity()) {//队列已满,挂起当前 线程,将当前线程加入条件锁队列中,直到收到队列不为满的信号,从队列中按照FIFO唤醒一个线程
            notFull.await(); //释放独占锁 ,让其他线程(消费者线程)获取,然后挂起当前线程,一旦后续条件满足,再次获取锁
        }
        items[tail] = t;
        if (++tail == getCapacity()) {
            tail = 0;
        }
        ++count;
        notEmpty.signalAll();//队列已不为空,可以唤醒消费者 消费产品
    } finally {
        lock.unlock();
    }

}

/**
 * 消费产品
 * 
 * @return
 * @throws InterruptedException
 */
public T get() throws InterruptedException {
    lock.lock();
    try {
        while (count == 0) {//队列已空,挂起空队列锁条件 线程,直到收到不为空的信号,被唤醒
            notEmpty.await();
        }
        T ret = items[head];
        items[head] = null;
        if (++head == getCapacity()) {
            head = 0;
        }
        --count;
        notFull.signalAll();//队列已不为满,可以唤醒生产者  生产产品
        return ret;

    } finally {
        lock.unlock();
    }
}

/**
 * 产品队列容量
 * 
 * @return
 */
public int getCapacity() {
    return items.length;
}

/**
 * 当前产品数量
 * 
 * @return
 */
public int size() {
    lock.lock();
    try {
        return count;
    } finally {
        lock.unlock();
    }
}

}

AQS基础数据结构

在开始分析之前,先讲一下AQS实现中的基础数据结构(如下图所示) AQS基础数据结构.png

  • state:32位整数描述的状态位来支持原子操作(volatile+CAS)
  • head:有序队列的队头结点,tail:有序队列的队尾结点
  • pre:当前节点的前一个结点引用 next:当前节点的下一个节点引用主要用于构建有序队列的双向链表
  • thread:当前节点绑定的线程
  • waitStatus:当前节点的等待状态
  • nextWaiter:当前节点的后继节点,主要结合Condition使用
  • firstWaiter:Condition队列的第一个Node
  • lastWaiter:Condition队列的最后一个Node,主要用于表示Condition队列的头和尾节点,需要区分AQS队列的head和tail

Lock.lock()

从Lock的lock方法作为入口讲起,参考下面的处理流程图:

整个加锁执行逻辑:

  • 1.如果tryAcquire(arg)成功,返回true,说明当前线程已经拿到锁,执行当前线程操作,整个lock()过程就结束了。如果失败进行操作2。
  • 2.创建一个独占节点(同时会新生成一个傀儡头结点)并且将此节点加入CHL队列末尾。进行操作3。
  • 3.acquireQueued(node,arg),自旋尝试获取锁,如果获取失败,则根据前一个节点来决定是否挂起(park()),不管是否挂起,都会自旋,直到成功获取到锁。如果在自旋的过程中线程被中断过,那么会置线程中断标志, 进行操作4。
  • 4.如果当前线程已经中断过,那么就中断当前线程(清除中断位)。

Lock.unlock()

整个解锁执行逻辑:

  • 1.tryRelease(arg)执行解锁操作,如果锁释放失败或者当前线程没有持有锁,则抛出异常(IllegalMonitorStateException),如果成功,则进行操作2。
  • 2.获取CHL队列头结点,如果头结点为空或者状态为0,说明CHL队列为空,结束锁释放过程,否则进行操作3。
  • 3.找到需要唤醒的继任节点,并进行线程唤醒操作(unpark()),同时会维护CHL队列,去除已中断或者超时的节点,结束锁释放过程。

AQS队列

AQS队列

  • 整个AQS队列为带头结点和尾节点的双向链表,其中的节点主要以waitStatus来表示其状态,并绑定对应的线程。
  • AQS队列的头节点为傀儡节点,唤醒的节点为傀儡节点的继任节点,移除节点从队列头节点开始,添加节点从队列尾节点开始,添加时主要以CAS操作保证线程安全。
  • AQS队列主要按FIFO来保证节点的执行顺序,以此来达到公平性。

Condition.await()

执行逻辑如下:

  • 1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。
  • 2.释放锁,这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。进行3。
  • 3.自旋(while)挂起,直到被唤醒或者超时或者CACELLED。进行4。
  • 4.获取锁(acquireQueued),并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。

Condition.signal()

signal就是唤醒Condition队列中的第一个非CANCELLED节点线程, 而signalAll就是唤醒所有非CANCELLED节点线程。当然了遇到CANCELLED线程就需要将其从FIFO队列中剔除。

aqs和condition2个队列

AQS和Condition队列

  • 以生产者-消费者模型来说,在产品的生成和消费过程中,会维护3个队列,一个AQS队列,一个生产者条件队列,一个消费者条件队列。
  • AQS和Condition队列有一定的不同,条件队列为典型的FIFO队列,而AQS带有一定的特性(头结点和继任节点关系)。
  • 当执行await()会向相应的条件队列中加入条件节点,并进行相应的维护,当执行signal()或者signalAll()时,会将条件队列中的节点维护到AQS中,进行队列间的交互。

JUC基础及进阶

这是介绍Java Util Concurrent一系列文章的第一篇,会给大家带来的是一些并发的基础知识,同时我们会介绍一个无锁的高并发框架LMAX Disruptor。

概念及应用场景

并发与并行

  • 并发 (Concurrency) 一个处理器“同时”处理多个任务,时间片划分,一个时刻只处理一个任务
  • 并行 (Parallelism) 多个处理器 “同时”处理多个任务

并发编程的应用场景:

通常在如下两种场景中我们会使用到并发编程。

  • 高并发请求的online应用
  • 大数据量和计算量(CPU型)的offline应用

几个知识点

在并发编程中,我们需要处理两个关键问题:线程之间如何通信及线程之间如何同步。首先我们先了解java并发的几个知识点。

内存模型

按照编译原理的观点, 程序运行时的内存分配有三种策略, 分别是静态的,栈式的,和堆式的,jvm也是这样的。如下图 如何保证多个线程操作主内存的数据完整性是一个难题,Java内存模型也规定了工作内存与主内存之间交互的协议,首先是定义了8种原子操作:

  • lock:将主内存中的变量锁定,为一个线程所独占
  • unclock:将lock加的锁定解除,此时其它的线程可以有机会访问此变量
  • read:将主内存中的变量值读到工作内存当中
  • load:将read读取的值保存到工作内存中的变量副本中。
  • use:将值传递给线程的代码执行引擎
  • assign:将执行引擎处理返回的值重新赋值给变量副本
  • store:将变量副本的值存储到主内存中。
  • write:将store存储的值写入到主内存的共享变量当中。

重排

指令重排,就是指指令执行顺序可能会与代码的顺序不一致,为什么需要指令重排,指令重排意义在于: JVM能够根据处理器的特征(CPU的多级缓存系统) 适当的重新排列机器指令,使得机器指令更符合CPU的执行特点,最大限度的发挥机器的性能。

  1. 编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
  2. 指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction-Level Parallelism, ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
  3. 内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。

上述的1属于编译器重排序,2和3属于处理器重排序。这些重排序都可能会导致多线程程序出现内存可见性问题。 就算指令重排了,我们也要利用一些原则来保证某些事件之间的关系,这样才让我们实现线程安全。这就是Happen-before原则 happens-before规则如下:

  • 程序顺序规则:一个线程中的每个操作,happens- before 于该线程中的任意后续操作。
  • 监视器锁规则:对一个监视器锁的解锁,happens- before 于随后对这个监视器锁的加锁。
  • volatile变量规则:对一个volatile域的写,happens- before 于任意后续对这个volatile域的读。
  • 传递性:如果A happens- before B,且B happens- before C,那么A happens- before C。  Java通过提供以下工具来实现Happens-before原则  synchronized, volatile, final, java.util.concurrent

内存屏蔽

JMM的编译器重排序规则会禁止特定类型的编译器重排序(不是所有的编译器重排序都要禁止)。对于处理器重排序,JMM的处理器重排序规则会要求java编译器在生成指令序列时,插入特定类型的内存屏障(memory barriers,intel称之为memory fence)指令,通过内存屏障指令来禁止特定类型的处理器重排序(不是所有的处理器重排序都要禁止)。 处理器为什么会重排?Cpu指令执行效率,CPU访问主存的效率比访问cpu缓存的效率低太多。如下java参数可限制CPU指令重排。 Java -XX:+UnlockDiagnosticVMOptions -XX:PrintAssemblyOptions=hsdis-print-bytes -XX:CompileCommand=print,WriterReader.write WriterReader

锁概念

1)悲观锁

只要线程2一获得Entry 的互斥锁,它就会阻击其它线程去改变它,然后它就可以随意做它要做的事情,设置值,然后做其它事情。

2)乐观锁 在这种情况,当线程2需要去写Entry时才会去锁定它.它需要检查Entry自从上次读过后是否已经被改过了。如果线程1在线程2读完后到达并把值改为”blah”,线程2读到了这个新值,线程2不会把"fluffy"写到Entry里并把线程1所写的数据覆盖.线程2会重试(重新读新的值,与旧值比较,如果相等则在变量的值后面附上’y’)

JUC介绍

volatile实现

volatile较轻量的同步,锁提供了两种主要特性:互斥(mutual exclusion) 和可见性(visibility),volatile只保证变量可见性,但无法保证原子性

  • [可见性]:对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。

  • [原子性]:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性。 volatile的内存语义

  • volatile写:当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存。

  • volatile读:当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。

问题:volatile变量在各个线程中是一致的,所以基于volatile变量的运算在并发下是安全的。这句话是否正确?为什么?

使用volatile的场景必须符合以下两个原则: 1)运算结果并不依赖变量的当前值,或者能够确保只有单一的线程修改变量的值。 2)变量不需要与其他的状态变量共同参与不变的约束。

CAS(Compare and Swap)

无锁的数据结构的基础:Compare and Swap现在所有的CPU指令都支持CAS的原子操作,X86对应的指令是CMPXCHG汇编指令。CAS是一组原子指令来达到同步的目的,它去拿一个值跟内存中一个值进行比较,只有相等的情况,才会给这个内存的值赋予新的值。整个操作都在一个原子操作内完成的。CAS操作比锁消耗资源少的多,因为它们不牵涉操作系统,它们直接在CPU上操作。

CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。ABA问题,循环时间长开销大和只能保证一个共享变量的原子操作

  1. ABA问题。从Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

  2. 循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。

  3. 只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。

线程池(ThreadPoolExecutor)

ThreadPoolExecutor是JDK中提供的线程池,其声明如下:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize:线程池里应该保留的线程数
  • maximumPoolSize:线程池里允许的最大线程数 如下线程池增加线程的实现逻辑:如下图:

  • worker线程小于corePoolSize, 新增一个worker线程,新task在新线程里运行

  • worker线程大于等于corePoolSize, 新task放入workQueue。同时检测worker线程为0时,增加一个worker线程检测executor状态,STOP时,移除新增任务,调用检测饱和策略处理。

  • 上面都失败的情况下,worker线程小于maximumPoolSize时,直接新增一个worker线程运行task,失败后调用饱和策略处理。 饱和策略是上面ThreadPoolExecutor声明中的RejectedExecutionHandler对象,如下是JDK提供的饱和策略。

  • AbortPolicy(默认):中止,executor抛出未检查RejectedExecutionException,调用者捕获这个异常,然后自己编写能满足自己需求的处理代码。

  • DiscardOldestPolicy:遗弃最旧的,选择丢弃的任务,是本应接下来就执行的任务。
  • DiscardPolicy:遗弃会默认放弃最新提交的任务(这个任务不能进入队列等待执行时)
  • CallerRunsPolicy:调用者运行,既不会丢弃哪个任务,也不会抛出任何异常,把一些任务推回到调用者那里,以此减缓新任务流。它不会在池线程中执行最新提交的任务,但它会在一个调用了execute的线程中执行。

CyclicBarrier

CyclicBarrier是juc中应用比较多的一个工具类。所谓工具类,是对外屏蔽了一些内部实现细节,可以被用户直接使用达到线程同步的目的。我们可以通过分析源码看看cyclicbarrier是怎么工作来让多线程协调工作。

1) 类的成员变量以及各自作用

2) 核心函数await如何工作。第一段:

lock.lock()到底干了什么事情?

lock是一个reetrantlock,是独占锁。独占锁的特征是,lock被thread0锁定之后,其他线程,包括同为“beginning”的线程,都无法获取到锁。Reetrantlock实际上是委托类中的非公平锁。非公平锁首先通过CAS设置AQS独占线程而持有锁: % img center /images/juc-images/java-cyclicBarrier-4.png %}

acquire:

如果tryAcquire(arg)成功,那就没有问题,已经拿到锁,整个lock()过程就结束了。前面已经强调Lock是独占的,那么如果此时thread1-beginning这个线程进来之后,发现无法tryacquire,那如何处理?

如果无法获取到锁,则创建一个独占节点Node,加入CHL队列。CHL是一个非常重要的概念。

那么这个CHL队列是如何起作用?看unlock的过程就明白了。核心函数await核心函数await如何工作,第二段

看看nextGeneration在做什么:

需要是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就上上面提到的第4步(acquireQueued) 1) 核心函数await如何工作,第三段

这个trip.await()到底是要干神马事情?我们注意第一段,thread1-beginning还在那里自旋等待锁的获取,那么这里await的实现逻辑,就是让当前获取锁的线程park住并把锁释放出来,让在那里自旋的兄弟们解解渴。 那么什么时候会受到signal的信号呢?

2) 核心函数await如何工作,第三段

Signal只是唤醒当前线程往下走,那么其他在那里自旋的兄弟们怎么办呢?这里我们又要看到AQS的那个队列,此时,unlock将从唤醒下一个节点从Condition的await中出来,然后往下走。

总结一下,3点控制线程同步: 1) 原子性操作同步器的状态位

2) 阻塞和唤醒线程一个有序的队列

3) 一个有序的队列

LMX Disruptor

锁技术是慢的

关于锁就是它们需要操作系统去做裁定。线程就像两姐妹在为一个玩具在争吵,然后操作系统就是能决定他们谁能拿到玩具的父母,就像当你跑向你父亲告诉他你的姐姐在你玩着的时候抢走了你的变形金刚-他还有比你们争吵更大的事情去担心,他或许在解决你们争吵之前要启动洗碗机并把它摆在洗衣房里。Disruptor论文中讲述了我们所做的一个实验。这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。当单线程无锁时,程序耗时300ms。如果增加一个锁(仍是单线程、没有竞争、仅仅增加锁),程序需要耗时10000ms,慢了两个数量级。更令人吃惊的是,如果增加一个线程(简单从逻辑上想,应该比单线程加锁快一倍),耗时224000ms。使用两个线程对计数器自增5亿次比使用无锁单线程慢1000倍。并发很难而锁的性能糟糕。

LMX Disruptor介绍

LMX Diruptor是一个性能极其良好的一个高并发的框架。他如何提高高并发情况下的性能呢?

无锁

首先,Disruptor根本就不用锁。取而代之的是,在需要确保操作是线程安全的(特别是,在多生产者的环境下,更新下一个可用的序列号)地方,我们使用CAS(Compare And Swap/Set)操作

去掉伪共享

1) CPU及缓存介绍

CPU是你机器的心脏,最终由它来执行所有运算和程序。主内存(RAM)是你的数据(包括代码行)存放的地方。CPU和主内存之间有好几层缓存,因为即使直接访问主内存也是非常慢的。如果你正在多次对一块数据做相同的运算,那么在执行运算的时候把它加载到离CPU很近的地方就有意义了(比如一个循环计数-你不想每次循环都跑到主内存去取这个数据来增长它吧)。 越靠近CPU的缓存越快也越小。所以L1缓存很小但很快(译注:L1表示一级缓存),并且紧靠着在使用它的CPU内核。L2大一些,也慢一些,并且仍然只能被一个单独的 CPU 核使用。L3在现代多核机器中更普遍,仍然更大,更慢,并且被单个插槽上的所有 CPU 核共享。最后,你拥有一块主存,由全部插槽上的所有 CPU 核共享。 当CPU执行运算的时候,它先去L1查找所需的数据,再去L2,然后是L3,最后如果这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。

2) 缓存行

现在需要注意一件有趣的事情,数据在缓存中不是以独立的项来存储的,如不是一个单独的变量,也不是一个单独的指针。缓存是由缓存行组成的,通常是64字节(译注:这篇文章发表时常用处理器的缓存行是64字节的,比较旧的处理器缓存行是32字节),并且它有效地引用主内存中的一块地址。一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量。 (为了简化,我将忽略多级缓存)

非常奇妙的是如果你访问一个long数组,当数组中的一个值被加载到缓存中,它会额外加载另外7个。因此你能非常快地遍历这个数组。事实上,你可以非常快速的遍历在连续的内存块中分配的任意数据结构。我在第一篇关于ring buffer的文章中顺便提到过这个,它解释了我们的ring buffer使用数组的原因。 因此如果你数据结构中的项在内存中不是彼此相邻的(链表,我正在关注你呢),你将得不到免费缓存加载所带来的优势。并且在这些数据结构中的每一个项都可能会出现缓存未命中。 不过,所有这种免费加载有一个弊端。设想你的long类型的数据不是数组的一部分。设想它只是一个单独的变量。让我们称它为head,这么称呼它其实没有什么原因。然后再设想在你的类中有另一个变量紧挨着它。让我们直接称它为tail。现在,当你加载head到缓存的时候,你也免费加载了tail。

听想来不错。直到你意识到tail正在被你的生产者写入,而head正在被你的消费者写入。这两个变量实际上并不是密切相关的,而事实上却要被两个不同内核中运行的线程所使用。

设想你的消费者更新了head的值。缓存中的值和内存中的值都被更新了,而其他所有存储head的缓存行都会都会失效,因为其它缓存中head不是最新值了。请记住我们必须以整个缓存行作为单位来处理(译注:这是CPU的实现所规定的,详细可参见深入分析Volatile的实现原理),不能只把head标记为无效。

现在如果一些正在其他内核中运行的进程只是想读tail的值,整个缓存行需要从主内存重新读取。那么一个和你的消费者无关的线程读一个和head无关的值,它被缓存未命中给拖慢了。 当然如果两个独立的线程同时写两个不同的值会更糟。因为每次线程对缓存行进行写操作时,每个内核都要把另一个内核上的缓存块无效掉并重新读取里面的数据。你基本上是遇到两个线程之间的写冲突了,尽管它们写入的是不同的变量。 这叫作“伪共享”(译注:可以理解为错误的共享),因为每次你访问head你也会得到tail,而且每次你访问tail,你也会得到head。这一切都在后台发生,并且没有任何编译警告会告诉你,你正在写一个并发访问效率很低的代码。

Summary

这篇blog主要是为大家带来concurrency的含义,以及它的好处,和相应的问题,同时介绍了LMAX Disruptor相关知识,后面我们会对其做进一步介绍。

关于Java异常的那些事儿

一、 Exception

1. Exception定义

在有充足理由将某情况视为该方法的典型功能(typical functioning )部分时,避免使用异常。 因此,意外情况就是指方法的“正常功能”(normal functioning)之外的情况。

2. Exception分类

Checked Exception: Java.lang.Exception的子类,方法签名上需要显示的声明throws,编译器迫使调用者处理这类异常或者声明throws继续往上抛。主要是对付那些可以恢复过来的场景。

Unchecked Exception: RuntimeException的子类,方法签名不需要声明throws,编译器也不会强制调用者处理该类异常。主要用于那些无法恢复的场景。

当要决定是采用checked exception还是Unchecked exception的时候,你要问自己一个问题,”如果这种异常一旦抛出,客户端会做怎样的补救?” 如果客户端可以通过其他的方法恢复异常,那么这种异常就是checked exception; 如果客户端对出现的这种异常无能为力,那么这种异常就是Unchecked exception;从使用上讲,当异常出现的时候要做一些试图恢复它的动作而不要仅仅的打印它的信息。

3. 异常的优缺点

优点:

    1). 分离错误代码和正常代码,代码更简洁。 
    2). 保护数据的正确性和完整性,程序更严谨及更健壮。 
    3). 便于调试和排错,软件更好维护。

缺点: 1). 过度或者错误使用异常会导致消耗过多的资源。 2). 有些被定义为Checked Exception的,在某些情况下并不被视为异常情况。

4. 异常处理原则:

第1条: 只针对不正常的情况才使用异常 1) 异常应该只用于异常的情况下,不该用于正常的控制流 2) 设计良好的API不应该强迫它的客户端为了正常的控制流而使用异常,可以提供状态测试(hashNext())或者可识别的返回值(return null)。

第2条: 对于可恢复的条件使用被检查的异常,对于程序错误使用运行时异常 java有三种可抛出的结构(throwable),受检的异常(checked exception)、运行时异常(run-time exception)和错误(error)。

第3条: 避免不必要的使用被检查的异常 如果正确地使用API并不能阻止这种异常条件的产生,并且一旦产生异常,使用API的程序员可以立即采取有用的动作,这种负担就被认为是正当的。除非这两个条件都成立,否则更适合于使用未受检的异常 第4条: 尽量使用标准的异常 第5条: 抛出的异常要适合于相应的抽象 1) 更高层的实现应该捕获低层的异常,同时抛出可以按照高层抽象进行解释的异常,即异常转译(Exception translation) 2) 异常链,低层的异常对于调试导致高层异常的问题有帮助的时候使用 3) 尽管异常转译与不加选择第从底层传递异常的做法相比有所改进,但是它也不能被滥用。如果可能,处理来自低层异常的最好做法是,在调用低层方法之前确保它们会成功执行,从而避免它们抛出异常。如在给低层传递参数之前,检查高层参数有效性,从而避免低层抛出异常 4) 如果无法避免低层异常,次选是,让更高层来悄悄绕开这些异常,从而将高层方法的调用者与低层的问题隔离开来。如使用java.util.logging将异常记录下来。

第6条: 每个方法抛出的 异常都要有文档 第7条: 在细节消息中包含失败 — 捕获消息 第8条: 努力使失败保持原子性,一般而言,失败的方法调用应该使对象保持在被调用之前的状态。具有这种熟悉的方法被成为具有失败原子性(failure atomic)。 1) 最简单的办法莫过于设计一个不可变的对象,如果对象不可变,失败原子性就是显然的了 2) 对于在可变对象上执行操作的方法,获得原子性最常见的办法是,在执行操作之前检查参数的有效性。 3) 做恢复代码,回滚 4) 在对象的临时拷贝上进行操作,完成之后在用临时拷贝中的结构代替对象的内容。 5) 一般而言,作为方法规范的一部分,产生任何异常都应该让对象保持在该方法调用之前的状态。如果违反这条规则,API文档就应该清楚地指明对象将会处于什么样的状态。

第9条: 不要忽略异常 1) 当api的设计者声明一个方法将抛出某个异常的时候,他们等于正在试图说明某些事情,因此不要忽略它。 2) 空的catch使异常达不到应有的目的,至少,catch块也应该保护一条说明,解释为什么可以忽略这个异常

5. Exception实例

5.1 Exception顺序

1
2
3
4
5
6
7
try {
      throw new FileNotFoundException();
  }  catch (FileNotFoundException file) {
      System.out.println("File Not Found Exception");
  } catch (IOException io) {
      System.out.println("IO Exception");
  }

IOException 只会catch住非FileNotFoundException以外的exception, 交换2个位置,是会编译出错的,会提示FileNotFoundException已经被catch住了。

5.2 Exception Wrapper

Exception Wrapper, 聚合这些exception, 在调用栈的顶部,封装下exception, 让上次不要知道底层这么多细节。

如果在catch或者finally的时候,抛出一个exception, 那么这个exception hide住了本来catch住的exception.

不好的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
InputStream input = null;
  try{
    input = new FileInputStream("myFile.txt");
    //do something with the stream
  } catch(IOException e){
    throw new WrapperException(e);
  } finally {
    try{
     input.close();
    } catch(IOException e){
       throw new WrapperException(e);
    }
  }

正确的写法:

1
2
3
4
5
6
7
8
9
10
11
12
try{
    input = new FileInputStream("myFile.txt");
    //do something with the stream
  } catch(IOException e){ //first catch block
    throw new WrapperException(e);
  } finally {
    try{
     if(input != null) input.close();
    } catch(IOException e){  //second catch block
       throw new WrapperException(e);
    }
  }

我们必须保证的是,最后一个exception必须throw 所有exceptions情况,其实这还不是最复杂的情况。在jdbc transations里面,情况更为复杂。

5.3 UndeclaredThrowableException

JDK的java doc是这么解释UndeclaredThrowableException的: http://download.oracle.com/technetwork/java/javase/6/docs/zh/api/java/lang/reflect/UndeclaredThrowableException.html 英文版的: http://pic.dhe.ibm.com/infocenter/adiehelp/v5r1m1/index.jsp?topic=%2Fcom.sun.api.doc%2Fjava%2Flang%2Freflect%2FUndeclaredThrowableException.html

明白了什么是UndeclaredThrowableException后,那我就查我的代码中时什么原因导致了这个exception: 这是因为通过反射调用抛出的异常被代理类包装为UndeclaredThrowableException 这个异常总共途径了一下几个系统: RPC client(源头)—–>我的应用——>Web容器(容器层) 那么我们来分析下,分别在那一层处理这个异常比较合适。 1、 源头RPC client层:我认为,判断一处代码是应该处理异常还是将异常抛出,取决于其是否获取了异常信息后,能如何应对?如果其能针对捕捉到的异常做出符合业务逻辑需要的处理,那么就不应该将异常继续抛出,反之,则抛出异常。比如上面的TimeOutException,不同应用的业务逻辑有不同的处理办法,而RPC Client端并不知道这个具体的情况,所以RPC Client不需要处理这类异常,将其继续抛出即可。越接近异常的源头,处理异常最方便(此处的方便,指的是,在接近于源头的底层框架处理异常,那么对调用这些框架的应用代码来说,就不需要关注异常处理了,这就方便了 2、 应用层:那么是不是需要在应用层做处理呢?这里需要具体情况具体分析。拿我这个应用来说,这是一个web应用,就算我这个应用对runtimeException不做处理,web容器也会对这个异常进行处理,并不会造成程序的崩溃。但是如果应用层不是web项目,或者应用不是依托在某个容器内,那如果应用层不对这个异常进行处理的话,就没人处理这个异常了,就会直接抛出一个RuntimeException,导致程序挂掉。因此我认为,如果我们在应用中调用RPC等之类的服务,要注意一下几点: 此处的调用抛了UndeclaredThrowableException,需要额外的处理不? 如果需要,就要在此处捕捉到这个异常; 如果不需要,那么就要想想,我们的应用把这个异常抛出去,接下来会不会有地方来处理这个异常,如果有,则可以继续抛出? 如果我们的应用已经是异常处理链中的最后一环,那么还是要把这个异常捕捉住。 如果有Web容器之类的罩着,我们可以把这个异常继续传递下去。 如果在应用中要处理异常,应该考虑的全面细致。是否需要降级?该流程是否是必须流程,如果是必须流程是否还会涉及到业务回滚,如果是非必须流程如何在产生异常时不影响当前业务。是否需要容灾? 3、 Web容器:上面说了,如果是Web项目,容器会处理应用抛出的RuntimeException,就像上面图展示的 Webx3容器(jboss)的默认Error页面。 另外, 对于RPC之类等涉及网络的服务调用,需要养成一个多思考一点的异常处理的编程习惯。

5.4 异常的性能问题

1) 问题存在 是否使用异常来控制业务流程,是个比较争议的话题。支持的人认为,使用业务异常来控制业务流程,代码逻辑清晰,反对的人认为异常消耗太多资源,对系统的性能有影响。那我们做个测试,看看异常是否会影响性能。 首先定义一个普通的业务异常,代码如下:

1
2
3
4
5
6
7
8
9
public class BusinessException extends Exception {
    private static final long serialVersionUID = 8186919518812720632L;
    public BusinessException(String message, Throwable cause) {
        super(message, cause);
    }
    public BusinessException(String message) {
        super(message);
    }
}

然后测试对比,生成业务异常对象和生成普通对象的耗时分别是多少,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
long startTime = System.currentTimeMillis();
for (int i = 0; i < 2000000; i++) {
    new HashMap<String, String>();
}
long endTime = System.currentTimeMillis();
System.out.println("HashMap:" + (endTime - startTime));

startTime = System.currentTimeMillis();
for (int i = 0; i < 2000000; i++) {
    new BusinessException("exception test");
}
endTime = System.currentTimeMillis();
System.out.println("BusinessException:" + (endTime - startTime));

最后的执行结果如下:

1
2
HashMap:24
BusinessException:916

2) 原因分析 可见生成普通HashMap对象和普通业务异常对象的耗时不在一个数量级,那我们看看生成业务异常对象,做了哪些事情,查看Exception的父类Throwable的构造函数,发现构造普通业务异常对象的时候,会调用synchronized声明的方法fillInStackTrace,代码如下:

1
2
3
4
public Throwable(String message) {
        fillInStackTrace();
        detailMessage = message;
    }

性能的主要开销就在这个方法,作用是填充线程运行堆栈信息,其实定义业务异常是为了用于控制业务逻辑,为可检查异常,其堆栈信息没有多大用处,生成业务异常的时候,没有必要填充堆栈信息。 3) 解决方法 对BusinessException做了改进,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class SterilizedBusinessException extends Exception {
    private static final long serialVersionUID = -722749368211888466L;
    public SterilizedBusinessException(String message, Throwable cause) {
        super(message, cause);
    }
    public SterilizedBusinessException(String message) {
        super(message);
    }
    @Override
    public Throwable fillInStackTrace() {
        return null;
    }
}

对方法fillInStackTrace进行了重写,此时我们再测试一次,结果如下:

1
2
HashMap:25
BusinessException:23

此时就不会因为业务异常而影响系统性能。

5.5 运行时异常和可检查异常的差别

这个差别主要表现在编程时和运行时,编程时差别大家都比较清楚, checked异常总是需要显式处理, 而unchecked异常可以选择性处理,那么在运行时呢,这两个异常JVM处理上面是否存在很大的差异? 1) Java文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.test.jingchu;

public class ExceptionTest2 {
    public static void main(String[] args) {
        try {
            getResult1();
        } catch (MyException e) {
            System.out.println("MyException");
        }
        try {
            getResult2();
        } catch (IllegalArgumentException e) {
            System.out.println("IllegalArgumentException");
        }
    }

    public static String getResult1() throws MyException {
        throw new MyException();
    }

    public static String getResult2() {
        throw new IllegalArgumentException();
    }
}

2) class阉割 将编译好的class 文件使用二进制方式打开,找到如下位置:

我们 将绿色部分修改成0001 , 将红色部分删除:

3) 反编译 将该class 反编译成java : 我们会发现 getResult1() 的方法的throws 信息被我们阉割掉了。 4) 运行 继续运行该class 打印结果: 5) 结论 checked 异常的throws 语句不影响class 文件的加载,且在运行时JVM没有区分这两类异常。也就是说异常机制其实在运行设计上面是没有任何差别的, 但是在使用设计上面却存在很大的差别。

6. Java-7中新增exception特性

6.1 特性一 Java 7 Try-with-resourcce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static void printFile() throws IOException {
    InputStream input = null;
    try {
        input = new FileInputStream("file.txt");
        int data = input.read();
        while(data != -1){
            System.out.print((char) data);
            data = input.read();
        }
    } finally {
        }
    }
}
        if(input != null){
            input.close();
        }
    }
}

在JSR 334, Project Coin

1
2
3
4
5
6
7
8
9
10
11
private static void printFileJava7() throws IOException {

    try(FileInputStream input = new FileInputStream("file.txt")) {

        int data = input.read();
        while(data != -1){
            System.out.print((char) data);
            data = input.read();
        }
    }
}

这个FileInputStream会在try block完成时,被自动关掉,因为FileInputStream实现了Java.lang.AutoCloseable接口,所有class实现了这个接口,都可以用在try-with-resources结果里面。

当多个Resouces申明了以后,

try( FileInputStream   input    = new FileInputStream("file.txt");
        BufferedInputStream bufferedInput = new BufferedInputStream(input)
   ) {

input, bufferedInput都会自动关闭,但是他们关闭的顺序正好和他们申明顺序相反。

6.2 特性二 Java7 之前我们catch住多个Exception

1
2
3
4
5
6
7
8
9
try {
    // execute code that may throw 1 of the 3 exceptions below.
} catch(SQLException) {
    logger.log(e);
} catch(IOException e) {
    logger.severe(e);
}catch(Exception e) {
    logger.severe(e);
}

Java 7之后 我们可以用

1
2
3
4
5
6
7
try {
    // execute code that may throw 1 of the 3 exceptions below.
} catch(SQLException | IOException e) {
    logger.log(e);
} catch(Exception e) {
    logger.severe(e);
}

如果用一个比较大的父类的Exception, 这会造成上层用户,很难知道到底发生了什么。 7. 异常实现原理(JVM) JVM对异常的处理主要是基于异常表(Exception Table),每个包含了try的方法在编译后除字节码外,都会产生一个附加的数据结构—异常表,异常表结构如下:

{
    {PC:BEGIN, PC:END, PC:HANDLER, EXCEPTION-TYPE}
}
PC:BEGIN-PC:END 代表异常发生的PC值的范围
EXCEPTION-TYPE   代表异常捕获的类型
PC:HANDLER          代表异常处理的指针

当JVM的方法产生异常后,按照以下步骤执行异常处理: 1. JVM会在当前Frame的Exception Table里面逐条查找PC范围和异常类型匹配的记录。 并将PC跳转到对应的异常处理的位置上。 注意:类型符合不是完全匹配,而是符合向上原则,即父类的catch可以捕获子类的对象,即 e instance of CLASS。 2. 如果当前Frame不存在Exception Table, 或者在Exception Table里找不到匹配的记录,则当前Frame出栈,把当前Frame设置成上一个Frame,获得新的当前pc和当前异常表。 3. 继续执行1,直到JAVA STACK到达栈底还不能匹配,则异常未处理,抛出到JRE。

个人理解:JVM是基于所谓的栈帧的(stack frame)的,一个函数调用链就是一个个栈帧组成,当在一个栈里用athrow抛出异常时,JVM会搜索当前函数的异常处理表(参考下面的Class文件分析),如果有找到对应的异常处理的handler,则由这个handler来处理。如果没有,则清理当前栈,再回到上一层栈帧中处理。如果一层层栈帧回退,最 athrow指令: 在JVM里实现异常的指令是athrow,指令的参考在这里:http://docs.oracle.com/javase/specs/jvms/se7/html/jvms-6.html#jvms-6.5.athrow 终都没有找到Exception Handler,则线程终止。

8. 带finally的异常处理

如果异常处理带有finally块,字节码的处理同上面的异常处理流程一致, JVM并不需要为此机制作出改动。编译器通过内联函数和扩展异常表来实现的这个过程。 可以这么理解finally,有异常发生的时候,可以理解为一个可以捕获方法内任何异常(包含try块中的异常和catch块中的异常)并自动重新抛出的catch段。 在没有异常,或者异常被捕获的过程,以字节码内联的方式将finally段内容自动添加到对应的段后,然后自动执行。 注意:如果finally段内如果有return关键字的话,异常已经被捕获,在再次抛出之前,return导致了Frame的出栈,从而导致后面的(athrow)没有执行,JVM的异常状态神奇的消失了。所以在代码实现上面,一定要避免这种有违异常约定的写法(eclipse也会提醒)。

关于Java集合的那些事儿

一、集合的体系结构

Collection是集合的基类,List和Set均继承自该类。在使用上,List:有序(元素存入集合的顺序和取出的顺序一致),元素都有索引,元素可以重复,而Set中元素要保证不能重复。Map集合存储和Collection有着很大不同: (1)Collection一次存一个元素;Map一次存一对元素。

(2)Collection是单列集合;Map是双列集合。

(3)Map中的存储的一对元素:一个是键,一个是值,键与值之间有对应(映射)关系。

二、Set和Map的关系,以及Map和List的关系

以HashMap为例,k-v键值对以Entry的方式存入了它的数据成员table,这个table我们可以认为它就是一个Set,因为这个table中的元素是彼此不相等; 反过来说,Set中的元素有一个索引与之对应,所以从这个角度看Set也是Map,而且我们可以从HashSet的实现看到,HashSet其实是通过HashMap来实现的; Map和List的关系,与Map和Set的关系类似。

三、ArrayList

(1)使用场景

继承自List,内用Object[]数组存放数据,封装对数组的操作,动态扩容,频繁的增(超出capacity则需要System.arraycopy,指定插入位置必执行System.arraycopy)删(每次删除都要System.arraycopy)影响性能,查询优势。当你需要一个数据结构存储数据,少增删多查询,可以使用ArrayList。

(2)源码解读

ArrayList的add/remove/subList代码解读:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public boolean add(E e) {
/*内部方法,计数并判断是否需要扩容
如果elementData已满,增加其大小至elementData.length + DEFAULT_CAPACITY
     */
        ensureCapacityInternal(size + 1);
elementData[size++] = e;
  //永远返回true,对于错误情况总是抛出异常
        return true;
    }

删除:
public E remove(int index) {
  //如果传入的index大于elementData存储总数,抛IndexOutOfBoundsException
        rangeCheck(index);
  //修改记录数,为做同步检查
        modCount++;
        E oldValue = elementData(index);
  //需要移动的数量。影响的是index之后的所有数据
        int numMoved = size - index - 1;
  //如果移除的是最后一位,就没有必要重新拷贝。
        if (numMoved > 0)
            System.arraycopy(elementData, index+1, elementData, index,numMoved);
  //把多余的最后一位清空,GC会回收
        elementData[--size] = null;
        return oldValue;
}

SubList(代码比较长,摘取部分)
  //constructor
SubList(AbstractList<E> parent,
                int offset, int fromIndex, int toIndex) {
      this.parent = parent; //将当前arraylist以参数形式传给subList
      this.parentOffset = fromIndex;
      this.offset = offset + fromIndex;
      this.size = toIndex - fromIndex; //记录sublist的size,与parent分开处理
      this.modCount = ArrayList.this.modCount; //防控并发修改
}
public E set(int index, E e) {
      rangeCheck(index); //sublist自身的方法,将index与sublist的size做比较
      checkForComodification(); //只检查了ModCount,没有变更。Set不是结构性变更,所以不用更改modCount。
      E oldValue = ArrayList.this.elementData(offset + index);
      ArrayList.this.elementData[offset + index] = e; //直接修改父类的数据
      return oldValue;
}
//对于sublist的add方法,必须指定index
  public void add(int index, E e) {
        rangeCheckForAdd(index);
        checkForComodification();
        parent.add(parentOffset + index, e);//调用parent对象的add方法完成此操作,因此父类数据也会被修改。
        this.modCount = parent.modCount;//add是结构性的更变,需要修改modCount。
        this.size++;
    }

初始capacity为10, 视应用情况使用ensureCapacity为数组扩容,减少System.arraycopy的次数。

四、HashMap

HashMap是线程不安全的 HashTable是线程安全的,但是粒度很大 ConcurrentHashMap, 线程安全,粒度比较小 利用segment缩小锁的范围

(1)、从HashMap的结构可以看出,HashMap的设计初衷是想结合链表和线性表的优势而达到较平衡的查询、添加和删除性能。

(2)、源码分析

Hash方法中,hashcode 的高位参加 hash计算,这样计算出来的hash值可以最大限度的体现对象之间的差异,目的是将对象放在不同的数组位置上。

Index方法中,对hash取模操作,这里的magic是length的长度是2的幂数,length-1后二进制都是01111…1, 从而可以快速取模。

1
2
3
4
5
6
7
8
static int hash(int h) {
    h ^= (h >>> 20) ^ (h >>> 12);
    return h ^ (h >>> 7) ^ (h >>> 4);
}

static int indexFor(int h, int length) {
    return h & (length-1);
}

GET取出的过程 1 判断key是不是为空,为空的话,默认在table[0]这个链表里找; 2 key非空的话,通过对hashcode进行hash,算出最终的hash值; 3 通过indexfor方法找到对应的链表table[i]; 4 在链表里,通过比较对象的hash值和key值。如果完全相同的话,返回对象。如果没有找到,返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public V get(Object key) {
    if (key == null)
        return getForNullKey();
    int hash = hash(key.hashCode());
    for (Entry<K,V> e = table[indexFor(hash, table.length)];
        e != null;
        e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k)))
            return e.value;
    }
    return null;
}

static int indexFor(int h, int length) {
    return h & (length-1);
}

(3)、Fail-first机制

HashMap不是线程安全。HashMap中有个成员变量modCount用来计算保障当前的map数据没有其它线程在同时修改。 如果有多个线程同时修改,会抛出ConcurrentModificationException看如下代码,用hashiterator方法来遍历map的时候, expectedModCount会被赋予最新的modCount值,调用nextEntry时,检查modCount值相等则不会报错。 调用iterator的remove方法,会让modCount加1,同时expectedModCount也被更新到最新。 所以下次在调用nextEntry时,连个值还是相等的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
     HashIterator() {
        expectedModCount = modCount;
        if (size > 0) { // advance to first entry
            Entry[] t = table;
            while (index < t.length && (next = t[index++]) == null)
                ;
        }
    }

      final Entry<K,V> nextEntry() {
        if (modCount != expectedModCount)
            throw new ConcurrentModificationException();
        Entry<K,V> e = next;
        if (e == null)
            throw new NoSuchElementException();

        if ((next = e.next) == null) {
            Entry[] t = table;
            while (index < t.length && (next = t[index++]) == null)
                ;
        }
        current = e;
        return e;
    }

    public void remove() {
        if (current == null)
            throw new IllegalStateException();
        if (modCount != expectedModCount)
            throw new ConcurrentModificationException();
        Object k = current.key;
        current = null;
        HashMap.this.removeEntryForKey(k);
        expectedModCount = modCount;
    }

五、HashSet

HashSet 不会保证iterable的顺序,LinkedHashSet会保证插入的顺序跟你的iterable的顺序一样 两条规则实现hashCode()在classes: 1.如果object1 和 object2 equals(),那么他们2个应该有相同的hashcode 2.如果object1 和 object2 有一样的hashcode,那么他们不一定会equal.

HashSet源码解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class HashSet<E>
    extends AbstractSet<E>
    implements Set<E>, Cloneable, java.io.Serializable
{
    static final long serialVersionUID = -5024744406713321676L;

    //使用HashMap的key保存HashSet中的所有元素
    private transient HashMap<E,Object> map;

    // Dummy value to associate with an Object in the backing Map
    //定义一个虚拟的Object对象作为HashMap的value
    private static final Object PRESENT = new Object();

    /**
     * Constructs a new, empty set; the backing <tt>HashMap</tt> instance has
     * default initial capacity (16) and load factor (0.75).
     */
     //初始化HashSet,底层会初始化一个HashMap
    public HashSet() {
    map = new HashMap<E,Object>();
    }
//指定初始容量和loadfactor来初始化HashSet,其实就是初始化HashMap
     //loadfactor = 散列表的实际元素数目(n)/ 散列表的容量(m)
    public HashSet(int initialCapacity, float loadFactor) {
    map = new HashMap<E,Object>(initialCapacity, loadFactor);
    }

    /**
     * Constructs a new, empty set; the backing <tt>HashMap</tt> instance has
     * the specified initial capacity and default load factor (0.75).
     *
     * @param      initialCapacity   the initial capacity of the hash table
     * @throws     IllegalArgumentException if the initial capacity is less
     *             than zero
     */
    public HashSet(int initialCapacity) {
    map = new HashMap<E,Object>(initialCapacity);
    }

    /**
     * Constructs a new, empty linked hash set.  (This package private
     * constructor is only used by LinkedHashSet.) The backing
     * HashMap instance is a LinkedHashMap with the specified initial
     * capacity and the specified load factor.
     *
     * @param      initialCapacity   the initial capacity of the hash map
     * @param      loadFactor        the load factor of the hash map
     * @param      dummy             ignored (distinguishes this
     *             constructor from other int, float constructor.)
     * @throws     IllegalArgumentException if the initial capacity is less
     *             than zero, or if the load factor is nonpositive
     */
    HashSet(int initialCapacity, float loadFactor, boolean dummy) {
    map = new LinkedHashMap<E,Object>(initialCapacity, loadFactor);
    }

    /**
     * Returns an iterator over the elements in this set.  The elements
     * are returned in no particular order.
     *
     * @return an Iterator over the elements in this set
     * @see ConcurrentModificationException
     */
     //调用Map的keyset来返回迭代器
    public Iterator<E> iterator() {
    return map.keySet().iterator();
    }
/**
     * Adds the specified element to this set if it is not already present.
     * More formally, adds the specified element <tt>e</tt> to this set if
     * this set contains no element <tt>e2</tt> such that
     * <tt>(e==null&nbsp;?&nbsp;e2==null&nbsp;:&nbsp;e.equals(e2))</tt>.
     * If this set already contains the element, the call leaves the set
     * unchanged and returns <tt>false</tt>.
     *
     * @param e element to be added to this set
     * @return <tt>true</tt> if this set did not already contain the specified
     * element
     */
     //将e作为key插入了Hashmap
    public boolean add(E e) {
    return map.put(e, PRESENT)==null;
    }
/**
     * Returns a shallow copy of this <tt>HashSet</tt> instance: the elements
     * themselves are not cloned.
     *
     * @return a shallow copy of this set
     */
     //拷贝通过两步完成:1、通过调用super.clone完成父类的拷贝;2、通过调用map.clone()完成对成员变量map的拷贝。注意map的拷贝也是调用super.clone完成,所以拷贝都是浅拷贝。另外需要注意拷贝过程会抛出CloneNotSupportedException异常,这是一个检查异常。
    public Object clone() {
    try {
        HashSet<E> newSet = (HashSet<E>) super.clone();
        newSet.map = (HashMap<E, Object>) map.clone();
        return newSet;
    } catch (CloneNotSupportedException e) {
        throw new InternalError();
    }
    }

六、ArrayList vs LinkedList

ArrayList,实现是一个数组,动态扩容,每次当超出capacity, 就把数组的size扩大,然后把以前的内容copy到新的数组里面去。

ArrayList 如何实现的fail-fast策略,是在每个对于ArrayList结构发生变化的方法里面(比如:add, remove),都有个modCount这样的变量来记录,然后在iterator里面,构造iterator时候,把modcount 赋值给 exceptCount, 然后check是否相等这2个变量。

LinkedList, 实现是一个双向链表。

            ArrayList,      LinkedList

Get(index) O(1) O(n) Add O(1) O(1) Remove O(n – index) O(1) Fail-fast的实现,modifyCount != exceptiedCount. 它们2个都是线程不安全的。

七、好的实践

使用LinkedHashMap构建LRU缓存

(1) LRU缓存 我们用缓存来存放以前读取的数据,而不是直接丢掉,这样,再次读取的时候,可以直接在缓存里面取,而不用再重新查找一遍,这样系统的反应能力会有很大提高。但是,当我们读取的个数特别大的时候,我们不可能把所有已经读取的数据都放在缓存里,毕竟内存大小是一定的,我们一般把最近常读取的放在缓存里。 LRU缓存利用了这样的一种思想。LRU是Least Recently Used 的缩写,翻译过来就是“最近最少使用”,也就是说,LRU缓存把最近最少使用的数据移除,让给最新读取的数据。而往往最常读取的,也是读取次数最多的,所以,利用LRU缓存,我们能够提高系统的性能。 (2) 为什么是LinkedHashMap 第一,它的数据结构包含一个双向链表,其源代码片段如下:

1
2
3
4
5
6
7
8
9
10
11
12
public class LinkedHashMap<K,V>  extends HashMap<K,V>  implements Map<K,V> {
    /**
     * The head of the doubly linked list.
     */
    private transient Entry<K,V> header;
其中Entry的定义的代码片段如下:
/**                                                                       
 * LinkedHashMap entry.                                                   
 */
private static class Entry<K,V> extends HashMap.Entry<K,V> {
    // These fields comprise the doubly linked list used for iteration.   
    Entry<K,V> before, after;

可见这个双向链表就具备了实现LRU的基础,每次访问LinkedHashMap里的元素或者向LinkedHashMap里添加元素时,都会更新这个双向链表,让其header永远指向最少使用的元素,其代码实现可以参考LinkedHashMap的源代码。 第二,LinkedHashMap本身有一个方法用于判断是否需要移除最少使用的元素,但是,原始方法默认不需要移除,该方法实现的源代码如下:

1
2
3
protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
        return false;
}


实际使用中,可以将header指向的最少使用的元素作为该方法的入参,然后重写方法 removeEldestEntry,可以根据自己的条件来决定是否删除该元素。 (3) 实现样例 基于以上的分析,下面给出一个简单的LUR缓存的实现样例,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class LRULinkedHashMap<K, V> extends LinkedHashMap<K, V> {
  /** serialVersionUID */
  private static final long serialVersionUID = -5933045562735378538L;
  /** 最大数据存储容量 */
  private static final int LRU_MAX_CAPACITY = 1024;
  /** 剩余内存空间的阈值,设置的为50M */
  private static final int SPACE_THRESHOLD = 50 * 1024 * 1024;
/** 存储数据容量 */
  private int capacity;
  public LRULinkedHashMap() {
      super();
  }

  /**
     * 带参数构造方法
     * 
     * @param initialCapacity 容量
     * @param loadFactor 装载因子
     * @param isLRU 是否使用lru算法,true:使用(按方案顺序排序);false:不使用(按存储顺序排序)
     */
  public LRULinkedHashMap(int initialCapacity, float loadFactor, boolean isLRU) {
      super(initialCapacity, loadFactor, true);
      capacity = LRU_MAX_CAPACITY;
  }

  /**
     * 带参数构造方法
     * 
     * @param initialCapacity 容量
     * @param loadFactor 装载因子
     * @param isLRU 是否使用lru算法,true:使用(按方案顺序排序);false:不使用(按存储顺序排序)
     * @param lruCapacity  lru存储数据容量
     */
  public LRULinkedHashMap(int initialCapacity, float loadFactor,
          boolean isLRU, int lruCapacity) {
      super(initialCapacity, loadFactor, true);
      this.capacity = lruCapacity;
  }

  @Override
  protected boolean removeEldestEntry(Entry<K, V> eldest) {
      if (capacity > 0 && size() > capacity) {
          return true;
      } else if (Runtime.getRuntime().freeMemory() < SPACE_THRESHOLD) {
          return true;
      }
      return false;
  }
  }