Compare commits
12 Commits
7def034fa2
...
main
Author | SHA1 | Date | |
---|---|---|---|
dba10cee1a | |||
d70bdba98c | |||
2849b42745 | |||
e88d4b6cc0 | |||
039013abb6 | |||
b2f3a2ed87 | |||
3a075a3c6a | |||
7cc4482965 | |||
0928197781 | |||
76fc85b7c0 | |||
5c530a41fb | |||
8ca6af330f |
167
cmd/kaiyan-api/main.go
Normal file
167
cmd/kaiyan-api/main.go
Normal file
@@ -0,0 +1,167 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"github.com/go-json-experiment/json"
|
||||
"zombiezen.com/go/sqlite"
|
||||
"zombiezen.com/go/sqlite/sqlitex"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/emote"
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/emote/sqlitestore"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var (
|
||||
listen string
|
||||
dburi string
|
||||
)
|
||||
flag.StringVar(&listen, "listen", ":80", "address to bind")
|
||||
flag.StringVar(&dburi, "db", "", "SQLite database URI")
|
||||
flag.Parse()
|
||||
if dburi == "" {
|
||||
fail("-db is mandatory")
|
||||
}
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
stop()
|
||||
}()
|
||||
|
||||
opts := sqlitex.PoolOptions{
|
||||
Flags: sqlite.OpenReadWrite | sqlite.OpenURI,
|
||||
}
|
||||
db, err := sqlitex.NewPool(dburi, opts)
|
||||
if err != nil {
|
||||
fail("%v", err)
|
||||
}
|
||||
st, err := sqlitestore.Open(ctx, db)
|
||||
if err != nil {
|
||||
fail("%v", err)
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("GET /twitch/{channel}", metrics(st))
|
||||
// TODO(branden): GET /metrics
|
||||
l, err := net.Listen("tcp", listen)
|
||||
if err != nil {
|
||||
fail(err.Error())
|
||||
}
|
||||
srv := http.Server{
|
||||
Addr: listen,
|
||||
Handler: mux,
|
||||
ReadHeaderTimeout: 3 * time.Second,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
IdleTimeout: 1 * time.Minute,
|
||||
BaseContext: func(l net.Listener) context.Context { return ctx },
|
||||
}
|
||||
go func() {
|
||||
slog.InfoContext(ctx, "HTTP API", slog.Any("addr", l.Addr()))
|
||||
// TODO(branden): tls? or do we just use caddy/nginx after all?
|
||||
err := srv.Serve(l)
|
||||
if err == http.ErrServerClosed {
|
||||
return
|
||||
}
|
||||
slog.ErrorContext(ctx, "HTTP API closed", slog.Any("err", err))
|
||||
}()
|
||||
<-ctx.Done()
|
||||
stop()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
if err := srv.Shutdown(ctx); err != nil {
|
||||
slog.ErrorContext(ctx, "HTTP API shutdown", slog.Any("err", err))
|
||||
}
|
||||
}
|
||||
|
||||
func metrics(st *sqlitestore.Store) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
log := slog.With(slog.Group("request",
|
||||
slog.String("host", r.Host),
|
||||
slog.String("remote", r.RemoteAddr),
|
||||
))
|
||||
channel := r.PathValue("channel")
|
||||
var start, end time.Time
|
||||
if s := r.FormValue("start"); s != "" {
|
||||
t, err := time.Parse(time.RFC3339, s)
|
||||
if err != nil {
|
||||
writerror(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
start = t
|
||||
} else {
|
||||
writerror(w, http.StatusBadRequest, "start time is required")
|
||||
return
|
||||
}
|
||||
if s := r.FormValue("end"); s != "" {
|
||||
t, err := time.Parse(time.RFC3339, s)
|
||||
if err != nil {
|
||||
writerror(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
end = t
|
||||
} else {
|
||||
// We can fill in with the current time.
|
||||
end = time.Now().Truncate(time.Second)
|
||||
}
|
||||
if end.Before(start) {
|
||||
// Be liberal and just swap them.
|
||||
start, end = end, start
|
||||
}
|
||||
if end.Sub(start) > 366*24*time.Hour {
|
||||
writerror(w, http.StatusBadRequest, "timespan is too long")
|
||||
return
|
||||
}
|
||||
log.LogAttrs(r.Context(), slog.LevelInfo, "metrics",
|
||||
slog.String("channel", channel),
|
||||
slog.Time("start", start),
|
||||
slog.Time("end", end),
|
||||
)
|
||||
m, err := st.Metrics(r.Context(), channel, start, end, nil)
|
||||
if err != nil {
|
||||
log.ErrorContext(r.Context(), "metrics", slog.Any("err", err))
|
||||
writerror(w, http.StatusInternalServerError, "data unavailable")
|
||||
return
|
||||
}
|
||||
log.LogAttrs(r.Context(), slog.LevelInfo, "done",
|
||||
slog.String("channel", channel),
|
||||
slog.Time("start", start),
|
||||
slog.Time("end", end),
|
||||
)
|
||||
writedata(w, m)
|
||||
}
|
||||
}
|
||||
|
||||
func writerror(w http.ResponseWriter, status int, err string) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
v := struct {
|
||||
Error string `json:"error"`
|
||||
}{err}
|
||||
json.MarshalWrite(w, &v)
|
||||
}
|
||||
|
||||
func writedata(w http.ResponseWriter, data []emote.Metric) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if len(data) == 0 {
|
||||
w.Write([]byte(`{"data":[]}`))
|
||||
return
|
||||
}
|
||||
v := struct {
|
||||
Data []emote.Metric `json:"data"`
|
||||
}{data}
|
||||
json.MarshalWrite(w, &v)
|
||||
}
|
||||
|
||||
func fail(format string, args ...any) {
|
||||
fmt.Fprintf(os.Stderr, format+"\n", args...)
|
||||
os.Exit(1)
|
||||
}
|
182
cmd/kaiyan-index/main.go
Normal file
182
cmd/kaiyan-index/main.go
Normal file
@@ -0,0 +1,182 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
"zombiezen.com/go/sqlite"
|
||||
"zombiezen.com/go/sqlite/sqlitex"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/emote"
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/emote/sqlitestore"
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/queue"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var (
|
||||
dburi string
|
||||
streams []string
|
||||
)
|
||||
flag.StringVar(&dburi, "db", "", "SQLite database URI")
|
||||
flag.Func("b", "Twitch broadcaster user ID", func(s string) error {
|
||||
streams = append(streams, s)
|
||||
_, err := strconv.Atoi(s)
|
||||
return err
|
||||
})
|
||||
flag.Parse()
|
||||
if flag.NArg() == 0 {
|
||||
fail("provide broker addresses as args")
|
||||
}
|
||||
if dburi == "" {
|
||||
fail("-db is mandatory")
|
||||
}
|
||||
if len(streams) == 0 {
|
||||
fail("no broadcasters specified")
|
||||
}
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
stop()
|
||||
}()
|
||||
|
||||
parsers := make(map[string]emote.Parser)
|
||||
{
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(streams))
|
||||
for _, b := range streams {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", "https://7tv.io/v3/users/twitch/"+b, nil)
|
||||
if err != nil {
|
||||
fail("%v", err)
|
||||
}
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
fail("%v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
slog.ErrorContext(ctx, "7tv", slog.String("broadcaster", b), slog.String("status", resp.Status))
|
||||
return
|
||||
}
|
||||
em, err := emote.SevenTVv3(resp.Body)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "7tv", slog.String("broadcaster", b), slog.Any("err", err))
|
||||
return
|
||||
}
|
||||
mu.Lock()
|
||||
parsers[b] = emote.NewParser(em...)
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
opts := sqlitex.PoolOptions{
|
||||
Flags: sqlite.OpenCreate | sqlite.OpenReadWrite | sqlite.OpenURI,
|
||||
PrepareConn: func(conn *sqlite.Conn) error {
|
||||
pragmas := []string{
|
||||
`PRAGMA journal_mode = WAL`,
|
||||
`PRAGMA synchronous = NORMAL`,
|
||||
}
|
||||
for _, p := range pragmas {
|
||||
s, _, err := conn.PrepareTransient(p)
|
||||
if err != nil {
|
||||
// If this function returns an error, then the pool will
|
||||
// continue to invoke it for every connection.
|
||||
// Explode violently instead.
|
||||
panic(fmt.Errorf("couldn't set %s: %w", p, err))
|
||||
}
|
||||
if _, err := s.Step(); err != nil {
|
||||
// This one is probably ok to retry, though.
|
||||
return fmt.Errorf("couldn't run %s: %w", p, err)
|
||||
}
|
||||
if err := s.Finalize(); err != nil {
|
||||
panic(fmt.Errorf("couldn't finalize statement for %s: %w", p, err))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
db, err := sqlitex.NewPool(dburi, opts)
|
||||
if err != nil {
|
||||
fail("%v", err)
|
||||
}
|
||||
st, err := sqlitestore.Open(ctx, db)
|
||||
if err != nil {
|
||||
fail("%v", err)
|
||||
}
|
||||
q, err := kgo.NewClient(
|
||||
kgo.SeedBrokers(flag.Args()...),
|
||||
kgo.ConsumerGroup("kaiyan"),
|
||||
kgo.ConsumeTopics(queue.Topic),
|
||||
kgo.FetchIsolationLevel(kgo.ReadCommitted()),
|
||||
kgo.GreedyAutoCommit(),
|
||||
)
|
||||
if err != nil {
|
||||
fail("%v", err)
|
||||
}
|
||||
|
||||
var msgs []queue.Message
|
||||
var ems []emote.Emote
|
||||
// TODO(branden): emote lists
|
||||
for ctx.Err() == nil {
|
||||
msgs, ems, err = batch(ctx, parsers, q, st, msgs[:0], ems[:0])
|
||||
if err != nil {
|
||||
fail("%v", err)
|
||||
}
|
||||
}
|
||||
st.Close()
|
||||
}
|
||||
|
||||
func batch(ctx context.Context, ps map[string]emote.Parser, q *kgo.Client, st *sqlitestore.Store, msgs []queue.Message, ems []emote.Emote) ([]queue.Message, []emote.Emote, error) {
|
||||
slog.InfoContext(ctx, "receive")
|
||||
msgs, err := queue.Recv(ctx, q, msgs)
|
||||
switch {
|
||||
case err == nil: // do nothing
|
||||
case ctx.Err() != nil:
|
||||
// Exiting normally.
|
||||
// Return no error and let the caller check the context.
|
||||
return nil, nil, nil
|
||||
default:
|
||||
// TODO(branden): there are a variety of errors that are non-fatal;
|
||||
// we should check for them and continue instead of giving up
|
||||
return nil, nil, fmt.Errorf("queue recv: %w", err)
|
||||
}
|
||||
n := 0
|
||||
slog.InfoContext(ctx, "process", slog.Int("count", len(msgs)))
|
||||
for _, msg := range msgs {
|
||||
p := ps[msg.Channel]
|
||||
text := msg.Text
|
||||
for {
|
||||
em, rest := p.Next(text)
|
||||
if em.ID == "" {
|
||||
break
|
||||
}
|
||||
ems = append(ems, em)
|
||||
text = rest
|
||||
}
|
||||
if err := st.Record(ctx, msg.Channel, msg.ID, msg.Sender.String(), time.Unix(0, msg.Timestamp), ems); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
n += len(ems)
|
||||
}
|
||||
slog.InfoContext(ctx, "recorded", slog.Int("emotes", n))
|
||||
return msgs, ems, nil
|
||||
}
|
||||
|
||||
func fail(format string, args ...any) {
|
||||
fmt.Fprintf(os.Stderr, format+"\n", args...)
|
||||
os.Exit(1)
|
||||
}
|
@@ -125,13 +125,26 @@ func webhook(q *kgo.Client, secret []byte, errs chan<- error) http.HandlerFunc {
|
||||
slog.String("broadcaster", ev.Event.Broadcaster),
|
||||
slog.String("id", ev.Event.ID),
|
||||
)
|
||||
// Use the external clock for timestamps.
|
||||
tm, err := time.Parse(time.RFC3339Nano, r.Header.Get("Twitch-Eventsub-Message-Timestamp"))
|
||||
if err != nil {
|
||||
log.LogAttrs(ctx, slog.LevelWarn, "bad timestamp",
|
||||
slog.String("timestamp", r.Header.Get("Twitch-Eventsub-Message-Timestamp")),
|
||||
slog.Any("err", err),
|
||||
)
|
||||
tm = time.Now()
|
||||
}
|
||||
msg := queue.Message{
|
||||
ID: ev.Event.ID,
|
||||
Channel: ev.Event.Broadcaster,
|
||||
Sender: queue.Sender(secret, ev.Event.Broadcaster, ev.Event.Chatter),
|
||||
Timestamp: tm.UnixNano(),
|
||||
Text: ev.Event.Message.Text,
|
||||
}
|
||||
queue.Send(ctx, q, msg, errs)
|
||||
// TODO(branden): the context in the http request cancels once this
|
||||
// handler returns, which means the producer doesn't relay it.
|
||||
// what's a good context to use, then?
|
||||
queue.Send(context.TODO(), q, msg, errs)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
log.LogAttrs(ctx, slog.LevelInfo, "received",
|
||||
slog.String("broadcaster", ev.Event.Broadcaster),
|
||||
|
@@ -18,6 +18,6 @@ func main() {
|
||||
if _, err := f.Write(b); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
os.Exit(0)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}))
|
||||
}
|
||||
|
44
emote/7tv.go
Normal file
44
emote/7tv.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package emote
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/go-json-experiment/json"
|
||||
)
|
||||
|
||||
// seventvUser has the fields of a 7TV user model which are relevant to Kaiyan.
|
||||
type seventvUser struct {
|
||||
EmoteSet emoteSet `json:"emote_set"`
|
||||
}
|
||||
|
||||
type emoteSet struct {
|
||||
Emotes []seventvEmote `json:"emotes"`
|
||||
}
|
||||
|
||||
type seventvEmote struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
// 7TV provides the CDN URL as well, but it's nested in further objects,
|
||||
// not a complete URL, and it's also trivially derivable from the ID.
|
||||
}
|
||||
|
||||
// SevenTVv3 parses the emotes from a 7TV v3/users/{platform}/{platform_id}
|
||||
// response body.
|
||||
func SevenTVv3(r io.Reader) ([]Emote, error) {
|
||||
var u seventvUser
|
||||
if err := json.UnmarshalRead(r, &u); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ems := make([]Emote, 0, len(u.EmoteSet.Emotes))
|
||||
for _, em := range u.EmoteSet.Emotes {
|
||||
v := Emote{
|
||||
ID: em.ID,
|
||||
Name: em.Name,
|
||||
Source: "7TV",
|
||||
Link: "https://7tv.app/emotes/" + em.ID,
|
||||
Image: "https://cdn.7tv.app/emote/" + em.ID + "/4x.webp",
|
||||
}
|
||||
ems = append(ems, v)
|
||||
}
|
||||
return ems, nil
|
||||
}
|
46
emote/7tv_test.go
Normal file
46
emote/7tv_test.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package emote_test
|
||||
|
||||
import (
|
||||
_ "embed"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/emote"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
)
|
||||
|
||||
func TestSevenTVv3(t *testing.T) {
|
||||
got, err := emote.SevenTVv3(strings.NewReader(seventvTwin))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
want := []emote.Emote{
|
||||
{
|
||||
ID: "01HBHC9CE00007KVA84SNERRE8",
|
||||
Name: "powerSip",
|
||||
Source: "7TV",
|
||||
Link: "https://7tv.app/emotes/01HBHC9CE00007KVA84SNERRE8",
|
||||
Image: "https://cdn.7tv.app/emote/01HBHC9CE00007KVA84SNERRE8/4x.webp",
|
||||
},
|
||||
{
|
||||
ID: "01JDTNVJADW1R4YZB3F85VVT9T",
|
||||
Name: "Jin",
|
||||
Source: "7TV",
|
||||
Link: "https://7tv.app/emotes/01JDTNVJADW1R4YZB3F85VVT9T",
|
||||
Image: "https://cdn.7tv.app/emote/01JDTNVJADW1R4YZB3F85VVT9T/4x.webp",
|
||||
},
|
||||
{
|
||||
ID: "01JE08SKFXM94FTG52GVMGFQZW",
|
||||
Name: "IMissTwin",
|
||||
Source: "7TV",
|
||||
Link: "https://7tv.app/emotes/01JE08SKFXM94FTG52GVMGFQZW",
|
||||
Image: "https://cdn.7tv.app/emote/01JE08SKFXM94FTG52GVMGFQZW/4x.webp",
|
||||
},
|
||||
}
|
||||
if diff := cmp.Diff(want, got); diff != "" {
|
||||
t.Errorf("wrong emotes (+got/-want):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
//go:embed testdata/twoinone1_.7tvv3.json
|
||||
var seventvTwin string
|
@@ -8,38 +8,38 @@ import (
|
||||
// Emote is the information Kaiyan needs about an emote.
|
||||
type Emote struct {
|
||||
// ID is the emote ID per the source.
|
||||
ID string
|
||||
ID string `json:"id"`
|
||||
// Name is the text of the emote as would be parsed from message text.
|
||||
Name string
|
||||
Name string `json:"name"`
|
||||
// Source is the name of the emote source, e.g. "7TV", "Twitch:cirno_tv", &c.
|
||||
Source string
|
||||
Source string `json:"source"`
|
||||
// Link is a hyperlink to manage the emote.
|
||||
Link string
|
||||
Link string `json:"link"`
|
||||
// Image is a hyperlink to the emote image of any size.
|
||||
Image string
|
||||
Image string `json:"image"`
|
||||
}
|
||||
|
||||
// Parser finds emotes in a message.
|
||||
//
|
||||
// Parser assumes that emotes are bound by whitespace.
|
||||
type Parser struct {
|
||||
m map[string]string
|
||||
m map[string]Emote
|
||||
// TODO(branden): more efficient data structure; trie?
|
||||
}
|
||||
|
||||
// NewParser creates a Parser for the given list of emotes.
|
||||
func NewParser(emotes ...Emote) Parser {
|
||||
m := make(map[string]string, len(emotes))
|
||||
m := make(map[string]Emote, len(emotes))
|
||||
for _, e := range emotes {
|
||||
m[e.Name] = e.ID
|
||||
m[e.Name] = e
|
||||
}
|
||||
return Parser{m}
|
||||
}
|
||||
|
||||
// Next parses the next emote instance from the message and returns the
|
||||
// remainder of the message text following it.
|
||||
// If there is no emote in the message the returned emote is the empty string.
|
||||
func (p Parser) Next(text string) (name, id, following string) {
|
||||
// If there is no emote in the message the returned emote has an empty ID.
|
||||
func (p Parser) Next(text string) (emote Emote, following string) {
|
||||
for text != "" {
|
||||
// First trim any existing space.
|
||||
text = strings.TrimSpace(text)
|
||||
@@ -54,12 +54,12 @@ func (p Parser) Next(text string) (name, id, following string) {
|
||||
} else {
|
||||
following = ""
|
||||
}
|
||||
id = p.m[word]
|
||||
if id != "" {
|
||||
return word, id, following
|
||||
em := p.m[word]
|
||||
if em.ID != "" {
|
||||
return em, following
|
||||
}
|
||||
text = following
|
||||
}
|
||||
// No emote found.
|
||||
return "", "", ""
|
||||
return Emote{}, ""
|
||||
}
|
||||
|
@@ -57,16 +57,16 @@ func TestParser(t *testing.T) {
|
||||
var got []string
|
||||
text := c.text
|
||||
for {
|
||||
name, id, rest := p.Next(text)
|
||||
if id == "" {
|
||||
em, rest := p.Next(text)
|
||||
if em.ID == "" {
|
||||
break
|
||||
}
|
||||
if name != id {
|
||||
if em.Name != em.ID {
|
||||
// Not normally the case, but this test is constructed
|
||||
// so that it is.
|
||||
t.Errorf("wrong id %q for name %q", id, name)
|
||||
t.Errorf("wrong id %q for name %q", em.ID, em.Name)
|
||||
}
|
||||
got = append(got, name)
|
||||
got = append(got, em.Name)
|
||||
text = rest
|
||||
}
|
||||
if diff := cmp.Diff(c.want, got); diff != "" {
|
||||
@@ -103,7 +103,7 @@ func BenchmarkParser(b *testing.B) {
|
||||
for b.Loop() {
|
||||
text := c.text
|
||||
for text != "" {
|
||||
_, _, text = p.Next(text)
|
||||
_, text = p.Next(text)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
12
emote/sqlitestore/metrics.sql
Normal file
12
emote/sqlitestore/metrics.sql
Normal file
@@ -0,0 +1,12 @@
|
||||
SELECT
|
||||
id,
|
||||
name, -- name, source, &c. are constant per id, and sqlite lets us select without aggregate
|
||||
source,
|
||||
link,
|
||||
image,
|
||||
COUNT(*) AS tokens,
|
||||
COUNT(DISTINCT message) AS messages,
|
||||
COUNT(DISTINCT sender) AS senders
|
||||
FROM emote
|
||||
WHERE channel = :channel AND time BETWEEN :start AND :end
|
||||
GROUP BY id
|
17
emote/sqlitestore/schema.sql
Normal file
17
emote/sqlitestore/schema.sql
Normal file
@@ -0,0 +1,17 @@
|
||||
PRAGMA journal_mode = WAL;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS emote (
|
||||
channel TEXT NOT NULL,
|
||||
message TEXT NOT NULL,
|
||||
time INTEGER NOT NULL,
|
||||
sender TEXT NOT NULL,
|
||||
id TEXT NOT NULL,
|
||||
-- The following columns could be normalized in a separate table,
|
||||
-- but they're here for simplicity.
|
||||
name TEXT NOT NULL,
|
||||
source TEXT NOT NULL,
|
||||
link TEXT NOT NULL,
|
||||
image TEXT NOT NULL
|
||||
) STRICT;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS channel_time ON emote (channel, time);
|
114
emote/sqlitestore/store.go
Normal file
114
emote/sqlitestore/store.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package sqlitestore
|
||||
|
||||
import (
|
||||
"context"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"zombiezen.com/go/sqlite/sqlitex"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/emote"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
pool *sqlitex.Pool
|
||||
}
|
||||
|
||||
//go:embed schema.sql
|
||||
var schemaSQL string
|
||||
|
||||
//go:embed metrics.sql
|
||||
var metricsSQL string
|
||||
|
||||
// Open creates a metrics store using db as its underlying storage.
|
||||
// It initializes the schema used for Kaiyan.
|
||||
func Open(ctx context.Context, db *sqlitex.Pool) (*Store, error) {
|
||||
conn, err := db.Take(ctx)
|
||||
defer db.Put(conn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't get connection from pool: %w", err)
|
||||
}
|
||||
if err := sqlitex.ExecuteScript(conn, schemaSQL, nil); err != nil {
|
||||
return nil, fmt.Errorf("couldn't run migration: %w", err)
|
||||
}
|
||||
st := Store{db}
|
||||
return &st, nil
|
||||
}
|
||||
|
||||
// Close closes the database.
|
||||
func (db *Store) Close() error {
|
||||
return db.pool.Close()
|
||||
}
|
||||
|
||||
// Record stores the emotes in a given message.
|
||||
func (db *Store) Record(ctx context.Context, channel, message, sender string, tm time.Time, emotes []emote.Emote) (err error) {
|
||||
conn, err := db.pool.Take(ctx)
|
||||
defer db.pool.Put(conn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't get connection to record emotes: %w", err)
|
||||
}
|
||||
defer sqlitex.Transaction(conn)(&err)
|
||||
|
||||
st, err := conn.Prepare(`INSERT INTO emote (channel, message, time, sender, id, name, source, link, image) VALUES (:channel, :message, :time, :sender, :id, :name, :source, :link, :image)`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't prepare emote insert: %w", err)
|
||||
}
|
||||
st.SetText(":channel", channel)
|
||||
st.SetText(":message", message)
|
||||
st.SetInt64(":time", tm.UnixNano())
|
||||
st.SetText(":sender", sender)
|
||||
for _, em := range emotes {
|
||||
st.SetText(":id", em.ID)
|
||||
st.SetText(":name", em.Name)
|
||||
st.SetText(":source", em.Source)
|
||||
st.SetText(":link", em.Link)
|
||||
st.SetText(":image", em.Image)
|
||||
_, err := st.Step()
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't insert emote: %w", err)
|
||||
}
|
||||
st.Reset() // NOTE(branden): bound parameters are retained
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Metrics appends emote usage information for a channel in a given time range.
|
||||
func (db *Store) Metrics(ctx context.Context, channel string, start, end time.Time, onto []emote.Metric) ([]emote.Metric, error) {
|
||||
conn, err := db.pool.Take(ctx)
|
||||
defer db.pool.Put(conn)
|
||||
if err != nil {
|
||||
return onto, fmt.Errorf("couldn't get connection for metrics: %w", err)
|
||||
}
|
||||
st, err := conn.Prepare(metricsSQL)
|
||||
if err != nil {
|
||||
return onto, fmt.Errorf("couldn't prepare metrics statement: %w", err)
|
||||
}
|
||||
st.SetText(":channel", channel)
|
||||
st.SetInt64(":start", start.UnixNano())
|
||||
st.SetInt64(":end", end.UnixNano())
|
||||
for {
|
||||
ok, err := st.Step()
|
||||
if err != nil {
|
||||
return onto, fmt.Errorf("couldn't step metrics selection: %w", err)
|
||||
}
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
m := emote.Metric{
|
||||
Emote: emote.Emote{
|
||||
ID: st.ColumnText(0),
|
||||
Name: st.ColumnText(1),
|
||||
Source: st.ColumnText(2),
|
||||
Link: st.ColumnText(3),
|
||||
Image: st.ColumnText(4),
|
||||
},
|
||||
Tokens: st.ColumnInt64(5),
|
||||
Messages: st.ColumnInt64(6),
|
||||
Users: st.ColumnInt64(7),
|
||||
}
|
||||
onto = append(onto, m)
|
||||
}
|
||||
return onto, st.Reset()
|
||||
}
|
141
emote/sqlitestore/store_test.go
Normal file
141
emote/sqlitestore/store_test.go
Normal file
@@ -0,0 +1,141 @@
|
||||
package sqlitestore_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/emote"
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/emote/sqlitestore"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"zombiezen.com/go/sqlite"
|
||||
"zombiezen.com/go/sqlite/sqlitex"
|
||||
)
|
||||
|
||||
func testDB(name string) *sqlitex.Pool {
|
||||
opts := sqlitex.PoolOptions{
|
||||
Flags: sqlite.OpenCreate | sqlite.OpenReadWrite | sqlite.OpenMemory | sqlite.OpenSharedCache | sqlite.OpenURI,
|
||||
}
|
||||
pool, err := sqlitex.NewPool(fmt.Sprintf("file:%s.db?mode=memory&cache=shared", name), opts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return pool
|
||||
}
|
||||
|
||||
func TestStore(t *testing.T) {
|
||||
ems := []struct {
|
||||
channel, message, sender string
|
||||
ts int64
|
||||
emotes []emote.Emote
|
||||
}{
|
||||
{
|
||||
channel: "kessoku",
|
||||
message: "1",
|
||||
sender: "ryo",
|
||||
ts: 1000,
|
||||
emotes: []emote.Emote{
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
},
|
||||
},
|
||||
{
|
||||
channel: "kessoku",
|
||||
message: "2",
|
||||
sender: "seika",
|
||||
ts: 2000,
|
||||
emotes: []emote.Emote{
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
},
|
||||
},
|
||||
{
|
||||
channel: "sickhack",
|
||||
message: "3",
|
||||
sender: "ryo",
|
||||
ts: 1500,
|
||||
emotes: []emote.Emote{
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
},
|
||||
},
|
||||
}
|
||||
st, err := sqlitestore.Open(t.Context(), testDB("TestStore"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, m := range ems {
|
||||
err := st.Record(t.Context(), m.channel, m.message, m.sender, time.Unix(m.ts, 0), m.emotes)
|
||||
if err != nil {
|
||||
t.Errorf("couldn't record message %s: %v", m.message, err)
|
||||
}
|
||||
}
|
||||
cases := []struct {
|
||||
name string
|
||||
channel string
|
||||
start, end int64
|
||||
want []emote.Metric
|
||||
}{
|
||||
{
|
||||
name: "channel",
|
||||
channel: "kessoku",
|
||||
start: 0,
|
||||
end: 1e6,
|
||||
want: []emote.Metric{
|
||||
{
|
||||
Emote: emote.Emote{ID: "kita", Name: "kita"},
|
||||
Tokens: 4,
|
||||
Messages: 2,
|
||||
Users: 2,
|
||||
},
|
||||
{
|
||||
Emote: emote.Emote{ID: "nijika", Name: "nijika"},
|
||||
Tokens: 4,
|
||||
Messages: 2,
|
||||
Users: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "time",
|
||||
channel: "kessoku",
|
||||
start: 1250,
|
||||
end: 1e6,
|
||||
want: []emote.Metric{
|
||||
{
|
||||
Emote: emote.Emote{ID: "kita", Name: "kita"},
|
||||
Tokens: 2,
|
||||
Messages: 1,
|
||||
Users: 1,
|
||||
},
|
||||
{
|
||||
Emote: emote.Emote{ID: "nijika", Name: "nijika"},
|
||||
Tokens: 2,
|
||||
Messages: 1,
|
||||
Users: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
// Don't use Run because we have earlier t.Fails.
|
||||
got, err := st.Metrics(t.Context(), c.channel, time.Unix(c.start, 0), time.Unix(c.end, 0), nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
// Sort results by emote ID so we can easily diff.
|
||||
// In particular, we don't expect the store to sort for us.
|
||||
slices.SortFunc(got, func(a, b emote.Metric) int { return strings.Compare(a.Emote.ID, b.Emote.ID) })
|
||||
if diff := cmp.Diff(c.want, got); diff != "" {
|
||||
t.Errorf("wrong results (+got/-want):\n%s", diff)
|
||||
}
|
||||
}
|
||||
}
|
26
emote/store.go
Normal file
26
emote/store.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package emote
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Store is a store of emote usage records.
|
||||
type Store interface {
|
||||
// Record stores the emotes in a given message.
|
||||
Record(ctx context.Context, channel, message, sender string, tm time.Time, emotes []Emote) error
|
||||
// Metrics appends emote usage information for a channel in a given time range.
|
||||
Metrics(ctx context.Context, channel string, start, end time.Time, onto []Metric) ([]Metric, error)
|
||||
}
|
||||
|
||||
// Metric is the metrics for a single emote.
|
||||
type Metric struct {
|
||||
// Emote is the emote that the metrics describe.
|
||||
Emote Emote `json:"emote"`
|
||||
// Tokens is the total number of occurrences of the emote.
|
||||
Tokens int64 `json:"tokens"`
|
||||
// Messages is the number of unique messages which contained the emote.
|
||||
Messages int64 `json:"messages"`
|
||||
// Users is the number of users who used the emote.
|
||||
Users int64 `json:"users"`
|
||||
}
|
538
emote/testdata/twoinone1_.7tvv3.json
vendored
Normal file
538
emote/testdata/twoinone1_.7tvv3.json
vendored
Normal file
@@ -0,0 +1,538 @@
|
||||
{
|
||||
"id": "116723523",
|
||||
"platform": "TWITCH",
|
||||
"username": "twoinone1_",
|
||||
"display_name": "twoinone1_",
|
||||
"linked_at": 1730685664536,
|
||||
"emote_capacity": 1000,
|
||||
"emote_set_id": "01JD8YZCMPF628X8DZ7NAYFG8C",
|
||||
"emote_set": {
|
||||
"id": "01JD8YZCMPF628X8DZ7NAYFG8C",
|
||||
"name": "twoinone1_'s Emotes",
|
||||
"flags": 0,
|
||||
"tags": [],
|
||||
"immutable": false,
|
||||
"privileged": false,
|
||||
"emotes": [
|
||||
{
|
||||
"id": "01HBHC9CE00007KVA84SNERRE8",
|
||||
"name": "powerSip",
|
||||
"flags": 0,
|
||||
"timestamp": 1696021656000,
|
||||
"actor_id": "01JBTEEZ8R6A1J8RMZ581A1V02",
|
||||
"data": {
|
||||
"id": "01HBHC9CE00007KVA84SNERRE8",
|
||||
"name": "powerSip",
|
||||
"flags": 0,
|
||||
"tags": [
|
||||
"csm",
|
||||
"weeb",
|
||||
"anime",
|
||||
"sip"
|
||||
],
|
||||
"lifecycle": 3,
|
||||
"state": [
|
||||
"LISTED"
|
||||
],
|
||||
"listed": true,
|
||||
"animated": false,
|
||||
"owner": {
|
||||
"id": "01FBXP96C0000APPS8GMQY5HFA",
|
||||
"username": "kodyparbs",
|
||||
"display_name": "Kodyparbs",
|
||||
"avatar_url": "https://static-cdn.jtvnw.net/jtv_user_pictures/88453ec4-d53f-4413-a1c0-57319ccc679a-profile_image-300x300.png",
|
||||
"style": {},
|
||||
"role_ids": [
|
||||
"01G68MMQFR0007J6GNM9E2M0TM"
|
||||
],
|
||||
"connections": [
|
||||
{
|
||||
"id": "417084954",
|
||||
"platform": "TWITCH",
|
||||
"username": "kodyparbs",
|
||||
"display_name": "Kodyparbs",
|
||||
"linked_at": 1627715312000,
|
||||
"emote_capacity": 1000,
|
||||
"emote_set_id": "01FBXP96C0000APPS8GMQY5HFA"
|
||||
}
|
||||
]
|
||||
},
|
||||
"host": {
|
||||
"url": "//cdn.7tv.app/emote/01HBHC9CE00007KVA84SNERRE8",
|
||||
"files": [
|
||||
{
|
||||
"name": "1x.webp",
|
||||
"static_name": "1x.webp",
|
||||
"width": 32,
|
||||
"height": 32,
|
||||
"frame_count": 1,
|
||||
"size": 2568,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "2x.webp",
|
||||
"static_name": "2x.webp",
|
||||
"width": 64,
|
||||
"height": 64,
|
||||
"frame_count": 1,
|
||||
"size": 6388,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "3x.webp",
|
||||
"static_name": "3x.webp",
|
||||
"width": 96,
|
||||
"height": 96,
|
||||
"frame_count": 1,
|
||||
"size": 10816,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "4x.webp",
|
||||
"static_name": "4x.webp",
|
||||
"width": 128,
|
||||
"height": 128,
|
||||
"frame_count": 1,
|
||||
"size": 16062,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "1x.avif",
|
||||
"static_name": "1x.avif",
|
||||
"width": 32,
|
||||
"height": 32,
|
||||
"frame_count": 1,
|
||||
"size": 1632,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "2x.avif",
|
||||
"static_name": "2x.avif",
|
||||
"width": 64,
|
||||
"height": 64,
|
||||
"frame_count": 1,
|
||||
"size": 3607,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "3x.avif",
|
||||
"static_name": "3x.avif",
|
||||
"width": 96,
|
||||
"height": 96,
|
||||
"frame_count": 1,
|
||||
"size": 5813,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "4x.avif",
|
||||
"static_name": "4x.avif",
|
||||
"width": 128,
|
||||
"height": 128,
|
||||
"frame_count": 1,
|
||||
"size": 8011,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "1x.png",
|
||||
"static_name": "1x.png",
|
||||
"width": 32,
|
||||
"height": 32,
|
||||
"frame_count": 1,
|
||||
"size": 2919,
|
||||
"format": "PNG"
|
||||
},
|
||||
{
|
||||
"name": "2x.png",
|
||||
"static_name": "2x.png",
|
||||
"width": 64,
|
||||
"height": 64,
|
||||
"frame_count": 1,
|
||||
"size": 8672,
|
||||
"format": "PNG"
|
||||
},
|
||||
{
|
||||
"name": "3x.png",
|
||||
"static_name": "3x.png",
|
||||
"width": 96,
|
||||
"height": 96,
|
||||
"frame_count": 1,
|
||||
"size": 15712,
|
||||
"format": "PNG"
|
||||
},
|
||||
{
|
||||
"name": "4x.png",
|
||||
"static_name": "4x.png",
|
||||
"width": 128,
|
||||
"height": 128,
|
||||
"frame_count": 1,
|
||||
"size": 23966,
|
||||
"format": "PNG"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"origin_id": null
|
||||
},
|
||||
{
|
||||
"id": "01JDTNVJADW1R4YZB3F85VVT9T",
|
||||
"name": "Jin",
|
||||
"flags": 0,
|
||||
"timestamp": 1732840900941,
|
||||
"actor_id": "01JBTEEZ8R6A1J8RMZ581A1V02",
|
||||
"data": {
|
||||
"id": "01JDTNVJADW1R4YZB3F85VVT9T",
|
||||
"name": "Jin",
|
||||
"flags": 0,
|
||||
"lifecycle": 3,
|
||||
"state": [
|
||||
"LISTED"
|
||||
],
|
||||
"listed": true,
|
||||
"animated": false,
|
||||
"owner": {
|
||||
"id": "01JBTEEZ8R6A1J8RMZ581A1V02",
|
||||
"username": "twoinone1_",
|
||||
"display_name": "twoinone1_",
|
||||
"avatar_url": "https://static-cdn.jtvnw.net/jtv_user_pictures/3d606741-9960-484d-a484-1aa9a0f002b6-profile_image-300x300.png",
|
||||
"style": {
|
||||
"color": -5635841,
|
||||
"paint_id": "01JQM68WYPTC7PFSE1QZKGWSTX"
|
||||
},
|
||||
"role_ids": [
|
||||
"01F37R3RFR0000K96678WEQT01",
|
||||
"01G68MMQFR0007J6GNM9E2M0TM"
|
||||
],
|
||||
"connections": [
|
||||
{
|
||||
"id": "116723523",
|
||||
"platform": "TWITCH",
|
||||
"username": "twoinone1_",
|
||||
"display_name": "twoinone1_",
|
||||
"linked_at": 1730685664536,
|
||||
"emote_capacity": 1000,
|
||||
"emote_set_id": "01JD8YZCMPF628X8DZ7NAYFG8C"
|
||||
}
|
||||
]
|
||||
},
|
||||
"host": {
|
||||
"url": "//cdn.7tv.app/emote/01JDTNVJADW1R4YZB3F85VVT9T",
|
||||
"files": [
|
||||
{
|
||||
"name": "1x.webp",
|
||||
"static_name": "1x.webp",
|
||||
"width": 74,
|
||||
"height": 32,
|
||||
"frame_count": 1,
|
||||
"size": 774,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "2x.webp",
|
||||
"static_name": "2x.webp",
|
||||
"width": 148,
|
||||
"height": 64,
|
||||
"frame_count": 1,
|
||||
"size": 1768,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "3x.webp",
|
||||
"static_name": "3x.webp",
|
||||
"width": 222,
|
||||
"height": 96,
|
||||
"frame_count": 1,
|
||||
"size": 2880,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "4x.webp",
|
||||
"static_name": "4x.webp",
|
||||
"width": 296,
|
||||
"height": 128,
|
||||
"frame_count": 1,
|
||||
"size": 3916,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "1x.avif",
|
||||
"static_name": "1x.avif",
|
||||
"width": 74,
|
||||
"height": 32,
|
||||
"frame_count": 1,
|
||||
"size": 1192,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "2x.avif",
|
||||
"static_name": "2x.avif",
|
||||
"width": 148,
|
||||
"height": 64,
|
||||
"frame_count": 1,
|
||||
"size": 2598,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "3x.avif",
|
||||
"static_name": "3x.avif",
|
||||
"width": 222,
|
||||
"height": 96,
|
||||
"frame_count": 1,
|
||||
"size": 3963,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "4x.avif",
|
||||
"static_name": "4x.avif",
|
||||
"width": 296,
|
||||
"height": 128,
|
||||
"frame_count": 1,
|
||||
"size": 5406,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "1x.png",
|
||||
"static_name": "1x.png",
|
||||
"width": 74,
|
||||
"height": 32,
|
||||
"frame_count": 1,
|
||||
"size": 4913,
|
||||
"format": "PNG"
|
||||
},
|
||||
{
|
||||
"name": "2x.png",
|
||||
"static_name": "2x.png",
|
||||
"width": 148,
|
||||
"height": 64,
|
||||
"frame_count": 1,
|
||||
"size": 13376,
|
||||
"format": "PNG"
|
||||
},
|
||||
{
|
||||
"name": "3x.png",
|
||||
"static_name": "3x.png",
|
||||
"width": 222,
|
||||
"height": 96,
|
||||
"frame_count": 1,
|
||||
"size": 24123,
|
||||
"format": "PNG"
|
||||
},
|
||||
{
|
||||
"name": "4x.png",
|
||||
"static_name": "4x.png",
|
||||
"width": 296,
|
||||
"height": 128,
|
||||
"frame_count": 1,
|
||||
"size": 34810,
|
||||
"format": "PNG"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"origin_id": null
|
||||
},
|
||||
{
|
||||
"id": "01JE08SKFXM94FTG52GVMGFQZW",
|
||||
"name": "IMissTwin",
|
||||
"flags": 0,
|
||||
"timestamp": 1733028531709,
|
||||
"actor_id": "01JBTEEZ8R6A1J8RMZ581A1V02",
|
||||
"data": {
|
||||
"id": "01JE08SKFXM94FTG52GVMGFQZW",
|
||||
"name": "IMissTwin",
|
||||
"flags": 0,
|
||||
"lifecycle": 3,
|
||||
"state": [
|
||||
"LISTED"
|
||||
],
|
||||
"listed": true,
|
||||
"animated": false,
|
||||
"owner": {
|
||||
"id": "01JBTEEZ8R6A1J8RMZ581A1V02",
|
||||
"username": "twoinone1_",
|
||||
"display_name": "twoinone1_",
|
||||
"avatar_url": "https://static-cdn.jtvnw.net/jtv_user_pictures/3d606741-9960-484d-a484-1aa9a0f002b6-profile_image-300x300.png",
|
||||
"style": {
|
||||
"color": -5635841,
|
||||
"paint_id": "01JQM68WYPTC7PFSE1QZKGWSTX"
|
||||
},
|
||||
"role_ids": [
|
||||
"01F37R3RFR0000K96678WEQT01",
|
||||
"01G68MMQFR0007J6GNM9E2M0TM"
|
||||
],
|
||||
"connections": [
|
||||
{
|
||||
"id": "116723523",
|
||||
"platform": "TWITCH",
|
||||
"username": "twoinone1_",
|
||||
"display_name": "twoinone1_",
|
||||
"linked_at": 1730685664536,
|
||||
"emote_capacity": 1000,
|
||||
"emote_set_id": "01JD8YZCMPF628X8DZ7NAYFG8C"
|
||||
}
|
||||
]
|
||||
},
|
||||
"host": {
|
||||
"url": "//cdn.7tv.app/emote/01JE08SKFXM94FTG52GVMGFQZW",
|
||||
"files": [
|
||||
{
|
||||
"name": "1x.webp",
|
||||
"static_name": "1x.webp",
|
||||
"width": 44,
|
||||
"height": 32,
|
||||
"frame_count": 1,
|
||||
"size": 748,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "2x.webp",
|
||||
"static_name": "2x.webp",
|
||||
"width": 88,
|
||||
"height": 64,
|
||||
"frame_count": 1,
|
||||
"size": 1964,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "3x.webp",
|
||||
"static_name": "3x.webp",
|
||||
"width": 132,
|
||||
"height": 96,
|
||||
"frame_count": 1,
|
||||
"size": 3444,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "4x.webp",
|
||||
"static_name": "4x.webp",
|
||||
"width": 176,
|
||||
"height": 128,
|
||||
"frame_count": 1,
|
||||
"size": 5210,
|
||||
"format": "WEBP"
|
||||
},
|
||||
{
|
||||
"name": "1x.avif",
|
||||
"static_name": "1x.avif",
|
||||
"width": 44,
|
||||
"height": 32,
|
||||
"frame_count": 1,
|
||||
"size": 1408,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "2x.avif",
|
||||
"static_name": "2x.avif",
|
||||
"width": 88,
|
||||
"height": 64,
|
||||
"frame_count": 1,
|
||||
"size": 3291,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "3x.avif",
|
||||
"static_name": "3x.avif",
|
||||
"width": 132,
|
||||
"height": 96,
|
||||
"frame_count": 1,
|
||||
"size": 5577,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "4x.avif",
|
||||
"static_name": "4x.avif",
|
||||
"width": 176,
|
||||
"height": 128,
|
||||
"frame_count": 1,
|
||||
"size": 8071,
|
||||
"format": "AVIF"
|
||||
},
|
||||
{
|
||||
"name": "1x.png",
|
||||
"static_name": "1x.png",
|
||||
"width": 44,
|
||||
"height": 32,
|
||||
"frame_count": 1,
|
||||
"size": 4904,
|
||||
"format": "PNG"
|
||||
},
|
||||
{
|
||||
"name": "2x.png",
|
||||
"static_name": "2x.png",
|
||||
"width": 88,
|
||||
"height": 64,
|
||||
"frame_count": 1,
|
||||
"size": 16973,
|
||||
"format": "PNG"
|
||||
},
|
||||
{
|
||||
"name": "3x.png",
|
||||
"static_name": "3x.png",
|
||||
"width": 132,
|
||||
"height": 96,
|
||||
"frame_count": 1,
|
||||
"size": 34591,
|
||||
"format": "PNG"
|
||||
},
|
||||
{
|
||||
"name": "4x.png",
|
||||
"static_name": "4x.png",
|
||||
"width": 176,
|
||||
"height": 128,
|
||||
"frame_count": 1,
|
||||
"size": 56643,
|
||||
"format": "PNG"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"origin_id": null
|
||||
}
|
||||
],
|
||||
"emote_count": 3,
|
||||
"capacity": 1000,
|
||||
"owner": null
|
||||
},
|
||||
"user": {
|
||||
"id": "01JBTEEZ8R6A1J8RMZ581A1V02",
|
||||
"username": "twoinone1_",
|
||||
"display_name": "twoinone1_",
|
||||
"created_at": 1730685664536,
|
||||
"avatar_url": "https://static-cdn.jtvnw.net/jtv_user_pictures/3d606741-9960-484d-a484-1aa9a0f002b6-profile_image-300x300.png",
|
||||
"style": {
|
||||
"color": -5635841,
|
||||
"paint_id": "01JQM68WYPTC7PFSE1QZKGWSTX"
|
||||
},
|
||||
"emote_sets": [
|
||||
{
|
||||
"id": "01JD8YZCMPF628X8DZ7NAYFG8C",
|
||||
"name": "twoinone1_'s Emotes",
|
||||
"flags": 0,
|
||||
"tags": [],
|
||||
"capacity": 1000
|
||||
},
|
||||
{
|
||||
"id": "01JR4JFB07TJHTFCCF2BDZM8MH",
|
||||
"name": "Personal Emote Set",
|
||||
"flags": 4,
|
||||
"tags": [],
|
||||
"capacity": 5
|
||||
}
|
||||
],
|
||||
"roles": [
|
||||
"01G68MMQFR0007J6GNM9E2M0TM",
|
||||
"01F37R3RFR0000K96678WEQT01"
|
||||
],
|
||||
"connections": [
|
||||
{
|
||||
"id": "116723523",
|
||||
"platform": "TWITCH",
|
||||
"username": "twoinone1_",
|
||||
"display_name": "twoinone1_",
|
||||
"linked_at": 1730685664536,
|
||||
"emote_capacity": 1000,
|
||||
"emote_set_id": "01JD8YZCMPF628X8DZ7NAYFG8C",
|
||||
"emote_set": null
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
11
go.mod
11
go.mod
@@ -6,10 +6,21 @@ require (
|
||||
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874
|
||||
github.com/google/go-cmp v0.7.0
|
||||
github.com/twmb/franz-go v1.18.1
|
||||
zombiezen.com/go/sqlite v1.4.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/klauspost/compress v1.17.11 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.22 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
|
||||
golang.org/x/sys v0.22.0 // indirect
|
||||
modernc.org/libc v1.55.3 // indirect
|
||||
modernc.org/mathutil v1.6.0 // indirect
|
||||
modernc.org/memory v1.8.0 // indirect
|
||||
modernc.org/sqlite v1.33.1 // indirect
|
||||
)
|
||||
|
45
go.sum
45
go.sum
@@ -1,14 +1,59 @@
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874 h1:F8d1AJ6M9UQCavhwmO6ZsrYLfG8zVFWfEfMS2MXPkSY=
|
||||
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874/go.mod h1:TiCD2a1pcmjd7YnhGH0f/zKNcCD06B029pHhzV23c2M=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
||||
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
|
||||
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
|
||||
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc=
|
||||
github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M=
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
|
||||
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
|
||||
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
|
||||
golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic=
|
||||
golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
|
||||
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw=
|
||||
golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc=
|
||||
modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ=
|
||||
modernc.org/cc/v4 v4.21.4/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ=
|
||||
modernc.org/ccgo/v4 v4.19.2 h1:lwQZgvboKD0jBwdaeVCTouxhxAyN6iawF3STraAal8Y=
|
||||
modernc.org/ccgo/v4 v4.19.2/go.mod h1:ysS3mxiMV38XGRTTcgo0DQTeTmAO4oCmJl1nX9VFI3s=
|
||||
modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
|
||||
modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
|
||||
modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw=
|
||||
modernc.org/gc/v2 v2.4.1/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU=
|
||||
modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U=
|
||||
modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w=
|
||||
modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4=
|
||||
modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo=
|
||||
modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E=
|
||||
modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU=
|
||||
modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
|
||||
modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
|
||||
modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc=
|
||||
modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss=
|
||||
modernc.org/sqlite v1.33.1 h1:trb6Z3YYoeM9eDL1O8do81kP+0ejv+YzgyFo+Gwy0nM=
|
||||
modernc.org/sqlite v1.33.1/go.mod h1:pXV2xHxhzXZsgT/RtTFAPY6JJDEvOTcTdwADQCCWD4k=
|
||||
modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA=
|
||||
modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0=
|
||||
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
|
||||
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
|
||||
zombiezen.com/go/sqlite v1.4.0 h1:N1s3RIljwtp4541Y8rM880qgGIgq3fTD2yks1xftnKU=
|
||||
zombiezen.com/go/sqlite v1.4.0/go.mod h1:0w9F1DN9IZj9AcLS9YDKMboubCACkwYCGkzoy3eG5ik=
|
||||
|
@@ -2,6 +2,8 @@ package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/go-json-experiment/json"
|
||||
@@ -16,6 +18,9 @@ type Message struct {
|
||||
Channel string `json:"ch"`
|
||||
// Sender is an obfuscated identifier for the sending user.
|
||||
Sender sender `json:"u"`
|
||||
// Timestamp is the time at which the message was sent in nanoseconds since
|
||||
// the Unix epoch.
|
||||
Timestamp int64 `json:"ts"`
|
||||
// Text is the message content.
|
||||
Text string `json:"t"`
|
||||
}
|
||||
@@ -25,7 +30,7 @@ type Producer interface {
|
||||
Produce(ctx context.Context, rec *kgo.Record, promise func(*kgo.Record, error))
|
||||
}
|
||||
|
||||
const topic = "kaiyan.chat"
|
||||
const Topic = "kaiyan.chat"
|
||||
|
||||
var recordPool sync.Pool
|
||||
|
||||
@@ -37,7 +42,7 @@ func Send(ctx context.Context, cl Producer, msg Message, errs chan<- error) {
|
||||
}
|
||||
rec, _ := recordPool.Get().(*kgo.Record)
|
||||
if rec == nil {
|
||||
rec = &kgo.Record{Topic: topic}
|
||||
rec = &kgo.Record{Topic: Topic}
|
||||
}
|
||||
rec.Key = append(rec.Key[:0], msg.ID...)
|
||||
rec.Value = append(rec.Value[:0], b...)
|
||||
@@ -51,3 +56,23 @@ func Send(ctx context.Context, cl Producer, msg Message, errs chan<- error) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type Consumer interface {
|
||||
PollFetches(ctx context.Context) kgo.Fetches
|
||||
}
|
||||
|
||||
func Recv(ctx context.Context, cl Consumer, onto []Message) ([]Message, error) {
|
||||
f := cl.PollFetches(ctx)
|
||||
var errs error
|
||||
f.EachError(func(s string, i int32, err error) {
|
||||
errs = errors.Join(errs, fmt.Errorf("partition %d: %w", i, err))
|
||||
})
|
||||
f.EachRecord(func(r *kgo.Record) {
|
||||
var msg Message
|
||||
if err := json.Unmarshal(r.Value, &msg); err != nil {
|
||||
errs = errors.Join(errs, fmt.Errorf("partition %d: %w", r.Partition, err))
|
||||
}
|
||||
onto = append(onto, msg)
|
||||
})
|
||||
return onto, errs
|
||||
}
|
||||
|
@@ -4,9 +4,10 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/queue"
|
||||
"github.com/go-json-experiment/json"
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/queue"
|
||||
)
|
||||
|
||||
type spyProducer struct {
|
||||
@@ -23,6 +24,7 @@ func TestSend(t *testing.T) {
|
||||
ID: "bocchi",
|
||||
Channel: "kessoku",
|
||||
Sender: queue.Sender(make([]byte, 16), "kessoku", "ryō"),
|
||||
Timestamp: 1,
|
||||
Text: "bocchi the rock!",
|
||||
}
|
||||
var q spyProducer
|
||||
@@ -48,3 +50,57 @@ func TestSend(t *testing.T) {
|
||||
t.Errorf("message did not round-trip:\nwant %+v\ngot %+v", msg, got)
|
||||
}
|
||||
}
|
||||
|
||||
type spyConsumer struct {
|
||||
get kgo.Fetches
|
||||
}
|
||||
|
||||
func (c *spyConsumer) PollFetches(ctx context.Context) kgo.Fetches {
|
||||
return c.get
|
||||
}
|
||||
|
||||
func TestRecv(t *testing.T) {
|
||||
msg := queue.Message{
|
||||
ID: "bocchi",
|
||||
Channel: "kessoku",
|
||||
Sender: queue.Sender(make([]byte, 16), "kessoku", "ryō"),
|
||||
Text: "bocchi the rock!",
|
||||
}
|
||||
val, err := json.Marshal(&msg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
p := spyConsumer{
|
||||
get: kgo.Fetches{
|
||||
{
|
||||
Topics: []kgo.FetchTopic{
|
||||
{
|
||||
Topic: queue.Topic,
|
||||
Partitions: []kgo.FetchPartition{
|
||||
{
|
||||
Partition: 1,
|
||||
Records: []*kgo.Record{
|
||||
{
|
||||
Key: []byte("bocchi"),
|
||||
Value: val,
|
||||
Partition: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
got, err := queue.Recv(context.Background(), &p, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("wrong number of messages: want 1, got %d", len(got))
|
||||
}
|
||||
if got[0] != msg {
|
||||
t.Errorf("message did not round-trip:\nwant %+v\ngot %+v", msg, got[0])
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user