spark: 从pulsar中读取数据
一、依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>pulsar-demo2</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><pulsar.version>2.8.0</pulsar.version><jackson.version>2.10.5</jackson.version><!--<jackson.version>2.6.7</jackson.version>--></properties><dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-kafka</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-spark</artifactId><version>${pulsar.version}</version><exclusions><exclusion><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId></exclusion></exclusions></dependency><!--<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version></dependency>--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency></dependencies></project>
二、demo程序
package cn.edu.tju;import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.spark.SparkStreamingPulsarReceiver;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import java.util.HashSet;
import java.util.Set;public class SparkTest {public static void main(String[] args) throws Exception{String serviceUrl = "pulsar://xx.xx.xx.xx:6650/";String topic = "persistent://public/default/my-topic11";String subs = "test_sub";SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(4));ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();Set<String> set = new HashSet<>();set.add(topic);pulsarConf.setTopicNames(set);pulsarConf.setSubscriptionName(subs);SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(serviceUrl,pulsarConf,new AuthenticationDisabled());JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);JavaDStream<String> resultStream = lineDStream.map(new Function<byte[], String>() {@Overridepublic String call(byte[] bytes) throws Exception {return new String(bytes);}});resultStream.print();jsc.start();jsc.awaitTermination();}
}