Compare commits
No commits in common. "b2f3a2ed87e21d37b69c6df10dbef01d53507ee8" and "092819778177e15fcc34af661207be8f9b924661" have entirely different histories.
b2f3a2ed87
...
0928197781
@ -1,137 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"log/slog"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/twmb/franz-go/pkg/kgo"
|
|
||||||
"zombiezen.com/go/sqlite"
|
|
||||||
"zombiezen.com/go/sqlite/sqlitex"
|
|
||||||
|
|
||||||
"git.sunturtle.xyz/zephyr/kaiyan/emote"
|
|
||||||
"git.sunturtle.xyz/zephyr/kaiyan/emote/sqlitestore"
|
|
||||||
"git.sunturtle.xyz/zephyr/kaiyan/queue"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
var (
|
|
||||||
dburi string
|
|
||||||
)
|
|
||||||
flag.StringVar(&dburi, "db", "", "SQLite database URI")
|
|
||||||
flag.Parse()
|
|
||||||
if flag.NArg() == 0 {
|
|
||||||
fail("provide broker addresses as args")
|
|
||||||
}
|
|
||||||
if dburi == "" {
|
|
||||||
fail("-db is mandatory")
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
|
||||||
go func() {
|
|
||||||
<-ctx.Done()
|
|
||||||
stop()
|
|
||||||
}()
|
|
||||||
|
|
||||||
opts := sqlitex.PoolOptions{
|
|
||||||
Flags: sqlite.OpenCreate | sqlite.OpenReadWrite | sqlite.OpenURI,
|
|
||||||
PrepareConn: func(conn *sqlite.Conn) error {
|
|
||||||
pragmas := []string{
|
|
||||||
`PRAGMA journal_mode = WAL`,
|
|
||||||
`PRAGMA synchronous = NORMAL`,
|
|
||||||
}
|
|
||||||
for _, p := range pragmas {
|
|
||||||
s, _, err := conn.PrepareTransient(p)
|
|
||||||
if err != nil {
|
|
||||||
// If this function returns an error, then the pool will
|
|
||||||
// continue to invoke it for every connection.
|
|
||||||
// Explode violently instead.
|
|
||||||
panic(fmt.Errorf("couldn't set %s: %w", p, err))
|
|
||||||
}
|
|
||||||
if _, err := s.Step(); err != nil {
|
|
||||||
// This one is probably ok to retry, though.
|
|
||||||
return fmt.Errorf("couldn't run %s: %w", p, err)
|
|
||||||
}
|
|
||||||
if err := s.Finalize(); err != nil {
|
|
||||||
panic(fmt.Errorf("couldn't finalize statement for %s: %w", p, err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
}
|
|
||||||
db, err := sqlitex.NewPool(dburi, opts)
|
|
||||||
if err != nil {
|
|
||||||
fail("%v", err)
|
|
||||||
}
|
|
||||||
st, err := sqlitestore.Open(ctx, db)
|
|
||||||
if err != nil {
|
|
||||||
fail("%v", err)
|
|
||||||
}
|
|
||||||
q, err := kgo.NewClient(
|
|
||||||
kgo.SeedBrokers(flag.Args()...),
|
|
||||||
kgo.ConsumerGroup("kaiyan"),
|
|
||||||
kgo.ConsumeTopics(queue.Topic),
|
|
||||||
kgo.FetchIsolationLevel(kgo.ReadCommitted()),
|
|
||||||
kgo.GreedyAutoCommit(),
|
|
||||||
kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, nil)),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
fail("%v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var msgs []queue.Message
|
|
||||||
var ems []emote.Emote
|
|
||||||
// TODO(branden): emote lists
|
|
||||||
for ctx.Err() == nil {
|
|
||||||
msgs, ems, err = batch(ctx, nil, q, st, msgs[:0], ems[:0])
|
|
||||||
if err != nil {
|
|
||||||
fail("%v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
st.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func batch(ctx context.Context, ps map[string]emote.Parser, q *kgo.Client, st *sqlitestore.Store, msgs []queue.Message, ems []emote.Emote) ([]queue.Message, []emote.Emote, error) {
|
|
||||||
slog.InfoContext(ctx, "receive")
|
|
||||||
msgs, err := queue.Recv(ctx, q, msgs)
|
|
||||||
switch {
|
|
||||||
case err == nil: // do nothing
|
|
||||||
case ctx.Err() != nil:
|
|
||||||
// Exiting normally.
|
|
||||||
// Return no error and let the caller check the context.
|
|
||||||
return nil, nil, nil
|
|
||||||
default:
|
|
||||||
// TODO(branden): there are a variety of errors that are non-fatal;
|
|
||||||
// we should check for them and continue instead of giving up
|
|
||||||
return nil, nil, fmt.Errorf("queue recv: %w", err)
|
|
||||||
}
|
|
||||||
n := 0
|
|
||||||
slog.InfoContext(ctx, "process", slog.Int("count", len(msgs)))
|
|
||||||
for _, msg := range msgs {
|
|
||||||
p := ps[msg.Channel]
|
|
||||||
text := msg.Text
|
|
||||||
for {
|
|
||||||
em, rest := p.Next(text)
|
|
||||||
if em.ID == "" {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
ems = append(ems, em)
|
|
||||||
text = rest
|
|
||||||
}
|
|
||||||
if err := st.Record(ctx, msg.Channel, msg.ID, msg.Sender.String(), time.Unix(0, msg.Timestamp), ems); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
n += len(ems)
|
|
||||||
}
|
|
||||||
slog.InfoContext(ctx, "recorded", slog.Int("emotes", n))
|
|
||||||
return msgs, ems, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func fail(format string, args ...any) {
|
|
||||||
fmt.Fprintf(os.Stderr, format+"\n", args...)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
@ -125,21 +125,11 @@ func webhook(q *kgo.Client, secret []byte, errs chan<- error) http.HandlerFunc {
|
|||||||
slog.String("broadcaster", ev.Event.Broadcaster),
|
slog.String("broadcaster", ev.Event.Broadcaster),
|
||||||
slog.String("id", ev.Event.ID),
|
slog.String("id", ev.Event.ID),
|
||||||
)
|
)
|
||||||
// Use the external clock for timestamps.
|
|
||||||
tm, err := time.Parse(time.RFC3339Nano, r.Header.Get("Twitch-Eventsub-Message-Timestamp"))
|
|
||||||
if err != nil {
|
|
||||||
log.LogAttrs(ctx, slog.LevelWarn, "bad timestamp",
|
|
||||||
slog.String("timestamp", r.Header.Get("Twitch-Eventsub-Message-Timestamp")),
|
|
||||||
slog.Any("err", err),
|
|
||||||
)
|
|
||||||
tm = time.Now()
|
|
||||||
}
|
|
||||||
msg := queue.Message{
|
msg := queue.Message{
|
||||||
ID: ev.Event.ID,
|
ID: ev.Event.ID,
|
||||||
Channel: ev.Event.Broadcaster,
|
Channel: ev.Event.Broadcaster,
|
||||||
Sender: queue.Sender(secret, ev.Event.Broadcaster, ev.Event.Chatter),
|
Sender: queue.Sender(secret, ev.Event.Broadcaster, ev.Event.Chatter),
|
||||||
Timestamp: tm.UnixNano(),
|
Text: ev.Event.Message.Text,
|
||||||
Text: ev.Event.Message.Text,
|
|
||||||
}
|
}
|
||||||
// TODO(branden): the context in the http request cancels once this
|
// TODO(branden): the context in the http request cancels once this
|
||||||
// handler returns, which means the producer doesn't relay it.
|
// handler returns, which means the producer doesn't relay it.
|
||||||
|
@ -18,6 +18,6 @@ func main() {
|
|||||||
if _, err := f.Write(b); err != nil {
|
if _, err := f.Write(b); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
w.WriteHeader(http.StatusNoContent)
|
os.Exit(0)
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
@ -18,9 +18,6 @@ type Message struct {
|
|||||||
Channel string `json:"ch"`
|
Channel string `json:"ch"`
|
||||||
// Sender is an obfuscated identifier for the sending user.
|
// Sender is an obfuscated identifier for the sending user.
|
||||||
Sender sender `json:"u"`
|
Sender sender `json:"u"`
|
||||||
// Timestamp is the time at which the message was sent in nanoseconds since
|
|
||||||
// the Unix epoch.
|
|
||||||
Timestamp int64 `json:"ts"`
|
|
||||||
// Text is the message content.
|
// Text is the message content.
|
||||||
Text string `json:"t"`
|
Text string `json:"t"`
|
||||||
}
|
}
|
||||||
|
@ -21,11 +21,10 @@ func (p *spyProducer) Produce(ctx context.Context, rec *kgo.Record, promise func
|
|||||||
|
|
||||||
func TestSend(t *testing.T) {
|
func TestSend(t *testing.T) {
|
||||||
msg := queue.Message{
|
msg := queue.Message{
|
||||||
ID: "bocchi",
|
ID: "bocchi",
|
||||||
Channel: "kessoku",
|
Channel: "kessoku",
|
||||||
Sender: queue.Sender(make([]byte, 16), "kessoku", "ryō"),
|
Sender: queue.Sender(make([]byte, 16), "kessoku", "ryō"),
|
||||||
Timestamp: 1,
|
Text: "bocchi the rock!",
|
||||||
Text: "bocchi the rock!",
|
|
||||||
}
|
}
|
||||||
var q spyProducer
|
var q spyProducer
|
||||||
errs := make(chan error, 1)
|
errs := make(chan error, 1)
|
||||||
|
Loading…
Reference in New Issue
Block a user