diff --git a/go.mod b/go.mod index 968ed7a..8c3d1ca 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,11 @@ go 1.24.1 require ( github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874 github.com/google/go-cmp v0.7.0 + github.com/twmb/franz-go v1.18.1 +) + +require ( + github.com/klauspost/compress v1.17.11 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect ) diff --git a/go.sum b/go.sum index 81a0c2f..6affcd4 100644 --- a/go.sum +++ b/go.sum @@ -2,3 +2,13 @@ github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874 h1:F8d1AJ6 github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874/go.mod h1:TiCD2a1pcmjd7YnhGH0f/zKNcCD06B029pHhzV23c2M= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc= +github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M= +github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= +github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= diff --git a/queue/message.go b/queue/message.go index 39f43e6..fea16f5 100644 --- a/queue/message.go +++ b/queue/message.go @@ -1,5 +1,13 @@ package queue +import ( + "context" + "sync" + + "github.com/go-json-experiment/json" + "github.com/twmb/franz-go/pkg/kgo" +) + // Message is the shape of messages handed from ingest to indexers. type Message struct { // ID is the message ID. @@ -11,3 +19,35 @@ type Message struct { // Text is the message content. Text string `json:"t"` } + +// Producer handles sending Kafka records. +type Producer interface { + Produce(ctx context.Context, rec *kgo.Record, promise func(*kgo.Record, error)) +} + +const topic = "kaiyan.chat" + +var recordPool sync.Pool + +func Send(ctx context.Context, cl Producer, msg Message, errs chan<- error) { + b, err := json.Marshal(&msg) + if err != nil { + // Should never happen, and we don't have a good way to report it here. + panic(err) + } + rec, _ := recordPool.Get().(*kgo.Record) + if rec == nil { + rec = &kgo.Record{Topic: topic} + } + rec.Key = append(rec.Key[:0], msg.ID...) + rec.Value = append(rec.Value[:0], b...) + cl.Produce(ctx, rec, func(r *kgo.Record, err error) { + recordPool.Put(rec) + if err != nil { + select { + case <-ctx.Done(): // do nothing + case errs <- err: // do nothing + } + } + }) +} diff --git a/queue/message_test.go b/queue/message_test.go new file mode 100644 index 0000000..0fe233f --- /dev/null +++ b/queue/message_test.go @@ -0,0 +1,50 @@ +package queue_test + +import ( + "context" + "testing" + + "git.sunturtle.xyz/zephyr/kaiyan/queue" + "github.com/go-json-experiment/json" + "github.com/twmb/franz-go/pkg/kgo" +) + +type spyProducer struct { + got []kgo.Record +} + +func (p *spyProducer) Produce(ctx context.Context, rec *kgo.Record, promise func(*kgo.Record, error)) { + p.got = append(p.got, *rec) + promise(rec, nil) +} + +func TestSend(t *testing.T) { + msg := queue.Message{ + ID: "bocchi", + Channel: "kessoku", + Sender: queue.Sender(make([]byte, 16), "kessoku", "ryō"), + Text: "bocchi the rock!", + } + var q spyProducer + errs := make(chan error, 1) + queue.Send(context.Background(), &q, msg, errs) + select { + case err := <-errs: + t.Error(err) + default: // do nothing + } + if len(q.got) != 1 { + t.Fatalf("wrong number of records produced: %d", len(q.got)) + } + rec := &q.got[0] + if string(rec.Key) != msg.ID { + t.Errorf("record has wrong key: want %q, got %q", msg.ID, rec.Key) + } + var got queue.Message + if err := json.Unmarshal(rec.Value, &got); err != nil { + t.Error(err) + } + if got != msg { + t.Errorf("message did not round-trip:\nwant %+v\ngot %+v", msg, got) + } +}