各位大佬好!请教如何配置FLUME(1.9)集群将MYSQL数据写入到kafka(2.12-2.5)集群(MYSQL-->FLUME集群(未配置完成)-->KAFKA集群(已经配置完成)),以下是找到的配置资料,还不完整,请帮完成。
FLUME集群设计,AGENT1/AGENT2/AGENG3对应HOST1/HOST2/HOST3
collector1/collector2对应HOST1/HOST2
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1 k2
a1.sources.r1.type = org.keedio.flume.source.SQLSource
a1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
a1.sources.r1.hibernate.connection.user = root
a1.sources.r1.hibernate.connection.password = 07710714
a1.sources.r1.table = flink_kafka_table
a1.sources.r1.hibernate.connection.autocommit = true
a1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
a1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
a1.sources.r1.status.file.name = sqlSource.status
如何配置???
如何配置???
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
collector1.sources = r1
collector1.sinks = k1
collector1.channels = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flumekafkacluster
a1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
其中client.conf在三台服务器 colletor.conf在二台服务器
网上资料在client.conf的sink和 colletor.conf的source用avro /hadoop这个在我当前场景好像用不到,请教各位大佬,应该如何配置
你好!根据你提供的配置信息,以下是将 MySQL 数据写入 Kafka 的 FLUME 集群配置的完整示例。
首先,是 client.conf 的配置:
# FLUME CLUSTER CONFIG
# Name the components on this agent
# 列出 agent1 的组件,sinks 有两个,分别去到 collector1 和 collector2
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1 k2
# Describe/configure the source MYSQL
agent1.sources.r1.type = org.keedio.flume.source.SQLSource
agent1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
agent1.sources.r1.hibernate.connection.user = root
agent1.sources.r1.hibernate.connection.password = 07710714
agent1.sources.r1.table = flink_kafka_table
agent1.sources.r1.hibernate.connection.autocommit = true
agent1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
agent1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
agent1.sources.r1.status.file.name = sqlSource.status
# Describe the sink1
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = flumekafkacluster
agent1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k1.requiredAcks = 1
agent1.sinks.k1.batchSize = 20
# Describe the sink2
agent1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k2.topic = flumekafkacluster
agent1.sinks.k2.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k2.requiredAcks = 1
agent1.sinks.k2.batchSize = 20
# Create sink groups, 将多个 sinks 绑定为一个组
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut = 10000
# Failover 模式,只有 collector1 工作。仅当 collector1 挂了后,collector2 才能启动服务。
agent1.sinkgroups.g1.processor.type = failover
# 值越大,优先级越高,collector1 优先级最高
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
# 发生异常的 sink 最大故障转移时间(毫秒),这里设为10秒
agent1.sinkgroups.g1.processor.maxpenalty = 10000
# Use a channel which buffers events in memory
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1
接下来是 collector.conf 的配置:
# Name the components on this agent, collector1 为 agent 名字
# 可以有多个 sources、sinks、channels
collector1.sources = r1
collector1.sinks = k1
collector1.channels = c1
# Describe/configure the source MYSQL
collector1.sources.r1.type = org.keedio.flume.source.SQLSource
collector1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
collector1.sources.r1.hibernate.connection.user = root
collector1.sources.r1.hibernate.connection.password = 07710714
collector1.sources.r1.table = flink_kafka_table
collector1.sources.r1.hibernate.connection.autocommit = true
collector1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
collector1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
collector1.sources.r1.status.file.name = sqlSource.status
# Describe the sink
collector1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
collector1.sinks.k1.topic = flumekafkacluster
collector1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
collector1.sinks.k1.requiredAcks = 1
collector1.sinks.k1.batchSize = 20
# Use a channel which buffers events in memory
collector1.channels.c1.type = memory
collector1.channels.c1.capacity = 1000
collector1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
collector1.sources.r1.channels = c1
collector1.sinks.k1.channel = c1
以上配置可以实现将 MySQL 数据写入 Kafka,其中 client.conf 在三台服务器上配置,collector.conf 在两台服务器上配置。根据你提供的信息,已经完成了相应的配置,如果有需要,可以根据实际情况进行修改。希望这对你有所帮助!如果还有其他问题,请随时提问。
在MYSQL数据库中创建一个表,并确保您拥有读取该表的权限。
在每个Flume Agent所在的主机上安装Flume,并创建一个Flume配置文件。配置文件应包含以下信息:
Source:使用JDBC Source,配置JDBC连接信息、查询语句、数据类型和字段映射等。
Channel:使用Memory Channel,配置Channel容量和事务等。
Sink:使用Kafka Sink,配置Kafka连接信息、Topic和数据序列化等。在Sink中,您可以使用Kafka的Producer API将数据写入到Kafka集群中。
在每个Flume Agent所在的主机上启动Flume Agent,并指定相应的配置文件。例如,在HOST1上启动Agent1:
Copy
$ flume-ng agent -n Agent1 -c conf -f /path/to/flume.conf -Dflume.root.logger=INFO,console
```
在Kafka集群中创建一个Topic,并确保您拥有写入该Topic的权限。
在每个Collector所在的主机上安装Kafka,并创建一个Kafka配置文件。配置文件应包含以下信息:
Producer:使用Kafka Producer,配置Kafka连接信息和Topic等。
在每个Collector所在的主机上启动Kafka Producer,并指定相应的配置文件。例如,在HOST1上启动Collector1:
$ kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic mytopic
启动MySQL数据库,并向该表中插入一些数据。
检查Kafka集群中的Topic是否接收到了相应的数据。
在你提供的配置中,缺少关于源和第二个 sink 的具体配置。下面是完整配置的示例:
FLUME CLUSTER CONFIG
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1 k2
Describe/configure the source MYSQL
agent1.sources.r1.type = org.keedio.flume.source.SQLSource
agent1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
agent1.sources.r1.hibernate.connection.user = root
agent1.sources.r1.hibernate.connection.password = 07710714
agent1.sources.r1.table = flink_kafka_table
agent1.sources.r1.hibernate.connection.autocommit = true
agent1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
agent1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
agent1.sources.r1.status.file.name = sqlSource.status
Describe the sink1
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = flumekafkacluster
agent1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k1.requiredAcks = 1
agent1.sinks.k1.batchSize = 20
Describe the sink2
agent1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k2.topic = flumekafkacluster
agent1.sinks.k2.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
agent1.sinks.k2.requiredAcks = 1
agent1.sinks.k2.batchSize = 20
创建sink groups,将多个sinks绑定为一个组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
failover模式,只有collector1工作。仅当collector1挂了后,collector2才能启动服务。
a1.sinkgroups.g1.processor.type = failover
值越大,优先级越高,collector1优先级最高
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
发生异常的sink最大故障转移时间(毫秒),这里设为10秒
a1.sinkgroups.g1.processor.maxpenalty = 10000
使用一个内存缓冲事件的通道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
以上配置了三个组件,下面进行连接
将源和 sink 绑定到通道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
Name the components on this agent,collector1为agent名字
可以有多个sources、sinks、channels
collector1.sources = r1
collector1.sinks = k1
collector1.channels = c1
Describe/configure the source MYSQL
collector1.sources.r1.type = org.keedio.flume.source.SQLSource
collector1.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.1.4:3306/index?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8
collector1.sources.r1.hibernate.connection.user = root
collector1.sources.r1.hibernate.connection.password = 07710714
collector1.sources.r1.table = flink_kafka_table
collector1.sources.r1.hibernate.connection.autocommit = true
collector1.sources.r1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
collector1.sources.r1.status.file.path = /home/bigdata/apache-flume-1.9.0-bin/status
collector1.sources.r1.status.file.name = sqlSource.status
Describe the sink
collector1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
collector1.sinks.k1.topic = flumekafkacluster
collector1.sinks.k1.brokerList = 192.168.15.112:9092,192.168.15.113:9092,192.168.15.114:9092
collector1.sinks.k1.requiredAcks = 1
collector1.sinks.k1.batchSize = 20
Use a channel which buffers events in memory
collector1.channels.c1.type = memory
collector1.channels.c1.capacity = 1000
collector1.channels.c1.transactionCapacity = 100
以上配置了三个组件,下面进行连接
将源和 sink 绑定到通道
collector1.sources.r1.channels = c1
collector1.sinks.k1.channel = c1
请使用上述配置完成你的 FLUME 集群,将 MySQL 数据写入到 Kafka 集群中。记得在每台服务器上根据对应的主机名进行配置。
要配置一个FLUME集群将MySQL数据写入到Kafka集群,您可以按照以下步骤进行操作:
安装和配置Kafka集群:首先,确保您已经安装和配置了可用的Kafka集群。请参考Kafka的官方文档或相关教程来完成该步骤。
安装和配置FLUME:在每个要运行FLUME代理的节点上进行安装和配置。您可以从Apache FLUME的官方网站下载FLUME,并按照文档中提供的指南进行安装和基本配置。
创建FLUME配置文件:创建FLUME的配置文件(例如flume.conf
),并配置数据源、通道和目标。在该配置文件中,您需要为数据源指定MySQL的连接信息(主机、端口、数据库名等),并配置Kafka的连接信息(Bootstrap Servers、Topic等)。
以下是一个示例配置文件的部分内容:
# source: MySQL
agent.sources = jdbc_source
agent.sources.jdbc_source.type = org.apache.flume.source.jdbc.JdbcSource
agent.sources.jdbc_source.driver = com.mysql.jdbc.Driver
agent.sources.jdbc_source.url = jdbc:mysql://mysql_host:3306/db_name
agent.sources.jdbc_source.user = mysql_user
agent.sources.jdbc_source.password = mysql_password
agent.sources.jdbc_source.query = SELECT * FROM table_name
# channel: memory
agent.channels = memory_channel
agent.channels.memory_channel.type = memory
# sink: Kafka
agent.sinks = kafka_sink
agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka_sink.bootstrap.servers = kafka_host1:9092,kafka_host2:9092
agent.sinks.kafka_sink.topic = kafka_topic
# bind source, channel, and sink together
agent.sources.jdbc_source.channels = memory_channel
agent.sinks.kafka_sink.channel = memory_channel
启动FLUME代理:在每个节点上启动FLUME代理。您可以使用以下命令来启动FLUME:
flume-ng agent --conf /path/to/flume/conf --conf-file /path/to/flume.conf --name agent_name -Dflume.root.logger=INFO,console
将 /path/to/flume/conf
替换为FLUME的配置文件目录,将 /path/to/flume.conf
替换为您创建的FLUME配置文件路径,将 agent_name
替换为代理的名称。
监测和管理FLUME集群:一旦FLUME代理启动,您可以监测和管理它们以确保数据正确地从MySQL写入到Kafka中。这可能涉及查看日志、指标和性能统计信息,以及调整配置文件或添加更多的FLUME代理节点等。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。