PDF-OCR文件识别篇(六):AI 客户端封装与结构化抽取

📅 2026/6/30 11:26:59
PDF-OCR文件识别篇(六):AI 客户端封装与结构化抽取
本章是整条流水线的「大脑」分两层讲客户端层AiClient把大模型的鉴权、调用、回调接收、异步、文件抽取、JSON 清洗全部收口对上只暴露几个干净方法。编排层AiPdfExtractionServiceImpl把「切分第3章 OCR识别JSON第4章 提示词第5章 调用」编排成「PDF → 结构化 JSON」并解决并行、重试、合并、失败隔离。下文实现以 GLMzai-sdk为例换其它模型只需替换客户端实现编排层不动。6.1 AiClient统一「JSON 输出 关闭思考」6.1.1 懒加载客户端SDK 客户端用double-checked locking懒加载保证全局单例、线程安全且只有真正调用时才初始化启动期不依赖密钥就绪private volatile ai.z.openapi.ZhipuAiClient sdkClient; // volatile 保证可见性 private ai.z.openapi.ZhipuAiClient client() { if (sdkClient null) { synchronized (this) { if (sdkClient null) { if (StringUtils.isEmpty(properties.getApiKey())) { throw new ServiceException(智谱AI调用失败未配置 apiKey); } int sec Math.max(60, properties.getTimeout() / 1000); // 下限 60s // 请求/连接/读/写 四个超时统一用大值大表输出长读超时最容易踩 sdkClient ai.z.openapi.ZhipuAiClient.builder() .ofZHIPU() .apiKey(properties.getApiKey()) .networkConfig(sec, sec, sec, sec, TimeUnit.SECONDS) .build(); log.info(智谱AI SDK 客户端已初始化model{}, timeout{}s, properties.getModel(), sec); } } } return sdkClient; }细节volatile不能省没有它另一个线程可能读到「构造一半」的对象引用。超时下限 60stimeout配小了也至少给 60 秒避免大表一调就超时。四个超时全放大连接快但读慢是大模型调用的典型特征要等它生成完四项统一放大最省心。6.1.2 请求参数buildParams每次调用都构造「系统 用户」两条消息并固定几个关键开关private ChatCompletionCreateParams buildParams(String systemPrompt, String userContent, boolean stream) { ChatMessage system ChatMessage.builder() .role(ChatMessageRole.SYSTEM.value()).content(systemPrompt).build(); ChatMessage user ChatMessage.builder() .role(ChatMessageRole.USER.value()).content(userContent).build(); return ChatCompletionCreateParams.builder() .model(properties.getModel()) .messages(Arrays.asList(system, user)) .stream(stream) .temperature((float) properties.getTemperature()) // 默认 0.1 .maxTokens(properties.getMaxTokens() 0 ? properties.getMaxTokens() : null) .responseFormat(new ResponseFormat(json_object)) // 强制 JSON .thinking(new ChatThinking(ChatThinkingType.DISABLED.value())) // 关闭思考 .build(); }参数取值为什么temperature0.1低温结构化抽取要稳定可复现不要创造性maxTokens16384大表 JSON 很长给小了会被截断finish_reasonlengthresponseFormatjson_object让模型直接产出 JSON少一层文本清洗thinkingDISABLED抽取是「照搬」不是「推理」关掉思考省 token、更快modelglm-5.1要更准切glm-5.1新模型理解能力强6.1.3 二种调用形态AiClient对外提供二个入口对应不同使用场景chatForJson/chatForJsonResult同步 回调接收批量抽取编排器runChunk用带重试submitAsync异步提交单表「调AI」按钮立即返回taskId不阻塞页面queryAsync异步查询定时任务轮询识别PROCESSING/SUCCESS/FAIL6.1.4 回调接口接收输出替代旧的阻塞流式早期用 RxJavaFlowable.blockingForEach阻塞式流式读取缺点是独占当前线程、与抽取线程池并行时不友好。现已改为注册回调接收模型输出SDK 收到内容增量片段或一次性结果时回调由回调累积内容、记录finish_reason与 token 用量doChat在回调结束后拿到完整 JSON。// 回调处理器累积内容 记录用量/截断接口名以实际 SDK 回调为准 StringBuilder sb new StringBuilder(); Usage[] usageHolder {null}; String[] finish {null}; client().chat().createChatCompletion(request, new ChatStreamCallback() { Override public void onMessage(ModelData md) { // 收到内容片段/结果 if (md.getUsage() ! null) usageHolder[0] md.getUsage(); // 用量通常在末尾给 if (md.getChoices() null || md.getChoices().isEmpty()) return; Choice c md.getChoices().get(0); if (c.getDelta() ! null c.getDelta().getContent() ! null) { sb.append(c.getDelta().getContent()); // 累积不再 System.out.print } if (c.getFinishReason() ! null) finish[0] c.getFinishReason(); } Override public void onError(Throwable t) { // 出错统一转业务异常 throw new ServiceException(智谱AI回调异常 t.getMessage()); } Override public void onComplete() { /* 收尾可选 */ } }); // 截断检测JSON 不完整时尽早失败触发上层重试或提示调大 max-tokens if (length.equalsIgnoreCase(finish[0])) throw new ServiceException(AI输出超长被截断(finish_reasonlength)请调大 max-tokens、或对该长表启用分块); String content stripJsonFence(sb.toString()); // 完整内容交给下游回调式相比阻塞流式的好处不占用调用线程回调由 SDK 的 IO 线程驱动抽取线程池6.3 节能跑满并发。结构一致无论增量片段还是一次性结果都在onMessage里统一累积doChat逻辑不变。错误集中onError统一转ServiceException被 6.1.3 的重试循环接住。ChatStreamCallback/onMessage / onError / onComplete为示意名请替换为你实际使用的回调接口与方法签名。finish_reason length的截断判断务必保留——它是「JSON 被切一半」最常见的根因。6.1.5 异步提交与查询给前端用页面点「调AI」不能干等于是走异步任务提交后立即拿taskId返回结果由定时任务轮询详见第 7 章。public String submitAsync(String systemPrompt, String userContent, String label) { ChatCompletionCreateParams request buildParams(systemPrompt, userContent, false); ChatCompletionResponse response client().chat().asyncChatCompletion(request); if (response null) throw new ServiceException(智谱AI异步提交返回为空); if (!response.isSuccess()) throw new ServiceException(智谱AI异步提交失败code response.getCode() , msg response.getMsg()); String taskId response.getData() null ? null : response.getData().getId(); if (StringUtils.isEmpty(taskId)) throw new ServiceException(智谱AI异步提交未返回任务ID); return taskId; }查询queryAsync把 SDK 的任务状态映射成自己的AsyncChatResultPROCESSING/SUCCESS/FAIL并在「成功」分支里做完整校验public AsyncChatResult queryAsync(String taskId) { QueryModelResultResponse response client().chat().retrieveAsyncResult( AsyncResultRetrieveParams.builder().taskId(taskId).build()); // …response 空/失败校验略… ModelData data response.getData(); TaskStatus status data null ? null : data.getTaskStatus(); AsyncChatResult result new AsyncChatResult(); if (status null) { result.setStatus(PROCESSING); return result; } // 状态未知按处理中 result.setStatus(status.name()); if (status TaskStatus.PROCESSING) return result; // 等下次轮询 if (status TaskStatus.FAIL) { result.setError(智谱返回任务失败(FAIL)); return result; } // SUCCESS记 token 用量 if (data.getUsage() ! null) { /* set prompt/completion/total tokens */ } if (data.getChoices() null || data.getChoices().isEmpty()) { // 成功却无内容 → 判失败 result.setStatus(FAIL); result.setError(任务成功但无 choices); return result; } Choice choice data.getChoices().get(0); if (length.equalsIgnoreCase(choice.getFinishReason())) { // 截断 → 判失败 result.setStatus(FAIL); result.setError(AI输出超长被截断(finish_reasonlength)…); return result; } String text choice.getMessage() null ? null : String.valueOf(choice.getMessage().getContent()); if (StringUtils.isEmpty(text)) { result.setStatus(FAIL); result.setError(content 为空); return result; } result.setContent(stripJsonFence(text)); return result; }关键点「成功」不等于「拿到可用 JSON」。无 choices、被截断、content 为空都要在查询时降级为FAIL否则脏数据会流到入库环节。6.1.7 去代码块包裹stripJsonFence即使要求了json_object模型偶尔仍会用json … 把结果包起来。统一剥掉首尾围栏再交给JSON.parse否则会解析失败private String stripJsonFence(String content) { if (StringUtils.isEmpty(content)) return content; String text content.trim(); if (text.startsWith()) { int nl text.indexOf(\n); if (nl 0) text text.substring(nl 1); // 去掉首行 json if (text.endsWith()) text text.substring(0, text.length() - 3); } return text.trim(); }6.2 抽取总编排AiPdfExtractionServiceImpl这是「PDF → 结构化 JSON」的纯抽取编排器不落库。主流程doExtractslice 切表 → 对每段调百度OCR识别成 JSON → 构建任务(buildChunkTasks) → 线程池并行调 AI(runChunk) → 按表合并(mergeSection) → AiExtractionResultprivate AiExtractionResult doExtract(byte[] pdfBytes, String fileName, String[] titleKeywords) { long start System.currentTimeMillis(); ListPdfSection sections pdfTableSlicer.slice(pdfBytes); // ① 按表切分第3章 if (titleKeywords ! null titleKeywords.length 0) // 仅抽指定标题 sections sections.stream().filter(s - matchAnyKeyword(s.getTitle(), titleKeywords)).toList(); if (sections.isEmpty()) throw new ServiceException(未在 PDF 中识别到任何「表N」表格); ListChunkTask tasks properties.isUseFile() ? buildFileTasks(sections, pdfBytes) // ②a 文件抽取可选 : buildChunkTasks(sections, ocrJsonOf(sections)); // ②b 主线OCR JSON 投喂 ListChunkResult results runInParallel(tasks); // ③ 并行6.3 ListTableExtraction tables new ArrayList(); // ④ 按表合并6.4 for (int si 0; si sections.size(); si) tables.add(mergeSection(sections.get(si), si, results)); AiExtractionResult result new AiExtractionResult(); result.setFileName(fileName); result.setTables(tables); result.setTableCount(tables.size()); result.setSuccessCount((int) tables.stream().filter(t - SUCCESS.equals(t.getStatus())).count()); result.setElapsedMs(System.currentTimeMillis() - start); return result; }analyze()只切分、不调 AI用于前端快速预览有哪些表、extract()全量、extractByTitles()按标题都收敛到doExtract。6.2.1 系统提示词SYSTEM_PROMPT所有分块共用同一套铁律7 条详见第 5 章核心是「逐字保留、字段对齐、合并单元格向下填充、跨页折行合并、只输出 JSON」。它定义为常量与按表的字段定义buildSchemaBlock拼在一起构成完整提示。6.2.2 构建任务buildChunkTasks主线输入是该表的百度 OCR 识别结果 JSON按表大小分两种整表一次投喂默认把整表 OCR JSON 字段定义拼进提示词一次调用保留完整表头上下文准确率最高。超大表分块兜底仅当整表文本超过chunkCharThreshold时才按chunkPageSize把内容拆块并行、再合并。每个非首块附带「表头参考」避免分块后丢失列名if (begin 0) { text 【表头参考仅用于确定列名不要把本段当作数据行】\n headerContext \n【待抽取数据】\n text; }默认chunkCharThreshold Integer.MAX_VALUE即「整表一次请求、不分块」分块只是超大表的安全阀。每个任务封装成内部类ChunkTask(sectionIndex, title, text)——sectionIndex记录它属于哪张表合并时按它归位。6.2.3 文件抽取任务buildFileTasksuseFile路径每段先用PdfSplitter导出独立 PDF第3章上传给模型解析6.1.6再让模型基于解析内容输出 JSON。临时 PDF 用完即删Files.deleteIfExists某段准备失败也会生成一个「空内容任务」让runChunk统一记为Failed不中断其它表。6.3 并行调度与失败隔离runChunk所有表的所有分块汇总成一个ListChunkTask用固定线程池concurrency默认 4统一并行int poolSize Math.max(1, Math.min(properties.getConcurrency(), tasks.size())); ExecutorService pool Executors.newFixedThreadPool(poolSize); try { ListCompletableFutureChunkResult futures tasks.stream() .map(t - CompletableFuture.supplyAsync(() - runChunk(t), pool)) .toList(); return futures.stream().map(CompletableFuture::join).toList(); } finally { shutdown(pool); // shutdown awaitTermination(timeout×35s)超时则 shutdownNow }runChunk内部再套一层重试且失败不抛出而是返回带error的ChunkResult保证单块失败不拖垮整批private ChunkResult runChunk(ChunkTask task) { int attempts Math.max(1, properties.getMaxRetries() 1); Exception last null; for (int i 0; i attempts; i) { try { String json aiClient.chatForJson(SYSTEM_PROMPT, task.text, task.title); if (StringUtils.isBlank(json)) throw new ServiceException(AI 返回内容为空); Object data JSON.parse(json); // 解析校验 if (data null) throw new ServiceException(AI 返回无法解析为 JSON abbreviate(json)); return new ChunkResult(task.sectionIndex, data, null);// 成功 } catch (Exception e) { last e; log.warn(表格[{}]第{}/{}次抽取失败, task.title, i 1, attempts, e); } } return new ChunkResult(task.sectionIndex, null, describe(last)); // 失败也返回不抛 }注意这里有两层重试chatForJson内部对「网络/接口错误」重试6.1.3runChunk对「内容不合法空、非JSON」再重试一次。前者管「没调通」后者管「调通了但产出不可用」。6.4 结果合并mergeData一张表可能被拆成多块多块结果要按类型智能合并。mergeSection先把同一sectionIndex的结果归拢再交mergeDataprivate TableExtraction mergeSection(PdfSection section, int sectionIndex, ListChunkResult results) { ListObject datas new ArrayList(); ListString errors new ArrayList(); for (ChunkResult r : results) { if (r.sectionIndex ! sectionIndex) continue; // 只收本表的块 if (r.data ! null) datas.add(r.data); else if (r.error ! null) errors.add(r.error); } TableExtraction te new TableExtraction(); te.setTitle(section.getTitle()); te.setStartPage(section.getStartPage()); te.setEndPage(section.getEndPage()); if (datas.isEmpty()) { // 全失败 te.setStatus(FAILED); te.setError(String.join(; , errors)); return te; } te.setData(mergeData(datas)); // 有成功 → 合并 te.setStatus(SUCCESS); if (!errors.isEmpty()) te.setError(部分分块失败 String.join(; , errors)); // 部分失败也保留 return te; }mergeData按数据形态分四类处理private Object mergeData(ListObject datas) { if (datas.size() 1) return datas.get(0); // 不分块的常见情况直接返回 if (datas.stream().allMatch(this::isRowsObject)) { // ① 明细表 {columns, rows} JSONObject merged new JSONObject(); JSONArray rows new JSONArray(); JSONArray columns null; for (Object d : datas) { JSONObject o (JSONObject) d; if (columns null o.getJSONArray(columns) ! null) columns o.getJSONArray(columns); if (o.getJSONArray(rows) ! null) rows.addAll(o.getJSONArray(rows)); // 拼接所有行 } if (columns ! null) merged.put(columns, columns); merged.put(rows, rows); return merged; } if (datas.stream().allMatch(d - d instanceof JSONArray)) { // ② 纯数组直接拼 JSONArray merged new JSONArray(); datas.forEach(d - merged.addAll((JSONArray) d)); return merged; } if (datas.stream().allMatch(d - d instanceof JSONObject)) { // ③ 键值对象逐键合并 JSONObject merged new JSONObject(); for (Object d : datas) for (String k : ((JSONObject) d).keySet()) { Object exist merged.get(k); if (exist null || (exist instanceof String ((String) exist).isEmpty())) merged.put(k, ((JSONObject) d).get(k)); // 已有非空值不覆盖 } return merged; } return datas; // ④ 混合类型原样返回 }形态判定合并策略明细表每块都是{rows:[...]}拼接所有rowscolumns取首个非空数组每块都是JSONArray直接addAll键值对象每块都是JSONObject逐键合并已有非空值不覆盖防分块重复字段互相冲掉混合以上都不满足原样返回各分块交由人工/装配器处理6.5 小结客户端层AiClient鉴权、回调接收、异步、文件抽取、JSON 清洗全收口上层只调方法名模型可替换。关键防线是两处截断检测同步回调 异步查询和双层重试。编排层AiPdfExtractionServiceImpl切表 → OCR JSON → 并行抽 → 合并产出纯粹的AiExtractionResult不碰数据库。失败隔离贯穿始终单块失败返回错误而非抛出、单表全失败只标Failed、部分失败保留成功数据便于精准重试。下一章讲怎么把这套抽取能力接进真实业务、异步化、并最终落库。