Kafka与FlumeNG整合

news/2024/7/24 7:08:48 标签: flume

1,作为Producer的Flume端配置,其中是以netcat为source数据源,sink是kafka

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ cat conf/producer1.properties   
  2. #agent section  
  3. producer.sources = s  
  4. producer.channels = c  
  5. producer.sinks = r  
  6.   
  7. #source section  
  8. #producer.sources.s.type = seq  
  9. producer.sources.s.type = netcat  
  10. producer.sources.s.bind = localhost  
  11. producer.sources.s.port = 44444  
  12. producer.sources.s.channels = c  
  13.   
  14. # Each sink's type must be defined  
  15. producer.sinks.r.type = org.apache.flume.plugins.KafkaSink  
  16. producer.sinks.r.metadata.broker.list=127.0.0.1:9092  
  17. producer.sinks.r.partition.key=0  
  18. producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition  
  19. producer.sinks.r.serializer.class=kafka.serializer.StringEncoder  
  20. producer.sinks.r.request.required.acks=0  
  21. producer.sinks.r.max.message.size=1000000  
  22. producer.sinks.r.producer.type=sync  
  23. producer.sinks.r.custom.encoding=UTF-8  
  24. producer.sinks.r.custom.topic.name=test  
  25.   
  26. #Specify the channel the sink should use  
  27. producer.sinks.r.channel = c  
  28.   
  29. # Each channel's type is defined.  
  30. producer.channels.c.type = memory  
  31. producer.channels.c.capacity = 1000  
  32.   
  33. hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$   
2,配置consumer,source是Kafka,sink是logger

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ cat conf/comsumer1.properties   
  2. umer config  
  3. ###########################################  
  4.   
  5. consumer.sources = s  
  6. consumer.channels = c  
  7. consumer.sinks = r  
  8.   
  9. consumer.sources.s.type = seq  
  10. consumer.sources.s.channels = c  
  11. consumer.sinks.r.type = logger  
  12.   
  13. consumer.sinks.r.channel = c  
  14. consumer.channels.c.type = memory  
  15. consumer.channels.c.capacity = 100  
  16.   
  17. consumer.sources.s.type = org.apache.flume.plugins.KafkaSource  
  18. consumer.sources.s.zookeeper.connect=127.0.0.1:2181  
  19. consumer.sources.s.group.id=testGroup  
  20. consumer.sources.s.zookeeper.session.timeout.ms=400  
  21. consumer.sources.s.zookeeper.sync.time.ms=200  
  22. consumer.sources.s.auto.commit.interval.ms=1000  
  23. consumer.sources.s.custom.topic.name=test  
  24. consumer.sources.s.custom.thread.per.consumer=4  
3,分别运行着两个agent

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. bin/flume-ng agent --conf conf  --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console  

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. bin/flume-ng agent --conf conf  --conf-file conf/comsumer1.properties   --name consumer -Dflume.root.logger=INFO,console  

4,这时telnet上端口44444

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ telnet localhost 44444  
  2. Trying ::1...  
  3. Trying 127.0.0.1...  
  4. Connected to localhost.  
  5. Escape character is '^]'.  
  6. 1111111111111111  
  7. OK  
  8. kak^Hfkakakkakakakkakkakkaakaknnnm  
  9. OK  
  10. abcdefghijklmnopqrstuvwxyz  
  11. OK  
两个agent都有信息输出

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. 2014-01-15 20:01:05,047 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:67)] Property metadata.broker.list is overridden to 127.0.0.1:9092  
  2. ] -- [{ headers:{} body: 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 1111111111111111 }]lume.plugins.KafkaSink.process(KafkaSink.java:137)] Send Message to Kafka : [1111111111111111  
  3. 2014-01-15 20:01:35,702 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:67)] Fetching metadata from broker id:0,host:127.0.0.1,port:9092 with correlation id 0 for 1 topic(s) Set(test)  
  4. 2014-01-15 20:01:35,704 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:67)] Connected to 127.0.0.1:9092 for producing  
  5. 2014-01-15 20:01:35,727 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:67)] Disconnecting from 127.0.0.1:9092  
  6. 2014-01-15 20:01:35,767 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:67)] Connected to stormspark:9092 for producing  
  7. ] -- [{ headers:{} body: 6B 61 6B 08 66 6B 61 6B 61 6B 6B 61 6B 61 6B 61 kak.fkakakkakaka }]lume.plugins.KafkaSink.process(KafkaSink.java:137)] Send Message to Kafka : [kafkakakkakakakkakkakkaakaknnnm  
  8. ] -- [{ headers:{} body: 61 62 63 64 65 66 67 68 69 6A 6B 6C 6D 6E 6F 70 abcdefghijklmnop }]lume.plugins.KafkaSink.process(KafkaSink.java:137)] Send Message to Kafka : [abcdefghijklmnopqrstuvwxyz  
  9. ] -- [{ headers:{} body: 71 75 69 74 0D                                  quit. }]rg.apache.flume.plugins.KafkaSink.process(KafkaSink.java:137)] Send Message to Kafka : [quit  

另一个

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. 2014-01-15 19:58:02,434 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 32 32 32 32 32 32 32 32 32 32 32 32 32 32 32 32 2222222222222222 }  
  2. ]014-01-15 20:01:35,771 (pool-4-thread-1) [INFO - org.apache.flume.plugins.KafkaSource$ConsumerWorker.run(KafkaSource.java:230)] Receive Message [Thread 0: 1111111111111111  
  3. 2014-01-15 20:01:36,487 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 31 1111111111111111 }  
  4. ]014-01-15 20:02:13,784 (pool-4-thread-1) [INFO - org.apache.flume.plugins.KafkaSource$ConsumerWorker.run(KafkaSource.java:230)] Receive Message [Thread 0: kafkakakkakakakkakkakkaakaknnnm  
  5. 2014-01-15 20:02:14,500 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 6B 61 6B 08 66 6B 61 6B 61 6B 6B 61 6B 61 6B 61 kak.fkakakkakaka }  
  6. ]014-01-15 20:02:28,960 (pool-4-thread-1) [INFO - org.apache.flume.plugins.KafkaSource$ConsumerWorker.run(KafkaSource.java:230)] Receive Message [Thread 0: abcdefghijklmnopqrstuvwxyz  
  7. 2014-01-15 20:02:29,506 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 61 62 63 64 65 66 67 68 69 6A 6B 6C 6D 6E 6F 70 abcdefghijklmnop }  
  8. ]014-01-15 20:03:54,986 (pool-4-thread-1) [INFO - org.apache.flume.plugins.KafkaSource$ConsumerWorker.run(KafkaSource.java:230)] Receive Message [Thread 0: quit  
  9. 2014-01-15 20:03:55,529 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 71 75 69 74 0D                                  quit. }  
  10. ^C2014-01-15 20:09:10,094 (agent-shutdown-hook) [INFO - org.apache.flume.lifecycle.LifecycleSup  

http://www.niftyadmin.cn/n/813674.html

相关文章

安装和测试Kafka

本文主要介绍如何在单节点上安装 Kafka 并测试 broker、producer 和 consumer 功能。 下载 进入下载页面:http://kafka.apache.org/downloads.html ,选择 Binary downloads下载 (Source download需要编译才能使用),这里…

iOS绘图教程

本文转载至 http://blog.csdn.net/pjk1129/article/details/12783677 原贴地址:http://www.cnblogs.com/xdream86/archive/2012/12/12/2814552.html 本文是《Programming iOS5》中Drawing一章的翻译,考虑到主题完整性,翻译版本中加入了一些书…

kafka集群搭建和使用Java写kafka生产者消费者

1 kafka集群搭建 Java代码 1.zookeeper集群 搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id110 host.name192.168.1.110 log.dirs/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他…

IDEA热部署安装JRebel启动问题

关于IDEA安装Jrebel的write error问题# 1.首先先明确问题JRebel启动报这个错一般是出现中文路径,我就是因为电脑的用户名是中文才导致的## 2.解决方案方案一:在系统上新建一个用户,此用户名用英文,并把原来的用户删掉,…

Kafka安装配置测试

Kafka的整体架构: 本文中的配置: 在两台机器Node1和Node2上,分别部署了两个broker,Zookeeper使用的是单独的ZK集群。 在每个机器上下载并解压kafka_2.10-0.8.2.1 http://kafka.apache.org/downloads.html Kafka配置 Node1: ip为 1…

JDK 并发容器总结

一 JDK 提供的并发容器总结 《实战Java高并发程序设计》为我们总结了下面几种大家可能会在高并发程序设计中经常遇到和使用的 JDK 为我们提供的并发容器。先带大家概览一下,下面会一一介绍到。 JDK提供的这些容器大部分在 java.util.concurrent 包中。 Concurrent…

iOS 企业证书发布app 流程

本文转载至 http://blog.csdn.net/l_ch_g/article/details/17394931 企业发布app的 过程比app store 发布的简单多了,没那么多的要求,哈 但是整个工程的要求还是一样,比如各种像素的icon啊 命名规范啊等等。 下面是具体的流程 1、修改你的 bu…

Kafka安装及部署

阅读目录 一、环境配置二、操作过程Kafka介绍 安装及部署 回到顶部一、环境配置 操作系统:Cent OS 7 Kafka版本:0.9.0.0 Kafka官网下载:请点击 JDK版本:1.7.0_51 SSH Secure Shell版本:XShell 5 回到顶部二、操作…