1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
type TailReader struct {
rc *os.File
buf []byte // 用来缓存剩余字节
temp []byte // 提供给 Read
sep []byte // 兼容不同系统架构分隔符
offset int64 // 记录offset
size int64 // 文件大小
skipempty bool // 控制是否跳过空行行为
atEnd bool // 记录offset 是否被移动到文件开始位置了
}
var (
Sep_win = []byte("\r\n")
Sep_linux = []byte("\n")
)
func NewTailReader(fname string, sep []byte, skip bool) (*TailReader, error) {
file, err := os.Open(fname)
if err != nil {
return nil, err
}
stat, _ := file.Stat()
size := stat.Size()
var offset int64 = 1024
if size < offset {
offset = size
}
_, errs := file.Seek(int64(-offset), 2)
if errs != nil {
return nil, errs
}
offset2, _ := file.Seek(0, io.SeekCurrent)
fmt.Printf("seek to offset %d, file size is %d\n", offset2, size)
atEnd := false
if offset == size {
atEnd = true
}
return &TailReader{rc: file, buf: make([]byte, 0, 1024), temp: make([]byte, 1024), sep: sep, skipempty: skip, offset: int64(offset), size: size, atEnd: atEnd}, nil
}
func (t *TailReader) Close() {
t.rc.Close()
}
func (t *TailReader) ReadBytes() ([]byte, error) {
// 如果上次缓存没清完,检查是否有换行符
sepsize := 0
if t.skipempty {
sepsize = len(t.sep)
}
// 处理上次遗留的缓存
if len(t.buf) > 0 {
if idx := bytes.LastIndex(t.buf, t.sep); idx != -1 {
temp := append([]byte{}, t.buf[idx+sepsize:]...)
t.buf = t.buf[:idx]
return temp, nil
}
if t.atEnd {
p := slices.Clone(t.buf[:len(t.buf)])
t.buf = t.buf[:0]
return p, nil
}
}
if t.size < t.offset {
return nil, io.EOF
}
// 拷贝重置缓存
var p []byte
// 先将这部分尾巴给卸除出去
if len(t.buf) > 0 {
p = append([]byte{}, t.buf...)
}
n, err := t.rc.Read(t.temp)
if err == nil && n > 0 {
idx := bytes.LastIndex(t.temp[:n], t.sep)
if idx != -1 {
temp := append([]byte{}, t.temp[idx+sepsize:n]...)
temp = append(temp, p...)
t.buf = t.buf[:0]
t.buf = append(t.buf, t.temp[:idx]...)
if err := t.move(n); err != nil {
return nil, err
}
return temp, nil
}
var cur, next []byte
cur = slices.Concat(t.temp[:n], p)
if err := t.move(n); err != nil {
return nil, err
}
// 用来预防二进制大文件,堆爆slice
for i := 0; i < 3 && idx == -1; i++ {
n, err = t.rc.ReadAt(t.buf, 0)
if err != nil {
return nil, err
}
if err := t.move(n); err != nil {
return nil, err
}
idx = bytes.LastIndex(t.buf[:n], t.sep)
if idx != -1 {
next = slices.Concat(t.temp[idx:n], cur)
// 尽量复用
t.buf = t.buf[:0]
t.buf = append(t.buf, t.temp[:idx]...)
break
}
next = slices.Concat(t.temp[:n], cur)
}
if idx != -1 {
return nil, errors.New("cant found sep in many times try")
}
return next, nil
}
return nil, err
}
func (t *TailReader) move(delta int) error {
t.offset += int64(delta)
if t.offset > t.size {
t.offset = t.size
}
// 避免重复移动
if t.offset <= t.size && !t.atEnd {
_, err := t.rc.Seek(-t.offset, 2)
if err != nil {
return err
}
if t.offset == t.size {
t.atEnd = true
}
}
if len(t.buf) > 0 {
return nil
}
return io.EOF
}
|