当前位置: 首页 > news >正文

spark: 从pulsar中读取数据

一、依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>pulsar-demo2</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><pulsar.version>2.8.0</pulsar.version><jackson.version>2.10.5</jackson.version><!--<jackson.version>2.6.7</jackson.version>--></properties><dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-kafka</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-spark</artifactId><version>${pulsar.version}</version><exclusions><exclusion><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId></exclusion></exclusions></dependency><!--<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version></dependency>--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency></dependencies></project>

二、demo程序

package cn.edu.tju;import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.spark.SparkStreamingPulsarReceiver;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import java.util.HashSet;
import java.util.Set;public class SparkTest {public static void main(String[] args) throws Exception{String serviceUrl = "pulsar://xx.xx.xx.xx:6650/";String topic = "persistent://public/default/my-topic11";String subs = "test_sub";SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(4));ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();Set<String> set = new HashSet<>();set.add(topic);pulsarConf.setTopicNames(set);pulsarConf.setSubscriptionName(subs);SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(serviceUrl,pulsarConf,new AuthenticationDisabled());JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);JavaDStream<String> resultStream = lineDStream.map(new Function<byte[], String>() {@Overridepublic String call(byte[] bytes) throws Exception {return new String(bytes);}});resultStream.print();jsc.start();jsc.awaitTermination();}
}

相关文章:

  • tcpdump 抓包
  • 基于STELLA系统动态模拟技术及在农业、生态及环境科学中的应用教程
  • WINDOWS设置代理链chain
  • 一文整合工厂模式、模板模式、策略模式
  • 什么是通配符SSL证书?
  • Webgl学习系列-认识Webgl
  • 一、TLE9471 - SBC Mode切换 + VCC2 开关
  • 百度谷歌301强引蜘蛛池效果怎么样
  • 项目中配置多个阿里巴巴矢量图库方案
  • SQL-CRUD-1
  • Available platform plugins are: linuxfb, minimal, offscreen, vnc.
  • Centos7.9备份mysql数据库
  • 【YOLOv5改进系列(5)】高效涨点----添加密集小目标检测NWD方法
  • 【Java程序设计】【C00376】基于(JavaWeb)Springboot的社区帮扶对象管理系统(有论文)
  • android各种软件下载
  • Android框架之Volley
  • FineReport中如何实现自动滚屏效果
  • github指令
  • mysql常用命令汇总
  • PHP那些事儿
  • Python连接Oracle
  • SAP云平台里Global Account和Sub Account的关系
  • 阿里云购买磁盘后挂载
  • 干货 | 以太坊Mist负责人教你建立无服务器应用
  • 构建二叉树进行数值数组的去重及优化
  • 解决jsp引用其他项目时出现的 cannot be resolved to a type错误
  • 什么软件可以剪辑音乐?
  • 宾利慕尚创始人典藏版国内首秀,2025年前实现全系车型电动化 | 2019上海车展 ...
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • ​渐进式Web应用PWA的未来
  • #NOIP 2014#Day.2 T3 解方程
  • #多叉树深度遍历_结合深度学习的视频编码方法--帧内预测
  • (C语言)字符分类函数
  • (分布式缓存)Redis分片集群
  • (附源码)springboot 基于HTML5的个人网页的网站设计与实现 毕业设计 031623
  • (附源码)计算机毕业设计ssm基于B_S的汽车售后服务管理系统
  • (免费分享)基于springboot,vue疗养中心管理系统
  • (原創) 系統分析和系統設計有什麼差別? (OO)
  • (转)PlayerPrefs在Windows下存到哪里去了?
  • (转)微软牛津计划介绍——屌爆了的自然数据处理解决方案(人脸/语音识别,计算机视觉与语言理解)...
  • (转载)虚函数剖析
  • .Net8 Blazor 尝鲜
  • .net专家(高海东的专栏)
  • ??如何把JavaScript脚本中的参数传到java代码段中
  • @data注解_SpringBoot 使用WebSocket打造在线聊天室(基于注解)
  • @FeignClient注解,fallback和fallbackFactory
  • [ NOI 2001 ] 食物链
  • [1]-基于图搜索的路径规划基础
  • [AIR] NativeExtension在IOS下的开发实例 --- IOS项目的创建 (一)
  • [AX]AX2012 SSRS报表Drill through action
  • [caffe(二)]Python加载训练caffe模型并进行测试1
  • [CSS]CSS 的背景
  • [datastore@cyberfear.com].Elbie、[thekeyishere@cock.li].Elbie勒索病毒数据怎么处理|数据解密恢复
  • [docker]docker网络-直接路由模式
  • [Foreman]解决Unable to find internal system admin account