当前位置: 首页 > news >正文

文本大数据挖掘项目(Go语言)

文本大数据挖掘项目:

       处理1G左右的网上获取的kf数据

 

完整代码见:https://github.com/skyerhxx/TextBigDataProcess

 

开发环境

  • 版本:Go SDK 1.13.5
  • IDE:GoLand/Vscode

 

项目结构

 

 

 

这么大的数据你用txt打开也是打不开的 (除非内存32G+?)

但可以通过这样进行缓冲查看

 

由于我没有搞到这个1G大小的数据(心塞),所以我通过OCR从视频中识别了一些自制了一个70K作用的文件,数据格式什么都是一样的,用于学习这个项目

 

整个项目要注意的一点是,文本中的逗号是汉语逗号,因为处理数据要将数据按照逗号切片,所以如果用了英语逗号是根本得不出结果来的

 

 

文本大数据读取

通过 ioutil.ReadFile(filename) API可以傻瓜式地一次性读入全部数据

然而考虑到数据总量在1G以上,一次性读入内存,不但增加内存压力,产生爆内存的风险,还会长时间阻塞,效率极低;

正确的做法是,使用缓冲区进行逐行读取

package main

import (
	"bufio"
	"fmt"
	"io"
	"io/ioutil"
	"os"
	"strings"
)

//一次性将全部数据读入内存
func main1() {

	contentBytes,err := ioutil.ReadFile("d:/golang/src/go_code/TextBigDataProcess/kaifangX.txt")
	if err != nil {
		fmt.Println("读入失败",err)
	}
	contentStr := string(contentBytes)

	//逐行打印
	lineStrs := strings.Split(contentStr,"\n\r")
	for _,lineStr := range lineStrs {
		fmt.Println(lineStr)

	}

}

//基于磁盘和缓存的读取
func main() {

	file,_ := os.Open("d:/golang/src/go_code/TextBigDataProcess/kaifangX.txt")
	defer file.Close()

	reader := bufio.NewReader(file)
	for {
		lineBytes, _, err := reader.ReadLine()
		if err == io.EOF {
			break
		}
	fmt.Println(string(lineBytes))
	}

}

视频中还有写将gbk编码处理成utf-8的代码

因为我的txt文件本身就是utf-8编码的,所以可以直接读入,不需要转换,节省了好一段代码

 

数据清洗

      一些数据条目的第二个字段并不为身份证号,而是一些邮政编码或者座机号,对我们用处不大,所以我们先来做一些清洗。

      把好的数据放在一块,坏的数据也不要轻易扔掉,也总结在一起,以后万一用其他的挖掘方式还可能用到

 

      没清洗前的数据叫脏数据

 

将有带身份证号的数据整理到一起

package main

import (
	"bufio"
	"fmt"
	"io"
	"os"
	"strings"
)

func main() {
	//基于磁盘和缓存的读取
	file,_ := os.Open("d:/golang/src/go_code/TextBigDataProcess/kaifangX.txt")
	defer file.Close()

	//准备一个优质文件
	goodFile,_ := os.OpenFile("d:/golang/src/go_code/TextBigDataProcess/kaifang_good.txt", os.O_WRONLY | os.O_CREATE|os.O_APPEND, 0644)
	defer goodFile.Close()

	//准备一个劣质文件
	badFile,_ := os.OpenFile("d:/golang/src/go_code/TextBigDataProcess/kaifang_bad.txt", os.O_WRONLY | os.O_CREATE|os.O_APPEND, 0644)
	defer badFile.Close()

	reader := bufio.NewReader(file)
	for {
		lineBytes, _, err := reader.ReadLine()
		if err == io.EOF {
			break
		}
		lineStr := string(lineBytes)

		fields := strings.Split(lineStr, ",")
		if len(fields) >1 && len(fields[1])==18{  //防止有空白行
		                                         //身份证号18位,这里偷懒了,仔细的话应该用正则判断
			//摘取到优质文件中
			goodFile.WriteString(lineStr+"\n")
			fmt.Println("Good: ",lineStr)
		}else{
			badFile.WriteString(lineStr+"\n")
			fmt.Println("Bad: ",lineStr)
		}
	}

}

视频中的处理1G的数据时间大概9分钟
主要时间还是花在了打印到屏幕上,不然快得多
 

 

省份划分

根据身份证中的省份信息,将不同省份的开房者存入不同的文件中

 

从身份证号可以看出某人的所在地
前两位代表省份,第一位代表一个大区

查看源图像

北京市 11
天津市 12
河北省 13
山西省 14
内蒙古自治区 15
辽宁省 21
吉林省 22
黑龙江省 23
上海市 31
江苏省 32
浙江省 33
安徽省 34
福建省 35
江西省 36
山东省 37
河南省 41
湖北省 42
湖南省 43
广东省 44
广西壮族自治区 45
海南省 46
重庆市 50
四川省 51
贵州省 52
云南省 53
西藏自治区 54
陕西省 61
甘肃省 62
青海省 63
宁夏回族自治区 64
新疆维吾尔自治区 65
台湾省 71
香港特别行政区 81
澳门特别行政区 91

 

用到了并发(协程)和管道,因为并发向34个省份文件中写入数据的效率显然是要高于一个线程的

因为要频繁的读取文件,所以是IO密集型,所以选择多协程实现并发更好

 

整体思路如下:

  • 主协程负责逐行读取文本大数据
  • 另外开设34条子协程,负责对不同省份文件进行写入,从34个不同的管道中扫描数据并写出文件
  • 主协程根据身份证号反映的不同省份,将读入的信息丢入不同的管道,由对应的子协程进行文件写出;
  • 当文件读取完毕时,关闭所有的数据管道(通知子协程停止数据扫描);
  • 主协程通过等待组等待所有子协程完成任务;


 

ProvinceDivison.go

package main

import (
	"bufio"
	"fmt"
	"io"
	"os"
	"strings"
	"sync"
)

type Province struct {
	Id string	//身份证号前两位
	Name string
	File *os.File  //黑龙江.txt
	               //指针
	chanData chan string //本省文件的数据管道
}

var wg sync.WaitGroup
func writeFile(province *Province){

	//死循环读取管道,管道关闭时循环结束
	for lineStr := range province.chanData{
		province.File.WriteString(lineStr)
		fmt.Print(province.Name,"写入",lineStr)
	}
	//标记协程结束
	wg.Done()
}

func main(){
	//创建34个省份的实例
	pMap := make(map[string]*Province)
	ps := []string{"北京市11","天津市12","河北省13","山西省14","内蒙古自治区15","辽宁省21","吉林省22","黑龙江省23","上海市31","江苏省32","浙江省33","安徽省34","福建省35","江西省36","山东省37","河南省41","湖北省42","湖南省43","广东省44","广西壮族自治区45","海南省46","重庆市50","四川省51","贵州省52","云南省53","西藏自治区54","陕西省61","甘肃省62","青海省63","宁夏回族自治区64","新疆维吾尔自治区65","台湾省71","香港特别行政区81","澳门特别行政区91"}
	for _,p := range ps{
		name := p[:len(p)-2]
		id := p[len(p)-2:]
		province := Province{Id: id, Name: name}
		pMap[id] = &province

		//为每个省份打开一个文件
		file, _ := os.OpenFile("d:/golang/src/go_code/TextBigDataProcess/text/"+province.Name+".txt", os.O_WRONLY|os.O_CREATE | os.O_TRUNC,0644)
		province.File = file
		defer file.Close()

		//创建每个省的数据管道
		province.chanData = make(chan string)
		fmt.Println("管道已经创建")
	}

	//每个省份各起一条协程
	for _,province := range pMap{
		wg.Add(1)
		go writeFile(province)
	}

	//读入优质数据
	file, _ := os.Open("d:/golang/src/go_code/TextBigDataProcess/text/kaifang_good.txt")
	defer file.Close()
	reader := bufio.NewReader(file)

	//逐行判断身份证号的前两位
	for{
		lineBytes, _, err := reader.ReadLine()
		//读取完毕时,关闭所有数据管道
		if err == io.EOF {

			for _,province := range pMap{
				close(province.chanData)
				fmt.Println("管道已关闭")
			}
			break
		}
		//拿出省份ID
		lineStr := string(lineBytes)
		fieldsSlice := strings.Split(lineStr, ",")
		id := fieldsSlice[1][0:2]

		//对号入座,写入响应的管道
		if province,ok := pMap[id];ok{
			province.chanData <- (lineStr + "\n")
		}else{
			fmt.Println("莫名其妙的省",id)
		}
	}

	//阻塞等待协程结束
	wg.Wait()
}

如果不用指针的话会导致死锁

文件经大小排序之后

可见如果但从这个数据来看,可能上海人最能开房?23333333

 

视频中的处理1G的数据时间大概5分钟

 

年龄划分

同省份划分一样,给每个年代开辟一个协程,并发写入

主程序往管道里丢数据,响应的子协程从管道中读出数据写入到每个年份的文件当中

 

package main

import (
	"bufio"
	"fmt"
	"io"
	"os"
	"strconv"
	"strings"
	"sync"
)

type Ager struct {

	decade string  //年代, 即 190x,...,197x,198x,199x,200x,201x
	file *os.File
	chanData chan string
}

var dWg sync.WaitGroup
func write2File(ager *Ager) {

	for contentStr := range ager.chanData{
		ager.file.WriteString(contentStr)
		fmt.Print(ager.decade,"x 写入 ",contentStr)
	}
	dWg.Done()
}


func main(){

	//创建一大堆年代对象
	agersMap := make(map[string]*Ager)
	for i := 190;i<202;i++{
		ager := Ager{decade: strconv.Itoa(i)}
		file,_ := os.OpenFile("D:/golang/src/go_code/TextBigDataProcess/text/age-text/"+ager.decade+"x.txt", os.O_CREATE | os.O_WRONLY|os.O_TRUNC,0644)
		ager.file = file
		defer ager.file.Close()
		ager.chanData = make(chan string,0)
		agersMap[ager.decade] = &ager
	}
	//为每一个年代开辟一个写入协程
	for _,ager :=range agersMap {
		dWg.Add(1)
		go write2File(ager)
	}

	//读入未分类数据
	file,_ :=os.Open("d:/golang/src/go_code/TextBigDataProcess/text/kaifang_good.txt")
	defer file.Close()
	reader := bufio.NewReader(file)

	for{
		//断行-判断年代-丢入响应的管道
		lineStr,err := reader.ReadString('\n')
		if err == io.EOF {
			for _,ager := range agersMap {
				close(ager.chanData)
			}
			break
		}
		decade := strings.Split(lineStr,",")[1][6:9]
		if ager:=agersMap[decade];ager!=nil{
			agersMap[decade].chanData <- lineStr + "\n"
		}else{
			fmt.Println("\n\n\n\n\n\n\n\n",lineStr,"\n\n\n\n\n\n\n\n")
		}
	}

	//阻塞等待结束
	dWg.Wait()
}

从这个数据来看,80后最能开房,然后是70后

毕竟那个年纪,有钱有地位2333333

 

视频中的处理1G的数据时间大概6分钟

 

信息入库

      将文本大数据以适当的结构存入MySQL数据库;

      在终端循环输入要查询的姓名,对开房记录进行查询;

      实现精确查询和模糊查询:

      实现内存-数据库的二级缓存策略,并显示每一次查询的时间消耗;

 

不可能每次都入库,数据太大了,所以只要入库成功我们就做个标记,以后就不入了

内存会逐渐增长起来

如果是单个程序还好一点,如果是服务器端程序,万人查,内存缓存会迅速涨起来。我们就需要想一个办法使其动态平衡,久远的没人查的数据就把它请出去,内存不能无限暴涨。例如我就给你分配500M用于内存缓存,那么超了500M我们就要请出一部分

逐条入库,我们读每条数据然后print到屏幕上时间不会很长,但是入库的话要很长时间,所以逐条入库还要使用并发

 

新建数据库kaifang

在数据库中新建表kfperson

 

单协程插入数据库

KaifangInformationSearch.go

package main

import (
	"bufio"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"github.com/jmoiron/sqlx"
	"io"
	"os"
	"strings"
)

//错误处理函数
func HandleError(err error,why string){
	if err != nil {
		fmt.Println("ERROR OCCURED!!!",err,why)
	}
}


//将文本大数据入库
//入库成功后,做一个文件标记,下一次见到标记就不再执行入库操作
func init() {

	//打开数据库
	db,err := sqlx.Open("mysql","root:root@tcp(127.0.0.1:3306)/kaifang")
	HandleError(err,"sqlx.Open")
	defer db.Close()

	//打开大数据文件
	file,e := os.Open("d:/golang/src/go_code/TextBigDataProcess/text/kaifang_good.txt")
	HandleError(e,"os.Open")
	defer file.Close()
	reader := bufio.NewReader(file)


	//分批次读入大数据文本
	//还是要基于缓存的读取
	for{
		lineBytes, _, err := reader.ReadLine()

		//如果读到了文件尾
		if err == io.EOF {
			break
		}
		HandleError(err,"reader.ReadLine")
		//逐条入库(并发)
		lineStr := string(lineBytes)
		fields := strings.Split(lineStr, ",")
		name,idcard := fields[0],fields[1]

		result,err := db.Exec("insert into kfperson(name,idcard) values(?,?);",name,idcard)
		HandleError(err,"db.Exec insert")
		if n,e := result.RowsAffected(); e == nil && n > 0{
			fmt.Printf("插入 %s 成功!\n",name)

		}



	}

	fmt.Println("数据初始化成功!")


}

func main() {
	//接受用户想要查询的姓名
	//这就是二级缓存
	//先查看内存中是否有结果
	//如果内存中没有,查数据库——查到的结果丢入内存
	//如果内存中有,就不是二级了,就一级就行了
	fmt.Println("INSERT OVER!")

}

 

下面给程序加上对数据库是否已经初始化了的判断

通过创建一个标记文件来实现的

package main

import (
	"bufio"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"github.com/jmoiron/sqlx"
	"io"
	"os"
	"strings"
)

//错误处理函数
func HandleError(err error,why string){
	if err != nil {
		fmt.Println("ERROR OCCURED!!!",err,why)
	}
}


//将文本大数据入库
//入库成功后,做一个文件标记,下一次见到标记就不再执行入库操作
func init() {
	//如果数据库已经初始化过了,就直接退出
	_, err := os.Stat("d:/golang/src/go_code/TextBigDataProcess/text/kaifanggood_dbok.mark")
	if err == nil {
		fmt.Println("数据库业已初始化")
		return
	}


	//打开数据库
	db,err := sqlx.Open("mysql","root:root@tcp(127.0.0.1:3306)/kaifang")
	HandleError(err,"sqlx.Open")
	defer db.Close()

	//打开大数据文件
	file,e := os.Open("d:/golang/src/go_code/TextBigDataProcess/text/kaifang_good.txt")
	HandleError(e,"os.Open")
	defer file.Close()
	reader := bufio.NewReader(file)


	//分批次读入大数据文本
	//还是要基于缓存的读取
	for{
		lineBytes, _, err := reader.ReadLine()

		//如果读到了文件尾
		if err == io.EOF {
			break
		}
		HandleError(err,"reader.ReadLine")
		//逐条入库(并发)
		lineStr := string(lineBytes)
		fields := strings.Split(lineStr, ",")
		name,idcard := fields[0],fields[1]

		result,err := db.Exec("insert into kfperson(name,idcard) values(?,?);",name,idcard)
		HandleError(err,"db.Exec insert")
		if n,e := result.RowsAffected(); e == nil && n > 0{
			fmt.Printf("插入 %s 成功!\n",name)
		}
	}

	fmt.Println("数据初始化成功!")

	//创建一个标记文件,标记数据库已经初始化成功
	_, err = os.Create("d:/golang/src/go_code/TextBigDataProcess/text/kaifanggood_dbok.mark")
	if err == nil {
		fmt.Println("初始化标记已经创建!")
	}
}

func main() {
	//接受用户想要查询的姓名
	//这就是二级缓存
	//先查看内存中是否有结果
	//如果内存中没有,查数据库——查到的结果丢入内存
	//如果内存中有,就不是二级了,就一级就行了
	fmt.Println("INSERT OVER!")

}

 

实现数据查询

二级缓存就是先查看内存中是否有结果,如果内存中没有,查数据库——查到的结果丢入内存
如果内存中有,就不是二级了,就一级就行了

 

精确查询 & 模糊查询

    select * from kfperson where name like '西门庆'————精确查询
    select * from kfperson where name like '西门%' ————模糊查询

KaifangInformationSearch.go

package main

import (
	"bufio"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"github.com/jmoiron/sqlx"
	"io"
	"os"
	"strings"
)

//错误处理函数
func HandleError(err error,why string){
	if err != nil {
		fmt.Println("ERROR OCCURED!!!",err,why)
	}
}


//将文本大数据入库
//入库成功后,做一个文件标记,下一次见到标记就不再执行入库操作
func init() {
	//如果数据库已经初始化过了,就直接退出
	_, err := os.Stat("d:/golang/src/go_code/TextBigDataProcess/text/kaifanggood_dbok.mark")
	if err == nil {
		fmt.Println("数据库业已初始化")
		return
	}


	//打开数据库
	db,err := sqlx.Open("mysql","root:root@tcp(127.0.0.1:3306)/kaifang")
	HandleError(err,"sqlx.Open")
	defer db.Close()

	//打开大数据文件
	file,e := os.Open("d:/golang/src/go_code/TextBigDataProcess/text/kaifang_good.txt")
	HandleError(e,"os.Open")
	defer file.Close()
	reader := bufio.NewReader(file)


	//分批次读入大数据文本
	//还是要基于缓存的读取
	for{
		lineBytes, _, err := reader.ReadLine()

		//如果读到了文件尾
		if err == io.EOF {
			break
		}
		HandleError(err,"reader.ReadLine")
		//逐条入库(并发)
		lineStr := string(lineBytes)
		fields := strings.Split(lineStr, ",")
		name,idcard := fields[0],fields[1]

		result,err := db.Exec("insert into kfperson(name,idcard) values(?,?);",name,idcard)
		HandleError(err,"db.Exec insert")
		if n,e := result.RowsAffected(); e == nil && n > 0{
			fmt.Printf("插入 %s 成功!\n",name)
		}
	}

	fmt.Println("数据初始化成功!")

	//创建一个标记文件,标记数据库已经初始化成功
	_, err = os.Create("d:/golang/src/go_code/TextBigDataProcess/text/kaifanggood_dbok.mark")
	if err == nil {
		fmt.Println("初始化标记已经创建!")
	}
}

func main() {

	//打开数据库
	db,err := sqlx.Open("mysql","root:root@tcp(127.0.0.1:3306)/kaifang")
	HandleError(err,"sqlx.Open")
	defer db.Close()

	//循环接收用户想要查询的姓名
	var name string
	for{
		fmt.Print("请输入要查询的开房者姓名: ")
		fmt.Scanf("%s",&name)

		//用户想退出
		if name == "exit"{
			break
		}

		//开始内存中肯定是没有的
		//查数据库
		kfpeople := make([]KfPerson,0)
		e := db.Select(&kfpeople,"select id,name,idcard from kfperson where name like ?;", name)//在这里name是string,带入select语句的时候会自动加上引号,否则select语句 like '西门庆' 是应该加引号的
		HandleError(e,"db.Select")
		fmt.Println(kfpeople)
	}

	fmt.Println("INSERT OVER!")

}

model.go

package main

type KfPerson struct {
	Id       int    `db:"id"`
	Name     string `db:"name"`
	IdNumber string `db:"idcard"`
}

编译的时候这俩一起编译

 


我们1G的数据清洗出来的gooddata也要几千万条数据了,如果是两千万条数据,那么查询的速度会很慢,半天返回不来结果
想象一个场景,妻子查询丈夫的kf记录,她就肯定只想查知道她老公一个人的,而我们这样半天返回不来数据,肯定是不行的


所以我们应该去做缓存


 

内存缓存+动态清理

 

内存最好的结构理应就是map

 

KaifangInformationSearch.go

package main

import (
	"bufio"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"github.com/jmoiron/sqlx"
	"io"
	"os"
	"strings"
	"time"
)

//错误处理函数
func HandleError(err error,why string){
	if err != nil {
		fmt.Println("ERROR OCCURED!!!",err,why)
	}
}


//将文本大数据入库
//入库成功后,做一个文件标记,下一次见到标记就不再执行入库操作
func init() {
	//如果数据库已经初始化过了,就直接退出
	_, err := os.Stat("d:/golang/src/go_code/TextBigDataProcess/text/kaifanggood_dbok.mark")
	if err == nil {
		fmt.Println("数据库业已初始化")
		return
	}


	//打开数据库
	db,err := sqlx.Open("mysql","root:root@tcp(127.0.0.1:3306)/kaifang")
	HandleError(err,"sqlx.Open")
	defer db.Close()

	//打开大数据文件
	file,e := os.Open("d:/golang/src/go_code/TextBigDataProcess/text/kaifang_good.txt")
	HandleError(e,"os.Open")
	defer file.Close()
	reader := bufio.NewReader(file)


	//分批次读入大数据文本
	//还是要基于缓存的读取
	for{
		lineBytes, _, err := reader.ReadLine()

		//如果读到了文件尾
		if err == io.EOF {
			break
		}
		HandleError(err,"reader.ReadLine")
		//逐条入库(并发)
		lineStr := string(lineBytes)
		fields := strings.Split(lineStr, ",")
		name,idcard := fields[0],fields[1]

		result,err := db.Exec("insert into kfperson(name,idcard) values(?,?);",name,idcard)
		HandleError(err,"db.Exec insert")
		if n,e := result.RowsAffected(); e == nil && n > 0{
			fmt.Printf("插入 %s 成功!\n",name)
		}
	}

	fmt.Println("数据初始化成功!")

	//创建一个标记文件,标记数据库已经初始化成功
	_, err = os.Create("d:/golang/src/go_code/TextBigDataProcess/text/kaifanggood_dbok.mark")
	if err == nil {
		fmt.Println("初始化标记已经创建!")
	}
}

const CACHE_LEN = 2

var(
	kfMap map[string]QueryResult

)

func main() {

	//打开数据库
	db,err := sqlx.Open("mysql","root:root@tcp(127.0.0.1:3306)/kaifang")
	HandleError(err,"sqlx.Open")
	defer db.Close()

	//初始化缓存
	kfMap = make(map[string]QueryResult,0)


	var name string
	//循环接收用户想要查询的姓名
	for{
		fmt.Print("请输入要查询的开房者姓名: ")
		fmt.Scanf("%s",&name)

		//如果用户想退出
		if name == "exit"{
			break
		}

		//查看所有缓存
		if name == "cache"{
			fmt.Println("共缓存了%d条结果: \n",len(kfMap))
			for key := range kfMap{
				fmt.Println(key)
			}
		}

		//先查看内存中是否有结果
		if qr,ok := kfMap[name];ok{
			fmt.Println(qr.value)
			fmt.Println("查询到%d条结果", len(qr.value))
			continue
		}

		//内存中没有,查数据库
		kfpeople := make([]KfPerson,0)
		e := db.Select(&kfpeople,"select id,name,idcard from kfperson where name like ?;", name)//在这里name是string,带入select语句的时候会自动加上引号,否则select语句 like '西门庆' 是应该加引号的
		HandleError(e,"db.Select")
		fmt.Println("查询到%d条结果", len(kfpeople))
		fmt.Println(kfpeople)

		//查到的结果丢入内存
		queryResult := QueryResult{value:kfpeople}
		queryResult.cacheTime = time.Now().UnixNano()
		queryResult.count = 1
		kfMap[name] = queryResult

		//有必要时淘汰一些缓存

		if len(kfMap) > CACHE_LEN{
			delKey := UpdateCache(&kfMap)
			fmt.Printf("%s已经被淘汰出缓存!\n",delKey)
		}
	}

	fmt.Println("ALL OVER!")

}

//整理缓存
//删除加入最早的缓存
func UpdateCache(cacheMap *map[string]QueryResult) (delKey string){
	//预定义一个假设的最早时间
	earliestTime := time.Now().UnixNano()
	for key,value := range *cacheMap{
		if value.cacheTime < earliestTime{
			earliestTime = value.cacheTime
			delKey = key
		}
	}
	delete(*cacheMap,delKey)
	return delKey
}

model.go

package main

type KfPerson struct {
	Id       int    `db:"id"`
	Name     string `db:"name"`
	IdNumber string `db:"idcard"`
}

//缓存结果
type QueryResult struct {
	//开房者数据切片
	value []KfPerson
	//加入缓存的时间
	cacheTime int64
	//被查询的次数
	count int
}


 

---------------------------按理说到这一步我们的活已经干完了------------------------------------

但是太慢

下面的是对程序进行优化了

 

 

 

KaifangInformationSearch.go 改名为 main.go

多协程并发写入

之前的操作中,入库1w条就要好几分钟。那么如果入库2000w条,好几分钟的2000倍,这个时间是不太能够接受的

所以入库的这个活,我们应该使用并发去做

 

将之前的插入数据库的代码抽出来写成一个函数

以后用协程去go它

之前

之后

现在将标记文件删掉,将数据库kfperson表delete from 掉

跑一下程序

结果它给报错,too many connections

所以我们应该控制协程数量,不应该不停地去开协程

 

并发控制

     并发控制我们之前划分数据就做了。比如有10个并发,我会搞一个管道,来的时候就往管道里塞一个,走的时候再读走

建立一个信号量管道,控制并发数

 

所以,方案一,开2000万协程,行不通,耗尽了资源,程序崩溃

这里只是,开了2000w,但只允许100条并发

 

方案二:开有限条协程,从管道中读取数据

 

 

mysql数据超过一定的数量,效率就会指数下降

大公司,百度阿里用的mysql都是自己优化过的,mysql是开源的

 

下面我们先把刚才的问题先放一下,先优化一下缓存的清理这部分

缓存清理框架

以前的updatecache还只能处理queryresult一个人
现在所有实现了TimedData接口的数据都能被处理

通过一个TimedData接口

 

main.go

package main

import (
	"bufio"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"github.com/jmoiron/sqlx"
	"io"
	"os"
	"strings"
	"time"
)

const CACHE_LEN = 2

var(
	kfMap map[string]TimedData
	//chanSema chan int  //信号量管道
	chanData chan *KfPerson
	db *sqlx.DB
)


//错误处理函数
func HandleError(err error,why string){
	if err != nil {
		fmt.Println("ERROR OCCURED!!!",err,why)
	}
}


//将文本大数据入库
//入库成功后,做一个文件标记,下一次见到标记就不再执行入库操作
func init() {
	//如果数据库已经初始化过了,就直接退出
	_, err1 := os.Stat("d:/golang/src/go_code/TextBigDataProcess/text/kaifanggood_dbok.mark")
	if err1 == nil {
		fmt.Println("数据库业已初始化")
		return
	}


	//打开数据库
	var err error
	db,err = sqlx.Open("mysql","root:root@tcp(127.0.0.1:3306)/kaifang")
	HandleError(err,"sqlx.Open")
	defer db.Close()
	fmt.Println("数据库已打开")

	//必要时建表
	_,err = db.Exec("create table if not exists kfperson(id int primary key auto_increment,name varchar(20),idcard char(18),sex char(1));")
	HandleError(err,"db.Exec create table")
	fmt.Println("数据表已创建")


	//初始化信号量管道(控制并发数)
	//chanSema = make(chan int,100)
	chanData = make(chan *KfPerson, 10000000)  //相当于我现在管道的缓存能力是无限的
	fmt.Println("管道已初始化")

	//开辟协程,源源不断地从数据管道获取信息,插入数据库
	for i:= 0;i<100;i++{
		go insertKfPerson()
		fmt.Printf("协程%d已开辟\n",i)
	}

	//打开大数据文件
	file,e := os.Open("d:/golang/src/go_code/TextBigDataProcess/text/kaifang_good.txt")
	HandleError(e,"os.Open")
	defer file.Close()
	reader := bufio.NewReader(file)
	fmt.Println("大数据文本已打开")

	//分批次读入大数据文本
	//还是要基于缓存的读取
	for{
		lineBytes, _, err := reader.ReadLine()
		//如果读到了文件尾
		if err == io.EOF {
			//关闭数据管道
			close(chanData)
			break
		}

		HandleError(err,"reader.ReadLine")

		//逐条入库(并发)
		lineStr := string(lineBytes)
		fields := strings.Split(lineStr, ",")
		name,idcard := fields[0],fields[1]
		//抛弃过长的名字
		name = strings.TrimSpace(name)
		if len(strings.Split(name,"")) > 20{
			fmt.Println("%s 名字过长,已经抛弃了")
			continue
		}


		//方案一:开2000万协程,行不通,耗尽了资源,程序崩溃
		//go insertKfPerson(db,&kfPerson)

		//方案二:开有限条协程,从管道中读取数据
		kfPerson := KfPerson{Name:name, Idcard: idcard}
		chanData <- &kfPerson //主协程管着往里面送数据
		//fmt.Println(&kfPerson)
		//fmt.Println(<-chanData)

	}

	fmt.Println("数据初始化成功!")

	//创建一个标记文件,标记数据库已经初始化成功
	_, err = os.Create("d:/golang/src/go_code/TextBigDataProcess/text/kaifanggood_dbok.mark")
	if err == nil {
		fmt.Println("初始化标记已经创建!")
	}
}

//源源不断地从管道里拿数据,然后源源不断地往数据库里送
func insertKfPerson(){
	//fmt.Println("1111111111111111")
	for kfPerson := range chanData{
		//fmt.Println(kfPerson)
		result,err := db.Exec("insert into kfperson(name,idcard) values(?,?);",kfPerson.Name,kfPerson.Idcard)
		//fmt.Println("已执行")
		HandleError(err,"db.Exec insert")
		//fmt.Println(result)
		if n,e := result.RowsAffected(); e == nil && n > 0{
			fmt.Printf("插入 %s 成功!\n",kfPerson.Name)
		}
	}
	//fmt.Println("----------------")
}


func main() {

	//打开数据库
	db,err := sqlx.Open("mysql","root:root@tcp(127.0.0.1:3306)/kaifang")
	HandleError(err,"sqlx.Open")
	defer db.Close()

	//初始化缓存
	kfMap = make(map[string]TimedData,0)

	var name string
	//循环接收用户想要查询的姓名
	//循环查询
	for{
		fmt.Print("请输入要查询的开房者姓名: ")
		fmt.Scanf("%s",&name)

		//如果用户想退出
		if name == "exit"{
			break
		}

		//查看所有缓存
		if name == "cache"{
			fmt.Println("共缓存了%d条结果: \n",len(kfMap))
			for key := range kfMap{
				fmt.Println(key)
			}
			continue
		}

		//先查看内存中是否有结果
		//内存中有结果就直接使用内存的结果
		if td,ok := kfMap[name];ok{
			qr := td.(*QueryResult)
			qr.count += 1
			fmt.Printf("查询到%d条结果: \n", len(qr.value))
			fmt.Println(qr.value)
			continue
		}

		//内存中没有,查数据库
		kfpeople := make([]KfPerson,0)
		e := db.Select(&kfpeople,"select id,name,idcard from kfperson where name like ?;", name)//在这里name是string,带入select语句的时候会自动加上引号,否则select语句 like '西门庆' 是应该加引号的
		HandleError(e,"db.Select")
		fmt.Printf("查询到%d条结果: \n", len(kfpeople))
		fmt.Println(kfpeople)

		//查到的结果丢入内存
		queryResult := QueryResult{value:kfpeople}
		queryResult.cacheTime = time.Now().UnixNano()
		queryResult.count = 1
		kfMap[name] = &queryResult

		//有必要时淘汰一些缓存

		if len(kfMap) > CACHE_LEN{
			delKey := UpdateCache(&kfMap)
			fmt.Printf("%s已经被淘汰出缓存!\n",delKey)
		}
	}

	fmt.Println("ALL OVER!")

}

cache.go

package main

import "time"

type TimedData interface {
	//获得加入缓存的时间纳秒
	GetCacheTime() int64
}


//整理缓存
//删除加入最早的缓存
func UpdateCache(cacheMap *map[string]TimedData) (delKey string){
	//预定义一个假设的最早时间
	earliestTime := time.Now().UnixNano()
	for key,value := range *cacheMap{
		if value.GetCacheTime() < earliestTime{
			earliestTime = value.GetCacheTime()
			delKey = key
		}
	}
	delete(*cacheMap,delKey)
	return delKey
}

model.go

package main

type KfPerson struct {
	Id       int    `db:"id"`
	Name     string `db:"name"`
	Idcard   string `db:"idcard"`
}

//缓存结果
type QueryResult struct {
	//开房者数据切片
	value []KfPerson
	//加入缓存的时间
	cacheTime int64
	//被查询的次数
	count int
}

//实现接口
func (qr *QueryResult) GetCacheTime() int64 {
	return qr.cacheTime
}

 

 

展现出来的效果和以前一样,没什么变化

 

然后等整个项目做完后我们应该把CACHE_LEN调大一点,2个有点太少了,但是方便调试,看结果

 

 

 

下面来处理之前多协程并发写入时遇到的大问题

多协程优化

我们现在是一读百写

一条协程来读文本大数据,一百条协程往数据库里面写

 

问题出在哪呢?我们init里面只有1个db对象,也就是我们开的100个协程共用1个db对象

导致这个db对象的连接已经被耗尽了,它的连接能力已经被榨干了

可能我当前这个连接,插入操作还没执行完,下一个协程又在做db.Exec()

深层意思:上一次db.Exec("...")尚未结束,新来的db.Exec("...")就会报这个错误

 

修改insertPerson代码

现在就可以插入了,虽然插入过程中还伴随着error

在运行过程中查询看一下

亲测,将开的协程数扩大会更快,开到300时是100+,开到500时是146

 

但是现在一开始还能执行几百句插入,后来阻塞了就全是ERROR了,进行不下去了

不知道是哪里的原因

 

-----------------------------------------------------------------------------------

至此,项目完结。

 

还可以优化的地方

①优化做成3级缓存,在内存里程序一关闭就没有了,但是我们之前查过的数据应该想办法再做一级缓存,用文件或者是redis(redis是最多的)

②数据库从一读多写改为多读多写

连接不同的db,分多个库去搞

这就已经是分布式了

比如我们按省份和年龄段都划分过了,我们可以按照区域(华北,东北等等),划分成6个分布式的库。然后当需要搜索某人的时候,对6个db,同时查6个库的结果,汇总起来,再返回

 

参考:

https://www.bilibili.com/video/BV17E41147ox?p=1

https://blog.csdn.net/u010986776/article/details/87622436

https://blog.csdn.net/u010986776/article/details/87700408

相关文章:

  • Python网络编程
  • CUDA与cuDNN
  • AI中的搜索(一)——启发式搜索 ((贪婪)最佳优先搜索 (Greedy)Best-First Search、A* 、迭代加深搜索 和 IDA* )
  • AI中的搜索(二)——对抗搜索(最小最大搜索Minimax、Alpha-Beta剪枝搜索、蒙特卡洛树搜索MCTS)
  • Web1.0 与 Web2.0 时代
  • HTTP服务器开发项目(Python)
  • IO多路复用(Select,Poll,Epoll)
  • Reactjs jsx
  • HTTP服务器开发项目之基础知识——传输层TCP协议 应用层HTTP协议(http请求报文应答报文详解)
  • 自制简易浏览器(Python)
  • 编译安装linux内核
  • Python中的*args,**kwargs(可变参数)(传参)
  • Python元类 type动态创建类 类装饰器
  • Python内存管理(一)——对象池(小整数池、大整数池、intern机制) 内建属性 属性拦截器
  • Python内存管理(二) —— GC垃圾回收机制(引用计数 隔代回收)
  • ES6指北【2】—— 箭头函数
  • (ckeditor+ckfinder用法)Jquery,js获取ckeditor值
  • 【347天】每日项目总结系列085(2018.01.18)
  • 11111111
  • centos安装java运行环境jdk+tomcat
  • HTTP中GET与POST的区别 99%的错误认识
  • iOS动画编程-View动画[ 1 ] 基础View动画
  • session共享问题解决方案
  • win10下安装mysql5.7
  • 从重复到重用
  • 和 || 运算
  • 检测对象或数组
  • 批量截取pdf文件
  • 微信端页面使用-webkit-box和绝对定位时,元素上移的问题
  • 我是如何设计 Upload 上传组件的
  • 自制字幕遮挡器
  • scrapy中间件源码分析及常用中间件大全
  • Unity3D - 异步加载游戏场景与异步加载游戏资源进度条 ...
  • ​软考-高级-系统架构设计师教程(清华第2版)【第20章 系统架构设计师论文写作要点(P717~728)-思维导图】​
  • #[Composer学习笔记]Part1:安装composer并通过composer创建一个项目
  • #13 yum、编译安装与sed命令的使用
  • (南京观海微电子)——I3C协议介绍
  • (四)模仿学习-完成后台管理页面查询
  • (淘宝无限适配)手机端rem布局详解(转载非原创)
  • (转)socket Aio demo
  • ***通过什么方式***网吧
  • .net CHARTING图表控件下载地址
  • .NET Core6.0 MVC+layui+SqlSugar 简单增删改查
  • .Net MVC + EF搭建学生管理系统
  • .net 提取注释生成API文档 帮助文档
  • .NET/C# 避免调试器不小心提前计算本应延迟计算的值
  • .NetCore实践篇:分布式监控Zipkin持久化之殇
  • .NET开发者必备的11款免费工具
  • .NET正则基础之——正则委托
  • ::before和::after 常见的用法
  • @ModelAttribute使用详解
  • @transaction 提交事务_【读源码】剖析TCCTransaction事务提交实现细节
  • [20190113]四校联考
  • [Android]使用Retrofit进行网络请求
  • [bzoj1038][ZJOI2008]瞭望塔