54 lines
1.3 KiB
Go
54 lines
1.3 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/go-json-experiment/json"
|
|
"github.com/twmb/franz-go/pkg/kgo"
|
|
)
|
|
|
|
// Message is the shape of messages handed from ingest to indexers.
|
|
type Message struct {
|
|
// ID is the message ID.
|
|
ID string `json:"id"`
|
|
// Channel is an identifier for the chat channel where the message was sent.
|
|
Channel string `json:"ch"`
|
|
// Sender is an obfuscated identifier for the sending user.
|
|
Sender sender `json:"u"`
|
|
// Text is the message content.
|
|
Text string `json:"t"`
|
|
}
|
|
|
|
// Producer handles sending Kafka records.
|
|
type Producer interface {
|
|
Produce(ctx context.Context, rec *kgo.Record, promise func(*kgo.Record, error))
|
|
}
|
|
|
|
const topic = "kaiyan.chat"
|
|
|
|
var recordPool sync.Pool
|
|
|
|
func Send(ctx context.Context, cl Producer, msg Message, errs chan<- error) {
|
|
b, err := json.Marshal(&msg)
|
|
if err != nil {
|
|
// Should never happen, and we don't have a good way to report it here.
|
|
panic(err)
|
|
}
|
|
rec, _ := recordPool.Get().(*kgo.Record)
|
|
if rec == nil {
|
|
rec = &kgo.Record{Topic: topic}
|
|
}
|
|
rec.Key = append(rec.Key[:0], msg.ID...)
|
|
rec.Value = append(rec.Value[:0], b...)
|
|
cl.Produce(ctx, rec, func(r *kgo.Record, err error) {
|
|
recordPool.Put(rec)
|
|
if err != nil {
|
|
select {
|
|
case <-ctx.Done(): // do nothing
|
|
case errs <- err: // do nothing
|
|
}
|
|
}
|
|
})
|
|
}
|