1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
package main
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/IBM/sarama"
)
var version = sarama.DefaultVersion
var eps = []string{"127.0.0.1:9094"}
var wg sync.WaitGroup
func Producer(topic string, ctx context.Context) {
defer wg.Done()
config := sarama.NewConfig()
config.Version = version
config.Producer.RequiredAcks = sarama.NoResponse
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Return.Successes = true
producer, err := sarama.NewAsyncProducer(eps, config)
if err != nil {
panic(err)
}
defer producer.Close()
// 注意这里一定要让这些等结果的协程先行退出
iwg := &sync.WaitGroup{}
iwg.Add(1)
go func(ctx context.Context, p sarama.AsyncProducer) {
iwg.Done()
for {
select {
case <-ctx.Done():
return
case err := <-p.Errors():
fmt.Println(err)
case <-p.Successes():
}
}
}(ctx, producer)
cnt := 1
for {
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(fmt.Sprintf("hello %d", cnt)),
}
select {
case <-ctx.Done():
// 等待子协程退出
iwg.Wait()
return
case producer.Input() <- msg:
cnt++
}
time.Sleep(1 * time.Second)
}
}
type TestKafkaGroup struct {
ctx context.Context
}
func (t *TestKafkaGroup) Setup(session sarama.ConsumerGroupSession) error {
fmt.Println("setup")
return nil
}
func (t *TestKafkaGroup) Cleanup(session sarama.ConsumerGroupSession) error {
fmt.Println("cleanup")
return nil
}
func (t *TestKafkaGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message, ok := <-claim.Messages():
if !ok {
fmt.Printf("message channel was closed")
return nil
}
fmt.Printf("Message claimed: value = %s, partid %d timestamp = %v\n", string(message.Value), message.Partition, message.Timestamp)
session.MarkMessage(message, "")
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
return nil
case <-t.ctx.Done():
return nil
}
}
}
func Consumer(topic string, ctx context.Context) {
defer wg.Done()
config := sarama.NewConfig()
config.Version = version
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumerGroup(eps, topic, config)
if err != nil {
fmt.Println(err)
return
}
defer consumer.Close()
tc := &TestKafkaGroup{ctx: ctx}
for {
if err := consumer.Consume(ctx, []string{topic}, tc); err != nil {
fmt.Println(err)
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
}
if ctx.Err() != nil {
return
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt,syscall.SIGINT, syscall.SIGTERM)
wg.Add(1)
go Producer("test", ctx)
go Consumer("test", ctx)
<-quit
cancel()
wg.Wait()
}
|