当前位置: 首页 > news >正文

Spark中利用Scala进行数据清洗(代码)

2019-05-07 18:56:18
1

  package com.amoscloud.log.analyze 2 3 import java.text.SimpleDateFormat 4 import java.util.Date 5 6 import org.apache.spark.rdd.RDD 7 import org.apache.spark.{SparkConf, SparkContext} 8 9 object LogAnalyze1 { 10 def main(args: Array[String]): Unit = { 11 12 13 val conf = new SparkConf().setMaster("local[2]").setAppName("LogAnalyze2") 14 val sc = new SparkContext(conf) 15 16 val data = sc.textFile("C:\\Users\\Administrator\\Desktop\\HTTP.txt") 17 data.cache() 18 // 1.(手机号,归属地,设备品牌,设备型号,连接时长) 19 // analyze1(data) 20 // 2.(时间段秒,访问流量) 21 analyze2(data) 22 // 3.(品牌,Array[(String,Int)]((型号1,个数1),(型号2,个数2))) 23 // analyze(data) 24 } 25 26 private def analyze(data: RDD[String]) = { 27 data.filter(_.split(",").length >= 72) 28 .map(x => { 29 val arr = x.split(",") 30 val brand = arr(70) 31 val model = arr(71) 32 ((brand, model), 1) 33 }) 34 .reduceByKey(_ + _) 35 .map(t => { 36 val k = t._1 37 (k._1, (k._2, t._2)) 38 }) 39 .groupByKey() 40 .collect() 41 .foreach(println) 42 } 43 44 private def analyze2(data: RDD[String]) = { 45 data.map(x => { 46 val arr = x.split(",") 47 val time = arr(16).take(arr(16).length - 4) 48 val flow = arr(7).toLong 49 (time, flow) 50 }) 51 .reduceByKey(_ + _) 52 // .map(x => (x._1, (x._2 / 1024.0).formatted("%.3f") + "KB")) 53 .map(x => (x._1, x._2)) 54 .collect() 55 .foreach(println) 56 } 57 58 private def analyze1(data: RDD[String]) = { 59 data 60 .filter(_.split(",").length >= 72) 61 .map(x => { 62 val arr = x.split(",") 63 val phoneNum = arr(3).takeRight(11) 64 val local = arr(61) + arr(62) + arr(63) 65 val brand = arr(70) 66 val model = arr(71) 67 val connectTime = timeDiff(arr(16), arr(17)) 68 (phoneNum + "|" + local + "|" + brand + "|" + model, connectTime) 69 // 1.(手机号,归属地,设备品牌,设备型号,连接时长) 70 }) 71 .reduceByKey(_ + _) 72 .map(t => (t._1, formatTime(t._2))) 73 .collect() 74 .foreach(println) 75 } 76 77 def timeDiff(time1: String, time2: String): Long = { 78 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 79 val timeStamp2 = sdf.parse(time2.take(time2.length - 4)).getTime + time2.takeRight(3).toLong 80 val timeStamp1 = sdf.parse(time1.take(time1.length - 4)).getTime + time1.takeRight(3).toLong 81 timeStamp2 - timeStamp1 82 } 83 84 85 def formatTime(time: Long): String = { 86 val timeS = time / 1000 87 val s = timeS % 60 88 val m = timeS / 60 % 60 89 val h = timeS / 60 / 60 % 24 90 h + ":" + m + ":" + s 91 } 92 93 }

 

2:写spark程序统计iis网站请求日志中 每天每个小时段成功访问ip的数量

package com.amoscloud.log.analyze

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

import scala.collection.mutable

object LogAnalyze {
  def main(args: Array[String]): Unit = {
    //    写spark程序统计iis网站请求日志中 每天每个小时段成功访问ip的数量

    //获取sc
    val conf = new SparkConf().setAppName("LogAnalyze").setMaster("local[2]")
    val sc = new SparkContext(conf)

    //读取数据
    val log: RDD[String] = sc.textFile("C:\\Users\\Administrator\\Desktop\\iis网站请求日志")


    //将日志中,日期,时间,IP和响应码 保留
    log
      .filter(_.split("\\s").length > 10)

      .map(line => {
        val strings = line.split("\\s+")
        //RDD[(String,String,String,String)]
        (strings(0), strings(1).split(":")(0), strings(8), strings(10))
      })
      //RDD[(String,String,String,String)]
      .filter(_._4 == "200")
      //RDD[(日期|时间,IP)]
      .map(t => (t._1 + "|" + t._2, t._3))
      //RDD[(日期|时间,Iterable[IP])]

      .groupByKey()
      .map(t => (t._1, t._2.toList.size, t._2.toList.distinct.size))

      .collect()
      .foreach(t => {
        val spl = t._1.split("\\|")
        printf("%s\t%s\t%d\t%d\n", spl(0), spl(1), t._2, t._3)
      })


    //数据按照 日期和时间进行分区  相同key的数据都在同一个分区中
    //      .partitionBy(new HashPartitioner(48))
    //      .mapPartitions((iter: Iterator[(String, String)]) => {
    //        val set = mutable.HashSet[String]()
    //        var count = 0
    //        var next = ("", "")
    //        while (iter.hasNext) {
    //          next = iter.next()
    //          count += 1
    //          set.add(next._2)
    //        }
    //        ((next._1, count, set.size) :: Nil).iterator
    //      })
    //      .filter(_._1.nonEmpty)
  }
}

                                                    更灵活的运用spark算子,意味着写更少的代码

                                                                2019-05-07 19:06:57

转载于:https://www.cnblogs.com/Vowzhou/p/10827349.html

相关文章:

  • Flask搭建简单的get请求
  • linux 源码安装JAVA jdk
  • Spring事务管理之几种方式实现事务(转)
  • 【转】np.random.random()函数 参数用法以及numpy.random系列函数大全
  • 从零开始手写一个优化版React脚手架
  • JavaWeb学习笔记(十二)--Session案例
  • 实验:basic验证,组验证
  • 携程、阿里、京东、腾讯iOS春招面试过程以及面试题总结!
  • 003-软件质量模型的6大特性27个子特性(转)
  • 使用SpringSession管理分布式会话时遇到的反序列化问题
  • c语言程序设计第1章
  • 计算机的三大原则
  • Java并发之AQS详解
  • htaccess隐藏index.php,301重定向等等..
  • CF241B Friends
  • Android开源项目规范总结
  • express + mock 让前后台并行开发
  • happypack两次报错的问题
  • IOS评论框不贴底(ios12新bug)
  • isset在php5.6-和php7.0+的一些差异
  • LintCode 31. partitionArray 数组划分
  • python docx文档转html页面
  • ⭐ Unity 开发bug —— 打包后shader失效或者bug (我这里用Shader做两张图片的合并发现了问题)
  • Vue小说阅读器(仿追书神器)
  • Zepto.js源码学习之二
  • 分布式任务队列Celery
  • 简单易用的leetcode开发测试工具(npm)
  • 精益 React 学习指南 (Lean React)- 1.5 React 与 DOM
  • 为什么要用IPython/Jupyter?
  • 想晋级高级工程师只知道表面是不够的!Git内部原理介绍
  • 小试R空间处理新库sf
  • 运行时添加log4j2的appender
  • Android开发者必备:推荐一款助力开发的开源APP
  • #android不同版本废弃api,新api。
  • #鸿蒙生态创新中心#揭幕仪式在深圳湾科技生态园举行
  • #我与Java虚拟机的故事#连载10: 如何在阿里、腾讯、百度、及字节跳动等公司面试中脱颖而出...
  • (C语言)输入一个序列,判断是否为奇偶交叉数
  • (done) NLP “bag-of-words“ 方法 (带有二元分类和多元分类两个例子)词袋模型、BoW
  • (MonoGame从入门到放弃-1) MonoGame环境搭建
  • (vue)页面文件上传获取:action地址
  • (读书笔记)Javascript高级程序设计---ECMAScript基础
  • (二)Pytorch快速搭建神经网络模型实现气温预测回归(代码+详细注解)
  • (附源码)spring boot建达集团公司平台 毕业设计 141538
  • (力扣)循环队列的实现与详解(C语言)
  • (牛客腾讯思维编程题)编码编码分组打印下标(java 版本+ C版本)
  • (推荐)叮当——中文语音对话机器人
  • (一)appium-desktop定位元素原理
  • .a文件和.so文件
  • .bat批处理(四):路径相关%cd%和%~dp0的区别
  • .NET Core 通过 Ef Core 操作 Mysql
  • .net core 源码_ASP.NET Core之Identity源码学习
  • .NET 常见的偏门问题
  • .NET 将多个程序集合并成单一程序集的 4+3 种方法
  • .NetCore项目nginx发布
  • @ComponentScan比较