From 7def034fa2c3ef7dffb08dddd9ee47dea9e09c40 Mon Sep 17 00:00:00 2001 From: Branden J Brown Date: Tue, 15 Apr 2025 19:58:13 -0400 Subject: [PATCH] kaiyan-ingest: initial ingest server --- cmd/kaiyan-ingest/main.go | 160 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 cmd/kaiyan-ingest/main.go diff --git a/cmd/kaiyan-ingest/main.go b/cmd/kaiyan-ingest/main.go new file mode 100644 index 0000000..6a7fa5e --- /dev/null +++ b/cmd/kaiyan-ingest/main.go @@ -0,0 +1,160 @@ +// kaiyan-ingest receives chat messages as webhooks and emplaces them to Kafka. +package main + +import ( + "context" + "flag" + "fmt" + "log/slog" + "net" + "net/http" + "os" + "os/signal" + "sync" + "time" + + "github.com/go-json-experiment/json" + "github.com/twmb/franz-go/pkg/kgo" + + "git.sunturtle.xyz/zephyr/kaiyan/ingest" + "git.sunturtle.xyz/zephyr/kaiyan/queue" + "git.sunturtle.xyz/zephyr/kaiyan/twitch" +) + +func main() { + var ( + listen string + secretFile string + ) + flag.StringVar(&listen, "listen", ":80", "address to bind for webhooks") + flag.StringVar(&secretFile, "secret", "", "path to file containing the system secret") + flag.Parse() + if flag.NArg() == 0 { + fail("provide broker addresses as args") + } + if secretFile == "" { + fail("-secret is required") + } + + secret, err := os.ReadFile(secretFile) + if err != nil { + fail(err.Error()) + } + // TODO(branden): derive secrets for twitch webhook and users? + q, err := kgo.NewClient( + kgo.SeedBrokers(flag.Args()...), + kgo.ProducerBatchCompression(kgo.ZstdCompression()), + ) + if err != nil { + fail(err.Error()) + } + errs := make(chan error, 1) + go func() { + for err := range errs { + slog.Error("enqueue", slog.Any("err", err)) + } + }() + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + mux := http.NewServeMux() + mux.Handle("POST /twitch", webhook(q, secret, errs)) + // TODO(branden): GET /metrics + l, err := net.Listen("tcp", listen) + if err != nil { + fail(err.Error()) + } + srv := http.Server{ + Addr: listen, + Handler: mux, + ReadTimeout: 3 * time.Second, + WriteTimeout: 3 * time.Second, + IdleTimeout: 1 * time.Minute, + BaseContext: func(l net.Listener) context.Context { return ctx }, + } + go func() { + slog.InfoContext(ctx, "HTTP API", slog.Any("addr", l.Addr())) + // TODO(branden): tls? or do we just use caddy/nginx after all? + err := srv.Serve(l) + if err == http.ErrServerClosed { + return + } + slog.ErrorContext(ctx, "HTTP API closed", slog.Any("err", err)) + }() + // TODO(branden): tell eventsub we're listening + <-ctx.Done() + stop() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(ctx); err != nil { + slog.ErrorContext(ctx, "HTTP API shutdown", slog.Any("err", err)) + } +} + +var byteptrPool sync.Pool + +func webhook(q *kgo.Client, secret []byte, errs chan<- error) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := slog.With(slog.Group("request", + slog.String("host", r.Host), + slog.String("remote", r.RemoteAddr), + slog.String("id", r.Header.Get("Twitch-Eventsub-Message-Id")), + )) + p, _ := byteptrPool.Get().(*[]byte) + if p == nil { + p = new([]byte) + } + b, err := twitch.Receive(*p, secret, r) + defer func() { + *p = b[:0] + byteptrPool.Put(p) + }() + if err != nil { + log.ErrorContext(ctx, "webhook receive", slog.Any("err", err)) + http.Error(w, "bad request", http.StatusBadRequest) + return + } + var ev twitch.EventSub[ingest.Twitch] + if err := json.Unmarshal(b, &ev); err != nil { + log.ErrorContext(ctx, "webhook decode", slog.Any("err", err)) + http.Error(w, "bad request", http.StatusBadRequest) + return + } + switch evtype := r.Header.Get("Twitch-Eventsub-Message-Type"); evtype { + case twitch.Notification: + log.LogAttrs(ctx, slog.LevelInfo, "webhook receive", + slog.String("broadcaster", ev.Event.Broadcaster), + slog.String("id", ev.Event.ID), + ) + msg := queue.Message{ + ID: ev.Event.ID, + Channel: ev.Event.Broadcaster, + Sender: queue.Sender(secret, ev.Event.Broadcaster, ev.Event.Chatter), + Text: ev.Event.Message.Text, + } + queue.Send(ctx, q, msg, errs) + w.WriteHeader(http.StatusNoContent) + log.LogAttrs(ctx, slog.LevelInfo, "received", + slog.String("broadcaster", ev.Event.Broadcaster), + slog.String("id", ev.Event.ID), + ) + + case twitch.Revocation: + log.LogAttrs(ctx, slog.LevelInfo, "webhook revoke") + // TODO(branden): this + w.WriteHeader(http.StatusNoContent) + + case twitch.Verification: + log.LogAttrs(ctx, slog.LevelInfo, "verification challenge") + twitch.HandleChallenge(w, b) + + default: + log.ErrorContext(ctx, "invalid EventSub message type", slog.String("type", evtype)) + w.WriteHeader(http.StatusBadRequest) + } + } +} + +func fail(format string, args ...any) { + fmt.Fprintf(os.Stderr, format+"\n", args...) + os.Exit(1) +}