当前位置: 首页 > news >正文

Apache Kafka(七)- Kafka ElasticSearch Comsumer

Kafka ElasticSearch Consumer

对于Kafka Consumer,我们会写一个例子用于消费Kafka 数据传输到ElasticSearch。

 

1. 构造ElasticSearch 基本代码

我们使用如下代码构造一个 Elastic Search Client,并向 ES写入一个index:

import org.apache.http.HttpHost;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class ElasticSearchConsumer {


    public static void main(String[] args) throws IOException {
        Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
        RestHighLevelClient client = createClient();


        String jsonString = "{\"foo\": \"bar\"}";

        // create an index

        IndexRequest indexRequest = new IndexRequest (
                "kafkademo"
        ).source(jsonString, XContentType.JSON);

        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
        String id = indexResponse.getId();

        logger.info(id);

        // close the client
        client.close();
    }

    public static RestHighLevelClient createClient(){
        String hostname = "xxxxx";

        RestClientBuilder builder = RestClient.builder(
                new HttpHost(hostname, 443, "https"))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                        return httpAsyncClientBuilder;
                    }
                });

        RestHighLevelClient client = new RestHighLevelClient(builder);

        return client;
    }
}

 

在 ES 端查看index 以及条目信息:

> curl https://xxx/_cat/indices?v

health status index     uuid                   pri rep docs.count docs.deleted store.size pri.store.size

green  open   .kibana_1 tQuukokDTbWg9OyQI8Bh4A   1   1          0            0       566b           283b

green  open   .kibana_2 025DtfBLR3CUexrUkX9x9Q   1   1          0            0       566b           283b

green  open   kafkademo elXjncvwQPam7dqMd5gedg   5   1          1            0      9.3kb          4.6kb

green  open   .kibana   ZvzR21YqSOi-8nbjffSuTA   5   1          1            0     10.4kb          5.2kb

 

> curl https://xxx/kafkademo/

{"kafkademo":{"aliases":{},"mappings":{"properties":{"foo":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}}}},"settings":{"index":{"creation_date":"1566985949656","number_of_shards":"5","number_of_replicas":"1","uuid":"elXjncvwQPam7dqMd5gedg","version":{"created":"7010199"},"provided_name":"kafkademo"}}}}

 

2. 向Kafka 生产消息

为了模拟输入到 Kafka 的消息,我们使用一个开源的json-data-generator,github地址如下:

https://github.com/everwatchsolutions/json-data-generator

使用此工具可以很方便地向 Kafka 生产随机的 json数据。

下载此工具后,配置好Kafka broker list地址,启动向Kafka 生产消息:

> java -jar json-data-generator-1.4.0.jar jackieChanSimConfig.json

 

 

3. 将消息发往ElasticSearch

在原有Kafka Consumer 的基础上,我们增加以下代码:

// poll for new data
while(true){
    ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMinutes(100));

    for(ConsumerRecord record : records) {
        // where we insert data into ElasticSearch
        IndexRequest indexRequest = new IndexRequest(
                "kafkademo"
        ).source(record.value(), XContentType.JSON);

        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
        String id = indexResponse.getId();

        logger.info(id);

        try {
            Thread.sleep(1000); // introduce a small delay
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    }

 

可以看到消息被正常发往ElasticSearch,其中随机字符串为插入ES后的 _id:

 

 

转载于:https://www.cnblogs.com/zackstang/p/11428045.html

相关文章:

  • Apache Kafka(八)- Kafka Delivery Semantics for Consumers
  • liquibase 注意事项
  • Red Team远程控制软件
  • upload-labs 上传漏洞靶场环境以及writeup
  • Hive on Tez 中 Map 任务的数量计算
  • countUp.js-数字滚动效果(简单基础使用)
  • Windows 搭建 nginx rtmp服务器
  • MySQL的sql_mode模式说明及设置
  • my read law / notarization / gongzheng
  • 我要成为怎么样的人
  • 虚拟机enp0s8网卡无法联网和开放linux端口
  • ANT下载和配置 IDEA
  • 目标检测: R-CNN原理
  • win10下cmake 编译tensorflow1.11.0
  • 目标检测: Fast R-CNN原理
  • [NodeJS] 关于Buffer
  • “Material Design”设计规范在 ComponentOne For WinForm 的全新尝试!
  • 77. Combinations
  • Java的Interrupt与线程中断
  • Java反射-动态类加载和重新加载
  • leetcode98. Validate Binary Search Tree
  • leetcode讲解--894. All Possible Full Binary Trees
  • oschina
  • SpiderData 2019年2月25日 DApp数据排行榜
  • spring + angular 实现导出excel
  • 百度地图API标注+时间轴组件
  • 机器学习学习笔记一
  • 简析gRPC client 连接管理
  • 力扣(LeetCode)22
  • 目录与文件属性:编写ls
  • 区块链共识机制优缺点对比都是什么
  • 移动端解决方案学习记录
  • 翻译 | The Principles of OOD 面向对象设计原则
  • 格斗健身潮牌24KiCK获近千万Pre-A轮融资,用户留存高达9个月 ...
  • 关于Android全面屏虚拟导航栏的适配总结
  • #define 用法
  • #调用传感器数据_Flink使用函数之监控传感器温度上升提醒
  • (pojstep1.3.1)1017(构造法模拟)
  • (pytorch进阶之路)CLIP模型 实现图像多模态检索任务
  • (Redis使用系列) SpringBoot 中对应2.0.x版本的Redis配置 一
  • (附源码)springboot“微印象”在线打印预约系统 毕业设计 061642
  • (每日持续更新)信息系统项目管理(第四版)(高级项目管理)考试重点整理第3章 信息系统治理(一)
  • (篇九)MySQL常用内置函数
  • (强烈推荐)移动端音视频从零到上手(上)
  • (五)MySQL的备份及恢复
  • (一)Thymeleaf用法——Thymeleaf简介
  • (原創) 如何安裝Linux版本的Quartus II? (SOC) (Quartus II) (Linux) (RedHat) (VirtualBox)
  • (转)http-server应用
  • .NET Core SkiaSharp 替代 System.Drawing.Common 的一些用法
  • .NET/C# 使用反射注册事件
  • .net获取当前url各种属性(文件名、参数、域名 等)的方法
  • .NET企业级应用架构设计系列之技术选型
  • .NET上SQLite的连接
  • [100天算法】-二叉树剪枝(day 48)
  • [Android Pro] Notification的使用