queue: implement sending messages to kafka
This commit is contained in:
@@ -1,5 +1,13 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/go-json-experiment/json"
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
)
|
||||
|
||||
// Message is the shape of messages handed from ingest to indexers.
|
||||
type Message struct {
|
||||
// ID is the message ID.
|
||||
@@ -11,3 +19,35 @@ type Message struct {
|
||||
// Text is the message content.
|
||||
Text string `json:"t"`
|
||||
}
|
||||
|
||||
// Producer handles sending Kafka records.
|
||||
type Producer interface {
|
||||
Produce(ctx context.Context, rec *kgo.Record, promise func(*kgo.Record, error))
|
||||
}
|
||||
|
||||
const topic = "kaiyan.chat"
|
||||
|
||||
var recordPool sync.Pool
|
||||
|
||||
func Send(ctx context.Context, cl Producer, msg Message, errs chan<- error) {
|
||||
b, err := json.Marshal(&msg)
|
||||
if err != nil {
|
||||
// Should never happen, and we don't have a good way to report it here.
|
||||
panic(err)
|
||||
}
|
||||
rec, _ := recordPool.Get().(*kgo.Record)
|
||||
if rec == nil {
|
||||
rec = &kgo.Record{Topic: topic}
|
||||
}
|
||||
rec.Key = append(rec.Key[:0], msg.ID...)
|
||||
rec.Value = append(rec.Value[:0], b...)
|
||||
cl.Produce(ctx, rec, func(r *kgo.Record, err error) {
|
||||
recordPool.Put(rec)
|
||||
if err != nil {
|
||||
select {
|
||||
case <-ctx.Done(): // do nothing
|
||||
case errs <- err: // do nothing
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
50
queue/message_test.go
Normal file
50
queue/message_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package queue_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/queue"
|
||||
"github.com/go-json-experiment/json"
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
)
|
||||
|
||||
type spyProducer struct {
|
||||
got []kgo.Record
|
||||
}
|
||||
|
||||
func (p *spyProducer) Produce(ctx context.Context, rec *kgo.Record, promise func(*kgo.Record, error)) {
|
||||
p.got = append(p.got, *rec)
|
||||
promise(rec, nil)
|
||||
}
|
||||
|
||||
func TestSend(t *testing.T) {
|
||||
msg := queue.Message{
|
||||
ID: "bocchi",
|
||||
Channel: "kessoku",
|
||||
Sender: queue.Sender(make([]byte, 16), "kessoku", "ryō"),
|
||||
Text: "bocchi the rock!",
|
||||
}
|
||||
var q spyProducer
|
||||
errs := make(chan error, 1)
|
||||
queue.Send(context.Background(), &q, msg, errs)
|
||||
select {
|
||||
case err := <-errs:
|
||||
t.Error(err)
|
||||
default: // do nothing
|
||||
}
|
||||
if len(q.got) != 1 {
|
||||
t.Fatalf("wrong number of records produced: %d", len(q.got))
|
||||
}
|
||||
rec := &q.got[0]
|
||||
if string(rec.Key) != msg.ID {
|
||||
t.Errorf("record has wrong key: want %q, got %q", msg.ID, rec.Key)
|
||||
}
|
||||
var got queue.Message
|
||||
if err := json.Unmarshal(rec.Value, &got); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if got != msg {
|
||||
t.Errorf("message did not round-trip:\nwant %+v\ngot %+v", msg, got)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user