基于一致性哈希的分布式Top-K
在分布式系统中,数据的高效存储和快速查询是一个常见的挑战。一致性哈希(Consistent Hashing)是一种常用于分布式存储和负载均衡的技术,而Top-K查询则是数据分析中的经典问题。本文将通过一个Java实现的案例,展示如何结合一致性哈希和多线程技术,高效地完成分布式环境下的Top-K计算。
实现思路
- 一致性哈希分片:将数据通过一致性哈希算法分配到不同节点。
- 局部Top-K计算:每个节点计算其分配到的数据的Top-K结果。
- 结果汇总:主节点收集所有节点的局部Top-K结果,并计算全局Top-K。
代码实现
以下是基于Java的实现代码:
public class TopKConsistentHashingTest {static class InitialDataTask implements Runnable {private final String dataPrefix;private final ConsistentHashingSharding sharding;private final CountDownLatch countDownLatch;public InitialDataTask(ConsistentHashingSharding sharding,CountDownLatch countDownLatch,String dataPrefix) {this.sharding = sharding;this.countDownLatch = countDownLatch;this.dataPrefix = dataPrefix;}@Overridepublic void run() {try {// 添加10万条测试数据for (int i = 0; i < 1_00_000; i++) {sharding.addData(dataPrefix + i);}} finally {countDownLatch.countDown();}}}/*** 局部Top-K计算* <pre>每个节点独立计算其分配到的数据的Top-K结果</pre>*/static class LocalTopKCalculator {public static PriorityQueue<Long> calculateTopK(Long[] data, int k) {PriorityQueue<Long> minHeap = new PriorityQueue<>(k);for (Long num : data) {if (minHeap.size() < k) {minHeap.offer(num);} else if (num > minHeap.peek()) {minHeap.poll();minHeap.offer(num);}}return minHeap;}}/*** 主节点汇总全局Top-K* <pre>主节点收集所有节点的局部Top-K结果,并计算全局Top-K</pre>*/static class MasterTopKCalculator {public static PriorityQueue<Long> calculateGlobalTopK(PriorityQueue<Long>[] localTopKResults, int k) {PriorityQueue<Long> globalMinHeap = new PriorityQueue<>(k);for (PriorityQueue<Long> localTopK : localTopKResults) {for (Long num : localTopK) {if (globalMinHeap.size() < k) {globalMinHeap.offer(num);} else if (num > globalMinHeap.peek()) {globalMinHeap.poll();globalMinHeap.offer(num);}}}return globalMinHeap;}}
}
一致性哈希代码见:一致性哈希HashRing
测试用例
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;import java.util.*;class TopKConsistentHashingTestTest {private ConsistentHashingSharding sharding;@BeforeEachvoid setUp() {// 初始化一致性哈希分片,设置100个虚拟节点sharding = new ConsistentHashingSharding(100);// 添加3个物理节点sharding.addNode("nodeA");sharding.addNode("nodeB");sharding.addNode("nodeC");}/*** 测试数据初始化是否正确分配到各个虚拟节点*/@Testvoid testInitialDataTask() throws InterruptedException {// 使用单线程初始化1000条数据InitialDataTask task = new InitialDataTask(sharding, new CountDownLatch(1), "data");task.run();// 验证数据是否正确分配int totalDataCount = sharding.getTotalDataCount();assertEquals(1000, totalDataCount, "数据总数应为1000");// 验证每个虚拟节点是否分配到数据Set<String> allVirtualNodes = sharding.getAllVirtualNodes();for (String virtualNode : allVirtualNodes) {Set<String> data = sharding.findNodeData(virtualNode);assertFalse(data.isEmpty(), "虚拟节点 " + virtualNode + " 应分配到数据");}}/*** 测试局部Top-K计算是否正确*/@Testvoid testLocalTopKCalculator() {// 测试数据Long[] data = {1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L};int k = 5;PriorityQueue<Long> result = LocalTopKCalculator.calculateTopK(data, k);// 验证结果是否为Top-KPriorityQueue<Long> expectedQueue = new PriorityQueue<>(Arrays.asList(8L, 9L, 10L, 11L, 12L));assertEquals(expectedQueue, result, "局部Top-K计算结果错误");}/*** 测试全局Top-K汇总是否正确*/@Testvoid testMasterTopKCalculator() {// 测试数据PriorityQueue<Long> queue1 = new PriorityQueue<>(Arrays.asList(1L, 2L, 3L, 4L, 5L));PriorityQueue<Long> queue2 = new PriorityQueue<>(Arrays.asList(6L, 7L, 8L, 9L, 10L));PriorityQueue<Long> queue3 = new PriorityQueue<>(Arrays.asList(11L, 12L, 13L, 14L, 15L));PriorityQueue<Long>[] localTopKResults = {queue1, queue2, queue3};int k = 5;PriorityQueue<Long> result = MasterTopKCalculator.calculateGlobalTopK(localTopKResults, k);// 验证结果是否为全局Top-KPriorityQueue<Long> expectedQueue = new PriorityQueue<>(Arrays.asList(11L, 12L, 13L, 14L, 15L));assertEquals(expectedQueue, result, "全局Top-K汇总结果错误");}/*** 测试完整流程:数据初始化 + 局部Top-K计算 + 全局Top-K汇总*/@Testvoid testCompleteTopKConsistentHashing() throws InterruptedException {// 初始化1000条数据InitialDataTask task = new InitialDataTask(sharding, new CountDownLatch(1), "data");task.run();// 模拟局部Top-K计算List<PriorityQueue<Long>> localQueues = new ArrayList<>();Set<String> allVirtualNodes = sharding.getAllVirtualNodes();for (String virtualNode : allVirtualNodes) {Set<String> data = sharding.findNodeData(virtualNode);Long[] dataArray = data.stream().map(Long::parseLong).toArray(Long[]::new);PriorityQueue<Long> localQueue = LocalTopKCalculator.calculateTopK(dataArray, 5);localQueues.add(localQueue);}// 模拟全局Top-K汇总PriorityQueue<Long>[] localTopKResults = localQueues.toArray(new PriorityQueue[0]);PriorityQueue<Long> globalTopK = MasterTopKCalculator.calculateGlobalTopK(localTopKResults, 5);// 验证全局Top-K结果是否正确assertFalse(globalTopK.isEmpty(), "全局Top-K结果不应为空");assertEquals(5, globalTopK.size(), "全局Top-K结果数量错误");// 打印结果(仅测试用)System.out.println("全局Top-K结果:");while (!globalTopK.isEmpty()) {System.out.println(globalTopK.poll());}}
}
测试用例说明
- testInitialDataTask
测试数据初始化是否正确分配到各个虚拟节点。
使用单线程初始化1000条数据,验证数据总数是否为1000,以及每个虚拟节点是否分配到数据。 - testLocalTopKCalculator
测试单个虚拟节点的局部Top-K计算是否正确。
使用固定的测试数据,验证计算结果是否符合预期。 - testMasterTopKCalculator
测试主节点汇总全局Top-K的结果是否正确。
使用多个局部Top-K队列,验证最终的全局Top-K是否正确。 - testCompleteTopKConsistentHashing
测试完整的流程,包括数据初始化、局部Top-K计算以及全局Top-K汇总。
验证整个系统的正确性,并打印最终的全局Top-K结果。