当前位置: 首页 > news >正文

Kafka 0.11.0.2 安装备忘录

Kafka 0.11.0.2 安装备忘录

本文基于 Kafka 0.11.0.2的安装,环境 Centos 6 / Centos 7


服务器共3台,用来安装Kafka集群和ZK集群

服务器名IPzk myidKafka broker ID
zk110.20.32.11311
zk210.20.32.11422
zk310.20.32.12633

ZooKeeper 安装部分

  1. 分别在三台机器上,写入host名称

修改 \etc\hosts

10.20.32.113 zk1
10.20.32.114 zk2
10.20.32.126 zk3
  1. 下载解压ZooKeeper,我用的是zookeeper-3.4.11.统一放到/usr/local/zookeeper-3.4.11/

  2. 创建数据文件夹 ,写入zookeeper的ID到myid文件里,注意每台机器都不一样,可参照上面的表格,查看服务器 IP MYID的对应关系

mkdir /usr/local/zookeeper-3.4.11/data
touch /usr/local/zookeeper-3.4.11/data/myid
echo "1" > /usr/local/zookeeper-3.4.11/data/myid

  1. 在zk的conf文件夹里,复制zoo_sample.cfg为zoo.cfg,并修改端口(可选,我这里使用的不是默认端口2181,而是1119
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/usr/local/zookeeper-3.4.11/data/
# the port at which the clients will connect
clientPort=1119
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

server.1=zk1:5888:6888
server.2=zk2:5888:6888
server.3=zk3:5888:6888


修改 /etc/rc.local 配置ZooKeeper开机启动(可选),注意这里的JAVA_HOME要指向你本机实际的JAVA目录,这里我使用yum安装的openjdk

yum  install  java-1.8.0-openjdk   java-1.8.0-openjdk-devel

/etc/rc.local

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el6_9.x86_64/jre
/usr/local/zookeeper-3.4.11/bin/zkServer.sh start

下面就可以在三台机器上分别启动zk了

/usr/local/zookeeper-3.4.11/bin/zkServer.sh start

Kafka安装部分

  1. 下载解压Kafka 0.11.0.2,这里我统一安装到 /usr/local/kafka_2.11-0.11.0.2/

  2. 修改配置文件如下(以配置第一台kafka机器为例,配置很简单只需要修改配置文件的3处即可)

vi /usr/local/kafka_2.11-0.11.0.2/config/server.properties

#修改,这里ID为1

broker.id=1

#这里指定kafak运行端口,可以填写IP或者机器名

listeners=PLAINTEXT://zk1:9092

#指定zk集群的连接地址
zookeeper.connect=zk1:1119,zk2:1119,zk3:1119
  1. 依次完成3个机器的配置修改。下面启动Kafka集群
/usr/local/kafka_2.11-0.11.0.2/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.11.0.2/config/server.properties

另外可以把这个命令加入/etc/rc.local开机启动(可选)

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el6_9.x86_64/jre
/usr/local/zookeeper-3.4.11/bin/zkServer.sh start
/usr/local/kafka_2.11-0.11.0.2/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.11.0.2/config/server.properties

验证测试Kafka

以下所以操作都是在/usr/local/kafka_2.11-0.11.0.2/bin

cd /usr/local/kafka_2.11-0.11.0.2/bin

首先创建topic,此处我的tpoic叫Mark_test

/kafka-topics.sh  --create --zookeeper zk1:1119  --replication-factor 1 --partitions 1 --topic Mark_test

在本机或者其他2台机器上查看一下是否能看到这个topic

./kafka-topics.sh --list --zookeeper zk1:1119 Mark_test

在某一台机器上创建消息,运行命令以后可发送任意消息,此处为生产者

./kafka-console-producer.sh --broker-list zk1:9092 --topic Mark_test

在某一台机器上消费消息,运行命令以后可接受指定topic的消息,此处为消费者,可以在其他2台机器上运行此命令

./kafka-console-consumer.sh --zookeeper localhost:1119 --topic Mark_test --from-beginning

java客户端验证

建立maven项目,pom.xml内容如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>my.kafka.test</groupId>
    <artifactId>demo1</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.2</version>
        </dependency>
    </dependencies>
</project>

java生产者

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

/**
 * Hello world!
 */

public class KafkaProducer

{

    public final static String TOPIC = "Mark_test";
    private final Producer<String, String> producer;

    private KafkaProducer() {

        Properties props = new Properties();

        // 此处配置的是kafka的端口

        props.put("metadata.broker.list", "10.20.32.126:9092");

        // 配置value的序列化类

        props.put("serializer.class", "kafka.serializer.StringEncoder");

        // 配置key的序列化类

        props.put("key.serializer.class", "kafka.serializer.StringEncoder");

        // request.required.acks

        // 0, which means that the producer never waits for an acknowledgement
        // from the broker (the same behavior as 0.7). This option provides the
        // lowest latency but the weakest durability guarantees (some data will
        // be lost when a server fails).

        // 1, which means that the producer gets an acknowledgement after the
        // leader replica has received the data. This option provides better
        // durability as the client waits until the server acknowledges the
        // request as successful (only messages that were written to the
        // now-dead leader but not yet replicated will be lost).

        // -1, which means that the producer gets an acknowledgement after all
        // in-sync replicas have received the data. This option provides the
        // best durability, we guarantee that no messages will be lost as long
        // as at least one in sync replica remains.

        props.put("request.required.acks", "-1");

        producer = new Producer<String, String>(new ProducerConfig(props));

    }

    public static void main(String[] args)

    {

        new KafkaProducer().produce();

    }

    void produce() {

        int messageNo = 1000;

        final int COUNT = 10000;

        while (messageNo < COUNT) {

            String key = String.valueOf(messageNo);

            String data = "Current time is:" + System.currentTimeMillis() + " msg key: " + key;

            producer.send(new KeyedMessage<String, String>(TOPIC, key, data));

            System.out.println(data);

            messageNo++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }

}

java消费者

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumer {

    private final ConsumerConnector consumer;

    public KafkaConsumer() {

        Properties props = new Properties();

        // zookeeper 配置

        props.put("metadata.broker.list", "10.20.32.126:9092");

        // group 代表一个消费组

        props.put("group.id", "jd-group");

        // zk连接超时
        props.put("zookeeper.connect", "zk1:1119,zk2:1119,zk3:1119");
        props.put("zookeeper.session.timeout.ms", "4000");

        props.put("zookeeper.sync.time.ms", "200");

        props.put("auto.commit.interval.ms", "1000");

        props.put("auto.offset.reset", "largest");

        // 序列化类

        props.put("serializer.class", "kafka.serializer.StringEncoder");

        ConsumerConfig config = new ConsumerConfig(props);

        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);

    }

    public static void main(String[] args) {

        new KafkaConsumer().consume();

    }

    public void consume() {

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put("Mark_test", new Integer(1));

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());

        StringDecoder valueDecoder = new StringDecoder(
                new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap =

                consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

        KafkaStream<String, String> stream = consumerMap.get(
                "Mark_test").get(0);

        ConsumerIterator<String, String> it = stream.iterator();

        while (it.hasNext())

            System.out.println(it.next().message());

    }

}

最后放上配合0.11版本的kafka的java API

相关文章:

  • Radxa Rock 3a NPU调用指南
  • Java 线程中断、线程让步、线程睡眠、线程合并
  • Java笔记15 - 面向对象
  • XSS脚本攻击防御(Antisamy)
  • Session的原理分析
  • 设计模式--单例模式(懒汉、饿汉)
  • 20个Java小项目,献给嗜学如狂的人,拿来练练手
  • GFS文件分布式系统概述与部署
  • zabbix监控基本概念和部署
  • 灵性图书馆:好书推荐-《情绪的惊人力量》
  • Google Earth Engine(GEE)——MODIS/061/MOD09GA影像计算NDVI并导出结果并UI可视化批量导出(含错误提示)
  • 【MC教程】iPad启动Java版mc(无需越狱)(保姆级?) Jitterbug启动iOS我的世界Java版启动器 PojavLauncher
  • 记SpringBoot拦截器报错getWriter() has already been called for this response
  • c++ boost库
  • pandas使用groupby函数基于指定分组变量对dataframe数据进行分组、使用first函数获取每个分组数据中每个分组的第一个样本数据
  • Dubbo 整合 Pinpoint 做分布式服务请求跟踪
  • Golang-长连接-状态推送
  • iOS 系统授权开发
  • Java知识点总结(JavaIO-打印流)
  • MySQL用户中的%到底包不包括localhost?
  • Protobuf3语言指南
  • Python socket服务器端、客户端传送信息
  • RxJS: 简单入门
  • SegmentFault 2015 Top Rank
  • use Google search engine
  • 后端_ThinkPHP5
  • 基于组件的设计工作流与界面抽象
  • 漫谈开发设计中的一些“原则”及“设计哲学”
  • 山寨一个 Promise
  • 微信小程序:实现悬浮返回和分享按钮
  • 在electron中实现跨域请求,无需更改服务器端设置
  • 1.Ext JS 建立web开发工程
  • Mac 上flink的安装与启动
  • UI设计初学者应该如何入门?
  • !!【OpenCV学习】计算两幅图像的重叠区域
  • (02)Hive SQL编译成MapReduce任务的过程
  • (1综述)从零开始的嵌入式图像图像处理(PI+QT+OpenCV)实战演练
  • (4.10~4.16)
  • (done) 两个矩阵 “相似” 是什么意思?
  • (webRTC、RecordRTC):navigator.mediaDevices undefined
  • (附源码)springboot助农电商系统 毕业设计 081919
  • (附源码)ssm经济信息门户网站 毕业设计 141634
  • (四)Controller接口控制器详解(三)
  • (转) Android中ViewStub组件使用
  • (转)jQuery 基础
  • .NET CORE使用Redis分布式锁续命(续期)问题
  • .net redis定时_一场由fork引发的超时,让我们重新探讨了Redis的抖动问题
  • .NET 服务 ServiceController
  • .Net 路由处理厉害了
  • .netcore如何运行环境安装到Linux服务器
  • /3GB和/USERVA开关
  • /bin/bash^M: bad interpreter: No such file or directory
  • @reference注解_Dubbo配置参考手册之dubbo:reference
  • @RequestParam @RequestBody @PathVariable 等参数绑定注解详解
  • [Android] Implementation vs API dependency