当前位置: 首页 > news >正文

创建Kafka0.8.2生产者与消费者

一、下载安装Kafka0.8.2

二、vi config/server.properties

三、修改为advertised.host.name=192.168.1.76

四、rm -rf  /tmp *移除临时目录下的文件

五、修改vi /etc/hosts中的127.0.0.1为192.168.1.76

六、开启zookeeper 

[html]  view plain  copy
 
  1. bin/zookeeper-server-start.sh config/zookeeper.properties  


七、开启kafka

bin/kafka-server-start.sh config/server.properties


八、创建主题

bin/kafka-topics.sh --create --zookeeper 192.168.1.76:2181 --replication-factor 1 --partitions 1 --topic mytesttopic


九、开启消费者

bin/kafka-console-consumer.sh --zookeeper 192.168.1.76:2181 --topic mytesttopic --from-beginning 回车


十、生产者代码(0.8.2.1的jar包)

[java]  view plain  copy
 
  1. import java.util.*;  
  2.   
  3. import org.apache.kafka.clients.producer.KafkaProducer;  
  4. import org.apache.kafka.clients.producer.ProducerRecord;  
  5.   
  6. public class SimpleProducer {  
  7.     public static void main(String[] args) {  
  8.         Properties properties = new Properties();  
  9.         properties.put("bootstrap.servers", "192.168.1.76:9092");  
  10.         properties.put("metadata.broker.list", "192.168.1.76:9092");  
  11.         properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  12.         properties.put("serializer.class", "kafka.serializer.StringEncoder");  
  13.         properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  14.         properties.put("request.required.acks", "1");  
  15.   
  16.         KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);  
  17.         for (int iCount = 0; iCount < 100; iCount++) {  
  18.             String message = "My Test Message No " + iCount;  
  19.             ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("mytesttopic", message);  
  20.             producer.send(record);  
  21.         }  
  22.         producer.close();  
  23.     }  
  24. }  

十一、查看结果

 

 

[html]  view plain  copy
 
  1. My Test Message No 0  
  2. My Test Message No 1  
  3. My Test Message No 2  
  4. My Test Message No 3  
  5. My Test Message No 4  
  6. My Test Message No 5  
  7. My Test Message No 6  
  8. My Test Message No 7  
  9. My Test Message No 8  
  10. My Test Message No 9  
  11. My Test Message No 10  
[html]  view plain  copy
 
  1. ...................  
[html]  view plain  copy
 
  1. ..................  

十、消费者代码(0.8.2.1的jar包)

 

[java]  view plain  copy
 
  1. import kafka.consumer.ConsumerConfig;  
  2. import kafka.consumer.ConsumerIterator;  
  3. import kafka.consumer.KafkaStream;  
  4. import kafka.serializer.StringDecoder;  
  5. import kafka.utils.VerifiableProperties;  
  6. import java.util.*;  
  7. public class SimpleConsumerExample {  
  8.   
  9.     private static kafka.javaapi.consumer.ConsumerConnector consumer;  
  10.   
  11.     public static void consume() {  
  12.   
  13.         Properties props = new Properties();  
  14.         // zookeeper 配置  
  15.         props.put("zookeeper.connect", "192.168.1.76:2181");  
  16.   
  17.         // group 代表一个消费组  
  18.         props.put("group.id", "jd-group");  
  19.   
  20.         // zk连接超时  
  21.         props.put("zookeeper.session.timeout.ms", "4000");  
  22.         props.put("zookeeper.sync.time.ms", "200");  
  23.         props.put("auto.commit.interval.ms", "1000");  
  24.         props.put("auto.offset.reset", "smallest");  
  25.         // 序列化类  
  26.         props.put("serializer.class", "kafka.serializer.StringEncoder");  
  27.   
  28.         ConsumerConfig config = new ConsumerConfig(props);  
  29.   
  30.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);  
  31.   
  32.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  33.         topicCountMap.put("mytesttopic", new Integer(1));  
  34.   
  35.         StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
  36.         StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
  37.   
  38.         Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,  
  39.                 keyDecoder, valueDecoder);  
  40.         KafkaStream<String, String> stream = consumerMap.get("mytesttopic").get(0);  
  41.         ConsumerIterator<String, String> it = stream.iterator();  
  42.         while (it.hasNext())  
  43.             System.out.println(it.next().message());  
  44.     }  
  45.   
  46.     public static void main(String[] args) {  
  47.         consume();  
  48.     }  
  49. }  

十一、提供下C#版的代码

 

[csharp]  view plain  copy
 
    1. static void Main(string[] args)  
    2. {  
    3.     //https://github.com/Jroland/kafka-net  
    4.     //生产者  
    5.     //var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));  
    6.     //var router = new BrokerRouter(options);  
    7.     //var client = new Producer(router);  
    8.   
    9.     //client.SendMessageAsync("mytesttopic", new[] { new Message("hello world") }).Wait();  
    10.   
    11.     //using (client) { }  
    12.   
    13.     //消费者  
    14.     var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));  
    15.     var router = new BrokerRouter(options);  
    16.     var consumer = new Consumer(new ConsumerOptions("mytesttopic", router));  
    17.   
    18.     //Consume returns a blocking IEnumerable (ie: never ending stream)  
    19.     foreach (var message in consumer.Consume())  
    20.     {  
    21.         Console.WriteLine("Response: P{0},O{1} : {2}",  
    22.             message.Meta.PartitionId, message.Meta.Offset,System.Text.Encoding.ASCII.GetString(message.Value));  
    23.     }  
    24.     Console.ReadLine();  
    25. }  

转载于:https://www.cnblogs.com/heidsoft/p/7697979.html

相关文章:

  • IDEA在编辑时提示could not autowire
  • linux 使用记录
  • 关于ashx不可重定向问题
  • 有关“树上剩余几只鸟”的问题的思考及解答
  • 可以放在页面任何地方de 天气插件
  • yum安装指定版本的软件包的方法
  • 探秘Spring AOP (三) Spring AOP 使用讲解 2
  • 基于java config的springSecurity(四)--启用全局方法安全
  • 黑客预警:搞瘫北美互联网?规模更大的僵尸网络现身
  • 一个关于ConfigurationManager.GetSecion方法的小问题
  • 基础大概回顾
  • 重新学习Mysql数据库3:Mysql存储引擎与数据存储原理
  • P1679 神奇的四次方数
  • nginx服务企业应用
  • Hadoop起源
  • download使用浅析
  • Java 11 发布计划来了,已确定 3个 新特性!!
  • Javascript编码规范
  • javascript从右向左截取指定位数字符的3种方法
  • Java基本数据类型之Number
  • js如何打印object对象
  • leetcode46 Permutation 排列组合
  • Markdown 语法简单说明
  • PV统计优化设计
  • SegmentFault 2015 Top Rank
  • TypeScript实现数据结构(一)栈,队列,链表
  • VirtualBox 安装过程中出现 Running VMs found 错误的解决过程
  • vue和cordova项目整合打包,并实现vue调用android的相机的demo
  • 悄悄地说一个bug
  • 三分钟教你同步 Visual Studio Code 设置
  • 深度学习中的信息论知识详解
  • 首页查询功能的一次实现过程
  • 想晋级高级工程师只知道表面是不够的!Git内部原理介绍
  • ​低代码平台的核心价值与优势
  • # 学号 2017-2018-20172309 《程序设计与数据结构》实验三报告
  • #WEB前端(HTML属性)
  • #微信小程序:微信小程序常见的配置传旨
  • (第27天)Oracle 数据泵转换分区表
  • (十)c52学习之旅-定时器实验
  • (一)Linux+Windows下安装ffmpeg
  • (转)总结使用Unity 3D优化游戏运行性能的经验
  • .【机器学习】隐马尔可夫模型(Hidden Markov Model,HMM)
  • .NET/C# 反射的的性能数据,以及高性能开发建议(反射获取 Attribute 和反射调用方法)
  • .NET/C# 获取一个正在运行的进程的命令行参数
  • .NET序列化 serializable,反序列化
  • [20171113]修改表结构删除列相关问题4.txt
  • [BeginCTF]真龙之力
  • [BJDCTF 2020]easy_md5
  • [C语言]——内存函数
  • [ffmpeg] x264 配置参数解析
  • [Google Guava] 1.1-使用和避免null
  • [IDF]啥?
  • [JS设计模式]Prototype Pattern
  • [LeetCode]剑指 Offer 40. 最小的k个数
  • [moka同学笔记]yii表单dropdownlist样式