当前位置: 首页 > news >正文

Pyspark_结构化流3

Pyspark

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Pyspark_结构化流3
#博学谷IT学习技术支持


文章目录

  • Pyspark
  • 前言
  • 一、CheckPoint检查点目录设置
  • 二、Spark 和 Kafka 整合
    • 1.从kafka中读取数据
    • 2.数据写入Kafka中
  • 总结


前言

接上次继续Pyspark_结构化流,今天主要是结合kafka操作,也是实际工作中用的最多的方式。


一、CheckPoint检查点目录设置

设置检查点, 目的是为了提供容错性, 当程序出现失败了, 可以从检查点的位置, 直接恢复处理即可, 避免出现重复处理的问题
方式一: 基于DataStreamWrite设置
streamDF.writeStream.option(“checkpointLocation”, “path”)
方式二: SparkConf设置
sparkConf.set(“spark.sql.streaming.checkpointLocation”, “path”)
在后续和Kafka对接后, 检查点尤其重要意义: 整合后, Spark不屑使用Kafka的偏移量维护, 将整个生产和消费的偏移量信息维护工作, 交给了自己的checkpoint来处理, 从而确定消息处理准确性

二、Spark 和 Kafka 整合

1.从kafka中读取数据

import os
from pyspark.sql import SparkSession

# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("spark 消费kafka的数据")

    # 1- 创建SparkSession
    spark = SparkSession.builder.appName("spark streaming kafka").master("local[*]") \
        .config('spark.sql.shuffle.partitions', 4) \
        .getOrCreate()

    # 2- 对接Kafka, 从Kafka中获取消息数据
    # endingOffsets只支持批处理
    # .option("endingOffsets", """{"stream_topic":{"0":-1,"1":-1,"2":-1}}""")
    df = spark.readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092,node3:9092') \
        .option('subscribe', 'stream_topic') \
        .load()

    # 3- 处理数据
    df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", 'topic', 'partition', 'offset', 'timestamp',
                       'timestampType')

    # 4- 输出
    df.writeStream.format('console').outputMode('append').start().awaitTermination()

2.数据写入Kafka中

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os

# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("演示: 将数据写入到Kafka中")

    # 1- 创建SparkSession镀锡
    spark = SparkSession.builder \
        .appName('file_source') \
        .master('local[1]') \
        .config('spark.sql.shuffle.partitions', 4) \
        .getOrCreate()

    # 2- 对接数据源: 从端口号中获取消息数据
    df = spark.readStream \
        .format('socket') \
        .option('host', 'node1') \
        .option('port', 44444) \
        .load()

    # 3- 处理数据
    df.createTempView('t1')

    df = spark.sql("""
            select
                split(value,'-')[0] as key,
                split(value,'-')[1] as value
            from t1
    """)

    # 4- 输出数据
    df.writeStream.format('console').outputMode('append').start()

    # 对接kafka , 将数据输出到kafka上
    df.writeStream\
        .format('kafka')\
        .option('kafka.bootstrap.servers','node1:9092,node2:9092,node3:9092')\
        .option('topic','stream_topic01')\
        .option('checkpointLocation','/structured/chk')\
        .start()\
        .awaitTermination()


总结

今天主要和大家分享了如何用Pyspark_结构化流结合kafka操作。

相关文章:

  • 【云原生】Kubernetes(k8s)部署 MySQL+Dubbo+Nacos服务
  • Linux使用:环境变量指南和CPU和GPU利用情况查看
  • 自学大数据第13天~Hbase数据库操作
  • FPGA 20个例程篇:20.USB2.0/RS232/LAN控制并行DAC输出任意频率正弦波、梯形波、三角波、方波(三)
  • Java 基本数据类型
  • 做了个springboot接口参数解密的工具,我给它命名为万能钥匙(已上传maven中央仓库,附详细使用说明)
  • ChatGPT如何批量撰写最新的热点自媒体文章
  • 虚拟动力数字人技术亮相第六届文创产业大会,探见元宇宙 驱动新文创
  • 内存优化小结
  • Android复习总结
  • 回溯算法37:解数独
  • Web前端 jQuery
  • 银行数字化转型导师坚鹏:银行产品经理技能快速提升之道
  • aws beanstalk 实例日志和环境状态日志的轮换和流式传输配置
  • GameFramework框架详解之 Scene场景
  • 【译】理解JavaScript:new 关键字
  • 0x05 Python数据分析,Anaconda八斩刀
  • Docker 1.12实践:Docker Service、Stack与分布式应用捆绑包
  • javascript数组去重/查找/插入/删除
  • JWT究竟是什么呢?
  • Phpstorm怎样批量删除空行?
  • React Native移动开发实战-3-实现页面间的数据传递
  • ReactNativeweexDeviceOne对比
  • Spark VS Hadoop:两大大数据分析系统深度解读
  • webpack项目中使用grunt监听文件变动自动打包编译
  • 代理模式
  • 分享一个自己写的基于canvas的原生js图片爆炸插件
  • 基于webpack 的 vue 多页架构
  • 模仿 Go Sort 排序接口实现的自定义排序
  • 如何实现 font-size 的响应式
  • 使用putty远程连接linux
  • 译米田引理
  • 赢得Docker挑战最佳实践
  • 400多位云计算专家和开发者,加入了同一个组织 ...
  • NLPIR智能语义技术让大数据挖掘更简单
  • #android不同版本废弃api,新api。
  • #每日一题合集#牛客JZ23-JZ33
  • (1)(1.13) SiK无线电高级配置(五)
  • (10)工业界推荐系统-小红书推荐场景及内部实践【排序模型的特征】
  • (安全基本功)磁盘MBR,分区表,活动分区,引导扇区。。。详解与区别
  • (几何:六边形面积)编写程序,提示用户输入六边形的边长,然后显示它的面积。
  • (三十五)大数据实战——Superset可视化平台搭建
  • (学习日记)2024.04.04:UCOSIII第三十二节:计数信号量实验
  • (一)插入排序
  • (原)Matlab的svmtrain和svmclassify
  • (转)Android学习笔记 --- android任务栈和启动模式
  • (转)EXC_BREAKPOINT僵尸错误
  • (转)linux下的时间函数使用
  • (转)Sublime Text3配置Lua运行环境
  • ***通过什么方式***网吧
  • **Java有哪些悲观锁的实现_乐观锁、悲观锁、Redis分布式锁和Zookeeper分布式锁的实现以及流程原理...
  • .Net Remoting常用部署结构
  • .NET 跨平台图形库 SkiaSharp 基础应用
  • .net6解除文件上传限制。Multipart body length limit 16384 exceeded
  • .NET开源全面方便的第三方登录组件集合 - MrHuo.OAuth