ebpf postgresql慢查询日志服务

ebpf postgresql慢查询日志服务


因为需要使用用户控件探针来做绑定,需要可执行文件有符号信息,所以需要在编译pg 时启用 –enable-debug

用户态程序

这里主要是实现一个日志,可以通过systemd 来管理这个慢查询日志,目前没给这个程序做配置文件管理的部分,但是实现起来也是很简单的

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
//go:build amd64

package main

import (
	"bytes"
	"encoding/binary"
	"errors"
	"flag"
	"fmt"
	"log"
	"os"
	"os/signal"
	"path/filepath"
	"syscall"

	"github.com/cilium/ebpf/link"
	"github.com/cilium/ebpf/perf"
	"github.com/cilium/ebpf/rlimit"
	"golang.org/x/sys/unix"
)
// 这个是bpf2go 的生成代码
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go  -cc clang -target amd64 -type event bpf dbstats.c -- -I../headers

const (
	symbol = "exec_simple_query"
)

var binPath string
var logPath string
var logFile *os.File
var pid int
var slow int

func init() {
	flag.StringVar(&binPath, "P", "", "the path of postgres")
	flag.IntVar(&pid, "p", 0, "pid")
	flag.IntVar(&slow, "t", 200, "the threshold of slow query")
	flag.StringVar(&logPath, "l", "", "the log output file path")
}
func Config() bool {
	flag.Parse()
	if binPath == "" && pid == 0 {
		fmt.Printf("invalid argument path [%s] pid %d", binPath, pid)
		return true
	}
	if binPath == "" {
		if path, err := filepath.EvalSymlinks(fmt.Sprintf("/proc/%d/exe", pid)); err != nil {
			fmt.Printf("pid %d err %v", pid, err)
			return true
		} else {
			binPath = path
		}
	}
	if logPath != "" {
		path, err := filepath.Abs(logPath)
		if err != nil {
			fmt.Printf("get abs path failed %v\n", path)
		}
		if file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666); err != nil {
			fmt.Printf("open log file failed %v\n", err)
		} else {
			logFile = file
			log.SetOutput(file)
		}
	}
	if slow < 0 || slow > 2000 {
		return true
	}
	return false
}
func main() {
	shouldReturn := Config()
	if shouldReturn {
		return
	}
	defer func(file *os.File) {
		if file != nil {
			file.Close()
		}
	}(logFile)
	stopper := make(chan os.Signal, 1)
	signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)

	// Allow the current process to lock memory for eBPF resources.
	if err := rlimit.RemoveMemlock(); err != nil {
		log.Fatal(err)
	}

	// Load pre-compiled programs and maps into the kernel.

	objs := bpfObjects{}
	if err := loadBpfObjects(&objs, nil); err != nil {
		log.Fatalf("loading objects: [%v]", err)
	}
	defer objs.Close()
	// 200ms 是慢查询上限
	objs.SlowMap.Put(0, slow)
	// Open an ELF binary and read its symbols.
	ex, err := link.OpenExecutable(binPath)
	if err != nil {
		log.Fatalf("opening executable: %s", err)
	}

	// Open a Uretprobe at the exit point of the symbol and attach
	// the pre-compiled eBPF program to it.

	up, err := ex.Uretprobe(symbol, objs.UretprobeExecSimpleQuery, nil)
	if err != nil {
		log.Fatalf("creating uretprobe: %s", err)
	}
	defer up.Close()
	do, err := ex.Uprobe(symbol, objs.UprobeExecSimpleQuery, nil)
	if err != nil {
		log.Fatalf("creating uprobe: %s", err)
	}
	defer do.Close()
	// Open a perf event reader from userspace on the PERF_EVENT_ARRAY map
	// described in the eBPF C program.
	rd, err := perf.NewReader(objs.Events, os.Getpagesize())
	if err != nil {
		log.Fatalf("creating perf event reader: %s", err)
	}
	defer rd.Close()

	go func() {
		// Wait for a signal and close the perf reader,
		// which will interrupt rd.Read() and make the program exit.
		<-stopper
		log.Println("Received signal, exiting program..")

		if err := rd.Close(); err != nil {
			log.Fatalf("closing perf event reader: %s", err)
		}
	}()

	log.Printf("Listening for events..")

	// bpfEvent is generated by bpf2go.
	var event bpfEvent
	for {
		record, err := rd.Read()
		if err != nil {
			if errors.Is(err, perf.ErrClosed) {
				return
			}
			log.Printf("reading from perf event reader: %s", err)
			continue
		}

		if record.LostSamples != 0 {
			log.Printf("perf event ring buffer full, dropped %d samples", record.LostSamples)
			continue
		}

		// Parse the perf event entry into a bpfEvent structure.
		if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
			log.Printf("parsing perf event: %s", err)
			continue
		}

		log.Printf("%s:%f/ms\n", unix.ByteSliceToString(event.Cmd[:]), float64(event.Timestamp)/1000000.0)
	}
}

ebpf uprobe

这部分代码主要是对命令的拷贝和exec_simple_query函数执行的计时

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
//go:build ignore

#include "common.h"

#include "bpf_tracing.h"

char __license[] SEC("license") = "Dual MIT/GPL";
#define MAX_DATA_LEN 256
u32 slowThreshold=0;
const u32 timebase = 1000000;
struct event {
    u64 pid;
    u64 timestamp;
    u8  cmd[MAX_DATA_LEN];
    
};
struct bpf_map_def SEC("maps") slow_map = {
	.type        = BPF_MAP_TYPE_ARRAY,
	.key_size    = sizeof(u32),
	.value_size  = sizeof(u32),
	.max_entries = 1,
};

struct bpf_map_def SEC("maps") kprobe_map = {
	.type        = BPF_MAP_TYPE_HASH,
	.key_size    = sizeof(u64),
	.value_size  = sizeof(struct event),
	.max_entries = 256,
};

struct {
	__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} events SEC(".maps");


// Force emitting struct event into the ELF.
const struct event *unused __attribute__((unused));

SEC("uretprobe/exec_simple_query")
int uretprobe_exec_simple_query(struct pt_regs *ctx) {

	u64 pid = bpf_get_current_pid_tgid()>>32;
        struct event * valp = bpf_map_lookup_elem(&kprobe_map, &pid);
	if (valp) {

	  valp->timestamp = bpf_ktime_get_ns()- (valp->timestamp);
    if(slowThreshold>0||valp->timestamp>slowThreshold){

          bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, valp, sizeof(struct event));
    }
          bpf_map_delete_elem(&kprobe_map, &pid);
	  return 0;
	}

	return 0;
}

SEC("uprobe/exec_simple_query")
int uprobe_exec_simple_query(struct pt_regs *ctx) {
  if(slowThreshold==0){
    int key =0;
    u32 * ret = (u32*)bpf_map_lookup_elem(&slow_map, &key);
    if(ret) slowThreshold = *ret*timebase;

  }
    struct event event;

    u64 pid = bpf_get_current_pid_tgid()>>32;
    u64 ts  = bpf_ktime_get_ns();
    event.pid = pid;
    event.timestamp = ts;
    char * sql_string = (char*)PT_REGS_PARM1(ctx);
    bpf_probe_read(&event.cmd,sizeof(event.cmd),sql_string);
    bpf_map_update_elem(&kprobe_map,&pid,&event,BPF_ANY);
    return 0;
}

这里因为ebp-go 没有从go 程序处修改ebpf 中全局变量的接口,这里是通过一个map array 来间接传递这个慢查询阈值,最大支持2s 的慢查询上限,要是觉得这个上限比较低,可以将数据类型改成 u64,这里理论上只会在初次触发时进行全局变量的赋值,后续使用全局变量做快速路径

往日已经不在,未来尚未开始
使用 Hugo 构建
主题 StackJimmy 设计