自定义身份验证提供程序
本页内容
概述
您可以通过实现com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider
接口来添加自定义身份验证提供程序。您必须将自定义类 JAR 文件放在 Kafka Connect 部署中的 lib
文件夹中。
设置以下身份验证属性以配置身份验证提供程序
mongo.custom.auth.mechanism.enable
:设置为true
mongo.custom.auth.mechanism.providerClass
:设置为实现类的完全限定名称(可选)
mongodbaws.auth.mechanism.roleArn
:设置为 Amazon 资源名称 (ARN)
AWS IAM 身份验证示例
此示例提供了一个支持 AWS IAM 的自定义身份验证提供程序。以下代码显示了自定义身份验证提供程序 JAR 文件
package com.mongodb; import java.util.Map; import java.util.function.Supplier; import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder; import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; import com.amazonaws.services.securitytoken.model.AssumeRoleResult; import com.amazonaws.services.securitytoken.model.Credentials; import com.amazonaws.util.StringUtils; public class SampleAssumeRoleCredential implements CustomCredentialProvider { public SampleAssumeRoleCredential() {} public MongoCredential getCustomCredential(Map<?, ?> map) { AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain(); Supplier<AwsCredential> awsFreshCredentialSupplier = () -> { AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard() .withCredentials(provider) .withRegion("us-east-1") .build(); AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600) .withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn")) .withRoleSessionName("Test_Session"); AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest); Credentials creds = assumeRoleResult.getCredentials(); // Add your code to fetch new credentials return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken()); }; return MongoCredential.createAwsCredential(null, null) .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier); } // Validates presence of an ARN public void validate(Map<?, ?> map) { String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn"); if (StringUtils.isNullOrEmpty(roleArn)) { throw new RuntimeException("Invalid value set for customProperty"); } } // Initializes the custom provider public void init(Map<?, ?> map) { } }
编译 JAR 文件并将其放置在部署中的 lib
文件夹中。
注意
要查看构建包含实现类的完整 JAR 的 pom.xml
文件示例,请参阅Kafka Connector GitHub 仓库的 README 文件.
接下来,配置您的源或接收连接器以包括自定义身份验证方法。以下配置属性定义了一个接收连接器,它通过 AWS IAM 身份验证连接 Kafka Connector 到 MongoDB Atlas
{ "name": "mongo-tutorial-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "<topic>", "connection.uri": "<connection string>?authSource=%24external&authMechanism=MONGODB-AWS&retryWrites=true&w=majority", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "database": "<db>", "collection": "<collection>", "mongo.custom.auth.mechanism.enable": "true", "mongo.custom.auth.mechanism.providerClass": "com.mongodb.SampleAssumeRoleCredential", "mongodbaws.auth.mechanism.roleArn": "<AWS IAM ARN>" } }
在此示例中,roleArn
的值是具有访问 MongoDB Atlas 权限的用户组的 IAM 角色。在 AWS IAM 控制台中,运行 Kafka Connect 的 IAM 账户对 Atlas 用户组具有 AssumeRole
权限。