diff --git a/cmd/kaiyan-ingest/main.go b/cmd/kaiyan-ingest/main.go
new file mode 100644
index 0000000..6a7fa5e
--- /dev/null
+++ b/cmd/kaiyan-ingest/main.go
@@ -0,0 +1,160 @@
+// kaiyan-ingest receives chat messages as webhooks and emplaces them to Kafka.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log/slog"
+	"net"
+	"net/http"
+	"os"
+	"os/signal"
+	"sync"
+	"time"
+
+	"github.com/go-json-experiment/json"
+	"github.com/twmb/franz-go/pkg/kgo"
+
+	"git.sunturtle.xyz/zephyr/kaiyan/ingest"
+	"git.sunturtle.xyz/zephyr/kaiyan/queue"
+	"git.sunturtle.xyz/zephyr/kaiyan/twitch"
+)
+
+func main() {
+	var (
+		listen     string
+		secretFile string
+	)
+	flag.StringVar(&listen, "listen", ":80", "address to bind for webhooks")
+	flag.StringVar(&secretFile, "secret", "", "path to file containing the system secret")
+	flag.Parse()
+	if flag.NArg() == 0 {
+		fail("provide broker addresses as args")
+	}
+	if secretFile == "" {
+		fail("-secret is required")
+	}
+
+	secret, err := os.ReadFile(secretFile)
+	if err != nil {
+		fail(err.Error())
+	}
+	// TODO(branden): derive secrets for twitch webhook and users?
+	q, err := kgo.NewClient(
+		kgo.SeedBrokers(flag.Args()...),
+		kgo.ProducerBatchCompression(kgo.ZstdCompression()),
+	)
+	if err != nil {
+		fail(err.Error())
+	}
+	errs := make(chan error, 1)
+	go func() {
+		for err := range errs {
+			slog.Error("enqueue", slog.Any("err", err))
+		}
+	}()
+	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
+	mux := http.NewServeMux()
+	mux.Handle("POST /twitch", webhook(q, secret, errs))
+	// TODO(branden): GET /metrics
+	l, err := net.Listen("tcp", listen)
+	if err != nil {
+		fail(err.Error())
+	}
+	srv := http.Server{
+		Addr:         listen,
+		Handler:      mux,
+		ReadTimeout:  3 * time.Second,
+		WriteTimeout: 3 * time.Second,
+		IdleTimeout:  1 * time.Minute,
+		BaseContext:  func(l net.Listener) context.Context { return ctx },
+	}
+	go func() {
+		slog.InfoContext(ctx, "HTTP API", slog.Any("addr", l.Addr()))
+		// TODO(branden): tls? or do we just use caddy/nginx after all?
+		err := srv.Serve(l)
+		if err == http.ErrServerClosed {
+			return
+		}
+		slog.ErrorContext(ctx, "HTTP API closed", slog.Any("err", err))
+	}()
+	// TODO(branden): tell eventsub we're listening
+	<-ctx.Done()
+	stop()
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	if err := srv.Shutdown(ctx); err != nil {
+		slog.ErrorContext(ctx, "HTTP API shutdown", slog.Any("err", err))
+	}
+}
+
+var byteptrPool sync.Pool
+
+func webhook(q *kgo.Client, secret []byte, errs chan<- error) http.HandlerFunc {
+	return func(w http.ResponseWriter, r *http.Request) {
+		ctx := r.Context()
+		log := slog.With(slog.Group("request",
+			slog.String("host", r.Host),
+			slog.String("remote", r.RemoteAddr),
+			slog.String("id", r.Header.Get("Twitch-Eventsub-Message-Id")),
+		))
+		p, _ := byteptrPool.Get().(*[]byte)
+		if p == nil {
+			p = new([]byte)
+		}
+		b, err := twitch.Receive(*p, secret, r)
+		defer func() {
+			*p = b[:0]
+			byteptrPool.Put(p)
+		}()
+		if err != nil {
+			log.ErrorContext(ctx, "webhook receive", slog.Any("err", err))
+			http.Error(w, "bad request", http.StatusBadRequest)
+			return
+		}
+		var ev twitch.EventSub[ingest.Twitch]
+		if err := json.Unmarshal(b, &ev); err != nil {
+			log.ErrorContext(ctx, "webhook decode", slog.Any("err", err))
+			http.Error(w, "bad request", http.StatusBadRequest)
+			return
+		}
+		switch evtype := r.Header.Get("Twitch-Eventsub-Message-Type"); evtype {
+		case twitch.Notification:
+			log.LogAttrs(ctx, slog.LevelInfo, "webhook receive",
+				slog.String("broadcaster", ev.Event.Broadcaster),
+				slog.String("id", ev.Event.ID),
+			)
+			msg := queue.Message{
+				ID:      ev.Event.ID,
+				Channel: ev.Event.Broadcaster,
+				Sender:  queue.Sender(secret, ev.Event.Broadcaster, ev.Event.Chatter),
+				Text:    ev.Event.Message.Text,
+			}
+			queue.Send(ctx, q, msg, errs)
+			w.WriteHeader(http.StatusNoContent)
+			log.LogAttrs(ctx, slog.LevelInfo, "received",
+				slog.String("broadcaster", ev.Event.Broadcaster),
+				slog.String("id", ev.Event.ID),
+			)
+
+		case twitch.Revocation:
+			log.LogAttrs(ctx, slog.LevelInfo, "webhook revoke")
+			// TODO(branden): this
+			w.WriteHeader(http.StatusNoContent)
+
+		case twitch.Verification:
+			log.LogAttrs(ctx, slog.LevelInfo, "verification challenge")
+			twitch.HandleChallenge(w, b)
+
+		default:
+			log.ErrorContext(ctx, "invalid EventSub message type", slog.String("type", evtype))
+			w.WriteHeader(http.StatusBadRequest)
+		}
+	}
+}
+
+func fail(format string, args ...any) {
+	fmt.Fprintf(os.Stderr, format+"\n", args...)
+	os.Exit(1)
+}