当前位置: 首页 > news >正文

0基础学习PyFlink——使用DataStream进行字数统计

大纲

  • source
  • Map
    • Splitting
    • Mapping
  • Reduce
    • Keying
    • Reducing
  • 完整代码
  • 结构
  • 参考资料

在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。
在这里插入图片描述
本节介绍的DataStream API,则使用了类似的结构。

source

为了方便,我们依然使用from_collection从内存中读取数据。
和使用Table API类似,我们给from_collection传递的第二参数是每行数据类型。本例中是String,即“A C B”的类型。

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)

可以使用下面指令输出source内容

    source.print()
A C B
A E B
E C D

Map

和上图一样,Map由Splitting和Mapping组成。它们分别将数据切割成做小运算单元,和生成map结构。

Splitting

    def split(line):for s in line.split():yield ssplitted = source.flat_map(split) 

上述splitted的结构输出是

A
C
B
A
E
B
E
C
D

Mapping

Mapping的操作就是将之前的数组结构转换成map结构

mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))

mapped的输出值如下,可以看到它还是按我们输入数据的顺序排列的。

(A,1)
(C,1)
(B,1)
(A,1)
(E,1)
(B,1)
(E,1)
(C,1)
(D,1)

Reduce

Keying

这一步对应于上图中的Shuffling&Sorting,它会将相同key的数据进行分区,以供后面reducing操作使用。

    keyed=mapped.key_by(lambda i: i[0]) 

可以看到keyed数据已经经过排序和聚合了。

(A,1)
(A,1)
(B,1)
(B,1)
(C,1)
(C,1)
(D,1)

Reducing

 reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))

reduce的方法有如下注释

Applies a reduce transformation on the grouped data stream grouped on by the given
key position. The ReduceFunction will receive input values based on the key value.
Only input values with the same key will go to the same reducer.

特别是最后一句非常有用“Only input values with the same key will go to the same reducer”(只有相同Key的输入数据才会进入相同的Reducer中)。这句话意味着上述Keyed的数据会被分组执行,于是就不会出现计算错乱。

(A,2)
(B,2)
(C,2)
(D,1)
(E,2)

完整代码

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data = ["A C B","A E B","E C D"]def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info = Types.STRING()# define the sourcesource = env.from_collection(word_count_data, source_type_info)# source.print()def split(line):for s in line.split():yield ssplitted = source.flat_map(split) # splitted.print()mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))# mapped.print()keyed=mapped.key_by(lambda i: i[0]) # keyed.print()reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))# define the sinkreduced.print()# submit for executionenv.execute()if __name__ == '__main__':word_count()

结构

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/datastream_tutorial/

相关文章:

  • Java操作word
  • 服务器遭受攻击如何处理(记录排查)
  • Redis入门02-基础概念
  • 分类预测 | Matlab实现KOA-CNN-BiLSTM-selfAttention多特征分类预测(自注意力机制)
  • 亲测解决Pytorch TypeError: object of type ‘numpy.int64‘ has no len()
  • 微服务框架SpringcloudAlibaba+Nacos集成RabbitMQ
  • C语言assert函数:什么是“assert”函数
  • 【Java 进阶篇】Java中的响应输出字节数据
  • MySQL - 覆盖索引、回表查询
  • Nacos | 使用 Nginx 转发 Nacos2.x 端口的注意事项
  • 多模态 多引擎 超融合 新生态!2023亚信科技AntDB数据库8.0产品发布
  • 【开发新的】apache common BeanUtils忽略null值
  • C# Onnx 用于边缘检测的轻量级密集卷积神经网络LDC
  • Kafka - 监控工具 Kafka Eagle:实时洞察Kafka集群的利器
  • P2107 小Z的AK计划
  • 深入了解以太坊
  • 【vuex入门系列02】mutation接收单个参数和多个参数
  • 【从零开始安装kubernetes-1.7.3】2.flannel、docker以及Harbor的配置以及作用
  • - C#编程大幅提高OUTLOOK的邮件搜索能力!
  • ES6核心特性
  • Github访问慢解决办法
  • isset在php5.6-和php7.0+的一些差异
  • JSONP原理
  • Lsb图片隐写
  • MaxCompute访问TableStore(OTS) 数据
  • Promise初体验
  • React as a UI Runtime(五、列表)
  • SpringBoot 实战 (三) | 配置文件详解
  • Vultr 教程目录
  • 给初学者:JavaScript 中数组操作注意点
  • 解决jsp引用其他项目时出现的 cannot be resolved to a type错误
  • - 语言经验 - 《c++的高性能内存管理库tcmalloc和jemalloc》
  • python最赚钱的4个方向,你最心动的是哪个?
  • 交换综合实验一
  • ​LeetCode解法汇总2583. 二叉树中的第 K 大层和
  • (10)Linux冯诺依曼结构操作系统的再次理解
  • (Redis使用系列) Springboot 实现Redis 同数据源动态切换db 八
  • (Redis使用系列) SpringBoot中Redis的RedisConfig 二
  • (附源码)spring boot公选课在线选课系统 毕业设计 142011
  • (附源码)springboot青少年公共卫生教育平台 毕业设计 643214
  • (附源码)ssm基于web技术的医务志愿者管理系统 毕业设计 100910
  • (力扣记录)235. 二叉搜索树的最近公共祖先
  • (牛客腾讯思维编程题)编码编码分组打印下标题目分析
  • (全注解开发)学习Spring-MVC的第三天
  • (使用vite搭建vue3项目(vite + vue3 + vue router + pinia + element plus))
  • .helper勒索病毒的最新威胁:如何恢复您的数据?
  • .net refrector
  • .NET 反射的使用
  • .NET 中 GetProcess 相关方法的性能
  • .NET/C# 使窗口永不获得焦点
  • .NET实现之(自动更新)
  • /3GB和/USERVA开关
  • @Conditional注解详解
  • @manytomany 保存后数据被删除_[Windows] 数据恢复软件RStudio v8.14.179675 便携特别版...
  • @SuppressWarnings(unchecked)代码的作用