kaiyan/queue/message.go
2025-04-20 13:55:36 -04:00

79 lines
2.0 KiB
Go

package queue
import (
"context"
"errors"
"fmt"
"sync"
"github.com/go-json-experiment/json"
"github.com/twmb/franz-go/pkg/kgo"
)
// Message is the shape of messages handed from ingest to indexers.
type Message struct {
// ID is the message ID.
ID string `json:"id"`
// Channel is an identifier for the chat channel where the message was sent.
Channel string `json:"ch"`
// Sender is an obfuscated identifier for the sending user.
Sender sender `json:"u"`
// Timestamp is the time at which the message was sent in nanoseconds since
// the Unix epoch.
Timestamp int64 `json:"ts"`
// Text is the message content.
Text string `json:"t"`
}
// Producer handles sending Kafka records.
type Producer interface {
Produce(ctx context.Context, rec *kgo.Record, promise func(*kgo.Record, error))
}
const Topic = "kaiyan.chat"
var recordPool sync.Pool
func Send(ctx context.Context, cl Producer, msg Message, errs chan<- error) {
b, err := json.Marshal(&msg)
if err != nil {
// Should never happen, and we don't have a good way to report it here.
panic(err)
}
rec, _ := recordPool.Get().(*kgo.Record)
if rec == nil {
rec = &kgo.Record{Topic: Topic}
}
rec.Key = append(rec.Key[:0], msg.ID...)
rec.Value = append(rec.Value[:0], b...)
cl.Produce(ctx, rec, func(r *kgo.Record, err error) {
recordPool.Put(rec)
if err != nil {
select {
case <-ctx.Done(): // do nothing
case errs <- err: // do nothing
}
}
})
}
type Consumer interface {
PollFetches(ctx context.Context) kgo.Fetches
}
func Recv(ctx context.Context, cl Consumer, onto []Message) ([]Message, error) {
f := cl.PollFetches(ctx)
var errs error
f.EachError(func(s string, i int32, err error) {
errs = errors.Join(errs, fmt.Errorf("partition %d: %w", i, err))
})
f.EachRecord(func(r *kgo.Record) {
var msg Message
if err := json.Unmarshal(r.Value, &msg); err != nil {
errs = errors.Join(errs, fmt.Errorf("partition %d: %w", r.Partition, err))
}
onto = append(onto, msg)
})
return onto, errs
}