我查看canal的connector.kafka源码发现没有开放对kafka sasl的支持,所以我自己为生产者和消费者添加了sasl功能。 生产者添加编译完是可以同步sasl认证正常连接到kafka的消费者添加时,我遇到了问题,错误信息如下
`2020-09-27 16:49:04.464 [Thread-4] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - process error! org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:793) ~[na:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:644) ~[na:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:624) ~[na:na] at com.alibaba.otter.canal.connector.kafka.consumer.CanalKafkaConsumer.connect(CanalKafkaConsumer.java:77) ~[na:na] at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.process(AdapterProcessor.java:184) ~[client-adapter.launcher-1.1.5-SNAPSHOT.jar:na] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_141]
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:125) ~[na:na] at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140) ~[na:na] at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65) ~[na:na] at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88) ~[na:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:710) ~[na:na] ... 5 common frames omitted
Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794) ~[na:1.8.0_141] at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) ~[na:1.8.0_141] at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) ~[na:1.8.0_141] at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) ~[na:1.8.0_141] at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_141] at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) ~[na:1.8.0_141] at javax.security.auth.login.LoginContext.login(LoginContext.java:587) ~[na:1.8.0_141] at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52) ~[na:na] at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:53) ~[na:na] at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:89) ~[na:na] at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:114) ~[na:na] ... 9 common frames omitted`
而我的sasl.jaas.config等参数配置已经在生产者中验证成功了,配置代码如下,添加于CanalKafkaConsumer的init方法尾部
if (Boolean.parseBoolean(properties.getProperty(KafkaConstants.CANAL_MQ_KAFKA_SASL_ENABLE))){ kafkaProperties.put("security.protocol", properties.getProperty(KafkaConstants.CANAL_MQ_KAFKA_SECURITY_PROTOCOL)); kafkaProperties.put("sasl.mechanism", properties.getProperty(KafkaConstants.CANAL_MQ_KAFKA_SASL_MECHANISM)); kafkaProperties.put("sasl.jaas.config", properties.getProperty(KafkaConstants.CANAL_MQ_KAFKA_SASL_JAAS_CONFIG)); }
我尝试了三种sasl配置添加方式:
1、sasl.jaas.config
2、应用启动脚本中-D追加系统参数
3、System.setProperty
我对java的JAAS身份验证原理了解还很浅显,观察canal的client-adapter的启动代码发现kafka consumer是线程启动的,是否可能于此有关。 恳请大佬解惑,感谢。
原提问者GitHub用户q2515045
我找到问题了 Adapter中实例化consumer使用的静态实例化,这个和生产端代码一样,。 但是,这个实例化调取的init没有new consumer对象,new操作在下面声明的子线程中做的,我修改connector代码将new操作移至init方法内,运行通过。 再跟进验证是否存在其它问题。。
原回答者GitHub用户q2515045
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。