当前位置: 首页 > news >正文

Hadoop的streamingAPI与MapReduce[Python]

文章目录

  • 1.创建模拟文本
  • 2. 使用mapperduce统计标签分布和抽取指定标签
  • 3. 运行Map函数并排序结果以模拟Reduce任务:
  • 4.运行在无网络开发机上

1.创建模拟文本

1.1 机器模拟生成

from collections import namedtuple
from faker import Faker# 初始化Faker
fake = Faker()# 定义一个namedtuple类型,包含id, subject, text字段
GaokaoQuestion = namedtuple('GaokaoQuestion', 'id subject text')# 定义生成模拟数据的函数
def generate_faker_data(num_samples):data = []for _ in range(num_samples):# 使用faker生成数据subject = fake.word()text = fake.sentence()# 使用md5生成idid_value = f"{text} {subject}"id_hash = hashlib.md5(id_value.encode('utf-8')).hexdigest()# 创建namedtuple实例question = GaokaoQuestion(id=id_hash, subject=subject, text=text)data.append(question)return data# 生成3条模拟数据
samples = generate_faker_data(3)# 打印生成的数据
for sample in samples:print(sample)

2.手动生成

cat > test_data.jsonl << EOF
{"id":"1", "subject":"Math", "text":"Math question"}
{"id":"2", "subject":"Science", "text":"Science question"}
{"id":"3", "subject":"Math", "text":"Another Math question"}
EOF

2. 使用mapperduce统计标签分布和抽取指定标签

#!/usr/bin/env python3
import sys
import json
from collections import defaultdict# 指定需要抽取的subject标签列表
TARGET_SUBJECTS = ["数学", "物理"]def mapper():for line in sys.stdin:data = json.loads(line)if data['subject'] in TARGET_SUBJECTS:print(json.dumps(data))def reducer():counts = defaultdict(int)for line in sys.stdin:subject, count = line.strip().split('\t')counts[subject] += int(count)for subject, count in counts.items():print(f"{subject}\t{count}")if __name__ == "__main__":if len(sys.argv) > 1 and sys.argv[1] == 'reduce':reducer()else:mapper()

3. 运行Map函数并排序结果以模拟Reduce任务:

cat test_data.jsonl | python3 mapper_reducer.py | sort -k1,1 | python3 mapper_reducer.py reduce

4.运行在无网络开发机上

# 假设input_data.jsonl是HDFS上的输入文件路径
# 假设output是HDFS上输出结果的路径# 运行Map任务
hadoop fs -get /path/to/input_data.jsonl input_data.jsonl
python \Auser\tmp\mapper_reducer_script\mapper_reducer.py | sort -k1,1 > mapped_output.txt# 运行Reduce任务
python \Auser\tmp\mapper_reducer_script\mapper_reducer.py reduce < mapped_output.txt > reduced_output.txt# 将结果上传到HDFS
hadoop fs -put reduced_output.txt /path/to/output/

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 使用 NumPy 生成随机数:一个全面的指南
  • ASC格式的协议数据解析
  • 使用Anaconda安装多个版本的Python并与Pycharm进行对接
  • 为企业创建智能支持 AI 代理
  • RTOS(8)信号量和互斥量
  • “Apple Intelligence”的“系统提示词”被曝光了
  • 【环绕字符串中唯一的子字符串】python刷题记录
  • 数据结构(java实现)——优先级队列,堆
  • NSSCTF练习记录:[SWPUCTF 2021 新生赛]include
  • actual combat 45 分布式事务seata,若依cloud项目Test,xid为null
  • 编译和汇编的区别
  • C++ 异常处理:深入解析与实践应用
  • 第100+20步 ChatGPT学习:R实现Lasso回归
  • LabVIEW远程开发
  • 为什么要推荐R语言?欢迎订阅专栏《R 探索临床数据科学》
  • angular组件开发
  • CSS 三角实现
  • ES6 学习笔记(一)let,const和解构赋值
  • hadoop集群管理系统搭建规划说明
  • If…else
  • JavaScript设计模式之工厂模式
  • k8s如何管理Pod
  • node和express搭建代理服务器(源码)
  • React-Native - 收藏集 - 掘金
  • vue数据传递--我有特殊的实现技巧
  • 编写高质量JavaScript代码之并发
  • 彻底搞懂浏览器Event-loop
  • 工作踩坑系列——https访问遇到“已阻止载入混合活动内容”
  • 海量大数据大屏分析展示一步到位:DataWorks数据服务+MaxCompute Lightning对接DataV最佳实践...
  • 如何学习JavaEE,项目又该如何做?
  • 我的业余项目总结
  • 学习Vue.js的五个小例子
  • 正则与JS中的正则
  • 字符串匹配基础上
  • 第二十章:异步和文件I/O.(二十三)
  • ​LeetCode解法汇总2182. 构造限制重复的字符串
  • # MySQL server 层和存储引擎层是怎么交互数据的?
  • #if 1...#endif
  • #数据结构 笔记一
  • $var=htmlencode(“‘);alert(‘2“); 的个人理解
  • %@ page import=%的用法
  • (1)(1.13) SiK无线电高级配置(五)
  • (1)安装hadoop之虚拟机准备(配置IP与主机名)
  • (14)学习笔记:动手深度学习(Pytorch神经网络基础)
  • (2)从源码角度聊聊Jetpack Navigator的工作流程
  • (42)STM32——LCD显示屏实验笔记
  • (java版)排序算法----【冒泡,选择,插入,希尔,快速排序,归并排序,基数排序】超详细~~
  • (Redis使用系列) Springboot 使用Redis+Session实现Session共享 ,简单的单点登录 五
  • (仿QQ聊天消息列表加载)wp7 listbox 列表项逐一加载的一种实现方式,以及加入渐显动画...
  • (附源码)ssm高校升本考试管理系统 毕业设计 201631
  • (附源码)计算机毕业设计ssm基于Internet快递柜管理系统
  • (论文阅读32/100)Flowing convnets for human pose estimation in videos
  • (亲测)设​置​m​y​e​c​l​i​p​s​e​打​开​默​认​工​作​空​间...
  • (十七)devops持续集成开发——使用jenkins流水线pipeline方式发布一个微服务项目
  • (原創) 如何讓IE7按第二次Ctrl + Tab時,回到原來的索引標籤? (Web) (IE) (OS) (Windows)...