MAF快速入门(5)开发自定义Executor

📅 2026/7/1 16:23:31
MAF快速入门(5)开发自定义Executor
目录简介什么是Executor准备工作定义数据传输模型定义自定义事件开发文案生成Executor开发质量审核Executor构建工作流小结示例源码参考资料简介大家好我是Edison。上一篇我们学习了MAF中进行多Agent智能体的顺序和移交编排。但是很多时候我们想要嵌入一些业务逻辑和结构化输出亦或者是需要保持历史对话这时我们就可以开发一些自定义Executor来组成工作流。什么是ExecutorExecutor又称为执行器它是MAF中处理工作流消息的基本构建模块是接受结构化消息、执行并生成输出结果消息或事件的独立单元。很多时候我们想要封装一些复杂的业务流程到Agent的工作流中又或者想要完全控制Agent的生命周期和对话历史这时候我们就需要开发一些自定义的Executor。例如我们想要做一些严格的评分判断、循环控制、条件终止的业务逻辑就需要自定义Executor了。这里以一个智能营销文案的场景为例假设一个企业有两个专家一个专门负责编写文案另一个则负责对文案进行审核评分。只有当撰写的文案通过审核负责人的审核假设量化指标评分8才能进行发布否则文案编写者需要根据审核人提供的反馈改进建议进行反复修改。案例来自圣杰《.NET AI 智能体开发进阶》由上图可知这里不仅需要文案专家 和 审核专家 嵌入一些评分和反馈的逻辑还要设置循环和终止的条件才能让这个工作流能够比较准确的满足企业的需求。因此我们就需要开发两个自定义的Executor来封装文案撰写 和 文案审核的Agent。那么哪些场景不需要自定义Executor呢比如就只需要一次性Agent的调用输出回答又或者不需要嵌入严格的业务逻辑的场景。下面就让我们来一一实现这个案例。准备工作在今天的这个案例中我们创建了一个.NET控制台应用程序安装了以下NuGet包Microsoft.Agents.AI.OpenAIMicrosoft.Agents.AI.WorkflowsMicrosoft.Extensions.AI.OpenAI我们的配置文件中定义了LLM API的信息{ OpenAI: { EndPoint: https://api.siliconflow.cn, ApiKey: ******************************, ModelId: Qwen/Qwen3-30B-A3B-Instruct-2507 } }这里我们使用 SiliconCloud 提供的 Qwen/Qwen3-30B-A3B-Instruct-2507 模型之前的 Qwen2.5 模型在这个案例中不适用。你可以通过这个URL注册账号https://cloud.siliconflow.cn/i/DomqCefW 获取大量免费的Token来进行本次实验。然后我们将配置文件中的API信息读取出来var config new ConfigurationBuilder() .AddJsonFile($appsettings.json, optional: false, reloadOnChange: true) .Build(); var openAIProvider config.GetSection(OpenAI).GetOpenAIProvider();定义数据传输模型首先我们定义一下在这个工作流中需要生成传递的数据模型1SloganResult 文案生成结果​​​​​​​/// summary /// 文案生成结果 /// /summary public sealed class SloganResult { /// summary /// 产品任务描述 /// /summary [JsonPropertyName(task)] public required string Task { get; set; } /// summary /// 生成的标语 /// /summary [JsonPropertyName(slogan)] public required string Slogan { get; set; } }2FeedbackResult审核反馈结果​​​​​​​/// summary /// 审核反馈结果 /// /summary public sealed class FeedbackResult { /// summary /// 审核评论 /// /summary [JsonPropertyName(comments)] public string Comments { get; set; } string.Empty; /// summary /// 质量评分1-10分 /// /summary [JsonPropertyName(rating)] public int Rating { get; set; } /// summary /// 改进建议 /// /summary [JsonPropertyName(actions)] public string Actions { get; set; } string.Empty; }定义自定义事件MAF中定义了一个WorkflowEvent的基类所有自定义Event都需要继承于它。1SloganGeneratedEvent 文案已生成事件​​​​​​​public sealed class SloganGeneratedEvent : WorkflowEvent { private readonly SloganResult _sloganResult; public SloganGeneratedEvent(SloganResult sloganResult) : base(sloganResult) { this._sloganResult sloganResult; } public override string ToString() $ [标语生成] {_sloganResult.Slogan}; }2FeedbackFinishedEvent : 反馈已完成事件​​​​​​​/// summary /// 自定义事件:审核反馈完成 /// /summary public sealed class FeedbackFinishedEvent : WorkflowEvent { private readonly FeedbackResult _feedbackResult; public FeedbackFinishedEvent(FeedbackResult feedbackResult) : base(feedbackResult) { this._feedbackResult feedbackResult; } public override string ToString() $ [审核反馈] 评分: {_feedbackResult.Rating}/10 评论: {_feedbackResult.Comments} 建议: {_feedbackResult.Actions} ; }开发文案生成ExecutorMAF中定义了一个Executor的基类所有自定义Exectuor都需要继承于它。​​​​​​​/// summary /// 文案生成 Executor - 根据任务或反馈生成标语 /// /summary public class SloganWriterExecutor : Executor { private readonly AIAgent _agent; private readonly AgentThread _thread; /// summary /// 初始化文案生成 Executor /// /summary /// param nameidExecutor 唯一标识/param /// param namechatClientAI 聊天客户端/param public SloganWriterExecutor(string id, IChatClient chatClient) : base(id) { // 配置 Agent 选项 ChatClientAgentOptions agentOptions new( instructions: 你是一名专业的文案撰写专家。你将根据产品特性创作简洁有力的宣传标语。 ) { ChatOptions new() { // 配置结构化输出要求返回 SloganResult JSON 格式 ResponseFormat ChatResponseFormat.ForJsonSchemaSloganResult() } }; // 创建 Agent 和对话线程 this._agent new ChatClientAgent(chatClient, agentOptions); this._thread this._agent.GetNewThread(); } /// summary /// 配置消息路由支持两种输入类型 /// /summary protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) routeBuilder .AddHandlerstring, SloganResult(this.HandleInitialTaskAsync) // 处理初始任务 .AddHandlerFeedbackResult, SloganResult(this.HandleFeedbackAsync); // 处理反馈 /// summary /// 处理初始任务首次生成 /// /summary private async ValueTaskSloganResult HandleInitialTaskAsync( string message, IWorkflowContext context, CancellationToken cancellationToken default) { Console.WriteLine($✍️ [文案生成] 接收到任务: {message}); // 调用 Agent 生成标语 var result await this._agent.RunAsync(message, this._thread, cancellationToken: cancellationToken); // 反序列化结构化输出 var sloganResult JsonSerializer.DeserializeSloganResult(result.Text) ?? throw new InvalidOperationException(❌ 反序列化标语结果失败); Console.WriteLine($ [文案生成] 生成标语: {sloganResult.Slogan}); // 发布自定义事件将在后续定义 await context.AddEventAsync(new SloganGeneratedEvent(sloganResult), cancellationToken); return sloganResult; } /// summary /// 处理审核反馈改进优化 /// /summary private async ValueTaskSloganResult HandleFeedbackAsync( FeedbackResult feedback, IWorkflowContext context, CancellationToken cancellationToken default) { // 构造反馈消息 var feedbackMessage $ 以下是对你之前标语的审核反馈 评论: {feedback.Comments} 评分: {feedback.Rating} / 10 改进建议: {feedback.Actions} 请根据反馈改进你的标语使其更加精准有力。 ; Console.WriteLine($ [文案生成] 接收到反馈评分: {feedback.Rating}/10); // 调用 Agent 改进标语保持对话上下文 var result await this._agent.RunAsync(feedbackMessage, this._thread, cancellationToken: cancellationToken); var sloganResult JsonSerializer.DeserializeSloganResult(result.Text) ?? throw new InvalidOperationException(❌ 反序列化标语结果失败); Console.WriteLine($ [文案生成] 改进后标语: {sloganResult.Slogan}); // 发布事件 await context.AddEventAsync(new SloganGeneratedEvent(sloganResult), cancellationToken); return sloganResult; } }在这个Executor中需要注意以下几点1在实例化Agent时配置结构化输出严格输出强类型的JSON格式数据2需要重写ConfigureRoutes方法配置消息路由即从处理初始任务开始并设置处理反馈闭环其工作机制如下当接收到string消息即用户的任务信息时调用 HandleInitialTaskAsync 方法进行首次生成当接收到FeedbackResult类型消息即质量审核反馈的消息时调用 HandleFeedbackAsync 方法进行改进生成。3在调用完Agent获取响应之后需要将其进行强类型的反序列化输出4最后通过发布自定义事件进行工作流传递这里是 SloganGeneratedEvent5通过AgentThread实现对话历史保持而不是每次从头开始。开发质量审核Executor在质量审核中假设我们有如下的审核逻辑评分8代表通过审核可以发布评分8则发送反馈继续循环如果编辑次数3次则需要终止循环输出当前版本文案。​​​​​​​/// summary /// 审核反馈 Executor - 评估标语质量并提供反馈 /// /summary public sealed class FeedbackExecutor : ExecutorSloganResult { private readonly AIAgent _agent; private readonly AgentThread _thread; private int _attempts 0; /// summary /// 最低评分要求1-10分 /// /summary public int MinimumRating { get; init; } 8; /// summary /// 最大尝试次数 /// /summary public int MaxAttempts { get; init; } 3; /// summary /// 初始化审核反馈 Executor /// /summary /// param nameidExecutor 唯一标识/param /// param namechatClientAI 聊天客户端/param public FeedbackExecutor(string id, IChatClient chatClient) : base(id) { // 配置 Agent 选项 ChatClientAgentOptions agentOptions new( instructions: 你是一名专业的文案审核专家。你将评估标语的质量并提供改进建议。 ) { ChatOptions new() { // 配置结构化输出要求返回 FeedbackResult JSON 格式 ResponseFormat ChatResponseFormat.ForJsonSchemaFeedbackResult() } }; this._agent new ChatClientAgent(chatClient, agentOptions); this._thread this._agent.GetNewThread(); } /// summary /// 处理标语审核 /// /summary public override async ValueTask HandleAsync( SloganResult slogan, IWorkflowContext context, CancellationToken cancellationToken default) { // 构造审核消息 var reviewMessage $ 请审核以下标语 任务: {slogan.Task} 标语: {slogan.Slogan} 请提供 1. 详细的评论comments 2. 质量评分rating1-10分 3. 改进建议actions ; Console.WriteLine($ [质量审核] 开始审核标语: {slogan.Slogan}); // 调用 Agent 进行审核 var response await this._agent.RunAsync(reviewMessage, this._thread, cancellationToken: cancellationToken); // 反序列化反馈结果 var feedback JsonSerializer.DeserializeFeedbackResult(response.Text) ?? throw new InvalidOperationException(❌ 反序列化反馈结果失败); Console.WriteLine($ [质量审核] 评分: {feedback.Rating}/10); // 发布自定义事件将在后续定义 await context.AddEventAsync(new FeedbackFinishedEvent(feedback), cancellationToken); // 业务逻辑判断是否通过审核 if (feedback.Rating this.MinimumRating) { // ✅ 通过审核 await context.YieldOutputAsync( $ ✅ 标语已通过审核 任务: {slogan.Task} 标语: {slogan.Slogan} 评分: {feedback.Rating}/10 评论: {feedback.Comments} , cancellationToken ); Console.WriteLine($✅ [质量审核] 标语通过审核); return; } // ❌ 未通过审核检查尝试次数 if (this._attempts this.MaxAttempts) { // 达到最大尝试次数输出最终版本 await context.YieldOutputAsync( $ ⚠️ 标语在 {this.MaxAttempts} 次尝试后未达到最低评分要求。 最终标语: {slogan.Slogan} 最终评分: {feedback.Rating}/10 评论: {feedback.Comments} , cancellationToken ); Console.WriteLine($⚠️ [质量审核] 达到最大尝试次数终止流程); return; } // 继续循环发送反馈消息回到 SloganWriterExecutor await context.SendMessageAsync(feedback, cancellationToken: cancellationToken); this._attempts; Console.WriteLine($ [质量审核] 发送反馈第 {this._attempts} 次尝试); } }在这个Executor中除了之前提到的几点之外我们还需要注意在反序列化反馈结果及发布自定义事件之后需要嵌入评分逻辑即判断是否通过审核如果通过审核就及时结束循环如果不通过则发送反馈消息继续循环记录审核次数如果达到设定的最大值也及时结束循环不恋战通过 IWorkflowContext 的 SendMessageAsync 方法将反馈消息传递给其他参与者这里是 SloganWriterExecutor。构建工作流现在万事俱备只欠一个Workflow现在Lets do it!Step1: 获取ChatClient​​​​​​​var chatClient new OpenAIClient( new ApiKeyCredential(openAIProvider.ApiKey), new OpenAIClientOptions { Endpoint new Uri(openAIProvider.Endpoint) }) .GetChatClient(openAIProvider.ModelId) .AsIChatClient();Step2: 实例化自定义Executors​​​​​​​var solganWriter new SloganWriterExecutor(id: SloganWriter, chatClient); var feebackHandler new FeedbackExecutor(id: FeedbackHandler, chatClient); Console.WriteLine(✅ Executor 实例创建完成);Step3: 创建工作流​​​​​​​var workflow new WorkflowBuilder(solganWriter) .AddEdge(source: solganWriter, target: feebackHandler) // 生成 → 审核 .AddEdge(source: feebackHandler, target: solganWriter) // 审核不通过 → 重新生成 .WithOutputFrom(feebackHandler) // 指定输出来源 .Build(); Console.WriteLine(✅ 工作流构建完成);Step4: 测试工作流​​​​​​​// 定义产品任务 var productTask 请为马自达一款经济实惠且驾驶乐趣十足的电动SUV创作标语要求结合马自达电车的特性来创作; Console.WriteLine($ 产品需求: {productTask}\n); Console.WriteLine($ 审核标准: 评分 8分); Console.WriteLine($ 最大尝试: 3次\n); Console.WriteLine(━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━); Console.WriteLine(⏱️ 开始执行工作流...); Console.WriteLine(━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n); // 执行工作流 await using (var run await InProcessExecution.StreamAsync(workflow, input: productTask)) { // 监听工作流事件 await foreach (WorkflowEvent evt in run.WatchStreamAsync()) { // 使用模式匹配识别不同类型的事件 switch (evt) { case SloganGeneratedEvent sloganEvent: // 处理标语生成事件 Console.WriteLine($✨ {sloganEvent}); Console.WriteLine(); break; case FeedbackFinishedEvent feedbackEvent: // 处理审核反馈事件 Console.WriteLine(${feedbackEvent}); Console.WriteLine(); break; case WorkflowOutputEvent outputEvent: // 处理最终输出事件 Console.WriteLine(━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━); Console.WriteLine( 工作流执行完成); Console.WriteLine(━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n); Console.WriteLine(${outputEvent.Data}); break; } } Console.WriteLine(\n✅ 所有流程已完成); }测试结果如下图所示首先获得了首次的文案撰写内容然后第一轮审核评分6分并给出反馈然后文案撰写开始修改形成第二版文案然后再次评分为6分又给出反馈然后终于获得审核通过本次评分9分8分可以发布至此工作流已经结束可以看见第三次生成的文案内容比前两次要好一些。小结本文介绍了Executor的基本概念 以及 如何开发自定义Executor然后给出了一个营销文案生成审核的工作流案例详细介绍了自定义Executor的应用。下一篇​​​​​​​我们将继续学习MAF中如何进行混合编排 Agent 和 Executor覆盖实际场景中 确定性的业务逻辑 和 AI智能决策 的结合应用。示例源码Github: https://github.com/EdisonTalk/MAFD参考资料Microsoft Learn《Agent Framework Tutorials》https://learn.microsoft.com/en-us/agent-framework/overview/agent-framework-overview?wt.mc_idMVP_397012引入地址