ConcurrentMap
ConcurrentMap
是 Java 并发包中提供的一个接口,它继承了 java.util.Map 接口,专门用于支持高并发环境下的线程安全操作。ConcurrentMap
提供了一系列线程安全的方法,旨在解决在多线程环境下使用普通 Map
类型(如 HashMap
)时可能出现的竞态条件和数据不一致问题。
ConcurrentMap
具有以下特点:
-
线程安全性:
ConcurrentMap
中的方法都是线程安全的,可以在多线程环境中安全地使用,无需额外的同步措施。 -
高并发性能:
ConcurrentMap
的设计目标是在高并发环境下提供高性能的操作,尤其适用于读多写少的场景。 -
原子性操作:提供了一系列原子性操作,如
putIfAbsent
、remove
等,这些操作在执行时不会被其他线程干扰。
常见实现
ConcurrentMap
接口有多种实现,其中最常见的是 ConcurrentHashMap
。其他实现还包括 ConcurrentSkipListMap
等。
-
ConcurrentHashMap
:- 在 Java 8 之前,
ConcurrentHashMap
使用了锁分段技术(segment-based locking),将哈希表分割成多个段,每个段有自己的锁,从而允许多个写入操作并发进行。 - 在 Java 8 中,
ConcurrentHashMap
采用了基于 CAS(Compare and Swap)操作的新实现,提供了更高的并发性能。
- 在 Java 8 之前,
-
ConcurrentSkipListMap
:- 使用跳表(Skip List)作为底层数据结构,提供了有序的键值对存储,适合需要排序操作的场景。
方法
ConcurrentMap
接口提供了以下常用方法(除了一些 Map 基本方法):
-
V get(Object key)
:获取指定键对应的值,如果映射表中不存在该键,则返回 null。 -
V put(K key, V value)
:将指定键值对插入到映射表中,如果这个键已经存在,则用新值替换旧值,并返回旧值。如果插入一个 null 的键或值,则抛出NullPointerException
。 -
putIfAbsent(K key, V value)
:如果指定的键(key)不存在,则将键值对插入到映射表中;如果键已经存在,则不进行插入操作,并返回原先的值。 -
boolean remove(Object key, Object value)
:如果指定的键值对存在,则从映射表中删除该键值对;否则不进行任何操作。 -
boolean replace(K key, V value)
:替换指定键的值,无论该键原来的值是什么,都会执行替换操作。 -
boolean replace(K key, V oldValue, V newValue)
:以原子方式将键值对由旧值替换为新值,即只有在该键原来的值和提供的旧值(oldValue)相等时,才会执行替换操作。。 -
int size()
:返回映射表中键值对的数量。 -
V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction)
:如果指定键没有关联值,将通过运行映射函数来计算一个新的值,并将该键关联到该计算值。 -
V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)
:如果指定键已经具有相关联的值,则通过运行重新映射函数来计算一个新值,并将该键关联到该值。在计算过程中,更新可能会顺便修改现有映射,从而避免了常见的条件费用。 -
V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)
:原子性的操作,它能够在并发环境中安全地更新或计算指定键的值。key
:要更新或计算值的键。remappingFunction
:用于更新或计算值的函数。remappingFunction
是一个在并发操作中被调用的函数,它接受两个参数:当前键的值(如果存在)和当前键。通过这个函数,可以基于当前的值来计算新的值。remappingFunction
的返回值将作为新的值存储在ConcurrentHashMap
中,若返回 null,则会删除当前键。 -
void forEach(BiConsumer<? super K, ? super V> action)
:对映射表中的每个键值对执行给定的动作。 -
V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction)
:如果指定的键已存在,将其关联的值与给定的合并函数合并,如果不存在,则将其关联到给定值。 -
default V getOrDefault(Object key, V defaultValue)
:获取指定键的值,如果键不存在,则返回默认值。可以用于避免空指针异常。
ConcurrentHashMap
ConcurrentHashMap
是 JUC 中提供的一个线程安全的哈希表实现,借助分段锁机制实现,它扩展了 AbstractMap
类,实现了 ConcurrentMap
接口。
HashTable 的性能问题
Hashtable
效率低下原因是其实现使用了 synchronized
关键字对put
等操作进行加锁,而 synchronized
关键字加锁是对整个对象进行加锁,也就是说在进行 put
等修改 HashTable
的操作时,锁住了整个 HashTable
,从而使得其表现的效率低下。
分段锁策略
JDK 1.7 之前,ConcurrentHashMap
使用 分段锁(Segment-level locking)策略,整个 ConcurrentHashMap
由一个个独立的 Segment
组成。每个分段都有自己的锁。不同的线程可以同时访问不同的分段。
数据结构
Segment
通过继承 ReentrantLock 来进行加锁。Segment
内部是由 数组+链表
组成,一个 Segment 里包含一个 HashEntry
数组,每个 HashEntry 是一个链表结构的元素。
每个段管理一部分键值对,当一个线程需要对某个键值对进行读写操作时,首先会根据键的哈希值来确定对应的段,每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全
在 ConcurrentHashMap
中,每个分段维护了自己的锁,并且只有在修改该分段中的数据时才需要获取对应的分段锁。
不同线程可以同时对不同的分段进行并发修改,从而减少了锁的竞争,提高了并发度。
在 ConcurrentHashMap
集合共 有 2 的 N 次方个 segment,共同保存在一个名为 segments 的数组当中
可以理解为 ConcurrentHashMap
是一个二级哈希表,包含着若干个子哈希表。
读取和写入操作
与传统的 HashTable
或 SynchronizedMap
相比,ConcurrentHashMap
的分段锁策略可以使得在读取操作和写入操作之间实现更好的并发性能。
在读取操作上,不需要获得锁,所以可以允许多个线程同时读取:
- 为输入的 Key 做 Hash 运算,得到 hash 值。
- 通过 hash 值,定位到对应的 Segment 对象
- 再次通过 hash 值,定位到 Segment 当中数组的具体位置。
在写入操作上,只需要获得特定分段的锁,而不是整个数据结构的锁,因此多个线程可以同时进行写入操作,只有修改同一个分段时才需要竞争锁:
- 为输入的 Key 做 Hash 运算,得到 hash 值。
- 通过 hash 值,定位到对应的 Segment 对象
- 获取 ReentrantLock
- 再次通过 hash 值,定位到 Segment 当中数组的具体位置。
- 插入或覆盖 HashEntry 对象。
- 释放锁。
由于当多个线程同时修改同一个分段中的元素时,仍然需要竞争分段锁,分段锁策略并不完全消除了锁的竞争。因此,在高并发环境下,还是需要谨慎设计数据访问逻辑,避免过多的竞争情况,以免影响性能。
数组 + 链表 + 红黑树 + CAS + Synchronized
JDK 1.8 ConcurrentHashMap
取消了 Segment 分段锁,使用数组 + 链表 + 红黑树数据结构和 CAS 原子操作、Synchronized 实现,整个容器只分为一个 Segment,即 table 数组。
-
ConcurrentHashMap
是一个哈希桶数组,如果不出现哈希冲突的时候,每个元素均匀的分布在哈希桶数组中。当出现哈希冲突的时候,采用拉链法的解决方案,将 hash 值相同的节点转换成链表的形式,同 Map 接口的 HashMap 一样,为了防止拉链过长,当链表的长度大于 8(默认值) 的时候会将链表转换成红黑树,这样可以提升大量冲突时候的查询效率;
-
以某个位置的头结点(链表的头结点或红黑树的 root 结点)为锁,配合自旋+ CAS 机制 避免不必要的锁开销,进一步提升并发性能。
特点
ConcurrentHashMap
具有以下特点:
-
线程安全:
ConcurrentHashMap
是线程安全的,多个线程可以同时读取并修改其中的数据,而不需要显式地进行同步操作。它使用了一种分段锁(Segment-Locking)的机制,可同时支持多个线程的读操作,提高了并发性能。 -
高性能:
ConcurrentHashMap
在多线程环境下具有很好的性能表现。通过将数据分割成多个段(Segment),每个段使用独立的锁来控制并发访问,使多个线程可以同时访问不同的数据段,从而提高了并发度。 -
可调整的并发级别:
ConcurrentHashMap
允许在创建实例时指定并发级别(concurrency level)。这个并发级别决定了内部分段锁的数量,从而可以根据应用程序的并发性选择最佳的配置。 -
支持高效的并发操作:
ConcurrentHashMap
提供了一系列高效的并发操作方法,如putIfAbsent(key, value)
、remove(key, value)
、replace(key, oldValue, newValue)
等。这些方法能够在多线程环境下安全地执行对映射表的修改操作。 -
不允许空键或值:
ConcurrentHashMap
不允许空键(null key)或空值(null value)的存在。如果尝试插入空键或空值,将会抛出NullPointerException
。 -
不保证遍历顺序:
ConcurrentHashMap
不保证迭代器的遍历顺序,它的遍历顺序可能受内部数据分布和并发修改操作的影响。
字段
-
volatile Node<K,V>[] table
:装载 Node 的数组,作为 ConcurrentHashMap 的底层容器,采用懒加载的方式,直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为 2 的幂次方。 -
volatile Node<K,V>[] nextTable
:扩容时使用,平时为 null,只有在扩容的时候才为非 null -
volatile int sizeCtl
:该属性用来控制 table 数组的大小,即 ConcurrentHashMap 的大小,根据是否初始化和是否正在扩容有几种情况:
-
当值为负数时:如果为 -1 表示正在初始化,如果为 -N 则表示当前正有 N-1 个线程进行扩容操作;
-
当值为正数时:如果当前数组为 null 的话表示 table 在初始化过程中,
sizeCtl
表示为需要新建数组的长度若已经初始化了,表示当前数据容器(table 数组)可用容量,也可以理解成临界值(插入节点数超过了该临界值就需要扩容),具体值为 t a b l e S i z e ∗ l o a d F a c t o r tableSize * loadFactor tableSize∗loadFactor
-
当值为 0 时,即数组长度为默认初始值。
sun.misc.Unsafe U
:在 ConcurrentHashMap 的实现中,用了大量的U.compareAndSwapXXXX
方法去修改 ConcurrentHashMap 的一些属性。这些方法实际上是利用了 CAS 保证线程安全性
内部实现
Node类
JDK1.8 中的 ConcurrentHashMap
对节点 Node
类中的共享变量,和 JDK1.7 一样,使用 volatile 关键字,保证多线程操作时,变量的可见性。
Node
类实现了 Map.Entry
接口,主要存放 key-value
对,并且具有 next
域
TreeNode
TreeNode
树节点,继承于 Node 类。红黑树的操作是针对 TreeBin
类的,TreeBin
是对 TreeNode
的再一次封装
TreeBin
TreeBin
类并不负责用户的 key、value
信息,而是封装了很多 TreeNode 节点。实际的 ConcurrentHashMap
“数组”中,存放的都是 TreeBin
对象,而不是 TreeNode
对象。
ForwardingNode
ForwardingNode
是在进行扩容操作时会出现的特殊节点,其 key、value、hash 全部为 null。并拥有 nextTable
引用的新 table 数组。
ConcurrentHashMap 的 CAS 操作
ConcurrentHashMap
会大量使用 CAS 来修改它的属性和进行一些操作。
tabAt(Node<K,V>[] tab, int i)
:该方法用来获取 table 数组中索引为 i 的 Node 元素。`
casTabAt()
:利用 CAS 操作设置 table 数组中索引为 i 的元素
setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)
:该方法用来设置 table 数组中索引为 i 的元素
构造方法
- 创建一个默认初始容量(16)和负载因子(0.75)的空映射表。
ConcurrentHashMap()
- 创建具有指定初始容量和默认负载因子(0.75)的空映射表。
ConcurrentHashMap(int initialCapacity)
initialCapacity
: 映射表初始容量。此参数为正数,且元素数量达到容量时,会自动扩容。
- 以指定的初始容量和负载因子创建一个空映射表。
ConcurrentHashMap(int initialCapacity, float loadFactor)
loadFactor
: 负载系数,0 到 1 的浮点数。当元素数量占容量的百分比达到负载系数时,会自动扩容。
- 创建一个映射表,根据指定映射表的键值关系创建映射表。
ConcurrentHashMap(Map<? extends K, ? extends V> map)
- 创建具有指定初始容量、负载因子和并发级别(预计同时操作数据的线程)的空映射表。
ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)
concurrencyLevel
: 并发度,预期映射表被的并发数量。为了在并发访问时使性能达到最优,需要尽量将并发度设置为接近于并发访问的数量。
常用操作实现原理
put()
-
对每一个放入的值,先用
spread
方法对 key 的hashcode
进行 hash 计算,由此来确定这个值在 table 中的位置; -
如果当前 table 数组还未初始化,进行初始化操作;
-
如果这个位置是 null,那么使用 CAS 操作直接放入;
-
如果这个位置存在节点,说明发生了 hash 碰撞,先判断这个节点的类型,如果该节点
==MOVED
的话,说明正在进行扩容; -
如果是链表节点(
fh>0
),先获取头节点,再依次向后遍历确定这个新加入节点的位置。如果遇到 key 相同的节点,直接将值覆盖。否则在链表尾插入; -
如果这个节点的类型是
TreeBin
,直接调用红黑树的插入方法插入新的节点:如果在红黑树中存在 Key 相同(hash 值相等并且 equals 方法判断为 true)的节点,就覆盖旧值;否则向红黑树追加新节点; -
插入完节点之后再次检查链表的长度,如果长度大于 8,就把这个链表转换成红黑树;
-
调用
addcount()
方法,把当前 ConcurrentHashMap 的元素个数 +1:使用 CAS 更新baseCount
的值,对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容。
get()
-
哈希: 对传入的键的哈希值进行散列,这有助于减少哈希冲突的可能性。使用
spread
方法可以保证不同的键更均匀地分布在桶数组中。 -
直接查找: 查找的第一步是检查键的哈希值是否位于表的正确位置。如果在该桶的第一个元素中找到了键,则直接返回该元素的值。
使用
==
操作符和equals
方法来比较键,处理可能的 null 值并确保正确的相等性比较。 -
红黑树查找: 如果第一个节点的哈希值小于 0,那么这个桶的数据结构是红黑树,使用
find
方法在红黑树中查找键。 -
链表查找: 如果前两个条件都不满足,那么代码将遍历该桶中的链表。如果在链表中找到了具有相同哈希值和键的元素,则返回其值。如果遍历完整个链表都未找到,则返回 null。
transfer()
当 ConcurrentHashMap
容量不足的时候,需要对 table 进行扩容。这个方法的基本思想跟 HashMap 很像,但由于支持并发扩容,所以要复杂一些。
整个扩容操作分为两个部分:
-
构建一个 nextTable,它的容量是原来的两倍,这个操作是单线程完成的。
-
将原来 table 中的元素复制到 nextTable 中,主要是遍历复制的过程。得到当前遍历的数组位置 i,然后利用 tabAt 方法获得 i 位置的元素:
-
如果这个位置为空,就在原 table 中的 i 位置用 CAS 操作设置为
ForwardNode
节点(可以理解成占位符),这个也是触发并发扩容的关键; -
如果遍历到
ForwardingNode
节点 说明这个点已经被处理过了,这里是控制并发扩容的核心 -
如果这个位置是 Node 节点(
fh>=0
),并且是链表的头节点,根据最高位为1还是为0(最高位指数组长度位),把这个链表分裂成两个链表。把它们分别放在 nextTable 的 i 和 i+n 的位置上;通过一系列操作,原链表中的一部分能直接平移到新链表(即lastRun及其后面跟着的一串节点),剩下部分才需要通过 new方式克隆移动到新链表中(逆序插入,采用头插法)。
-
如果这个位置是
TreeBin
节点(fh<0
),也做一个反序处理,并且判断是否需要untreefi
,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上; -
遍历所有的节点,就完成复制工作,这时让 nextTable 作为新的 table,并且更新 sizeCtl 为新容量的 0.75 倍 ,完成扩容。
-
统计 size
对于 ConcurrentHashMap
来说,这个 table 里到底装了多少东西是不确定的,因为不可能在调用 size()
方法的时候 Stop The World 让其他线程都停下来去统计。为此,ConcurrentHashMap
定义了一些变量和一个内部类来统计元素的个数。
public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);
}public long mappingCount() {long n = sumCount();// ignore transient negative valuesreturn (n < 0L) ? 0L : n;
}final long sumCount() {CounterCell[] as = counterCells; CounterCell a;long sum = baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value; // 所有counter的值求和}}return sum;
}
-
size()
方法返回 Map 中的元素数量,但结果被限制在Integer.MAX_VALUE
内。如果计算的大小超过这个值,则返回Integer.MAX_VALUE
。如果计算的大小小于0,则返回0。 -
mappingCount()
方法也返回 Map 中的元素数量,但允许返回一个 long 值,因此可以表示大于Integer.MAX_VALUE
的数量。与size()
方法类似,该方法也会忽略负值,返回0。 -
sumCount()
方法计算 Map 的实际大小。ConcurrentHashMap
使用一个基础计数 baseCount 和一个 CounterCell 数组 counterCells 来跟踪大小。这种结构有助于减少多线程环境中的争用,因为不同的线程可能会更新不同的 CounterCell。在计算总和时,
sumCount()
方法将 baseCount 与 counterCells 数组中的所有非空单元的值相加。
使用示例
使用 ConcurrentHashMap
,构建一个线程安全的高并发统计用户访问次数的功能,确保在多线程环境下能够安全地更新和查询用户的访问次数。
import java.util.concurrent.ConcurrentHashMap;public class UserVisitCounter {private final ConcurrentHashMap<String, Integer> visitCounts;public UserVisitCounter() {this.visitCounts = new ConcurrentHashMap<>();}/*** 增加用户的访问次数。* * @param userId 用户ID*/public void incrementVisitCount(String userId) {visitCounts.merge(userId, 1, Integer::sum);}/*** 获取用户的访问次数。* * @param userId 用户ID* @return 用户的访问次数*/public int getVisitCount(String userId) {return visitCounts.getOrDefault(userId, 0);}/*** 检查用户是否已经访问过。* * @param userId 用户ID* @return 如果用户已经访问过,则返回true;否则返回false*/public boolean hasVisited(String userId) {return visitCounts.containsKey(userId);}/*** 重置用户的访问次数。* * @param userId 用户ID*/public void resetVisitCount(String userId) {visitCounts.remove(userId);}/*** 打印所有用户的访问次数。*/public void printAllVisits() {visitCounts.forEach((userId, count) -> System.out.println("User ID: " + userId + ", Visit Count: " + count));}public static void main(String[] args) {UserVisitCounter counter = new UserVisitCounter();ExecutorService executor = Executors.newFixedThreadPool(10); // 创建一个线程池// 模拟多个线程并发增加用户的访问次数for (int i = 0; i < 10; i++) {String userId = "user" + i;Runnable task = () -> {for (int j = 0; j < 100; j++) {counter.incrementVisitCount(userId);}};executor.submit(task);}// 关闭线程池executor.shutdown();executor.awaitTermination(1, java.util.concurrent.TimeUnit.MINUTES);// 打印所有用户的访问次数counter.printAllVisits();}
}
示例说明:
- 创建
UserVisitCounter
类:使用ConcurrentHashMap
创建一个线程安全的映射visitCounts
,用于存储用户的访问次数。 - 创建线程池:使用
Executors.newFixedThreadPool
创建一个固定大小的线程池,这里设置为 10 个线程。 - 提交任务:创建多个任务,每个任务模拟一个用户并发增加访问次数。使用
executor.submit
提交任务到线程池执行。 - 关闭线程池:使用
executor.shutdown
关闭线程池,并等待所有任务执行完毕。 - 打印结果:使用
counter.printAllVisits
打印所有用户的访问次数。
总结
ConcurrentHashMap
是线程安全的,支持完全并发的读取,并且有很多线程可以同时执行写入。在早期版本(例如 JDK 1.7)中,ConcurrentHashMap
使用分段锁技术。整个哈希表被分成一些段(Segment),每个段独立加锁。这样,在不同段上的操作可以并发进行。
从 JDK 1.8 开始,ConcurrentHashMap
的内部实现有了很大的变化。它放弃了分段锁技术,转而采用了更先进的并发控制策略,如 CAS 操作和红黑树等,进一步提高了并发性能。