当前位置: 首页 > news >正文

[平台运维、Hadoop]kafka streams概述

目录

一、 kafka streams概述

二、kafka streams开发单词计数应用


 

一、 kafka streams概述

Kafka Streams是Apache Kafka开源项目的一个流处理框架,它是基于Kafka的生产者和消费者,为开发者提供了流式处理的能力,具有低延迟性.高扩展性、高弹性、高容错性的特点,易于集成到现有的应用程序中。

KafkaStreams是一套处理分析Kafka中存储数据的客户端类库,处理完的数据可以重新写回Kafka,也可以发送给外部存储系统。作为类库,可以非常方便地嵌人到应用程序中,直接提供具体的类供开发者调用,而且在打包和部署的过程中基本没有任何要求,整个应用的运行方式主要由开发者控制,方便使用和调试。

二、kafka streams开发单词计数应用

(步骤一)编写代码

①创建名为LogProcessor的Java class

 

② 编写LogProcessor.java代码

package cn.itcast;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorContext;

import java.util.HashMap;



public class LogProcessor implements Processor<byte[],byte[]> {

    //上下文对象

    private ProcessorContext processorContext;

    @Override

    public void init(ProcessorContext processorContext) {

        //初始化方法

        this.processorContext=processorContext;

    }

    @Override

    public void process(byte[] key, byte[] value) {

        //处理一条消息

        String inputOri = new String(value);

        HashMap <String,Integer> map = new HashMap<String,Integer>();

        int times = 1;

        if(inputOri.contains(" ")){

            //截取字段

            String [] words = inputOri.split(" ");

            for (String word : words){

                if(map.containsKey(word)){

                    map.put(word,map.get(word)+1);

                }else{

                    map.put(word,times);

                }

            }

        }

        inputOri = map.toString();

        processorContext.forward(key,inputOri.getBytes());

    }

    @Override

    public void close() {}

}

③ 创建名为App的Java class

 编写App.java代码

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import org.apache.kafka.streams.processor.Processor;

import org.apache.kafka.streams.processor.ProcessorSupplier;



import java.util.Properties;



public class App {

    public static void main(String[] args) {

        //声明来源主题

        String fromTopic = "testStreams1";

        //声明目标主题

        String toTopic = "testStreams2";

        //设置参数

        Properties props = new Properties();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");

        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092,slave2:9092");

        //实例化StreamsConfig

        StreamsConfig config = new StreamsConfig(props);

        //构建拓扑结构

        Topology topology = new Topology();

        //添加源处理节点,为源处理节点指定名称和它订阅的主题

        topology.addSource("SOURCE",fromTopic)

                //添加自定义处理节点,指定名称,处理器类和上一个节点的名称

                .addProcessor("PROCESSOR", new ProcessorSupplier() {

                    @Override

                    public Processor get() {//调用这个方法,就知道这条数据用哪个process处理,

                        return new LogProcessor();

                    }

                },"SOURCE")

                //添加目标处理节点,需要指定目标处理节点的名称,和上一个节点名称。

                .addSink("SINK",toTopic,"PROCESSOR");//最后给SINK

        //实例化KafkaStreams

        KafkaStreams streams = new KafkaStreams(topology,config);

        streams.start();

    }

}

(步骤二)执行测试

① 在master节点创建testStreams1和testStreams2主题

$ bin/kafka-topics.sh --create --topic testStreams1 --partitions 3 --replication-factor 1 --zookeeper master:2181,slave1:2181,slave2:2181
$ bin/kafka-topics.sh --create --topic testStreams2 --partitions 3 --replication-factor 1 --zookeeper master:2181,slave1:2181,slave2:2181

 

 启动生产者服务命令

bin/kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic testStreams1

 

 启动消费者服务命令

bin/kafka-console-consumer.sh --from-beginning --topic testStreams2 --bootstrap-server master:9092,slave1:9092,slave2:9092

 

 再运行App.java程序

 

 在master节点输入任意数据,按enter键发送,在slave1节点上可以查看到消息

输入内容如下:Hello itcast hello spark hello kafka,结果如下图

 

 

 

相关文章:

  • 【祝福伟大的祖国】Java Web 9.2 Request 对象 9.2.5 请求参数中文乱码问题
  • 《When you are old》一如苇中的风,轻柔却难忘
  • JavaFX实战:模拟电子琴弹奏效果,鼠标弹奏一曲piano送给大家
  • 基于VC++和AT89C52单片机的数字存储示波器设计
  • labview与stm32通信
  • OpenHarmony适配移植:X86、ARM、RISC-V、MIPS、LoongArch芯片架构简析
  • DBeaver manual
  • 图解redis(三)——功能篇
  • 数据库的约束和设计
  • Lesson 1 A private conversation
  • 第一季:9SpringMVC中如何解决POST请求中文乱码问题,GET的又如何处理呢【Java面试题】
  • 看了羊了个羊的源码后我发现这个游戏花了最少钱,赚了最多的钱
  • 『 云原生·分布式』分布式基础——Docker容器详解与Docker的安装(Linux与Windows)
  • Web会话跟踪:Cookie与Session
  • 【大数据】Hadoop在呼唤Hive(附一键部署Hive脚本)
  • 【159天】尚学堂高琪Java300集视频精华笔记(128)
  • 【跃迁之路】【519天】程序员高效学习方法论探索系列(实验阶段276-2018.07.09)...
  • Github访问慢解决办法
  • HTTP 简介
  • JavaSE小实践1:Java爬取斗图网站的所有表情包
  • Python 使用 Tornado 框架实现 WebHook 自动部署 Git 项目
  • Python3爬取英雄联盟英雄皮肤大图
  • 第十八天-企业应用架构模式-基本模式
  • 技术:超级实用的电脑小技巧
  • 免费小说阅读小程序
  • 适配mpvue平台的的微信小程序日历组件mpvue-calendar
  • 数据库写操作弃用“SELECT ... FOR UPDATE”解决方案
  • 说说我为什么看好Spring Cloud Alibaba
  • ​批处理文件中的errorlevel用法
  • !!java web学习笔记(一到五)
  • (3)llvm ir转换过程
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第5节(封闭类和Final方法)
  • (pojstep1.1.1)poj 1298(直叙式模拟)
  • (附源码)spring boot基于小程序酒店疫情系统 毕业设计 091931
  • (附源码)ssm教师工作量核算统计系统 毕业设计 162307
  • (三) prometheus + grafana + alertmanager 配置Redis监控
  • (原+转)Ubuntu16.04软件中心闪退及wifi消失
  • (转)机器学习的数学基础(1)--Dirichlet分布
  • .NET 动态调用WebService + WSE + UsernameToken
  • @ 代码随想录算法训练营第8周(C语言)|Day53(动态规划)
  • @property @synthesize @dynamic 及相关属性作用探究
  • @requestBody写与不写的情况
  • [⑧ADRV902x]: Digital Pre-Distortion (DPD)学习笔记
  • [Android Studio] 开发Java 程序
  • [C/C++随笔] char与unsigned char区别
  • [c]扫雷
  • [C++]运行时,如何确保一个对象是只读的
  • [Docker]十二.Docker consul集群搭建、微服务部署,Consul集群+Swarm集群部署微服务实战
  • [GXYCTF2019]BabySQli1
  • [Java] 什么是IoC?什么是DI?它们的区别是什么?
  • [Latex学习笔记]数学公式基本命令
  • [leetcode 数位计算]2520. 统计能整除数字的位数
  • [LeetCode] Sort List
  • [linux c]linux do_div() 函数用法
  • [Loadrunner参数化]一个文件输两列参数的取值