chord/main.go
2025-03-13 22:33:04 -04:00

209 lines
4.5 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"net/http"
"net/netip"
"os"
"os/signal"
"time"
"github.com/urfave/cli/v3"
"git.sunturtle.xyz/zephyr/chord/chord"
"git.sunturtle.xyz/zephyr/chord/chord/httpnode"
)
var app = cli.Command{
Name: "chord",
Usage: "Distributed hash table using the Chord protocol",
Commands: []*cli.Command{
{
Name: "join",
Aliases: []string{"init", "initialize"},
Usage: "Join or initialize a Chord network.",
Flags: []cli.Flag{
flagIP,
flagPort,
&cli.StringFlag{
Name: "c",
Usage: "Address of an existing node to join.",
},
},
Action: cliJoin,
},
{
Name: "leave",
Usage: "Instruct a node to leave the network.",
Flags: []cli.Flag{
flagIP,
flagPort,
&cli.StringFlag{
Name: "n",
Usage: "Node to leave.",
Required: true,
},
},
Action: cliLeave,
},
{
Name: "lookup",
Usage: "Find the node responsible for a key.",
Flags: []cli.Flag{
flagKey,
flagNode,
flagLocal,
},
Action: cliLookup,
},
{
Name: "put",
Aliases: []string{"store", "set"},
Usage: "Set a value in the network.",
Flags: []cli.Flag{
flagKey,
&cli.StringFlag{
Name: "v",
Usage: "Value to set.",
Required: true,
},
flagNode,
flagLocal,
},
Action: cliPut,
},
{
Name: "get",
Aliases: []string{"load"},
Usage: "Get a value in the network.",
Flags: []cli.Flag{
flagKey,
flagNode,
flagLocal,
},
Action: cliGet,
},
},
}
var (
flagIP = &cli.StringFlag{
Name: "ip",
Usage: "IP to bind. May include port.",
Value: "127.0.0.1:3000",
}
flagPort = &cli.UintFlag{
Name: "p",
Usage: "Port to bind. Overrides port in -ip if given.",
}
flagNode = &cli.StringFlag{
Name: "n",
Usage: "Address of any node in the network.",
Required: true,
}
flagKey = &cli.StringFlag{
Name: "k",
Usage: "Key to operate on.",
Required: true,
}
flagLocal = &cli.BoolFlag{
Name: "l",
Usage: "Stop if the key is not local to the node.",
}
)
func addrflag(ip string, p uint64) (netip.AddrPort, error) {
ap, err := netip.ParseAddrPort(ip)
if err == nil {
// note inverted error check
if p != 0 {
ap = netip.AddrPortFrom(ap.Addr(), uint16(p))
}
return ap, nil
}
a, err := netip.ParseAddr(ip)
if err == nil {
// note inverted error check
return netip.AddrPortFrom(a, uint16(p)), nil
}
return netip.AddrPort{}, fmt.Errorf("couldn't parse %q as ip:port or ip", ip)
}
func cliJoin(ctx context.Context, cmd *cli.Command) error {
addr, err := addrflag(cmd.String("ip"), cmd.Uint("p"))
if err != nil {
return err
}
l, err := net.Listen("tcp", addr.String())
if err != nil {
return err
}
defer l.Close()
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
var node *chord.Node
if peer := cmd.String("c"); peer != "" {
var p netip.AddrPort
p, err = netip.ParseAddrPort(peer)
if err != nil {
return err
}
node, err = chord.Join(ctx, cl, addr, chord.Address(p))
} else {
node, err = chord.Create(addr)
}
if err != nil {
return err
}
s, err := httpnode.New(l, cl, node)
if err != nil {
return err
}
srv := http.Server{
Handler: s.Router(),
ReadTimeout: 5 * time.Second,
BaseContext: func(l net.Listener) context.Context { return ctx },
}
go func() {
slog.InfoContext(ctx, "HTTP API server", slog.Any("addr", l.Addr()))
err := srv.Serve(l)
if err == http.ErrServerClosed {
return
}
slog.ErrorContext(ctx, "HTTP API server closed", slog.Any("err", err))
}()
t := time.NewTicker(time.Second)
go func() {
for range t.C {
if err := chord.Stabilize(ctx, cl, node); err != nil {
slog.ErrorContext(ctx, "stabilize", slog.Any("err", err))
}
}
}()
<-ctx.Done()
t.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return srv.Shutdown(ctx)
}
func cliLeave(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
func cliLookup(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
func cliPut(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
func cliGet(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
go func() {
<-ctx.Done()
stop()
}()
if err := app.Run(ctx, os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}