使用Go语言,25秒读取16GB文件
当今世界的任何计算机系统每天都会生成大量的日志或数据。随着系统的发展,将调试数据存储到数据库中是不可行的,因为它们是不可变的,并且只能用于分析和解决故障。所以大部分公司倾向于将日志存储在文件中,而这些文件通常位于本地磁盘中。
我们将使用Go语言,从一个大小为16GB的.txt或.log文件中提取日志。
让我们开始编码……
首先,我们打开文件。对于任何文件的IO,我们都将使用标准的Go os.File。
f, err := os.Open(fileName) if err != nil { fmt.Println("cannot able to read the file", err) return // UPDATE: close after checking error defer file.Close() //Do not forget to close the file
打开文件后,我们有以下两个选项可以选择:
逐行读取文件,这有助于减少内存紧张,但需要更多的时间。
一次将整个文件读入内存并处理该文件,这将消耗更多内存,但会显著减少时间。
由于文件太大,即16 GB,因此无法将整个文件加载到内存中。但是第一种选择对我们来说也是不可行的,因为我们希望在几秒钟内处理文件。
但你猜怎么着,还有第三种选择。瞧……相比于将整个文件加载到内存中,在Go语言中,我们还可以使用bufio.NewReader()将文件分块加载。
r := bufio.NewReader(f) for { buf := make([]byte,4*1024) //the chunk size n, err := r.Read(buf) //loading chunk into buffer buf = buf[:n] if n == 0 { if err != nil { fmt.Println(err) break if err == io.EOF { break return err
一旦我们将文件分块,我们就可以分叉一个线程,即Go routine,同时处理多个文件区块。上述代码将修改为:
//sync pools to reuse the memory and decrease the preassure on Garbage Collector linesPool := sync.Pool{New: func() interface{} { lines := make([]byte, 500*1024) return lines stringPool := sync.Pool{New: func() interface{} { lines := "" return lines slicePool := sync.Pool{New: func() interface{} { lines := make([]string, 100) return lines r := bufio.NewReader(f) var wg sync.WaitGroup //wait group to keep track off all threads for { buf := linesPool.Get().([]byte) n, err := r.Read(buf) buf = buf[:n] if n == 0 { if err != nil { fmt.Println(err) break if err == io.EOF { break return err nextUntillNewline, err := r.ReadBytes(/n)//read entire line if err != io.EOF { buf = append(buf, nextUntillNewline...) wg.Add(1) go func() { //process each chunk concurrently //start - log start time, end - log end time ProcessChunk(buf, linesPool, stringPool, slicePool, start, end) wg.Done() wg.Wait()
上面的代码,引入了两个优化点:
sync.Pool是一个强大的对象池,可以重用对象来减轻垃圾收集器的压力。我们将重用各个分片的内存,以减少内存消耗,大大加快我们的工作。
Go Routines帮助我们同时处理缓冲区块,这大大提高了处理速度。
现在让我们实现ProcessChunk函数,它将处理以下格式的日志行。
2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,.../n
我们将根据命令行提供的时间戳提取日志。
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) { //another wait group to process every chunk further var wg2 sync.WaitGroup logs := stringPool.Get().(string) logs = string(chunk) linesPool.Put(chunk) //put back the chunk in pool //split the string by "/n", so that we have slice of logs logsSlice := strings.Split(logs, "/n") stringPool.Put(logs) //put back the string pool chunkSize := 100 //process the bunch of 100 logs in thread n := len(logsSlice) noOfThread := n / chunkSize if n%chunkSize != 0 { //check for overflow noOfThread++ length := len(logsSlice) //traverse the chunk for i := 0; i length; i += chunkSize { wg2.Add(1) //process each chunk in saperate chunk go func(s int, e int) { for i:= s; i i++{ text := logsSlice[i] if len(text) == 0 { continue logParts := strings.SplitN(text, ",", 2) logCreationTimeString := logParts[0] logCreationTime, err := time.Parse("2006-01- 02T15:04:05.0000Z", logCreationTimeString) if err != nil { fmt.Printf("/n Could not able to parse the time :%s for log : %v", logCreationTimeString, text) return // check if logs timestamp is inbetween our desired period if logCreationTime.After(start) logCreationTime.Before(end) { fmt.Println(text) textSlice = nil wg2.Done() }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice))))) //passing the indexes for processing wg2.Wait() //wait for a chunk to finish logsSlice = nil
对上面的代码进行基准测试。以16 GB的日志文件为例,提取日志所需的时间约为25秒。
完整的代码示例如下:
func main() { s := time.Now() args := os.Args[1:] if len(args) != 6 { // for format LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location" fmt.Println("Please give proper command line arguments") return startTimeArg := args[1] finishTimeArg := args[3] fileName := args[5] file, err := os.Open(fileName) if err != nil { fmt.Println("cannot able to read the file", err) return defer file.Close() //close after checking err queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg) if err != nil { fmt.Println("Could not able to parse the start time", startTimeArg) return queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg) if err != nil { fmt.Println("Could not able to parse the finish time", finishTimeArg) return filestat, err := file.Stat() if err != nil { fmt.Println("Could not able to get the file stat") return fileSize := filestat.Size() offset := fileSize - 1 lastLineSize := 0 for { b := make([]byte, 1) n, err := file.ReadAt(b, offset) if err != nil { fmt.Println("Error reading file ", err) break char := string(b[0]) if char == "/n" { break offset-- lastLineSize += n lastLine := make([]byte, lastLineSize) _, err = file.ReadAt(lastLine, offset+1) if err != nil { fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize) return logSlice := strings.SplitN(string(lastLine), ",", 2) logCreationTimeString := logSlice[0] lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString) if err != nil { fmt.Println("can not able to parse time : ", err) if lastLogCreationTime.After(queryStartTime) lastLogCreationTime.Before(queryFinishTime) { Process(file, queryStartTime, queryFinishTime) fmt.Println("/nTime taken - ", time.Since(s)) func Process(f *os.File, start time.Time, end time.Time) error { linesPool := sync.Pool{New: func() interface{} { lines := make([]byte, 250*1024) return lines stringPool := sync.Pool{New: func() interface{} { lines := "" return lines r := bufio.NewReader(f) var wg sync.WaitGroup for { buf := linesPool.Get().([]byte) n, err := r.Read(buf) buf = buf[:n] if n == 0 { if err != nil { fmt.Println(err) break if err == io.EOF { break return err nextUntillNewline, err := r.ReadBytes(/n) if err != io.EOF { buf = append(buf, nextUntillNewline...) wg.Add(1) go func() { ProcessChunk(buf, linesPool, stringPool, start, end) wg.Done() wg.Wait() return nil func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) { var wg2 sync.WaitGroup logs := stringPool.Get().(string) logs = string(chunk) linesPool.Put(chunk) logsSlice := strings.Split(logs, "/n") stringPool.Put(logs) chunkSize := 300 n := len(logsSlice) noOfThread := n / chunkSize if n%chunkSize != 0 { noOfThread++ for i := 0; i (noOfThread); i++ { wg2.Add(1) go func(s int, e int) { defer wg2.Done() //to avaoid deadlocks for i := s; i i++ { text := logsSlice[i] if len(text) == 0 { continue logSlice := strings.SplitN(text, ",", 2) logCreationTimeString := logSlice[0] logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString) if err != nil { fmt.Printf("/n Could not able to parse the time :%s for log : %v", logCreationTimeString, text) return if logCreationTime.After(start) logCreationTime.Before(end) { //fmt.Println(text)
原文链接:https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2
(版权归原作者所有,侵删)
本文链接:http://www.yunweipai.com/39959.html
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/52864.html
相关文章
- 【愚公系列】2022年08月 Go教学课程 039-文件操作
- 为Go编译后的文件添加图标和版本信息
- 2023-02-28:moonfdd/ffmpeg-go是用go语言绑定ffmpeg的库,目前是github上最好用的库。请用go语言将yuv文件编码为h264
- 2023-03-07:x264的视频编码器,不用ffmpeg,用libx264.dll也行。请用go语言调用libx264.dll,将yuv文件编码成h264文
- 腾讯云cos获取文件操作实践--Go SDK
- 「Go工具箱」将文件大小转换成Kb、Mb、Gb就用这个库:go-humanize
- 2023-02-21:请用go语言调用ffmpeg,解码mp4文件,输出视频信息和总帧数。
- 2023-02-22:请用go语言调用ffmpeg,保存mp4文件的视频帧,每帧用ppm图片保存。
- 2023-03-17:使用Go语言和FFmpeg库实现音频重采样解码,并将其保存为PCM格式的文件。
- Go语言go mod包依赖管理工具使用详解
- go test命令(Go语言测试命令)完全攻略
- 轻松搞定Go语言连接MySQL(go连接mysql)
- 简易教程:如何在 Linux 上安装 GO 语言(go语言安装linux)
- 如何在Linux上安装Go语言?——简单易懂的指南(linux安装go语言)
- Go语言链接Oracle数据库的实践记录(go 链接oracle)
- Go语言快速安装Oracle数据库(go语言安装oracle)
- 新技术Go语言解析Oracle数据库日志(go解析oracle日志)
- 踏上oracle编程之路,Go(go oracle编程)
- GO语言文件的创建与打开实例分析