emote/sqlitestore: implement index in terms of sqlite
This commit is contained in:
12
emote/sqlitestore/metrics.sql
Normal file
12
emote/sqlitestore/metrics.sql
Normal file
@@ -0,0 +1,12 @@
|
||||
SELECT
|
||||
id,
|
||||
name, -- name, source, &c. are constant per id, and sqlite lets us select without aggregate
|
||||
source,
|
||||
link,
|
||||
image,
|
||||
COUNT(*) AS tokens,
|
||||
COUNT(DISTINCT message) AS messages,
|
||||
COUNT(DISTINCT sender) AS senders
|
||||
FROM emote
|
||||
WHERE channel = :channel AND time BETWEEN :start AND :end
|
||||
GROUP BY id
|
17
emote/sqlitestore/schema.sql
Normal file
17
emote/sqlitestore/schema.sql
Normal file
@@ -0,0 +1,17 @@
|
||||
PRAGMA journal_mode = WAL;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS emote (
|
||||
channel TEXT NOT NULL,
|
||||
message TEXT NOT NULL,
|
||||
time INTEGER NOT NULL,
|
||||
sender TEXT NOT NULL,
|
||||
id TEXT NOT NULL,
|
||||
-- The following columns could be normalized in a separate table,
|
||||
-- but they're here for simplicity.
|
||||
name TEXT NOT NULL,
|
||||
source TEXT NOT NULL,
|
||||
link TEXT NOT NULL,
|
||||
image TEXT NOT NULL
|
||||
) STRICT;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS channel_time ON emote (channel, time);
|
114
emote/sqlitestore/store.go
Normal file
114
emote/sqlitestore/store.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package sqlitestore
|
||||
|
||||
import (
|
||||
"context"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"zombiezen.com/go/sqlite/sqlitex"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/emote"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
pool *sqlitex.Pool
|
||||
}
|
||||
|
||||
//go:embed schema.sql
|
||||
var schemaSQL string
|
||||
|
||||
//go:embed metrics.sql
|
||||
var metricsSQL string
|
||||
|
||||
// Open creates a metrics store using db as its underlying storage.
|
||||
// It initializes the schema used for Kaiyan.
|
||||
func Open(ctx context.Context, db *sqlitex.Pool) (*Store, error) {
|
||||
conn, err := db.Take(ctx)
|
||||
defer db.Put(conn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't get connection from pool: %w", err)
|
||||
}
|
||||
if err := sqlitex.ExecuteScript(conn, schemaSQL, nil); err != nil {
|
||||
return nil, fmt.Errorf("couldn't run migration: %w", err)
|
||||
}
|
||||
st := Store{db}
|
||||
return &st, nil
|
||||
}
|
||||
|
||||
// Close closes the database.
|
||||
func (db *Store) Close() error {
|
||||
return db.pool.Close()
|
||||
}
|
||||
|
||||
// Record stores the emotes in a given message.
|
||||
func (db *Store) Record(ctx context.Context, channel, message, sender string, tm time.Time, emotes []emote.Emote) (err error) {
|
||||
conn, err := db.pool.Take(ctx)
|
||||
defer db.pool.Put(conn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't get connection to record emotes: %w", err)
|
||||
}
|
||||
defer sqlitex.Transaction(conn)(&err)
|
||||
|
||||
st, err := conn.Prepare(`INSERT INTO emote (channel, message, time, sender, id, name, source, link, image) VALUES (:channel, :message, :time, :sender, :id, :name, :source, :link, :image)`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't prepare emote insert: %w", err)
|
||||
}
|
||||
st.SetText(":channel", channel)
|
||||
st.SetText(":message", message)
|
||||
st.SetInt64(":time", tm.UnixNano())
|
||||
st.SetText(":sender", sender)
|
||||
for _, em := range emotes {
|
||||
st.SetText(":id", em.ID)
|
||||
st.SetText(":name", em.Name)
|
||||
st.SetText(":source", em.Source)
|
||||
st.SetText(":link", em.Link)
|
||||
st.SetText(":image", em.Image)
|
||||
_, err := st.Step()
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't insert emote: %w", err)
|
||||
}
|
||||
st.Reset() // NOTE(branden): bound parameters are retained
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Metrics appends emote usage information for a channel in a given time range.
|
||||
func (db *Store) Metrics(ctx context.Context, channel string, start, end time.Time, onto []emote.Metric) ([]emote.Metric, error) {
|
||||
conn, err := db.pool.Take(ctx)
|
||||
defer db.pool.Put(conn)
|
||||
if err != nil {
|
||||
return onto, fmt.Errorf("couldn't get connection for metrics: %w", err)
|
||||
}
|
||||
st, err := conn.Prepare(metricsSQL)
|
||||
if err != nil {
|
||||
return onto, fmt.Errorf("couldn't prepare metrics statement: %w", err)
|
||||
}
|
||||
st.SetText(":channel", channel)
|
||||
st.SetInt64(":start", start.UnixNano())
|
||||
st.SetInt64(":end", end.UnixNano())
|
||||
for {
|
||||
ok, err := st.Step()
|
||||
if err != nil {
|
||||
return onto, fmt.Errorf("couldn't step metrics selection: %w", err)
|
||||
}
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
m := emote.Metric{
|
||||
Emote: emote.Emote{
|
||||
ID: st.ColumnText(0),
|
||||
Name: st.ColumnText(1),
|
||||
Source: st.ColumnText(2),
|
||||
Link: st.ColumnText(3),
|
||||
Image: st.ColumnText(4),
|
||||
},
|
||||
Tokens: st.ColumnInt64(5),
|
||||
Messages: st.ColumnInt64(6),
|
||||
Users: st.ColumnInt64(7),
|
||||
}
|
||||
onto = append(onto, m)
|
||||
}
|
||||
return onto, st.Reset()
|
||||
}
|
141
emote/sqlitestore/store_test.go
Normal file
141
emote/sqlitestore/store_test.go
Normal file
@@ -0,0 +1,141 @@
|
||||
package sqlitestore_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/emote"
|
||||
"git.sunturtle.xyz/zephyr/kaiyan/emote/sqlitestore"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"zombiezen.com/go/sqlite"
|
||||
"zombiezen.com/go/sqlite/sqlitex"
|
||||
)
|
||||
|
||||
func testDB(name string) *sqlitex.Pool {
|
||||
opts := sqlitex.PoolOptions{
|
||||
Flags: sqlite.OpenCreate | sqlite.OpenReadWrite | sqlite.OpenMemory | sqlite.OpenSharedCache | sqlite.OpenURI,
|
||||
}
|
||||
pool, err := sqlitex.NewPool(fmt.Sprintf("file:%s.db?mode=memory&cache=shared", name), opts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return pool
|
||||
}
|
||||
|
||||
func TestStore(t *testing.T) {
|
||||
ems := []struct {
|
||||
channel, message, sender string
|
||||
ts int64
|
||||
emotes []emote.Emote
|
||||
}{
|
||||
{
|
||||
channel: "kessoku",
|
||||
message: "1",
|
||||
sender: "ryo",
|
||||
ts: 1000,
|
||||
emotes: []emote.Emote{
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
},
|
||||
},
|
||||
{
|
||||
channel: "kessoku",
|
||||
message: "2",
|
||||
sender: "seika",
|
||||
ts: 2000,
|
||||
emotes: []emote.Emote{
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
},
|
||||
},
|
||||
{
|
||||
channel: "sickhack",
|
||||
message: "3",
|
||||
sender: "ryo",
|
||||
ts: 1500,
|
||||
emotes: []emote.Emote{
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
{ID: "nijika", Name: "nijika"},
|
||||
{ID: "kita", Name: "kita"},
|
||||
},
|
||||
},
|
||||
}
|
||||
st, err := sqlitestore.Open(t.Context(), testDB("TestStore"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, m := range ems {
|
||||
err := st.Record(t.Context(), m.channel, m.message, m.sender, time.Unix(m.ts, 0), m.emotes)
|
||||
if err != nil {
|
||||
t.Errorf("couldn't record message %s: %v", m.message, err)
|
||||
}
|
||||
}
|
||||
cases := []struct {
|
||||
name string
|
||||
channel string
|
||||
start, end int64
|
||||
want []emote.Metric
|
||||
}{
|
||||
{
|
||||
name: "channel",
|
||||
channel: "kessoku",
|
||||
start: 0,
|
||||
end: 1e6,
|
||||
want: []emote.Metric{
|
||||
{
|
||||
Emote: emote.Emote{ID: "kita", Name: "kita"},
|
||||
Tokens: 4,
|
||||
Messages: 2,
|
||||
Users: 2,
|
||||
},
|
||||
{
|
||||
Emote: emote.Emote{ID: "nijika", Name: "nijika"},
|
||||
Tokens: 4,
|
||||
Messages: 2,
|
||||
Users: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "time",
|
||||
channel: "kessoku",
|
||||
start: 1250,
|
||||
end: 1e6,
|
||||
want: []emote.Metric{
|
||||
{
|
||||
Emote: emote.Emote{ID: "kita", Name: "kita"},
|
||||
Tokens: 2,
|
||||
Messages: 1,
|
||||
Users: 1,
|
||||
},
|
||||
{
|
||||
Emote: emote.Emote{ID: "nijika", Name: "nijika"},
|
||||
Tokens: 2,
|
||||
Messages: 1,
|
||||
Users: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
// Don't use Run because we have earlier t.Fails.
|
||||
got, err := st.Metrics(t.Context(), c.channel, time.Unix(c.start, 0), time.Unix(c.end, 0), nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
// Sort results by emote ID so we can easily diff.
|
||||
// In particular, we don't expect the store to sort for us.
|
||||
slices.SortFunc(got, func(a, b emote.Metric) int { return strings.Compare(a.Emote.ID, b.Emote.ID) })
|
||||
if diff := cmp.Diff(c.want, got); diff != "" {
|
||||
t.Errorf("wrong results (+got/-want):\n%s", diff)
|
||||
}
|
||||
}
|
||||
}
|
26
emote/store.go
Normal file
26
emote/store.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package emote
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Store is a store of emote usage records.
|
||||
type Store interface {
|
||||
// Record stores the emotes in a given message.
|
||||
Record(ctx context.Context, channel, message, sender string, tm time.Time, emotes []Emote) error
|
||||
// Metrics appends emote usage information for a channel in a given time range.
|
||||
Metrics(ctx context.Context, channel string, start, end time.Time, onto []Metric) ([]Metric, error)
|
||||
}
|
||||
|
||||
// Metric is the metrics for a single emote.
|
||||
type Metric struct {
|
||||
// Emote is the emote that the metrics describe.
|
||||
Emote Emote
|
||||
// Tokens is the total number of occurrences of the emote.
|
||||
Tokens int64
|
||||
// Messages is the number of unique messages which contained the emote.
|
||||
Messages int64
|
||||
// Users is the number of users who used the emote.
|
||||
Users int64
|
||||
}
|
Reference in New Issue
Block a user