当前位置: 首页 > news >正文

Kafka部署与代码实例(转)

来自:http://doc.okbase.net/QING____/archive/19447.html

也可参考:

http://blog.csdn.net/21aspnet/article/details/19325373

http://blog.csdn.net/unix21/article/details/18990123

 

kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.

    我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.

    其中kafka为0.8V,zookeeper为3.4.5V

 

一.Zookeeper集群构建

    我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.

    1) zk-0

    调整配置文件:

clientPort=2181
server.0=127.0.0.1:2888:3888
server.1=127.0.0.1:2889:3889
server.2=127.0.0.1:2890:3890
##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

./zkServer.sh start

    2) zk-1

    调整配置文件(其他配置和zk-0一只):

clientPort=2182
##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

 

./zkServer.sh start

    3) zk-2

    调整配置文件(其他配置和zk-0一只):

clientPort=2183
##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

 

./zkServer.sh start

  

二. Kafka集群构建

    因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.

    1) kafka-0

    在config目录下修改配置文件为:

复制代码
broker.id=0
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912

log.cleanup.interval.mins=10
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false
复制代码

 

    因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。

> cd kafka-0
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency 

    其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:

> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &

    因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.

    2) kafka-1

broker.id=1
port=9093
##其他配置和kafka-0保持一致

    然后和kafka-0一样执行打包命令,然后启动此broker.

> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &

    到目前为止环境已经OK了,那我们就开始展示编程实例吧。

 

三.项目准备

    项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。

复制代码
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.test</groupId> <artifactId>test-kafka</artifactId> <packaging>jar</packaging> <name>test-kafka</name> <url>http://maven.apache.org</url> <version>1.0.0</version> <dependencies> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.0</artifactId> <version>0.8.0-beta1</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency> </dependencies> <build> <finalName>test-kafka-1.0</finalName> <resources> <resource> <directory>src/main/resources</directory> <filtering>true</filtering> </resource> </resources> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.5</source> <target>1.5</target> <encoding>gb2312</encoding> </configuration> </plugin> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>2.2</version> <configuration> <encoding>gbk</encoding> </configuration> </plugin> </plugins> </build> </project>
复制代码

 

 

四.Producer端代码

    1) producer.properties文件:此文件放在/resources目录下

复制代码
#partitioner.class=
metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
##,127.0.0.1:9093
producer.type=sync
compression.codec=0 serializer.class=kafka.serializer.StringEncoder ##在producer.type=async时有效 #batch.num.messages=100
复制代码

 

    2) LogProducer.java代码样例

复制代码
package com.test.kafka;

import java.util.ArrayList;
import java.util.Collection; import java.util.List; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class LogProducer { private Producer<String,String> inner; public LogProducer() throws Exception{ Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); ProducerConfig config = new ProducerConfig(properties); inner = new Producer<String, String>(config); } public void send(String topicName,String message) { if(topicName == null || message == null){ return; } KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message); inner.send(km); } public void send(String topicName,Collection<String> messages) { if(topicName == null || messages == null){ return; } if(messages.isEmpty()){ return; } List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(); for(String entry : messages){ KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry); kms.add(km); } inner.send(kms); } public void close(){ inner.close(); } /** * @param args */ public static void main(String[] args) { LogProducer producer = null; try{ producer = new LogProducer(); int i=0; while(true){ producer.send("test-topic", "this is a sample" + i); i++; Thread.sleep(2000); } }catch(Exception e){ e.printStackTrace(); }finally{ if(producer != null){ producer.close(); } } } }
复制代码

 

 

五.Consumer端

     1) consumer.properties:文件位于/resources目录下

复制代码
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
##,127.0.0.1:2182,127.0.0.1:2183
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group #consumer timeout #consumer.timeout.ms=5000
复制代码

 

    2) LogConsumer.java代码样例

复制代码
package com.test.kafka;

import java.util.HashMap;
import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class LogConsumer { private ConsumerConfig config; private String topic; private int partitionsNum; private MessageExecutor executor; private ConsumerConnector connector; private ExecutorService threadPool; public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{ Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties")); config = new ConsumerConfig(properties); this.topic = topic; this.partitionsNum = partitionsNum; this.executor = executor; } public void start() throws Exception{ connector = Consumer.createJavaConsumerConnector(config); Map<String,Integer> topics = new HashMap<String,Integer>(); topics.put(topic, partitionsNum); Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics); List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic); threadPool = Executors.newFixedThreadPool(partitionsNum); for(KafkaStream<byte[], byte[]> partition : partitions){ threadPool.execute(new MessageRunner(partition)); } } public void close(){ try{ threadPool.shutdownNow(); }catch(Exception e){ // }finally{ connector.shutdown(); } } class MessageRunner implements Runnable{ private KafkaStream<byte[], byte[]> partition; MessageRunner(KafkaStream<byte[], byte[]> partition) { this.partition = partition; } public void run(){ ConsumerIterator<byte[], byte[]> it = partition.iterator(); while(it.hasNext()){ MessageAndMetadata<byte[],byte[]> item = it.next(); System.out.println("partiton:" + item.partition()); System.out.println("offset:" + item.offset()); executor.execute(new String(item.message()));//UTF-8  } } } interface MessageExecutor { public void execute(String message); } /** * @param args */ public static void main(String[] args) { LogConsumer consumer = null; try{ MessageExecutor executor = new MessageExecutor() { public void execute(String message) { System.out.println(message); } }; consumer = new LogConsumer("test-topic", 2, executor); consumer.start(); }catch(Exception e){ e.printStackTrace(); }finally{ // if(consumer != null){ // consumer.close(); // }  } } }
复制代码

 

    在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

转载于:https://www.cnblogs.com/lyl693/p/6649870.html

相关文章:

  • 0-1岁宝宝的游戏和活动指南
  • Oracle性能优化之表压缩及并行提高效率的测试
  • Excel数组排序+图片统一大小
  • composer
  • 不求完美但求易用 报价软件适时出笼(温州传奇4)
  • 微信开源mars源码分析1—上层samples分析
  • 如何让普通域用户可以登录域控
  • jQuery实现AJAX定时局部页面刷新
  • Centos文件查看命令字符
  • ospf实例分析 (子网掩码实战)
  • 欢迎访问我的个人网站
  • 通过串口收发短消息(上)
  • [LeetCode]Reverse Linked List II
  • Vijos 1067Warcraft III 守望者的烦恼
  • 清除连接(其他电脑的)记录
  • android百种动画侧滑库、步骤视图、TextView效果、社交、搜房、K线图等源码
  • Angular2开发踩坑系列-生产环境编译
  • canvas 五子棋游戏
  • create-react-app做的留言板
  • JS数组方法汇总
  • Redux系列x:源码分析
  • Terraform入门 - 3. 变更基础设施
  • 翻译 | 老司机带你秒懂内存管理 - 第一部(共三部)
  • 基于web的全景—— Pannellum小试
  • 简单易用的leetcode开发测试工具(npm)
  • 看域名解析域名安全对SEO的影响
  • 聊聊redis的数据结构的应用
  • 数据可视化之 Sankey 桑基图的实现
  • 我与Jetbrains的这些年
  • 400多位云计算专家和开发者,加入了同一个组织 ...
  • #Linux(Source Insight安装及工程建立)
  • #NOIP 2014#day.2 T1 无限网络发射器选址
  • %3cli%3e连接html页面,html+canvas实现屏幕截取
  • (ZT) 理解系统底层的概念是多么重要(by趋势科技邹飞)
  • (非本人原创)史记·柴静列传(r4笔记第65天)
  • (附源码)node.js知识分享网站 毕业设计 202038
  • (附源码)基于SSM多源异构数据关联技术构建智能校园-计算机毕设 64366
  • (附源码)计算机毕业设计SSM教师教学质量评价系统
  • (十七)devops持续集成开发——使用jenkins流水线pipeline方式发布一个微服务项目
  • (数位dp) 算法竞赛入门到进阶 书本题集
  • (学习日记)2024.01.09
  • (转)PlayerPrefs在Windows下存到哪里去了?
  • ***php进行支付宝开发中return_url和notify_url的区别分析
  • .gitignore文件---让git自动忽略指定文件
  • .helper勒索病毒的最新威胁:如何恢复您的数据?
  • .NET Core中的去虚
  • .NET delegate 委托 、 Event 事件
  • .NET 分布式技术比较
  • .NET 简介:跨平台、开源、高性能的开发平台
  • .NET开发不可不知、不可不用的辅助类(三)(报表导出---终结版)
  • .net下的富文本编辑器FCKeditor的配置方法
  • ;号自动换行
  • @GlobalLock注解作用与原理解析
  • [2024最新教程]地表最强AGI:Claude 3注册账号/登录账号/访问方法,小白教程包教包会
  • [AIGC] Java 和 Kotlin 的区别