Golang使用Kafka-自动刷新主题

在公司业务中有场景需要实时订阅Topic,也就是当有新的Topic出现时,需要自动发现、监听、消费

诸多比较之后选择了用户群体最多的 sarama,但是遇到了一个问题,这个包并没有实现像Java的一样的正则匹配策略,不要说正则,连实时刷新机制都没有,所以需要我们自己来实现 Java 客户端 subscribe(Pattern)的通配符模式,废话不多说,直接上代码:


package main
import (
 "context"
 "errors"
 "log"
 "os"
 "os/signal"
 "reflect"
 "sort"
 "strings"
 "sync"
 "syscall"
 "time"
 "github.com/IBM/sarama"
)
// Sarama configuration options
var (
 brokers = "127.0.0.1:9092"
 version = "2.8.1" // Note: kafka broker version (not Sarama version)
 group = "kfk_group_id"
 assignor = "sticky"
)
func main() {
 keepRunning := true
 log.Println("Starting a new Sarama consumer")
 sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
 version, err := sarama.ParseKafkaVersion(version)
 if err != nil {
 log.Panicf("Error parsing Kafka version: %v", err)
 }
 /**
 * Construct a new Sarama configuration.
 * The Kafka cluster version has to be defined before the consumer/producer is initialized.
 */
 config := sarama.NewConfig()
 config.Version = version
 config.Consumer.Offsets.Initial = sarama.OffsetOldest
 switch assignor {
 case "sticky":
 config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
 case "roundrobin":
 config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
 case "range":
 config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
 default:
 log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
 }
 /**
 * Set up a new Sarama consumer group
 */
 consumer := Consumer{
 ready: make(chan bool),
 }
 ctx, cancel := context.WithCancel(context.Background())
 newClient, err := sarama.NewClient(strings.Split(brokers, ","), config)
 consumerGroup, err := sarama.NewConsumerGroupFromClient(group, newClient)
 if err != nil {
 log.Fatalf("Error creating consumer group client: %v", err)
 }
 wg := &sync.WaitGroup{}
 wg.Add(1)
 // Get all the Topic
 topics, err := newClient.Topics()
 topics = filterTopics(topics)
 go func() {
 defer wg.Done()
 for {
 // `Consume` should be called inside an infinite loop, when a
 // server-side rebalance happens, the consumer session will need to be
 // recreated to get the new claims
 if err := consumerGroup.Consume(ctx, topics, &consumer); err != nil {
 if errors.Is(err, sarama.ErrClosedConsumerGroup) {
 return
 }
 log.Panicf("Error from consumer: %v", err)
 }
 // check if context was cancelled, signaling that the consumer should stop
 if ctx.Err() != nil {
 log.Printf("Context err from consumer: %v", ctx.Err())
 return
 }
 consumer.ready = make(chan bool)
 }
 }()
 <-consumer.ready // Await till the consumer has been set up
 log.Println("Sarama consumer up and running!...")
 go refreshTopics(newClient, consumerGroup, topics)
 sigusr1 := make(chan os.Signal, 1)
 signal.Notify(sigusr1, syscall.SIGUSR1)
 sigterm := make(chan os.Signal, 1)
 signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
 for keepRunning {
 select {
 case <-ctx.Done():
 log.Println("terminating: context cancelled")
 keepRunning = false
 case <-sigterm:
 log.Println("terminating: via signal")
 keepRunning = false
 case <-sigusr1:
 }
 }
 cancel()
 wg.Wait()
 if err = consumerGroup.Close(); err != nil {
 log.Panicf("Error closing client: %v", err)
 }
}
func EqualSlices(s1, s2 []string) bool {
 if len(s1) != len(s2) {
 return false
 }
 m1 := make(map[string]struct{})
 m2 := make(map[string]struct{})
 for _, v := range s1 {
 m1[v] = struct{}{}
 }
 for _, v := range s2 {
 m2[v] = struct{}{}
 }
 return reflect.DeepEqual(m1, m2)
}
func filterTopics(topics []string) []string {
 filteredTopics := make([]string, 0)
 for _, topic := range topics {
 if topic != "__consumer_offsets" {
 filteredTopics = append(filteredTopics, topic)
 }
 }
 return filteredTopics
}
func refreshTopics(client sarama.Client, prevConsumerGroup sarama.ConsumerGroup, topicsOld []string) {
 ticker := time.NewTicker(5 * time.Second)
 for {
 <-ticker.C
 if err := client.RefreshMetadata(); err != nil {
 log.Printf("Error refreshing metadata: %v", err)
 continue
 }
 topics, err := client.Topics()
 if err != nil {
 log.Printf("Error refreshing topics: %v", err)
 continue
 }
 filteredTopics := filterTopics(topics) // filter "__consumer_offsets"
 sort.Strings(filteredTopics)
 log.Printf("All Topics: %v", filteredTopics)
 if !EqualSlices(filteredTopics, topicsOld) {
 topicsOld = filteredTopics
 if prevConsumerGroup != nil {
 err := prevConsumerGroup.Close()
 if err != nil {
 log.Printf("Error closing prev consumer group: %v", err)
 }
 }
 newConsumer := Consumer{
 ready: make(chan bool),
 }
 newConsumerGroup, err := sarama.NewConsumerGroupFromClient(group, client)
 if err != nil {
 log.Printf("Error creating new consumer group: %v", err)
 return
 }
 defer func(newConsumerGroup sarama.ConsumerGroup) {
 err := newConsumerGroup.Close()
 if err != nil {
 log.Printf("Error closing new consumer group: %v", err)
 }
 }(newConsumerGroup)
 go func() {
 ctx, cancel := context.WithCancel(context.Background())
 defer cancel()
 wg := &sync.WaitGroup{}
 wg.Add(1)
 // start Consume
 go func() {
 defer wg.Done()
 if err := newConsumerGroup.Consume(ctx, filteredTopics, &newConsumer); err != nil {
 log.Printf("Error from consumer: %v", err)
 }
 }()
 wg.Wait()
 }()
 }
 }
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
 ready chan bool
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
 // Mark the consumer as ready
 close(consumer.ready)
 return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
 return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
 // NOTE:
 // Do not move the code below to a goroutine.
 // The `ConsumeClaim` itself is called within a goroutine, see:
 // https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
 for {
 select {
 case message, ok := <-claim.Messages():
 if !ok {
 log.Printf("message channel was closed")
 return nil
 }
 log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
 session.MarkMessage(message, "")
 // Should return when `session.Context()` is done.
 // If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
 // https://github.com/IBM/sarama/issues/1192
 case <-session.Context().Done():
 return nil
 }
 }
}

代码很简单,相信大家都能看懂我是在做什么了,其实就是增加了一个 refreshTopics 来刷新Topic,当检测到新的Topic的时候关掉之前的消费者组、创建新的组、订阅新的主题

注意:此处有一个大坑!当新的Topic加入的时候,refreshTopics 监测到新的消息进行消费,这里如果不进行配置,则只会消费订阅之后的消息,你不要忘记啊,能触发订阅新的主题的条件是有了一条新的消息,那这条触发订阅的消息去哪了呢?首先肯定不会被原消费者组消费,因为他们还没订阅,其次,新启动的消费这组虽然订阅了新的Topic,但是因为你没有配置,它默认没有消费记录的新的Topic会从消息的最后的位置进行消费(有消费记录的Topic从未消费的位置开始消费),所以,根本就不会消费到那条消息!那怎么办呢?你只需要在启动的时候增加一个配置即可:

config.Consumer.Offsets.Initial = sarama.OffsetOldest

问题解决,撒花···

作者:少年吖少年原文地址:https://segmentfault.com/a/1190000044174430

%s 个评论

要回复文章请先登录注册