当前位置: 首页> 健康> 美食 > 蜜糖直播_厦门网站快速排名优化_长沙网络推广外包费用_成人短期就业培训班

蜜糖直播_厦门网站快速排名优化_长沙网络推广外包费用_成人短期就业培训班

时间:2025/8/27 6:00:34来源:https://blog.csdn.net/ShrCheng/article/details/142744960 浏览次数:0次
蜜糖直播_厦门网站快速排名优化_长沙网络推广外包费用_成人短期就业培训班

先决条件


在我们深入为 Pulsar 创建自定义身份验证机制之前,请确保您具有以下设置:

  • Java 17: 确保您的环境中已安装并设置 Java 17。
  • Spring Boot Version 3.3.2: 我们将使用 Spring Boot 创建自定义 Pulsar 客户端。
  • Docker & Docker Compose: 在容器化环境中运行 Pulsar 代理所必需的。
  • Maven: 用于构建和管理 Java 项目中的依赖关系。

Overview


在本指南中,我们将为 Apache Pulsar 创建一个自定义身份验证提供程序。我们的自定义提供程序将通过验证请求标头中发送到 REST API 的用户名和密码组合来处理身份验证。如果身份验证成功,将被授予访问 Pulsar 的权限;否则将被拒绝。

我们将按照官方 Pulsar 文档来扩展 Pulsar 的安全部分。

github repo: https://github.com/urdogan0000/PulsarCustomAuthSpringboot

Linked-in: https://www.linkedin.com/in/haydarurdogan/

第 1 步:设置环境

首先,让我们为自定义 Pulsar 代理身份验证设置一个新的 Maven 项目:

项目结构:

pulsar-custom-auth
│
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── liderahenkpulsar
│   │   │           └── auth
│   │   │               └── BasicAuthProvider.java
|   |   |               |_  AuthenticationBasicAuth
|   |   |               |__ CustomDataBasic
│   │   └── resources
│   │       └── application.properties
│   └── test
└── pom.xml

pom.xml:添加以下依赖项:

<dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-broker-common</artifactId><version>3.2.3</version></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.12.0</version></dependency>
</dependencies>

第 2 步:创建自定义代理身份验证

现在,让我们通过实现 Pulsar 的 AuthenticationProvider 接口来创建 BasicAuthProvider 类:

BasicAuthProvider.java:

package com.liderahenkpulsar.auth;import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.naming.AuthenticationException;
import java.io.IOException;public class BasicAuthProvider implements AuthenticationProvider {static final String HTTP_HEADER_NAME = "Authorization";static final String AUTH_METHOD_NAME = "customAuth";static final String HTTP_HEADER_VALUE_PREFIX = "Basic";static final String REST_API_URL_NAME = "authRestApiEndpoint";private static final Logger log = LoggerFactory.getLogger(BasicAuthProvider.class);private String apiEndpoint;private OkHttpClient httpClient;@Overridepublic void initialize(ServiceConfiguration config) throws PulsarServerException {httpClient = new OkHttpClient();this.apiEndpoint = (String) config.getProperties().getOrDefault(REST_API_URL_NAME, "http://localhost:8081/pulsar/send");log.info("BasicAuthProvider initialized with endpoint: {}", apiEndpoint);}@Overridepublic String getAuthMethodName() {return AUTH_METHOD_NAME;}@Overridepublic String authenticate(AuthenticationDataSource authData) throws AuthenticationException {String credentials = getUserCredentials(authData);log.info("Authentication request to endpoint: {}", apiEndpoint);log.info("Authorization header: {}", credentials);Request request = new Request.Builder().url(apiEndpoint).addHeader(HTTP_HEADER_NAME, credentials).build();try (Response response = httpClient.newCall(request).execute()) {if (response.isSuccessful()) {assert response.body() != null;String responseBody = response.body().string();log.info("Authentication successful: {}", responseBody);return responseBody;} else {log.warn("Authentication failed. HTTP status code: {}, Response: {}",response.code(),response.body().string());throw new AuthenticationException("Authentication failed. Invalid username or password.");}} catch (IOException e) {log.error("Error during authentication: ", e);throw new AuthenticationException("Authentication process encountered an error.");}}private static String getUserCredentials(AuthenticationDataSource authData) throws AuthenticationException {if (authData.hasDataFromCommand()) {String commandData = authData.getCommandData();log.info("Extracted command data: {}", commandData);return validateUserCredentials(commandData);} else if (authData.hasDataFromHttp()) {String httpHeaderValue = authData.getHttpHeader(HTTP_HEADER_NAME);if (httpHeaderValue == null) {throw new AuthenticationException("Invalid HTTP Authorization header");}return validateUserCredentials(httpHeaderValue);} else {throw new AuthenticationException("No user credentials passed");}}private static String validateUserCredentials(final String userCredentials) throws AuthenticationException {if (StringUtils.isNotBlank(userCredentials)) {return userCredentials;} else {throw new AuthenticationException("Invalid or blank user credentials found");}}@Overridepublic void close() {if (httpClient != null) {httpClient.connectionPool().evictAll();}}
}

第 3 步:实施自定义客户端身份验证

为了实现自定义客户端,我们将创建两个类:

  1. AuthenticationBasicAuth:此类实现主要身份验证逻辑并向 Pulsar 代理提供凭证。

  2. CustomDataBasic:此类提供向 Pulsar 发出请求时进行身份验证所需的数据(标头)。

1. AuthenticationBasicAuth Class

此类负责定义身份验证方法并管理用户凭据。它实现了 AuthenticationEncodedAuthenticationParameterSupport 接口。

AuthenticationBasicAuth.java:

package com.liderahenkpulsar.auth;import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.Map;public class AuthenticationBasicAuth implements Authentication, EncodedAuthenticationParameterSupport {private static final Logger log = LoggerFactory.getLogger(AuthenticationBasicAuth.class);private static final String AUTH_NAME = "customAuth";  // Ensure this matches your Pulsar broker's expectationprivate String userId;private String password;// Default constructor for reflection or configuration usagepublic AuthenticationBasicAuth() {log.info("AuthenticationBasicAuth instantiated without parameters. Awaiting configuration.");}// Constructor to directly accept userId and passwordpublic AuthenticationBasicAuth(String userId, String password) {if (userId == null || userId.isEmpty() || password == null || password.isEmpty()) {throw new IllegalArgumentException("User ID and password must not be null or empty");}this.userId = userId;this.password = password;log.info("AuthenticationBasicAuth instantiated with userId: {} and password: [PROTECTED]", userId);}@Overridepublic void close() throws IOException {// No operation needed on close}@Overridepublic String getAuthMethodName() {return AUTH_NAME;}@Overridepublic AuthenticationDataProvider getAuthData() {return new CustomDataBasic(userId, password);}@Overridepublic void configure(Map<String, String> authParams) {// No-op for map configurationlog.info("Configured with authParams: {}", authParams);}@Overridepublic void configure(String encodedAuthParamString) {// No-op for encoded string configurationlog.info("Configured with encodedAuthParamString: {}", encodedAuthParamString);}@Overridepublic void start() {log.info("Starting AuthenticationBasicAuth for userId: {}", userId);}
}

方法说明:

  • 构造函数:有两个构造函数 - 一个默认(无参数),当没有直接传递配置时,另一个接受“userId”和“password”。后者确保在继续之前正确设置这些参数。
  • **getAuthMethodName**:返回身份验证方法名称,该名称应与自定义代理身份验证 (customAuth) 中定义的方法名称匹配。
  • **getAuthData**:提供 CustomDataBasic 的实例,其中包含身份验证标头。
  • **configure(Map<String, String> authParams)** **configure(StringencodedAuthParamString)**:这些方法允许您使用参数配置身份验证实例。它们在这里是无操作的,因为我们通过构造函数处理配置,但为了可见性而包含日志记录。
  • **start**:初始化或启动身份验证过程,主要用于日志记录目的。

2. CustomDataBasic Class

此类提供身份验证所需的实际数据,特别是处理 HTTP 请求的标头和命令数据。

CustomDataBasic.java:

package com.liderahenkpulsar.auth;import org.apache.pulsar.client.api.AuthenticationDataProvider;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;public class CustomDataBasic implements AuthenticationDataProvider {private static final String HTTP_HEADER_NAME = "Authorization";private final String commandAuthToken;private Map<String, String> headers = new HashMap<>();public CustomDataBasic(String userId, String password) {// Create the basic auth tokenthis.commandAuthToken = "Basic " + userId + ":" + password; // Ideally, base64 encode this string// Initialize headersheaders.put(HTTP_HEADER_NAME, this.commandAuthToken);this.headers = Collections.unmodifiableMap(this.headers);}@Overridepublic boolean hasDataForHttp() {return true; // Indicate that HTTP headers are available}@Overridepublic Set<Map.Entry<String, String>> getHttpHeaders() {return this.headers.entrySet(); // Return the HTTP headers for authentication}@Overridepublic boolean hasDataFromCommand() {return true; // Indicate that command data is available}@Overridepublic String getCommandData() {return this.commandAuthToken; // Return the command data for authentication}
}

方法说明:

  • 构造函数:通过连接前缀为 BasicuserIdpassword 来初始化 commandAuthToken。在现实场景中,应该使用 Base64 进行编码。
  • **hasDataForHttp**: 返回 true,表示有数据(标头)可用于 HTTP 身份验证。
  • **getHttpHeaders**:提供 Pulsar 身份验证所需的 HTTP 标头,包括 Authorization 标头。
  • **hasDataFromCommand**:返回true,表示命令数据(凭证)可用。
  • **getCommandData**:返回身份验证令牌 (commandAuthToken) 作为命令数据。

第 1 步:构建自定义身份验证 JAR 文件

  1. 创建 JAR 文件:确保您的 Java 项目已正确设置所有依赖项。使用 Maven 或 Gradle 编译项目并将其打包成 JAR 文件。

  2. 对于Maven:

    mvn clean package
    
  • 构建后,您应该有一个 JAR 文件,通常位于 target 目录中,例如 target/liderahenkpulsar.auth-1.0.jar

修改代理配置文件

下载原始 Broker 配置:从 Pulsar GitHub 存储库下载原始 broker.conf

wget https://raw.githubusercontent.com/apache/pulsar/branch-3.3/conf/broker.conf -O broker.conf

更新代理配置:编辑 broker.conf 以包含您的自定义身份验证设置。以下是需要修改的关键行:

### --- Authentication --- ###
authenticationEnabled=true# Specify the authentication providers to use
authenticationProviders=com.liderahenkpulsar.auth.BasicAuthProvider# Define the authentication method name that matches your AuthenticationBasicAuth class
authenticationMethods=customAuth# (Optional) Specify the endpoint for your authentication service
authRestApiEndpoint=http://localhost:8083/pulsar/send # Authentication settings of the broker itself. Used when the broker connects to other brokers,
# either in the same or other clusters
brokerClientAuthenticationPlugin=com.liderahenkpulsar.auth.AuthenticationBasicAuth
brokerClientAuthenticationParameters=
# (Optional) Specify the endpoint for your authentication service
authRestApiEndpoint=http://localhost:8083/pulsar/send //this part is for your login rest api servic

将更新的配置文件保存custom-auth-broker.conf

为 Pulsar 设置 Docker Compose

  1. 创建 Docker Compose 文件
  2. 下面是完整的 Docker Compose 文件。将其保存为 docker-compose.yml,位于 JAR 文件和更新的代理配置所在的同一目录中。
version: '3.8'
networks:pulsar:driver: bridgeservices:# Start Zookeeperzookeeper:image: apachepulsar/pulsar:3.3.1container_name: zookeeperrestart: on-failurenetworks:- pulsarvolumes:- ./data/zookeeper:/pulsar/data/zookeeperenvironment:- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256mcommand: >bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \bin/generate-zookeeper-config.sh conf/zookeeper.conf && \exec bin/pulsar zookeeper"healthcheck:test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]interval: 10stimeout: 5sretries: 30# Init cluster metadatapulsar-init:image: apachepulsar/pulsar:3.3.1container_name: pulsar-inithostname: pulsar-initnetworks:- pulsarcommand: >bin/pulsar initialize-cluster-metadata \--cluster cluster-a \--zookeeper zookeeper:2181 \--configuration-store zookeeper:2181 \--web-service-url http://broker:8080 \--broker-service-url pulsar://broker:6650depends_on:- zookeeper# Start Bookiebookie:image: apachepulsar/pulsar:3.3.1container_name: bookierestart: on-failurenetworks:- pulsarenvironment:- clusterName=cluster-a- zkServers=zookeeper:2181- metadataServiceUri=metadata-store:zk:zookeeper:2181- advertisedAddress=bookie- BOOKIE_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256mdepends_on:- zookeeper- pulsar-initvolumes:- ./data/bookkeeper:/pulsar/data/bookkeepercommand: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie"# Start Brokerbroker:image: apachepulsar/pulsar:3.3.1container_name: brokerhostname: brokerrestart: on-failurenetworks:- pulsarenvironment:- metadataStoreUrl=zk:zookeeper:2181- zookeeperServers=zookeeper:2181- clusterName=cluster-a- managedLedgerDefaultEnsembleSize=1- managedLedgerDefaultWriteQuorum=1- managedLedgerDefaultAckQuorum=1- advertisedAddress=broker- advertisedListeners=external:pulsar://172.16.102.12:6650- PULSAR_MEM=-Xms1g -Xmx1g -XX:MaxDirectMemorySize=512mdepends_on:- zookeeper- bookieports:- "6650:6650"- "8080:8080"volumes:- ./custom-auth-broker.conf:/pulsar/conf/broker.conf- ./liderahenkpulsar.auth-1.0.jar:/pulsar/lib/liderahenkpulsar.auth-1.0.jarcommand: bash -c "bin/pulsar broker"

运行 Docker Compose 环境

运行 Docker Compose:使用 Docker Compose 启动具有自定义身份验证设置的 Pulsar 集群。

docker-compose up -d

验证设置

  • 检查日志以确保所有服务(Zookeeper、Bookie、Broker)正确启动。
  • 确保代理加载自定义身份验证提供程序时不会出现错误。

设置 Spring Boot 项目

您可以使用 Spring Initializr 或具有以下依赖项的首选 IDE 创建新的 Spring Boot 项目:

  • Spring Web:适用于 RESTful API(可选,根据您的用例)
  • Spring Boot Starter:核心 Spring Boot 依赖项
  • Pulsar 客户端:Java 版 Pulsar 客户端

添加依赖项

将 Pulsar 客户端和您的自定义身份验证 JAR 添加到 pom.xml

<dependencies><!-- Spring Boot Starter Dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Pulsar Client Dependency --><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>3.3.1</version></dependency><!-- Custom Authentication Dependency --><dependency><groupId>com.liderahenkpulsar</groupId><artifactId>liderahenkpulsar.auth</artifactId><version>1.0</version><scope>system</scope><systemPath>${project.basedir}/lib/liderahenkpulsar.auth-1.0.jar</systemPath></dependency>
</dependencies>

确保自定义 JAR 位于项目目录中的 lib 文件夹中。

使用自定义身份验证配置 Pulsar 客户端

为 Pulsar 客户端设置创建一个配置类:

package com.example.pulsarclient.config;import com.liderahenkpulsar.auth.AuthenticationBasicAuth;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class PulsarConfig {@Beanpublic PulsarClient pulsarClient() throws Exception {// Replace with the correct broker service URLString serviceUrl = "pulsar://localhost:6650";String userId = "your-username"; // Replace with your actual usernameString password = "your-password"; // Replace with your actual password// Create a Pulsar client with custom authenticationreturn PulsarClient.builder().serviceUrl(serviceUrl).authentication(new AuthenticationBasicAuth(userId, password)).build();}
}

当您运行应用程序时,您会看到 broker 的日志

20240911T06:38:20,515+0000 [pulsar-io-37] INFO com.liderahenkpulsar.auth.BasicAuthProviderAuthentication request to endpoint: [http://172.16.102.215:8083/pulsar/send](http://172.16.102.215:8083/pulsar/send)  
20240911T06:38:20,515+0000 [pulsar-io-37] INFO com.liderahenkpulsar.auth.BasicAuthProviderAuthorization header: Basic testUser:testPass  
20240911T06:38:20,519+0000 [pulsar-io-37] INFO com.liderahenkpulsar.auth.BasicAuthProviderAuthentication successful: test

恭喜您可以通过自定义身份验证访问 pulsar。

原文地址

关键字:蜜糖直播_厦门网站快速排名优化_长沙网络推广外包费用_成人短期就业培训班

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: