当前位置: 首页 > news >正文

spark的广播变量

直接上代码:
包含了,map,filter,persist,mapPartitions等函数

 String master = "spark://192.168.2.279:7077";
//         jsc = getContext("local[2]");
        jsc = getContext(master);
        sqlContext = new SQLContext(jsc);
        connectionProperties = new Properties();
        connectionProperties.put("user", user);
        connectionProperties.put("password", password);
        connectionProperties.put("driver", "com.mysql.jdbc.Driver");
//        emrdataIndedx(filePath, jsc, sqlContext);//加载emrdataindex数据到mysql
        JavaRDD<String> javaRDD = jsc.textFile(filePath);
        String[] fields = {"pk_dcpv", "code_pvtype", "name_pvtype", "code_ord", "empi", "code_sex"
                , "name_sex", "birthday", "age", "code_dept", "name_dept", "bed", "pk_dcordrisreq"
                , "code_req", "code_rep", "code_rep_type", "name_rep_type", "code_state", "name_state"
                , "code_eu_type", "name_eu_type", "code_eu_item", "name_eu_item", "code_part"
                , "name_part", "create_time", "code_pres", "parent_code"};
        String[] old_type = {"D", "GYN", "X ", "MR ", "L05", "L04",
                "L12", "B ", "OTHC", "DOS", "ECG", "CT ", "UIS", "L02",
                "RIS", "SY ", "CB ", "L01", "ENT", "L03", "EYE", "NSC",
                "L07", "EMG", "NEU", "PTH", "DC", "INF", "GC", "L08",
                "L09", "BD", "L26", "ECT", "GM", "GP", "L10", "EDO",
                "L11", "DER", "EEG", "URO", "PFT", "L25", "RF", "OTH",
                "PIS", "PMR", "PSY", "MPL", "BM", "Z", "EIS", "BED", "BLD",
                "L27", "FOD", "R", "GYP", "CTD", "BDT", "L99", "EUS", "HNS",
                "L91", "SED", "L28", "F", "IED", "FOW", "L31", "OO", "P01", "L13"};
        //广播变量
        final Broadcast<String[]> broadcast = jsc.broadcast(old_type);
        StructType schema = createStructType(fields);
        JavaRDD<Row> mapPartitions1 = javaRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Row>() {
            private static final long serialVersionUID = 1L;
            ObjectMapper mapper = new ObjectMapper();

            @SuppressWarnings("unchecked")
            public Iterator<Row> call(Iterator<String> iterator)
                    throws Exception {
                ArrayList<Row> arrayList = new ArrayList<Row>();
                // TODO Auto-generated method stub
                while (iterator.hasNext()) {
                    try {
                        String next = iterator.next();
                        map_t = mapper.readValue(next, Map.class);
                        for (Entry<String, Object> entry : map_t.entrySet()) {
                            map_s.put(entry.getKey(), String.valueOf(entry.getValue()));
                        }
                    } catch (Exception e) {
                        return null;
                    }
                    Row createOrdPart3Row = createOrdPart3Row(map_s);
                    arrayList.add(createOrdPart3Row);

                }
                return arrayList.iterator();
            }
        });
        JavaRDD<Row> mapPartitions2 = mapPartitions1.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                // TODO Auto-generated method stub
                String pk_dcpv1 = row.getString(0);
                String code_pvtype1 = row.getString(1);
                String code_rep_type1 = row.getString(15);
                return pk_dcpv1.split("_").length == 2
                        && (!"".equals(code_pvtype1) || null != code_pvtype1 || !"P".equals(code_pvtype1))
                        && Arrays.asList(broadcast.value()).contains(code_rep_type1);
            }
        });
        //broadcast不用就销毁
        broadcast.destroy();
        JavaRDD<Row> mapPartitions = mapPartitions2.repartition(100);
        JavaRDD<Row> persist = mapPartitions.persist(StorageLevel.MEMORY_AND_DISK_SER());
        JavaRDD<Row> filter1 = persist.filter(new Function<Row, Boolean>() {
            private static final long serialVersionUID = 1L;

            public Boolean call(Row row) throws Exception {
                // TODO Auto-generated method stub
                return row.getString(0).startsWith("1");
            }
        });

 

转载于:https://www.cnblogs.com/kwzblog/p/10180281.html

相关文章:

  • regsvr32.exe使用详解
  • openwrt下定义软件包的依赖关系类型
  • 找不到IIS Out-Of-Process Pooled Applications
  • 注册表:DWORD
  • 如何提取hyper-v的驱动程序
  • js base64 转成图片上传
  • 超像素、语义分割、实例分割、全景分割 傻傻分不清?
  • webpack压缩图片之项目资源优化
  • 内存释放
  • 我的译作
  • .net core MVC 通过 Filters 过滤器拦截请求及响应内容
  • 百度排名公式最新版
  • Day6:html和css
  • NAT技术及其应用
  • Verilog实现同步FIFO
  • [PHP内核探索]PHP中的哈希表
  • [iOS]Core Data浅析一 -- 启用Core Data
  • ECMAScript入门(七)--Module语法
  • JavaScript新鲜事·第5期
  • laravel5.5 视图共享数据
  • Laravel核心解读--Facades
  • Logstash 参考指南(目录)
  • TiDB 源码阅读系列文章(十)Chunk 和执行框架简介
  • 翻译--Thinking in React
  • 工作中总结前端开发流程--vue项目
  • 简单数学运算程序(不定期更新)
  • 聊聊flink的BlobWriter
  • 买一台 iPhone X,还是创建一家未来的独角兽?
  • 前端临床手札——文件上传
  • 使用Envoy 作Sidecar Proxy的微服务模式-4.Prometheus的指标收集
  • 扩展资源服务器解决oauth2 性能瓶颈
  • ​sqlite3 --- SQLite 数据库 DB-API 2.0 接口模块​
  • ​一些不规范的GTID使用场景
  • # 计算机视觉入门
  • #Js篇:单线程模式同步任务异步任务任务队列事件循环setTimeout() setInterval()
  • #pragma pack(1)
  • $ is not function   和JQUERY 命名 冲突的解说 Jquer问题 (
  • (20)目标检测算法之YOLOv5计算预选框、详解anchor计算
  • (c语言版)滑动窗口 给定一个字符串,只包含字母和数字,按要求找出字符串中的最长(连续)子串的长度
  • (floyd+补集) poj 3275
  • (LeetCode C++)盛最多水的容器
  • (libusb) usb口自动刷新
  • (MATLAB)第五章-矩阵运算
  • (pojstep1.1.1)poj 1298(直叙式模拟)
  • (超详细)语音信号处理之特征提取
  • (第27天)Oracle 数据泵转换分区表
  • (多级缓存)缓存同步
  • (二)学习JVM —— 垃圾回收机制
  • (翻译)Entity Framework技巧系列之七 - Tip 26 – 28
  • (附源码)spring boot公选课在线选课系统 毕业设计 142011
  • (亲测成功)在centos7.5上安装kvm,通过VNC远程连接并创建多台ubuntu虚拟机(ubuntu server版本)...
  • (四)库存超卖案例实战——优化redis分布式锁
  • (原创)Stanford Machine Learning (by Andrew NG) --- (week 9) Anomaly DetectionRecommender Systems...
  • ./include/caffe/util/cudnn.hpp: In function ‘const char* cudnnGetErrorString(cudnnStatus_t)’: ./incl
  • .NET CF命令行调试器MDbg入门(二) 设备模拟器