流处理方式
分片处理
去年的面试中我被问到超大文件你怎么处理,这个问题确实当时没多想,回来之后仔细研究和讨论了下这个问题,对大文件读取做了一个分析
比如我们有一个log文件,运行了几年,有100G之大。按照我们之前的操作可能代码会这样写:
func ReadFile(filePath string) []byte{ content, err := ioutil.ReadFile(filePath) if err != nil { log.Println('Read error') } return content}
上面的代码读取几兆的文件可以,但是如果大于你本身及其内存,那就直接翻车了。因为上面的代码,是把文件所有的内容全部都读取到内存之后返回,几兆的文件,你内存够大可以处理,但是一旦上几百兆的文件,就没那么好处理了。
那么,正确的方法有两种
第一个是使用流处理方式代码如下func ReadFile(filePath string, handle func(string)) error { f, err := os.Open(filePath) defer f.Close() if err != nil { return err } buf := bufio.NewReader(f) for { line, err := buf.ReadLine('n') line = strings.TrimSpace(line) handle(line) if err != nil { if err == io.EOF{ return nil } return err } return nil }}第二个方案就是分片处理
当读取的是二进制文件,没有换行符的时候,使用下面的方案一样处理大文件
func ReadBigFile(fileName string, handle func([]byte)) error { f, err := os.Open(fileName) if err != nil { fmt.Println('can’t opened this file') return err } defer f.Close() s := make([]byte, 4096) for { switch nr, err := f.Read(s[:]); true { case nr < 0: fmt.Fprintf(os.Stderr, 'cat: error reading: %sn
补充:golang 读取大文件处理sync.pool + bufio.NewReader(f)
看代码吧~文件大小
package mainimport ('bufio''fmt''io'//'math''os''strings''sync''time')func main() {/*文件数据样例{'remark': '来电时间: 2021/04/15 13:52:07客户电话:13913xx39xx ', 'no': '600020510132021101310210547639', 'title': 'b-ae0e-0242ac100907', 'call_in_date': '2021-04-15 13:52:12', 'name': '张三', '_date': '2021-06-15', 'name': '张三', 'meet': '1'}1、我们取出 call_in_date': '2021-04-15 13:52:1的数据写入另一个文件*/var (s time.Time //当前时间file *os.FilefileStat os.FileInfoerr errorlastLineSize int64)s = time.Now()if file, err = os.Open('/Users/zhangsan/Downloads/log.txt');err != nil{fmt.Println(err)}defer func() {err = file.Close() //close after checking err}()//queryStartTime, err := time.Parse('2006-01-02T15:04:05.0000Z', startTimeArg)//if err != nil {//fmt.Println('Could not able to parse the start time', startTimeArg)//return//}////queryFinishTime, err := time.Parse('2006-01-02T15:04:05.0000Z', finishTimeArg)//if err != nil {//fmt.Println('Could not able to parse the finish time', finishTimeArg)//return//}/*** {name:'log.log', size:911100961, mode:0x1a4,modTime:time.Time{wall:0x656c25c, ext:63742660691,loc:(*time.Location)(0x1192c80)}, sys:syscall.Stat_t{Dev:16777220,Mode:0x81a4, Nlink:0x1, Ino:0x118cba7, Uid:0x1f5, Gid:0x14, Rdev:0,Pad_cgo_0:[4]uint8{0x0, 0x0, 0x0, 0x0}, Atimespec:syscall.Timespec{Sec:1607063899, Nsec:977970393},Mtimespec:syscall.Timespec{Sec:1607063891, Nsec:106349148}, Ctimespec:syscall.Timespec{Sec:1607063891,Nsec:258847043}, Birthtimespec:syscall.Timespec{Sec:1607063883, Nsec:425808150},Size:911100961, Blocks:1784104, Blksize:4096, Flags:0x0, Gen:0x0, Lspare:0, Qspare:[2]int64{0, 0}}**/if fileStat, err = file.Stat();err != nil {return}fileSize := fileStat.Size()//72849354767offset := fileSize - 1//检测是不是都是空行 只有nfor {var (b []byten intchar string)b = make([]byte, 1)//从指定位置读取if n, err = file.ReadAt(b, offset);err != nil {fmt.Println('Error reading file ', err)break}char = string(b[0])if char == 'n' {break}offset--//获取一行的大小lastLineSize += int64(n)}var (lastLine []bytelogSlice []stringlogSlice1 []string)//初始化一行大小的空间lastLine = make([]byte, lastLineSize)_, err = file.ReadAt(lastLine, offset)if err != nil {fmt.Println('Could not able to read last line with offset', offset, 'and lastline size', lastLineSize)return}//根据条件进行区分logSlice = strings.Split(strings.Trim(string(lastLine),'n'),'next_pay_date')logSlice1 = strings.Split(logSlice[1],''')if logSlice1[2] == '2021-06-15'{Process(file)}fmt.Println('nTime taken - ', time.Since(s))fmt.Println(err)}func Process(f *os.File) error {//读取数据的key,减小gc压力linesPool := sync.Pool{New: func() interface{} {lines := make([]byte, 250*1024)return lines}}//读取回来的数据池stringPool := sync.Pool{New: func() interface{} {lines := ''return lines}}//一个文件对象本身是实现了io.Reader的 使用bufio.NewReader去初始化一个Reader对象,存在buffer中的,读取一次就会被清空r := bufio.NewReader(f) ////设置读取缓冲池大小 默认16r = bufio.NewReaderSize(r,250 *1024)var wg sync.WaitGroupfor {buf := linesPool.Get().([]byte)//读取Reader对象中的内容到[]byte类型的buf中n, err := r.Read(buf)buf = buf[:n]if n == 0 {if err != nil {fmt.Println(err)break}if err == io.EOF {break}return err}//补齐剩下没满足的剩余nextUntillNewline, err := r.ReadBytes(’n’)//fmt.Println(string(nextUntillNewline))if err != io.EOF {buf = append(buf, nextUntillNewline...)}wg.Add(1)go func() {ProcessChunk(buf, &linesPool, &stringPool)wg.Done()}()}wg.Wait()return nil}func ProcessChunk(chunk []byte, linesPool *sync.Pool,stringPool *sync.Pool) {//做相应的处理}
执行
go run test2.go '2020-01-01T00:00:00.0000Z' '2020-02-02T00:00:00.0000Z' /Users/zhangsan/go/src/workspace/test/log.log
EOFTime taken - 20.023517675s<nil>
以上为个人经验,希望能给大家一个参考,也希望大家多多支持乐呵呵网。如有错误或未考虑完全的地方,望不吝赐教。