gRPC 使用手册

gRPC 使用手册


环境准备

grpc 是使用protobuf 协议的 需要安装对应的编译器

1
2
 go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
 go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2

定义好 .proto 文件之后可以使用 protoc 编译器来生成对应语言的代码

1
2
protoc --go_out=./proto/ --go_opt=paths=source_relative 
    --go-grpc_out=./proto/ --go-grpc_opt=paths=source_relative  ./proto/your.proto

基础的流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 单次调用
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	resp, err := client.UnaryEcho(ctx, &ecpb.EchoRequest{Message: message})
	if err != nil {
		log.Fatalf("client.UnaryEcho(_) = _, %v: ", err)
	}

// 流接收
func recvMessage(stream pb.Echo_BidirectionalStreamingEchoClient, wantErrCode codes.Code) {
	res, err := stream.Recv()
	if status.Code(err) != wantErrCode {
		log.Fatalf("stream.Recv() = %v, %v; want _, status.Code(err)=%v", res, err, wantErrCode)
	}
	if err != nil {
		fmt.Printf("stream.Recv() returned expected error %v\n", err)
		return
	}
	fmt.Printf("received message %q\n", res.GetMessage())
}

// 在接受流的时候要验证 err 是不是EOF
for {
		in, err := stream.Recv()
		if err != nil {
			fmt.Printf("server: error receiving from stream: %v\n", err)
			if err == io.EOF {
				return nil
			}
			return err
		}
		fmt.Printf("echoing message %q\n", in.Message)
		stream.Send(&pb.EchoResponse{Message: in.Message})
	}

OAuth token 验证

因为有两种rpc 调用 一种是 单次调用 一种是流式调用; 在客户端 client 建立连接时使用的opts 中使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// fetchToken 表示获取token 的动作,使用 tokensource 获取带时效时间的 token 
	perRPC := oauth.TokenSource{TokenSource: oauth2.StaticTokenSource(fetchToken())}
	creds, err := credentials.NewClientTLSFromFile(data.Path("x509/ca_cert.pem"), "x.test.example.com")
	if err != nil {
		log.Fatalf("failed to load credentials: %v", err)
	}
	opts := []grpc.DialOption{
		// In addition to the following grpc.DialOption, callers may also use
		// the grpc.CallOption grpc.PerRPCCredentials with the RPC invocation
		// itself.
		// See: https://godoc.org/google.golang.org/grpc#PerRPCCredentials
		grpc.WithPerRPCCredentials(perRPC),
		// oauth.TokenSource requires the configuration of transport
		// credentials.
		grpc.WithTransportCredentials(creds),
	}

在服务端则是要通过拦截器来分别处理两种 rpc 调用的验证

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 流式的 验证 
func ensureValidToken(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return nil, errMissingMetadata
	}
	// 下面这个是将客户端传的token 和服务器端的校验逻辑来比较
	if !valid(md["authorization"]) {
		return nil, errInvalidToken
	}
	// Continue execution of handler after ensuring a valid token.
	return handler(ctx, req)
}
	cert, err := tls.LoadX509KeyPair(data.Path("x509/server_cert.pem"), data.Path("x509/server_key.pem"))
	if err != nil {
		log.Fatalf("failed to load key pair: %s", err)
	}
	opts := []grpc.ServerOption{
		grpc.UnaryInterceptor(ensureValidToken),
		// Enable TLS for all incoming connections.
		grpc.Creds(credentials.NewServerTLSFromCert(&cert)),
	}

这里也是可以用 go-grpc-middleware 提供的auth 中间件来实现验证函数的包装

取消调用

取消调用里面要在用grpc 调用时传入上下文作为第一个参数来控制rpc 调用过程;

1
2
3
4
5
6
7
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	stream, err := c.BidirectionalStreamingEcho(ctx)
	if err != nil {
		log.Fatalf("error creating stream: %v", err)
	}
    cancel()
    // 此时已经取消任务了,

压缩请求

1
2
3
// 旧版本在 NewClient() 的时候传一个 grpc.WithCompressor(grpc.NewGZIPCompressor())
// 新版本需要在调用的时候传入
grpc.UseCompressor(gzip.Name)

grpc限流

用go-grpc-middleware实现一个接口来在grpc 中间件里做限流,限流中间件必须排在后面,避免令牌被浪费了,使用原生的方式可以基于服务来做特定任务的限流; 在示例中用定时器触发模拟限流机制产生,当服务端调用阻塞的时候,退出后续的批量任务,

请求失败重试策略配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
	
	var retryPolicy = `{
		"methodConfig": [{ 
		  "name": [{"service": "grpc.examples.echo.Echo"}], //应用的服务
		  "waitForReady": true,	// 是否等待
		  "retryPolicy": {
			  "MaxAttempts": 4,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": ".01s",
			  "BackoffMultiplier": 1.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE" ]
		  }
		}]}`

// use grpc.WithDefaultServiceConfig() to set service config
func retryDial() (*grpc.ClientConn, error) {
	return grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy))
}

等待对端恢复

1
2
// 在需要等待对端恢复服务的时候可以加入这个option
grpc.WaitForReady(true)

携带元数据

这个元数据有点像 http 里面的 header 的作用,携带一些用于配置的的内容 客户端这边需要用

1
2
3
4
5
6
 metadata.Pairs("timestamp", time.Now().Format(timestampFormat)) // 来添加组装键值对,两个字符串作为一组,转换成一个KV对,键值对的键可以有重复的
 ctx := metadata.NewOutgoingContext(context.Background(), md)
 // 然后 封装成一个上下文通过 grpc 调用传过去
 var header, trailer metadata.MD
 r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message}, grpc.Header(&header), grpc.Trailer(&trailer))
 // 这个 header 和 trailer 是

服务端对这个元数据做交互

1
2
3
4
5
6
7
md, ok := metadata.FromIncomingContext(ctx)
	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
	grpc.SendHeader(ctx, header)
	// 执行grpc 服务
	// 下面逻辑要在defer 函数里面执行
	trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
	grpc.SetTrailer(ctx, trailer)

感觉可以用来做rpc 调用的时延监控,或者调用前后状态的跟踪点

grpc 长连接保活

1
2
3
4
5
6
7
var kacp = keepalive.ClientParameters{
	Time:                10 * time.Second, // send pings every 10 seconds if there is no activity
	Timeout:             time.Second,      // wait 1 second for ping ack before considering the connection dead
	PermitWithoutStream: true,             // send pings even without active streams
}
// 新建客户端时带上这个 grpc.DialOption
conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(kacp))

负载平衡

默认的连接构建策略是 使用首个配置构建两件,如果需要使用负载平衡机制

1
2
3
4
5
6
// 使用轮转策略
roundrobinConn, err := grpc.NewClient(
		fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName),
		grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), // This sets the initial balancing policy.
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
往日已经不在,未来尚未开始
使用 Hugo 构建
主题 StackJimmy 设计