// kaiyan-ingest receives chat messages as webhooks and emplaces them to Kafka. package main import ( "context" "flag" "fmt" "log/slog" "net" "net/http" "os" "os/signal" "sync" "time" "github.com/go-json-experiment/json" "github.com/twmb/franz-go/pkg/kgo" "git.sunturtle.xyz/zephyr/kaiyan/ingest" "git.sunturtle.xyz/zephyr/kaiyan/queue" "git.sunturtle.xyz/zephyr/kaiyan/twitch" ) func main() { var ( listen string secretFile string ) flag.StringVar(&listen, "listen", ":80", "address to bind for webhooks") flag.StringVar(&secretFile, "secret", "", "path to file containing the system secret") flag.Parse() if flag.NArg() == 0 { fail("provide broker addresses as args") } if secretFile == "" { fail("-secret is required") } secret, err := os.ReadFile(secretFile) if err != nil { fail(err.Error()) } // TODO(branden): derive secrets for twitch webhook and users? q, err := kgo.NewClient( kgo.SeedBrokers(flag.Args()...), kgo.ProducerBatchCompression(kgo.ZstdCompression()), ) if err != nil { fail(err.Error()) } errs := make(chan error, 1) go func() { for err := range errs { slog.Error("enqueue", slog.Any("err", err)) } }() ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) mux := http.NewServeMux() mux.Handle("POST /twitch", webhook(q, secret, errs)) // TODO(branden): GET /metrics l, err := net.Listen("tcp", listen) if err != nil { fail(err.Error()) } srv := http.Server{ Addr: listen, Handler: mux, ReadTimeout: 3 * time.Second, WriteTimeout: 3 * time.Second, IdleTimeout: 1 * time.Minute, BaseContext: func(l net.Listener) context.Context { return ctx }, } go func() { slog.InfoContext(ctx, "HTTP API", slog.Any("addr", l.Addr())) // TODO(branden): tls? or do we just use caddy/nginx after all? err := srv.Serve(l) if err == http.ErrServerClosed { return } slog.ErrorContext(ctx, "HTTP API closed", slog.Any("err", err)) }() // TODO(branden): tell eventsub we're listening <-ctx.Done() stop() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := srv.Shutdown(ctx); err != nil { slog.ErrorContext(ctx, "HTTP API shutdown", slog.Any("err", err)) } } var byteptrPool sync.Pool func webhook(q *kgo.Client, secret []byte, errs chan<- error) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() log := slog.With(slog.Group("request", slog.String("host", r.Host), slog.String("remote", r.RemoteAddr), slog.String("id", r.Header.Get("Twitch-Eventsub-Message-Id")), )) p, _ := byteptrPool.Get().(*[]byte) if p == nil { p = new([]byte) } b, err := twitch.Receive(*p, secret, r) defer func() { *p = b[:0] byteptrPool.Put(p) }() if err != nil { log.ErrorContext(ctx, "webhook receive", slog.Any("err", err)) http.Error(w, "bad request", http.StatusBadRequest) return } var ev twitch.EventSub[ingest.Twitch] if err := json.Unmarshal(b, &ev); err != nil { log.ErrorContext(ctx, "webhook decode", slog.Any("err", err)) http.Error(w, "bad request", http.StatusBadRequest) return } switch evtype := r.Header.Get("Twitch-Eventsub-Message-Type"); evtype { case twitch.Notification: log.LogAttrs(ctx, slog.LevelInfo, "webhook receive", slog.String("broadcaster", ev.Event.Broadcaster), slog.String("id", ev.Event.ID), ) msg := queue.Message{ ID: ev.Event.ID, Channel: ev.Event.Broadcaster, Sender: queue.Sender(secret, ev.Event.Broadcaster, ev.Event.Chatter), Text: ev.Event.Message.Text, } // TODO(branden): the context in the http request cancels once this // handler returns, which means the producer doesn't relay it. // what's a good context to use, then? queue.Send(context.TODO(), q, msg, errs) w.WriteHeader(http.StatusNoContent) log.LogAttrs(ctx, slog.LevelInfo, "received", slog.String("broadcaster", ev.Event.Broadcaster), slog.String("id", ev.Event.ID), ) case twitch.Revocation: log.LogAttrs(ctx, slog.LevelInfo, "webhook revoke") // TODO(branden): this w.WriteHeader(http.StatusNoContent) case twitch.Verification: log.LogAttrs(ctx, slog.LevelInfo, "verification challenge") twitch.HandleChallenge(w, b) default: log.ErrorContext(ctx, "invalid EventSub message type", slog.String("type", evtype)) w.WriteHeader(http.StatusBadRequest) } } } func fail(format string, args ...any) { fmt.Fprintf(os.Stderr, format+"\n", args...) os.Exit(1) }