kaiyan/cmd/kaiyan-index/main.go
2025-04-20 20:26:40 -04:00

183 lines
4.4 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"time"
"github.com/twmb/franz-go/pkg/kgo"
"zombiezen.com/go/sqlite"
"zombiezen.com/go/sqlite/sqlitex"
"git.sunturtle.xyz/zephyr/kaiyan/emote"
"git.sunturtle.xyz/zephyr/kaiyan/emote/sqlitestore"
"git.sunturtle.xyz/zephyr/kaiyan/queue"
)
func main() {
var (
dburi string
streams []string
)
flag.StringVar(&dburi, "db", "", "SQLite database URI")
flag.Func("b", "Twitch broadcaster user ID", func(s string) error {
streams = append(streams, s)
_, err := strconv.Atoi(s)
return err
})
flag.Parse()
if flag.NArg() == 0 {
fail("provide broker addresses as args")
}
if dburi == "" {
fail("-db is mandatory")
}
if len(streams) == 0 {
fail("no broadcasters specified")
}
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
go func() {
<-ctx.Done()
stop()
}()
parsers := make(map[string]emote.Parser)
{
var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(len(streams))
for _, b := range streams {
go func() {
defer wg.Done()
req, err := http.NewRequestWithContext(ctx, "GET", "https://7tv.io/v3/users/twitch/"+b, nil)
if err != nil {
fail("%v", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
fail("%v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
slog.ErrorContext(ctx, "7tv", slog.String("broadcaster", b), slog.String("status", resp.Status))
return
}
em, err := emote.SevenTVv3(resp.Body)
if err != nil {
slog.ErrorContext(ctx, "7tv", slog.String("broadcaster", b), slog.Any("err", err))
return
}
mu.Lock()
parsers[b] = emote.NewParser(em...)
mu.Unlock()
}()
}
wg.Wait()
}
opts := sqlitex.PoolOptions{
Flags: sqlite.OpenCreate | sqlite.OpenReadWrite | sqlite.OpenURI,
PrepareConn: func(conn *sqlite.Conn) error {
pragmas := []string{
`PRAGMA journal_mode = WAL`,
`PRAGMA synchronous = NORMAL`,
}
for _, p := range pragmas {
s, _, err := conn.PrepareTransient(p)
if err != nil {
// If this function returns an error, then the pool will
// continue to invoke it for every connection.
// Explode violently instead.
panic(fmt.Errorf("couldn't set %s: %w", p, err))
}
if _, err := s.Step(); err != nil {
// This one is probably ok to retry, though.
return fmt.Errorf("couldn't run %s: %w", p, err)
}
if err := s.Finalize(); err != nil {
panic(fmt.Errorf("couldn't finalize statement for %s: %w", p, err))
}
}
return nil
},
}
db, err := sqlitex.NewPool(dburi, opts)
if err != nil {
fail("%v", err)
}
st, err := sqlitestore.Open(ctx, db)
if err != nil {
fail("%v", err)
}
q, err := kgo.NewClient(
kgo.SeedBrokers(flag.Args()...),
kgo.ConsumerGroup("kaiyan"),
kgo.ConsumeTopics(queue.Topic),
kgo.FetchIsolationLevel(kgo.ReadCommitted()),
kgo.GreedyAutoCommit(),
)
if err != nil {
fail("%v", err)
}
var msgs []queue.Message
var ems []emote.Emote
// TODO(branden): emote lists
for ctx.Err() == nil {
msgs, ems, err = batch(ctx, parsers, q, st, msgs[:0], ems[:0])
if err != nil {
fail("%v", err)
}
}
st.Close()
}
func batch(ctx context.Context, ps map[string]emote.Parser, q *kgo.Client, st *sqlitestore.Store, msgs []queue.Message, ems []emote.Emote) ([]queue.Message, []emote.Emote, error) {
slog.InfoContext(ctx, "receive")
msgs, err := queue.Recv(ctx, q, msgs)
switch {
case err == nil: // do nothing
case ctx.Err() != nil:
// Exiting normally.
// Return no error and let the caller check the context.
return nil, nil, nil
default:
// TODO(branden): there are a variety of errors that are non-fatal;
// we should check for them and continue instead of giving up
return nil, nil, fmt.Errorf("queue recv: %w", err)
}
n := 0
slog.InfoContext(ctx, "process", slog.Int("count", len(msgs)))
for _, msg := range msgs {
p := ps[msg.Channel]
text := msg.Text
for {
em, rest := p.Next(text)
if em.ID == "" {
break
}
ems = append(ems, em)
text = rest
}
if err := st.Record(ctx, msg.Channel, msg.ID, msg.Sender.String(), time.Unix(0, msg.Timestamp), ems); err != nil {
return nil, nil, err
}
n += len(ems)
}
slog.InfoContext(ctx, "recorded", slog.Int("emotes", n))
return msgs, ems, nil
}
func fail(format string, args ...any) {
fmt.Fprintf(os.Stderr, format+"\n", args...)
os.Exit(1)
}