Compare commits
No commits in common. "7def034fa2c3ef7dffb08dddd9ee47dea9e09c40" and "2d8807685c8fbb7631f21fa0dfb13266828d7115" have entirely different histories.
7def034fa2
...
2d8807685c
@ -1,160 +0,0 @@
|
|||||||
// kaiyan-ingest receives chat messages as webhooks and emplaces them to Kafka.
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"log/slog"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/go-json-experiment/json"
|
|
||||||
"github.com/twmb/franz-go/pkg/kgo"
|
|
||||||
|
|
||||||
"git.sunturtle.xyz/zephyr/kaiyan/ingest"
|
|
||||||
"git.sunturtle.xyz/zephyr/kaiyan/queue"
|
|
||||||
"git.sunturtle.xyz/zephyr/kaiyan/twitch"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
var (
|
|
||||||
listen string
|
|
||||||
secretFile string
|
|
||||||
)
|
|
||||||
flag.StringVar(&listen, "listen", ":80", "address to bind for webhooks")
|
|
||||||
flag.StringVar(&secretFile, "secret", "", "path to file containing the system secret")
|
|
||||||
flag.Parse()
|
|
||||||
if flag.NArg() == 0 {
|
|
||||||
fail("provide broker addresses as args")
|
|
||||||
}
|
|
||||||
if secretFile == "" {
|
|
||||||
fail("-secret is required")
|
|
||||||
}
|
|
||||||
|
|
||||||
secret, err := os.ReadFile(secretFile)
|
|
||||||
if err != nil {
|
|
||||||
fail(err.Error())
|
|
||||||
}
|
|
||||||
// TODO(branden): derive secrets for twitch webhook and users?
|
|
||||||
q, err := kgo.NewClient(
|
|
||||||
kgo.SeedBrokers(flag.Args()...),
|
|
||||||
kgo.ProducerBatchCompression(kgo.ZstdCompression()),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
fail(err.Error())
|
|
||||||
}
|
|
||||||
errs := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
for err := range errs {
|
|
||||||
slog.Error("enqueue", slog.Any("err", err))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
|
||||||
mux := http.NewServeMux()
|
|
||||||
mux.Handle("POST /twitch", webhook(q, secret, errs))
|
|
||||||
// TODO(branden): GET /metrics
|
|
||||||
l, err := net.Listen("tcp", listen)
|
|
||||||
if err != nil {
|
|
||||||
fail(err.Error())
|
|
||||||
}
|
|
||||||
srv := http.Server{
|
|
||||||
Addr: listen,
|
|
||||||
Handler: mux,
|
|
||||||
ReadTimeout: 3 * time.Second,
|
|
||||||
WriteTimeout: 3 * time.Second,
|
|
||||||
IdleTimeout: 1 * time.Minute,
|
|
||||||
BaseContext: func(l net.Listener) context.Context { return ctx },
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
slog.InfoContext(ctx, "HTTP API", slog.Any("addr", l.Addr()))
|
|
||||||
// TODO(branden): tls? or do we just use caddy/nginx after all?
|
|
||||||
err := srv.Serve(l)
|
|
||||||
if err == http.ErrServerClosed {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
slog.ErrorContext(ctx, "HTTP API closed", slog.Any("err", err))
|
|
||||||
}()
|
|
||||||
// TODO(branden): tell eventsub we're listening
|
|
||||||
<-ctx.Done()
|
|
||||||
stop()
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
if err := srv.Shutdown(ctx); err != nil {
|
|
||||||
slog.ErrorContext(ctx, "HTTP API shutdown", slog.Any("err", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var byteptrPool sync.Pool
|
|
||||||
|
|
||||||
func webhook(q *kgo.Client, secret []byte, errs chan<- error) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
ctx := r.Context()
|
|
||||||
log := slog.With(slog.Group("request",
|
|
||||||
slog.String("host", r.Host),
|
|
||||||
slog.String("remote", r.RemoteAddr),
|
|
||||||
slog.String("id", r.Header.Get("Twitch-Eventsub-Message-Id")),
|
|
||||||
))
|
|
||||||
p, _ := byteptrPool.Get().(*[]byte)
|
|
||||||
if p == nil {
|
|
||||||
p = new([]byte)
|
|
||||||
}
|
|
||||||
b, err := twitch.Receive(*p, secret, r)
|
|
||||||
defer func() {
|
|
||||||
*p = b[:0]
|
|
||||||
byteptrPool.Put(p)
|
|
||||||
}()
|
|
||||||
if err != nil {
|
|
||||||
log.ErrorContext(ctx, "webhook receive", slog.Any("err", err))
|
|
||||||
http.Error(w, "bad request", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var ev twitch.EventSub[ingest.Twitch]
|
|
||||||
if err := json.Unmarshal(b, &ev); err != nil {
|
|
||||||
log.ErrorContext(ctx, "webhook decode", slog.Any("err", err))
|
|
||||||
http.Error(w, "bad request", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
switch evtype := r.Header.Get("Twitch-Eventsub-Message-Type"); evtype {
|
|
||||||
case twitch.Notification:
|
|
||||||
log.LogAttrs(ctx, slog.LevelInfo, "webhook receive",
|
|
||||||
slog.String("broadcaster", ev.Event.Broadcaster),
|
|
||||||
slog.String("id", ev.Event.ID),
|
|
||||||
)
|
|
||||||
msg := queue.Message{
|
|
||||||
ID: ev.Event.ID,
|
|
||||||
Channel: ev.Event.Broadcaster,
|
|
||||||
Sender: queue.Sender(secret, ev.Event.Broadcaster, ev.Event.Chatter),
|
|
||||||
Text: ev.Event.Message.Text,
|
|
||||||
}
|
|
||||||
queue.Send(ctx, q, msg, errs)
|
|
||||||
w.WriteHeader(http.StatusNoContent)
|
|
||||||
log.LogAttrs(ctx, slog.LevelInfo, "received",
|
|
||||||
slog.String("broadcaster", ev.Event.Broadcaster),
|
|
||||||
slog.String("id", ev.Event.ID),
|
|
||||||
)
|
|
||||||
|
|
||||||
case twitch.Revocation:
|
|
||||||
log.LogAttrs(ctx, slog.LevelInfo, "webhook revoke")
|
|
||||||
// TODO(branden): this
|
|
||||||
w.WriteHeader(http.StatusNoContent)
|
|
||||||
|
|
||||||
case twitch.Verification:
|
|
||||||
log.LogAttrs(ctx, slog.LevelInfo, "verification challenge")
|
|
||||||
twitch.HandleChallenge(w, b)
|
|
||||||
|
|
||||||
default:
|
|
||||||
log.ErrorContext(ctx, "invalid EventSub message type", slog.String("type", evtype))
|
|
||||||
w.WriteHeader(http.StatusBadRequest)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func fail(format string, args ...any) {
|
|
||||||
fmt.Fprintf(os.Stderr, format+"\n", args...)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
7
go.mod
7
go.mod
@ -5,11 +5,4 @@ go 1.24.1
|
|||||||
require (
|
require (
|
||||||
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874
|
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874
|
||||||
github.com/google/go-cmp v0.7.0
|
github.com/google/go-cmp v0.7.0
|
||||||
github.com/twmb/franz-go v1.18.1
|
|
||||||
)
|
|
||||||
|
|
||||||
require (
|
|
||||||
github.com/klauspost/compress v1.17.11 // indirect
|
|
||||||
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
|
|
||||||
)
|
)
|
||||||
|
10
go.sum
10
go.sum
@ -2,13 +2,3 @@ github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874 h1:F8d1AJ6
|
|||||||
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874/go.mod h1:TiCD2a1pcmjd7YnhGH0f/zKNcCD06B029pHhzV23c2M=
|
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874/go.mod h1:TiCD2a1pcmjd7YnhGH0f/zKNcCD06B029pHhzV23c2M=
|
||||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||||
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
|
||||||
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
|
||||||
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
|
|
||||||
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
|
||||||
github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc=
|
|
||||||
github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M=
|
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
|
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
|
|
||||||
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
|
|
||||||
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
|
|
||||||
|
@ -1,13 +1,5 @@
|
|||||||
package queue
|
package queue
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/go-json-experiment/json"
|
|
||||||
"github.com/twmb/franz-go/pkg/kgo"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Message is the shape of messages handed from ingest to indexers.
|
// Message is the shape of messages handed from ingest to indexers.
|
||||||
type Message struct {
|
type Message struct {
|
||||||
// ID is the message ID.
|
// ID is the message ID.
|
||||||
@ -19,35 +11,3 @@ type Message struct {
|
|||||||
// Text is the message content.
|
// Text is the message content.
|
||||||
Text string `json:"t"`
|
Text string `json:"t"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Producer handles sending Kafka records.
|
|
||||||
type Producer interface {
|
|
||||||
Produce(ctx context.Context, rec *kgo.Record, promise func(*kgo.Record, error))
|
|
||||||
}
|
|
||||||
|
|
||||||
const topic = "kaiyan.chat"
|
|
||||||
|
|
||||||
var recordPool sync.Pool
|
|
||||||
|
|
||||||
func Send(ctx context.Context, cl Producer, msg Message, errs chan<- error) {
|
|
||||||
b, err := json.Marshal(&msg)
|
|
||||||
if err != nil {
|
|
||||||
// Should never happen, and we don't have a good way to report it here.
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
rec, _ := recordPool.Get().(*kgo.Record)
|
|
||||||
if rec == nil {
|
|
||||||
rec = &kgo.Record{Topic: topic}
|
|
||||||
}
|
|
||||||
rec.Key = append(rec.Key[:0], msg.ID...)
|
|
||||||
rec.Value = append(rec.Value[:0], b...)
|
|
||||||
cl.Produce(ctx, rec, func(r *kgo.Record, err error) {
|
|
||||||
recordPool.Put(rec)
|
|
||||||
if err != nil {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done(): // do nothing
|
|
||||||
case errs <- err: // do nothing
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
@ -1,50 +0,0 @@
|
|||||||
package queue_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"git.sunturtle.xyz/zephyr/kaiyan/queue"
|
|
||||||
"github.com/go-json-experiment/json"
|
|
||||||
"github.com/twmb/franz-go/pkg/kgo"
|
|
||||||
)
|
|
||||||
|
|
||||||
type spyProducer struct {
|
|
||||||
got []kgo.Record
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *spyProducer) Produce(ctx context.Context, rec *kgo.Record, promise func(*kgo.Record, error)) {
|
|
||||||
p.got = append(p.got, *rec)
|
|
||||||
promise(rec, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSend(t *testing.T) {
|
|
||||||
msg := queue.Message{
|
|
||||||
ID: "bocchi",
|
|
||||||
Channel: "kessoku",
|
|
||||||
Sender: queue.Sender(make([]byte, 16), "kessoku", "ryō"),
|
|
||||||
Text: "bocchi the rock!",
|
|
||||||
}
|
|
||||||
var q spyProducer
|
|
||||||
errs := make(chan error, 1)
|
|
||||||
queue.Send(context.Background(), &q, msg, errs)
|
|
||||||
select {
|
|
||||||
case err := <-errs:
|
|
||||||
t.Error(err)
|
|
||||||
default: // do nothing
|
|
||||||
}
|
|
||||||
if len(q.got) != 1 {
|
|
||||||
t.Fatalf("wrong number of records produced: %d", len(q.got))
|
|
||||||
}
|
|
||||||
rec := &q.got[0]
|
|
||||||
if string(rec.Key) != msg.ID {
|
|
||||||
t.Errorf("record has wrong key: want %q, got %q", msg.ID, rec.Key)
|
|
||||||
}
|
|
||||||
var got queue.Message
|
|
||||||
if err := json.Unmarshal(rec.Value, &got); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
if got != msg {
|
|
||||||
t.Errorf("message did not round-trip:\nwant %+v\ngot %+v", msg, got)
|
|
||||||
}
|
|
||||||
}
|
|
@ -6,21 +6,8 @@ type EventSub[Event any] struct {
|
|||||||
Subscription Subscription `json:"subscription"`
|
Subscription Subscription `json:"subscription"`
|
||||||
// Event is the event payload.
|
// Event is the event payload.
|
||||||
Event Event `json:"event"`
|
Event Event `json:"event"`
|
||||||
// Challenge is payload for verification challenges.
|
|
||||||
Challenge string `json:"challenge"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscription is the fields of an EventSub subscription
|
// Subscription is the fields of an EventSub subscription
|
||||||
// which are relevant to Kaiyan.
|
// which are relevant to Kaiyan.
|
||||||
type Subscription struct {
|
type Subscription struct{}
|
||||||
// Status is the subscription status.
|
|
||||||
// For revocation messages, this holds the reason for revocation.
|
|
||||||
Status string `json:"status"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// EventSub message types.
|
|
||||||
const (
|
|
||||||
Notification = "notification"
|
|
||||||
Verification = "webhook_callback_verification"
|
|
||||||
Revocation = "revocation"
|
|
||||||
)
|
|
||||||
|
@ -10,10 +10,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/go-json-experiment/json"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrVerificationFailed is the error returned when a webhook request does not
|
// ErrVerificationFailed is the error returned when a webhook request does not
|
||||||
@ -42,26 +39,14 @@ func Receive(b, secret []byte, req *http.Request) ([]byte, error) {
|
|||||||
|
|
||||||
got := req.Header.Get("Twitch-Eventsub-Message-Signature")
|
got := req.Header.Get("Twitch-Eventsub-Message-Signature")
|
||||||
if !hmac.Equal(want, []byte(got)) {
|
if !hmac.Equal(want, []byte(got)) {
|
||||||
|
fmt.Printf("req message id: %q\n", req.Header.Get("Twitch-Eventsub-Message-Id"))
|
||||||
|
fmt.Printf("req timestamp: %q\n", req.Header.Get("Twitch-Eventsub-Message-Timestamp"))
|
||||||
|
fmt.Printf("req body: %q\n", b)
|
||||||
return nil, fmt.Errorf("%w: computed %q, header has %q", ErrVerificationFailed, want, got)
|
return nil, fmt.Errorf("%w: computed %q, header has %q", ErrVerificationFailed, want, got)
|
||||||
}
|
}
|
||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleChallenge responds to an EventSub verification challenge.
|
|
||||||
func HandleChallenge(w http.ResponseWriter, body []byte) error {
|
|
||||||
var ev EventSub[struct{}]
|
|
||||||
if err := json.Unmarshal(body, &ev); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if ev.Challenge == "" {
|
|
||||||
return errors.New("no challenge in body")
|
|
||||||
}
|
|
||||||
w.Header().Set("Content-Type", "text/plain")
|
|
||||||
w.Header().Set("Content-Length", strconv.Itoa(len(ev.Challenge)))
|
|
||||||
_, err := io.WriteString(w, ev.Challenge)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Secret returns a process-wide secret for webhook subscriptions.
|
// Secret returns a process-wide secret for webhook subscriptions.
|
||||||
var Secret = sync.OnceValue(func() []byte {
|
var Secret = sync.OnceValue(func() []byte {
|
||||||
b := make([]byte, 32)
|
b := make([]byte, 32)
|
||||||
|
Loading…
Reference in New Issue
Block a user