package queue import ( "context" "errors" "fmt" "sync" "github.com/go-json-experiment/json" "github.com/twmb/franz-go/pkg/kgo" ) // Message is the shape of messages handed from ingest to indexers. type Message struct { // ID is the message ID. ID string `json:"id"` // Channel is an identifier for the chat channel where the message was sent. Channel string `json:"ch"` // Sender is an obfuscated identifier for the sending user. Sender sender `json:"u"` // Timestamp is the time at which the message was sent in nanoseconds since // the Unix epoch. Timestamp int64 `json:"ts"` // Text is the message content. Text string `json:"t"` } // Producer handles sending Kafka records. type Producer interface { Produce(ctx context.Context, rec *kgo.Record, promise func(*kgo.Record, error)) } const Topic = "kaiyan.chat" var recordPool sync.Pool func Send(ctx context.Context, cl Producer, msg Message, errs chan<- error) { b, err := json.Marshal(&msg) if err != nil { // Should never happen, and we don't have a good way to report it here. panic(err) } rec, _ := recordPool.Get().(*kgo.Record) if rec == nil { rec = &kgo.Record{Topic: Topic} } rec.Key = append(rec.Key[:0], msg.ID...) rec.Value = append(rec.Value[:0], b...) cl.Produce(ctx, rec, func(r *kgo.Record, err error) { recordPool.Put(rec) if err != nil { select { case <-ctx.Done(): // do nothing case errs <- err: // do nothing } } }) } type Consumer interface { PollFetches(ctx context.Context) kgo.Fetches } func Recv(ctx context.Context, cl Consumer, onto []Message) ([]Message, error) { f := cl.PollFetches(ctx) var errs error f.EachError(func(s string, i int32, err error) { errs = errors.Join(errs, fmt.Errorf("partition %d: %w", i, err)) }) f.EachRecord(func(r *kgo.Record) { var msg Message if err := json.Unmarshal(r.Value, &msg); err != nil { errs = errors.Join(errs, fmt.Errorf("partition %d: %w", r.Partition, err)) } onto = append(onto, msg) }) return onto, errs }