当前位置: 首页 > news >正文

c语言fread函数的用法_Sparklyr 1.2支持foreach函数了

刚才在R-blogger上才看到这篇文章,垂死病中惊坐起,马上来试试。


众所周知,foreach常见于R里头的并行程序,我之前也写过相关的文章。

微笑牛油果:R语言多文件合并统计:foreach的高级用法​zhuanlan.zhihu.com

这次就按照上面文章中的用法来测试spark集群的foreach。

p. s. 虽然说是集群,但我只有一个服务器,当做单机测试看看就行。

首先在R里头读入要合并的文件位置,然后看下内容:

> a<-list.files('/home/testfiles',
+               recursive = T, full.names = T, pattern = '.csv')[1:5]
> fread(a[1], nrows = 5)
         V1 V2     V3     V4 V5   V6        V7         V8       V9
1: 12123.31  9 350000 350700 NA 1132 1.6174643 0.53193917 5.893926
2: 12172.31 19 310000 310115  6 1141 1.3583580 0.18948141 4.310760
3: 12173.31 19 310000 310115  6 1141 0.5696340 0.07945995 1.807738
4: 12173.31 19 310000 310115  6 1141 0.3696276 0.06254695 1.253260
5: 12173.31 19 310000 310115  6 1141 1.1088829 0.18764085 3.759780
           V10       V11
1: 0.056843234  222.0000
2: 0.029994972 1151.2273
3: 0.012578537  482.7727
4: 0.008010572  318.2500
5: 0.024031717  954.7500 
> nrow(fread(a[1]))
[1] 8741292

这次只抓5个文件来测试,对这些文件合并统计,以V3来分组统计V9及V10的总和,写一个自定义的合并函数:

> bind_fun<-function(...){
+   data<-rbind(...)[,.(V9 = sum(V9, na.rm = T),
+                       V10 = sum(V10, na.rm = T)),
+                    by = c('V3')]
+ }

按照blog的方法,注册spark app,注册并行后端,开始跑foreach:

> conf <- spark_config()
> conf$spark.executor.memory <- "28GB"
> conf$spark.memory.fraction <- 0.9
> conf$spark.executor.cores <- 4
> conf$spark.cores.max<-8  
> conf$spark.dynamicAllocation.enabled <- "false"
> 
> sc <- spark_connect(master="spark://127.0.1.7:7077", 
+                     version = "2.3.2",
+                     config = conf,
+                     spark_home = "/home/spark-2.3.2-bin-hadoop2.7/")
> 
> registerDoSpark(sc, nocompile = F)
> 
> raw<-foreach(i = a,
+              .packages = c('data.table','dplyr','stringr'),
+              .inorder = F,
+              .combine = 'bind_fun',
+              .multicombine = T,
+              .maxcombine = 3) %dopar% {
+                data<-fread(i)[,.(V9 = sum(V9, na.rm = T),
+                                 V10 = sum(V10, na.rm = T)),
+                              by = c('V3')]
+                return(data)
+              }

注册后端的函数目前只有nocompile ,就这个案例来看,设置为T或F,运行时间差不多都是1.2m。

再来看看FORK的并行:

> cl<-makeCluster(8, type = 'FORK')
> registerDoParallel(cl)
> 
> system.time({
+   raw<-foreach(i = a,
+                .packages = c('data.table','dplyr','stringr'),
+                .inorder = F,
+                .combine = 'bind_fun',
+                .multicombine = T,
+                .maxcombine = 3) %dopar% {
+                  data<-fread(i)[,.(V9 = sum(V9, na.rm = T),
+                                    V10 = sum(V10, na.rm = T)),
+                                 by = c('V3')]
+                  return(data)
+                }
+ })
   user  system elapsed 
  0.013   0.024  11.699 
> 
> stopCluster(cl)
> stopImplicitCluster()

用时12秒,这就很尴尬了。

其实从job的log不难看出这个案例里spark跑foreach慢的原因:

7b518eede0f6c7a8b7a24d45a0bb76c8.png

出现了好几次collect。看起来spark跑foreach程序不适合用来读写数据。


最后分享一下单机standalone模式下一个有效利用CPU资源跑循环的方法。一般来说,Standalone模式可以在一台机子上配置多个worker。Standalone的设置可以参考我之前的文章。比方说这是我机子上worker的配置:

aabf97e0dbf88e2fc8007ca3c6596672.png
手动码一下ip

但实际运行中,在程序各阶段不是所有worker都在工作,这就造成CPU资源闲置。因此我们可以通过后台运行方式来申请多个app,最大限度的压榨每个work的资源。

我们先准备nohup要调用的脚本:

library(sparklyr)
library(data.table)

conf <- spark_config()
conf$spark.executor.memory <- "28GB"
conf$spark.memory.fraction <- 0.9
conf$spark.executor.cores <- 4
conf$spark.cores.max<-8  
conf$spark.dynamicAllocation.enabled <- "false"
#conf$spark.shuffle.service.enabled<-'true'

sc <- spark_connect(master="spark://127.0.1.7:7077", 
                    version = "2.3.2",
                    config = conf,
                    spark_home = "/home/spark-2.3.2-bin-hadoop2.7/")

<你的代码>

将上面的代码存成脚本,放在工作目录下,比方说/home/script.R

解释一下,spark.executor.cores设置为4,因为我的worker的cores也是4spark.cores.max设置为8,则这个app(就是下面的sc)最多用到8个cores。

因此,我们nohup执行script.R之后,worker的状态会显示:

b6a2c005117c3b2d962a465daad5f77f.png

App栏则会出现:

767d332845a199721a0844b3b240cca1.png

可以看到,只有2个worker的cores被调用。然后我们按照同样的方法再nohup一次刚才的脚本,新的app就注册上了:

dfa49644a71ff210c51f66f6a18215a2.png

查看app工作的端口是按照注册顺序来编号的,第一个app就是localip:4040,第二个就是localip:4041,以此类推。

相关文章:

  • 一段对话,解决一个Exchange问题
  • ucosiii源码分析笔记 pdf下载_方舟编译器学习笔记14 DriverRunner源码分析
  • wxWidgets在windows VC++下的安装
  • python serial_浅谈python中的多线程和多进程
  • StatCVS 对使用CVS的项目进行深入统计的开源工具
  • 2020无人用的邮箱和密码大全_2020年之微波炉怎么用 微波炉使用注意事项大全
  • 用C原生API写Symbian日志文件
  • go make function_Go中复制文件的3种技巧
  • 猪猪宝贝
  • 简述python文件操作_Python文件操作的几个要点与示例
  • 读书小结
  • tomcat线程被打满怎么排查_Tomcat面试题(2020最新版)
  • oc引导开机直接进_超级详细的oc引导制作过程(二)——config.plist的制作,从入门到入土...
  • 2007年你需要知道的五大技术
  • python中reader_关于Python 的这几个技巧,你应该知道
  • 分享的文章《人生如棋》
  • [case10]使用RSQL实现端到端的动态查询
  • [NodeJS] 关于Buffer
  • 30秒的PHP代码片段(1)数组 - Array
  • AHK 中 = 和 == 等比较运算符的用法
  • codis proxy处理流程
  • css选择器
  • iOS帅气加载动画、通知视图、红包助手、引导页、导航栏、朋友圈、小游戏等效果源码...
  • MySQL常见的两种存储引擎:MyISAM与InnoDB的爱恨情仇
  • opencv python Meanshift 和 Camshift
  • Promise初体验
  • Quartz实现数据同步 | 从0开始构建SpringCloud微服务(3)
  • WebSocket使用
  • 测试开发系类之接口自动化测试
  • 构造函数(constructor)与原型链(prototype)关系
  • 基于 Babel 的 npm 包最小化设置
  • 我的zsh配置, 2019最新方案
  • 详解NodeJs流之一
  • 用 Swift 编写面向协议的视图
  • 运行时添加log4j2的appender
  • 正则表达式小结
  • 2017年360最后一道编程题
  • ​LeetCode解法汇总2304. 网格中的最小路径代价
  • ​一些不规范的GTID使用场景
  • #gStore-weekly | gStore最新版本1.0之三角形计数函数的使用
  • $$$$GB2312-80区位编码表$$$$
  • (+4)2.2UML建模图
  • (Demo分享)利用原生JavaScript-随机数-实现做一个烟花案例
  • (力扣)循环队列的实现与详解(C语言)
  • (力扣记录)1448. 统计二叉树中好节点的数目
  • (四)鸿鹄云架构一服务注册中心
  • (四)模仿学习-完成后台管理页面查询
  • (一)硬件制作--从零开始自制linux掌上电脑(F1C200S) <嵌入式项目>
  • (转)真正的中国天气api接口xml,json(求加精) ...
  • ./indexer: error while loading shared libraries: libmysqlclient.so.18: cannot open shared object fil
  • .cfg\.dat\.mak(持续补充)
  • .NET 使用 JustAssembly 比较两个不同版本程序集的 API 变化
  • .NET 中使用 TaskCompletionSource 作为线程同步互斥或异步操作的事件
  • .Net6支持的操作系统版本(.net8已来,你还在用.netframework4.5吗)
  • .Net转Java自学之路—基础巩固篇十三(集合)