package main import ( "context" "flag" "fmt" "log/slog" "os" "os/signal" "time" "github.com/twmb/franz-go/pkg/kgo" "zombiezen.com/go/sqlite" "zombiezen.com/go/sqlite/sqlitex" "git.sunturtle.xyz/zephyr/kaiyan/emote" "git.sunturtle.xyz/zephyr/kaiyan/emote/sqlitestore" "git.sunturtle.xyz/zephyr/kaiyan/queue" ) func main() { var ( dburi string ) flag.StringVar(&dburi, "db", "", "SQLite database URI") flag.Parse() if flag.NArg() == 0 { fail("provide broker addresses as args") } if dburi == "" { fail("-db is mandatory") } ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) go func() { <-ctx.Done() stop() }() opts := sqlitex.PoolOptions{ Flags: sqlite.OpenCreate | sqlite.OpenReadWrite | sqlite.OpenURI, PrepareConn: func(conn *sqlite.Conn) error { pragmas := []string{ `PRAGMA journal_mode = WAL`, `PRAGMA synchronous = NORMAL`, } for _, p := range pragmas { s, _, err := conn.PrepareTransient(p) if err != nil { // If this function returns an error, then the pool will // continue to invoke it for every connection. // Explode violently instead. panic(fmt.Errorf("couldn't set %s: %w", p, err)) } if _, err := s.Step(); err != nil { // This one is probably ok to retry, though. return fmt.Errorf("couldn't run %s: %w", p, err) } if err := s.Finalize(); err != nil { panic(fmt.Errorf("couldn't finalize statement for %s: %w", p, err)) } } return nil }, } db, err := sqlitex.NewPool(dburi, opts) if err != nil { fail("%v", err) } st, err := sqlitestore.Open(ctx, db) if err != nil { fail("%v", err) } q, err := kgo.NewClient( kgo.SeedBrokers(flag.Args()...), kgo.ConsumerGroup("kaiyan"), kgo.ConsumeTopics(queue.Topic), kgo.FetchIsolationLevel(kgo.ReadCommitted()), kgo.GreedyAutoCommit(), ) if err != nil { fail("%v", err) } var msgs []queue.Message var ems []emote.Emote // TODO(branden): emote lists for ctx.Err() == nil { msgs, ems, err = batch(ctx, nil, q, st, msgs[:0], ems[:0]) if err != nil { fail("%v", err) } } st.Close() } func batch(ctx context.Context, ps map[string]emote.Parser, q *kgo.Client, st *sqlitestore.Store, msgs []queue.Message, ems []emote.Emote) ([]queue.Message, []emote.Emote, error) { slog.InfoContext(ctx, "receive") msgs, err := queue.Recv(ctx, q, msgs) switch { case err == nil: // do nothing case ctx.Err() != nil: // Exiting normally. // Return no error and let the caller check the context. return nil, nil, nil default: // TODO(branden): there are a variety of errors that are non-fatal; // we should check for them and continue instead of giving up return nil, nil, fmt.Errorf("queue recv: %w", err) } n := 0 slog.InfoContext(ctx, "process", slog.Int("count", len(msgs))) for _, msg := range msgs { p := ps[msg.Channel] text := msg.Text for { em, rest := p.Next(text) if em.ID == "" { break } ems = append(ems, em) text = rest } if err := st.Record(ctx, msg.Channel, msg.ID, msg.Sender.String(), time.Unix(0, msg.Timestamp), ems); err != nil { return nil, nil, err } n += len(ems) } slog.InfoContext(ctx, "recorded", slog.Int("emotes", n)) return msgs, ems, nil } func fail(format string, args ...any) { fmt.Fprintf(os.Stderr, format+"\n", args...) os.Exit(1) }