chord/main.go
2025-03-15 20:52:59 -04:00

269 lines
5.8 KiB
Go

package main
import (
"context"
"fmt"
"log/slog"
"net"
"net/http"
"net/netip"
"os"
"os/signal"
"time"
"github.com/urfave/cli/v3"
"git.sunturtle.xyz/zephyr/chord/chord"
"git.sunturtle.xyz/zephyr/chord/chord/httpnode"
)
var app = cli.Command{
Name: "chord",
Usage: "Distributed hash table using the Chord protocol",
Commands: []*cli.Command{
{
Name: "join",
Aliases: []string{"init", "initialize"},
Usage: "Join or initialize a Chord network.",
Flags: []cli.Flag{
flagIP,
flagPort,
&cli.StringFlag{
Name: "c",
Usage: "Address of an existing node to join.",
},
},
Action: cliJoin,
},
{
Name: "leave",
Usage: "Instruct a node to leave the network.",
Flags: []cli.Flag{
flagIP,
flagPort,
&cli.StringFlag{
Name: "n",
Usage: "Node to leave.",
Required: true,
},
},
Action: cliLeave,
},
{
Name: "lookup",
Usage: "Find the node responsible for a key.",
Flags: []cli.Flag{
flagKey,
flagNode,
flagLocal,
},
Action: cliLookup,
},
{
Name: "put",
Aliases: []string{"store", "set"},
Usage: "Set a value in the network.",
Flags: []cli.Flag{
flagKey,
&cli.StringFlag{
Name: "v",
Usage: "Value to set.",
Required: true,
},
flagNode,
flagLocal,
},
Action: cliPut,
},
{
Name: "get",
Aliases: []string{"load"},
Usage: "Get a value in the network.",
Flags: []cli.Flag{
flagKey,
flagNode,
flagLocal,
},
Action: cliGet,
},
},
}
var (
flagIP = &cli.StringFlag{
Name: "ip",
Usage: "IP to bind. May include port.",
Value: "127.0.0.1:3000",
}
flagPort = &cli.UintFlag{
Name: "p",
Usage: "Port to bind. Overrides port in -ip if given.",
}
flagNode = &cli.StringFlag{
Name: "n",
Usage: "Address of any node in the network.",
Required: true,
}
flagKey = &cli.StringFlag{
Name: "k",
Usage: "Key to operate on.",
Required: true,
}
flagLocal = &cli.BoolFlag{
Name: "l",
Usage: "Stop if the key is not local to the node.",
}
)
func addrflag(ip string, p uint64) (netip.AddrPort, error) {
ap, err := netip.ParseAddrPort(ip)
if err == nil {
// note inverted error check
if p != 0 {
ap = netip.AddrPortFrom(ap.Addr(), uint16(p))
}
return ap, nil
}
a, err := netip.ParseAddr(ip)
if err == nil {
// note inverted error check
return netip.AddrPortFrom(a, uint16(p)), nil
}
return netip.AddrPort{}, fmt.Errorf("couldn't parse %q as ip:port or ip", ip)
}
func cliJoin(ctx context.Context, cmd *cli.Command) error {
ctx, cancel := context.WithCancel(ctx)
addr, err := addrflag(cmd.String("ip"), cmd.Uint("p"))
if err != nil {
return err
}
l, err := net.Listen("tcp", addr.String())
if err != nil {
return err
}
defer l.Close()
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
var node *chord.Node
if peer := cmd.String("c"); peer != "" {
var p netip.AddrPort
p, err = netip.ParseAddrPort(peer)
if err != nil {
return err
}
node, err = chord.Join(ctx, cl, addr, chord.Address(p))
} else {
node, err = chord.Create(addr)
}
if err != nil {
return err
}
s, err := httpnode.New(l, cl, node)
if err != nil {
return err
}
srv := http.Server{
Handler: s.Router(),
ReadTimeout: 5 * time.Second,
BaseContext: func(l net.Listener) context.Context { return ctx },
}
go func() {
slog.InfoContext(ctx, "HTTP API server", slog.Any("addr", l.Addr()))
defer cancel()
err := srv.Serve(l)
if err == http.ErrServerClosed {
return
}
slog.ErrorContext(ctx, "HTTP API server closed", slog.Any("err", err))
}()
t := time.NewTicker(time.Second)
go func() {
for range t.C {
if err := chord.Stabilize(ctx, cl, node); err != nil {
slog.ErrorContext(ctx, "stabilize", slog.Any("err", err))
node.SuccessorFailed()
}
pred, succ := node.Neighbors(nil)
slog.InfoContext(ctx, "neighbors", slog.Any("node", node), slog.Any("predecessor", pred), slog.Any("successors", succ))
}
}()
<-ctx.Done()
t.Stop()
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return srv.Shutdown(ctx)
}
func cliLeave(ctx context.Context, cmd *cli.Command) error {
// I'm not really clear on why this command takes a startup IP and port.
// All we need is the node to tell to leave.
n, err := netip.ParseAddrPort(cmd.String("n"))
if err != nil {
return err
}
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
return cl.SayBye(ctx, chord.Address(n))
}
func cliLookup(ctx context.Context, cmd *cli.Command) error {
n, err := netip.ParseAddrPort(cmd.String("n"))
if err != nil {
return err
}
k := chord.Key(cmd.String("k"))
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
p, err := cl.FindSuccessor(ctx, chord.Address(n), k)
if err != nil {
return err
}
_, addr := p.Values()
fmt.Println(addr)
return nil
}
func cliPut(ctx context.Context, cmd *cli.Command) error {
n, err := netip.ParseAddrPort(cmd.String("n"))
if err != nil {
return err
}
k := chord.Key(cmd.String("k"))
v := cmd.String("v")
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
p, err := cl.FindSuccessor(ctx, chord.Address(n), k)
if err != nil {
return err
}
return cl.Set(ctx, p, k, v)
}
func cliGet(ctx context.Context, cmd *cli.Command) error {
n, err := netip.ParseAddrPort(cmd.String("n"))
if err != nil {
return err
}
k := chord.Key(cmd.String("k"))
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
p, err := cl.FindSuccessor(ctx, chord.Address(n), k)
if err != nil {
return err
}
v, err := cl.Get(ctx, p, k)
if err != nil {
return err
}
fmt.Println(v)
return nil
}
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
go func() {
<-ctx.Done()
stop()
}()
if err := app.Run(ctx, os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}