From 7cc448296586c196b1a18da7f878eac93af93712 Mon Sep 17 00:00:00 2001 From: Branden J Brown Date: Sun, 20 Apr 2025 13:55:36 -0400 Subject: [PATCH] queue: include send time in messages --- cmd/kaiyan-ingest/main.go | 18 ++++++++++++++---- queue/message.go | 3 +++ queue/message_test.go | 9 +++++---- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/cmd/kaiyan-ingest/main.go b/cmd/kaiyan-ingest/main.go index bc68bb4..e8ec214 100644 --- a/cmd/kaiyan-ingest/main.go +++ b/cmd/kaiyan-ingest/main.go @@ -125,11 +125,21 @@ func webhook(q *kgo.Client, secret []byte, errs chan<- error) http.HandlerFunc { slog.String("broadcaster", ev.Event.Broadcaster), slog.String("id", ev.Event.ID), ) + // Use the external clock for timestamps. + tm, err := time.Parse(time.RFC3339Nano, r.Header.Get("Twitch-Eventsub-Message-Timestamp")) + if err != nil { + log.LogAttrs(ctx, slog.LevelWarn, "bad timestamp", + slog.String("timestamp", r.Header.Get("Twitch-Eventsub-Message-Timestamp")), + slog.Any("err", err), + ) + tm = time.Now() + } msg := queue.Message{ - ID: ev.Event.ID, - Channel: ev.Event.Broadcaster, - Sender: queue.Sender(secret, ev.Event.Broadcaster, ev.Event.Chatter), - Text: ev.Event.Message.Text, + ID: ev.Event.ID, + Channel: ev.Event.Broadcaster, + Sender: queue.Sender(secret, ev.Event.Broadcaster, ev.Event.Chatter), + Timestamp: tm.UnixNano(), + Text: ev.Event.Message.Text, } // TODO(branden): the context in the http request cancels once this // handler returns, which means the producer doesn't relay it. diff --git a/queue/message.go b/queue/message.go index 298cd00..be762dc 100644 --- a/queue/message.go +++ b/queue/message.go @@ -18,6 +18,9 @@ type Message struct { Channel string `json:"ch"` // Sender is an obfuscated identifier for the sending user. Sender sender `json:"u"` + // Timestamp is the time at which the message was sent in nanoseconds since + // the Unix epoch. + Timestamp int64 `json:"ts"` // Text is the message content. Text string `json:"t"` } diff --git a/queue/message_test.go b/queue/message_test.go index 4cc9ae0..632ed05 100644 --- a/queue/message_test.go +++ b/queue/message_test.go @@ -21,10 +21,11 @@ func (p *spyProducer) Produce(ctx context.Context, rec *kgo.Record, promise func func TestSend(t *testing.T) { msg := queue.Message{ - ID: "bocchi", - Channel: "kessoku", - Sender: queue.Sender(make([]byte, 16), "kessoku", "ryō"), - Text: "bocchi the rock!", + ID: "bocchi", + Channel: "kessoku", + Sender: queue.Sender(make([]byte, 16), "kessoku", "ryō"), + Timestamp: 1, + Text: "bocchi the rock!", } var q spyProducer errs := make(chan error, 1)