当前位置: 首页 > news >正文

pyspark之Structured Streaming file文件案例1

# generate_file.py 
# 生成数据 生成500个文件,每个文件1000条数据
# 生成数据格式:eventtime name province action ()时间 用户名 省份 动作)
import os 
import time
import shutil
import time

FIRST_NAME = ['Zhao', 'Qian', 'Sun', 'Li', 'Zhou', 'Wu', 'Zheng', 'Wang']
SECOND_NAME = ['San', 'Si', 'Wu', 'Chen', 'Yang', 'Min', 'Jie', 'Qi']
PROVINCE = ['BeiJing', 'ShanDong', 'ShangHai', 'HeNan', 'HaErBin']
ACTION = ['login', 'logout', 'purchase']

PATH = "/opt/software/tmp/"
DATA_PATH = "/opt/software/tmp/data/"
    # 初始化环境
    def test_Setup():
        if os.path.exists(DATA_PATH):
            shutil.rmtree(DATA_PATH)
        os.mkdir(DATA_PATH)

    # 清理数据,恢复测试环境
    def test_TearDown():
        shutile.rmtree(DATA_PATH)

    # 数据保存文件
    def writeAndMove(filename,content):
        with open(PATH+filename,'wt',encoding='utf-8') as f:
            f.write(content)
        shutil.move(PATH+filename,DATA_PATH+filename)

if __name__ == '__main__':

    test_Setup()
    
    for i in range(500):
        filename = "user_action_{}.log".format(i)
        """
        验证spark输出模式,complete和update,增加代码,第一个文件i=0时,设置PROVINCE = "TAIWAN"
        """
        if i == 0:
            province= ['TaiWan']
        else:
            province = PROVINCE
        content = ""
        for _ in range(1000):
            content += "{} {} {} {}\n".format(str(int(time.time())),random.choice(FIRST_NAME)+random.choice(SECOND_NAME),random.choice(province),random.choice(ACTION))
        writeAndMove(filename,content)    
        time.sleep(10)

# spark_file_test.py  
# 读取DATA文件夹下面文件,按照省份统计数据,主要考虑window情况,按照window情况测试,同时针对    outputMode和输出console和mysql进行考虑,其中保存到mysql时添加batch字段

from pyspark.sql import SparkSession,DataFrame
from pyspark.sql.functions import split,lit,from_unixtime

DATA_PATH = "/opt/software/tmp/data/"

if __name__ == '__main__':
    spark = SparkSession.builder.getOrCreate()
    lines = spark.readStream.format("text").option("seq","\n").load(DATA_PATH)
    # 分隔符为空格
    userinfo = lines.select(split(lines.value," ").alias("info"))
    # 第一个为eventtime  第二个为name   第三个为province  第四个为action 
    # userinfo['info'][0]等同于userinfo['info'].getIterm(0)
    user = userinfo.select(from_unixtime(userinfo['info'][0]).alias('eventtime'),
        userinfo['info'][1].alias('name'),userinfo['info'][2].alias('province'),
        userinfo['info'][3].alias('action'))
    """
    测试1:数据直接输出到控制台,由于没有采用聚合,输出模式选择update
    user.writeStream.outputMode("update").format("console").trigger(processingTime="8 seconds").start().awaitTermination()
    """
    """
    测试2:数据存储到数据库,新建数据库表,可以通过printSchema()查看数据类型情况
    def insert_into_mysql_batch(df:DataFrame,batch):
        if df.count()>0:
        # 此处将batch添加到df中,采用lit函数
            data = df.withColumn("batch",lit(batch))
            data.write.format("jdbc"). \
        option("driver","com.mysql.jdbc.Driver"). \
        option("url","jdbc:mysql://localhost:3306/spark").option("user","root").\
        option("password","root").option("dbtable","user_log").\
        option("batchsize",1000).mode("append").save()    
        else:
            pass
user.writeStream.outputMode("update").foreachBatch((insert_into_mysql_batch)).trigger(processingTime="20 seconds").start().awaitTermination()
    """
    """
    测试3:数据按照省份统计后,输出到控制台,分析complete和update输出模式区别,针对该问题,调整输入,province="TaiWan"只会输入1次,即如果输出方式complete,则每batch都会输出,update的话,只会出现在一个batch
    userProvinceCounts = user.groupBy("province").count()
    userProvinceCounts = userProvinceCounts.select(userProvinceCounts['province'],userProvinceCounts["count"].alias('sl'))
# 测试输出模式complete:complete将总计算结果都进行输出
"""
batch 0
TaiWan 1000
batch 1
TaiWan 1000
其他省份  sl
batch 2
TaiWan 1000
其他省份  sl
"""    userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination() 
# 测试输出模式update:update只输出相比上个批次变动的内容(新增或修改)
batch 0
TaiWan 1000  
batch 1 中没有TaiWan输出
userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination() 
    """

相关文章:

  • 设备通过GB28181注册到EasyCVR,平台看不到设备信息的排查方法汇总
  • OpenCV第 2 课 OpenCV 环境搭建
  • LabVIEW高级CAN通信系统
  • (M)unity2D敌人的创建、人物属性设置,遇敌掉血
  • 深入Android S (12.0) 探索Framework之输入子系统InputReader的流程
  • notepad++: 插件fingertext 来创建代码块
  • 考研过后你如坐针毡,而有些人因选择中国人民大学与加拿大女王大学金融硕士而乐在其中
  • 网络中黑客攻击使用手段Top25漏洞常见参数,8个WAF绕过,一些用于查找敏感文件的语法
  • Windows ssh登录eNSP交换机
  • 直播录屏工具哪家强?让你的直播更精彩!
  • 【VRTK】【PICO】如何快速创建一个用VRTK开发的PICO项目
  • 【算法专题】动态规划之路径问题
  • alfred自定义谷歌翻译workflow
  • AcWing828.模拟栈
  • # 安徽锐锋科技IDMS系统简介
  • create-react-app做的留言板
  • css属性的继承、初识值、计算值、当前值、应用值
  • ES6系统学习----从Apollo Client看解构赋值
  • hadoop入门学习教程--DKHadoop完整安装步骤
  • HTML中设置input等文本框为不可操作
  • iOS | NSProxy
  • JavaScript 事件——“事件类型”中“HTML5事件”的注意要点
  • Python十分钟制作属于你自己的个性logo
  • vue中实现单选
  • 从tcpdump抓包看TCP/IP协议
  • 对象引论
  • 前端相关框架总和
  • 我从编程教室毕业
  • 我这样减少了26.5M Java内存!
  • 学习笔记DL002:AI、机器学习、表示学习、深度学习,第一次大衰退
  • ​iOS安全加固方法及实现
  • ​Spring Boot 分片上传文件
  • ​软考-高级-系统架构设计师教程(清华第2版)【第15章 面向服务架构设计理论与实践(P527~554)-思维导图】​
  • #includecmath
  • (8)STL算法之替换
  • (C++17) std算法之执行策略 execution
  • (day6) 319. 灯泡开关
  • (echarts)echarts使用时重新加载数据之前的数据存留在图上的问题
  • (LNMP) How To Install Linux, nginx, MySQL, PHP
  • (MIT博士)林达华老师-概率模型与计算机视觉”
  • (七)c52学习之旅-中断
  • (四)【Jmeter】 JMeter的界面布局与组件概述
  • (转)Linux整合apache和tomcat构建Web服务器
  • (转)创业的注意事项
  • (转载)OpenStack Hacker养成指南
  • .helper勒索病毒的最新威胁:如何恢复您的数据?
  • .NET core 自定义过滤器 Filter 实现webapi RestFul 统一接口数据返回格式
  • .NET Micro Framework初体验
  • .NET 中选择合适的文件打开模式(CreateNew, Create, Open, OpenOrCreate, Truncate, Append)
  • .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?
  • .net反混淆脱壳工具de4dot的使用
  • .NET面试题(二)
  • .Net中ListT 泛型转成DataTable、DataSet
  • @Not - Empty-Null-Blank
  • @vue/cli 3.x+引入jQuery