文档菜单
文档首页
/
MongoDB Kafka 连接器
/

自定义身份验证提供程序

本页内容

  • 概述
  • AWS IAM 身份验证示例

您可以通过实现com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider 接口来添加自定义身份验证提供程序。您必须将自定义类 JAR 文件放在 Kafka Connect 部署中的 lib 文件夹中。

设置以下身份验证属性以配置身份验证提供程序

  • mongo.custom.auth.mechanism.enable:设置为 true

  • mongo.custom.auth.mechanism.providerClass:设置为实现类的完全限定名称

  • (可选) mongodbaws.auth.mechanism.roleArn:设置为 Amazon 资源名称 (ARN)

此示例提供了一个支持 AWS IAM 的自定义身份验证提供程序。以下代码显示了自定义身份验证提供程序 JAR 文件

package com.mongodb;
import java.util.Map;
import java.util.function.Supplier;
import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder;
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.amazonaws.util.StringUtils;
public class SampleAssumeRoleCredential implements CustomCredentialProvider {
public SampleAssumeRoleCredential() {}
@Override
public MongoCredential getCustomCredential(Map<?, ?> map) {
AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain();
Supplier<AwsCredential> awsFreshCredentialSupplier = () -> {
AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard()
.withCredentials(provider)
.withRegion("us-east-1")
.build();
AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600)
.withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn"))
.withRoleSessionName("Test_Session");
AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest);
Credentials creds = assumeRoleResult.getCredentials();
// Add your code to fetch new credentials
return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken());
};
return MongoCredential.createAwsCredential(null, null)
.withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);
}
// Validates presence of an ARN
@Override
public void validate(Map<?, ?> map) {
String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn");
if (StringUtils.isNullOrEmpty(roleArn)) {
throw new RuntimeException("Invalid value set for customProperty");
}
}
// Initializes the custom provider
@Override
public void init(Map<?, ?> map) {
}
}

编译 JAR 文件并将其放置在部署中的 lib 文件夹中。

注意

要查看构建包含实现类的完整 JAR 的 pom.xml 文件示例,请参阅Kafka Connector GitHub 仓库的 README 文件.

接下来,配置您的源或接收连接器以包括自定义身份验证方法。以下配置属性定义了一个接收连接器,它通过 AWS IAM 身份验证连接 Kafka Connector 到 MongoDB Atlas

{
"name": "mongo-tutorial-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "<topic>",
"connection.uri": "<connection string>?authSource=%24external&authMechanism=MONGODB-AWS&retryWrites=true&w=majority",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"database": "<db>",
"collection": "<collection>",
"mongo.custom.auth.mechanism.enable": "true",
"mongo.custom.auth.mechanism.providerClass": "com.mongodb.SampleAssumeRoleCredential",
"mongodbaws.auth.mechanism.roleArn": "<AWS IAM ARN>"
}
}

在此示例中,roleArn 的值是具有访问 MongoDB Atlas 权限的用户组的 IAM 角色。在 AWS IAM 控制台中,运行 Kafka Connect 的 IAM 账户对 Atlas 用户组具有 AssumeRole 权限。

返回

MongoDB 基于 AWS 的身份验证