diff --git a/cmd/kaiyan-index/main.go b/cmd/kaiyan-index/main.go new file mode 100644 index 0000000..024e301 --- /dev/null +++ b/cmd/kaiyan-index/main.go @@ -0,0 +1,137 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log/slog" + "os" + "os/signal" + "time" + + "github.com/twmb/franz-go/pkg/kgo" + "zombiezen.com/go/sqlite" + "zombiezen.com/go/sqlite/sqlitex" + + "git.sunturtle.xyz/zephyr/kaiyan/emote" + "git.sunturtle.xyz/zephyr/kaiyan/emote/sqlitestore" + "git.sunturtle.xyz/zephyr/kaiyan/queue" +) + +func main() { + var ( + dburi string + ) + flag.StringVar(&dburi, "db", "", "SQLite database URI") + flag.Parse() + if flag.NArg() == 0 { + fail("provide broker addresses as args") + } + if dburi == "" { + fail("-db is mandatory") + } + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + go func() { + <-ctx.Done() + stop() + }() + + opts := sqlitex.PoolOptions{ + Flags: sqlite.OpenCreate | sqlite.OpenReadWrite | sqlite.OpenURI, + PrepareConn: func(conn *sqlite.Conn) error { + pragmas := []string{ + `PRAGMA journal_mode = WAL`, + `PRAGMA synchronous = NORMAL`, + } + for _, p := range pragmas { + s, _, err := conn.PrepareTransient(p) + if err != nil { + // If this function returns an error, then the pool will + // continue to invoke it for every connection. + // Explode violently instead. + panic(fmt.Errorf("couldn't set %s: %w", p, err)) + } + if _, err := s.Step(); err != nil { + // This one is probably ok to retry, though. + return fmt.Errorf("couldn't run %s: %w", p, err) + } + if err := s.Finalize(); err != nil { + panic(fmt.Errorf("couldn't finalize statement for %s: %w", p, err)) + } + } + return nil + }, + } + db, err := sqlitex.NewPool(dburi, opts) + if err != nil { + fail("%v", err) + } + st, err := sqlitestore.Open(ctx, db) + if err != nil { + fail("%v", err) + } + q, err := kgo.NewClient( + kgo.SeedBrokers(flag.Args()...), + kgo.ConsumerGroup("kaiyan"), + kgo.ConsumeTopics(queue.Topic), + kgo.FetchIsolationLevel(kgo.ReadCommitted()), + kgo.GreedyAutoCommit(), + kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, nil)), + ) + if err != nil { + fail("%v", err) + } + + var msgs []queue.Message + var ems []emote.Emote + // TODO(branden): emote lists + for ctx.Err() == nil { + msgs, ems, err = batch(ctx, nil, q, st, msgs[:0], ems[:0]) + if err != nil { + fail("%v", err) + } + } + st.Close() +} + +func batch(ctx context.Context, ps map[string]emote.Parser, q *kgo.Client, st *sqlitestore.Store, msgs []queue.Message, ems []emote.Emote) ([]queue.Message, []emote.Emote, error) { + slog.InfoContext(ctx, "receive") + msgs, err := queue.Recv(ctx, q, msgs) + switch { + case err == nil: // do nothing + case ctx.Err() != nil: + // Exiting normally. + // Return no error and let the caller check the context. + return nil, nil, nil + default: + // TODO(branden): there are a variety of errors that are non-fatal; + // we should check for them and continue instead of giving up + return nil, nil, fmt.Errorf("queue recv: %w", err) + } + n := 0 + slog.InfoContext(ctx, "process", slog.Int("count", len(msgs))) + for _, msg := range msgs { + p := ps[msg.Channel] + text := msg.Text + for { + em, rest := p.Next(text) + if em.ID == "" { + break + } + ems = append(ems, em) + text = rest + } + if err := st.Record(ctx, msg.Channel, msg.ID, msg.Sender.String(), time.Unix(0, msg.Timestamp), ems); err != nil { + return nil, nil, err + } + n += len(ems) + } + slog.InfoContext(ctx, "recorded", slog.Int("emotes", n)) + return msgs, ems, nil +} + +func fail(format string, args ...any) { + fmt.Fprintf(os.Stderr, format+"\n", args...) + os.Exit(1) +}