269 lines
5.8 KiB
Go
269 lines
5.8 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"net/netip"
|
|
"os"
|
|
"os/signal"
|
|
"time"
|
|
|
|
"github.com/urfave/cli/v3"
|
|
|
|
"git.sunturtle.xyz/zephyr/chord/chord"
|
|
"git.sunturtle.xyz/zephyr/chord/chord/httpnode"
|
|
)
|
|
|
|
var app = cli.Command{
|
|
Name: "chord",
|
|
Usage: "Distributed hash table using the Chord protocol",
|
|
|
|
Commands: []*cli.Command{
|
|
{
|
|
Name: "join",
|
|
Aliases: []string{"init", "initialize"},
|
|
Usage: "Join or initialize a Chord network.",
|
|
Flags: []cli.Flag{
|
|
flagIP,
|
|
flagPort,
|
|
&cli.StringFlag{
|
|
Name: "c",
|
|
Usage: "Address of an existing node to join.",
|
|
},
|
|
},
|
|
Action: cliJoin,
|
|
},
|
|
{
|
|
Name: "leave",
|
|
Usage: "Instruct a node to leave the network.",
|
|
Flags: []cli.Flag{
|
|
flagIP,
|
|
flagPort,
|
|
&cli.StringFlag{
|
|
Name: "n",
|
|
Usage: "Node to leave.",
|
|
Required: true,
|
|
},
|
|
},
|
|
Action: cliLeave,
|
|
},
|
|
{
|
|
Name: "lookup",
|
|
Usage: "Find the node responsible for a key.",
|
|
Flags: []cli.Flag{
|
|
flagKey,
|
|
flagNode,
|
|
flagLocal,
|
|
},
|
|
Action: cliLookup,
|
|
},
|
|
{
|
|
Name: "put",
|
|
Aliases: []string{"store", "set"},
|
|
Usage: "Set a value in the network.",
|
|
Flags: []cli.Flag{
|
|
flagKey,
|
|
&cli.StringFlag{
|
|
Name: "v",
|
|
Usage: "Value to set.",
|
|
Required: true,
|
|
},
|
|
flagNode,
|
|
flagLocal,
|
|
},
|
|
Action: cliPut,
|
|
},
|
|
{
|
|
Name: "get",
|
|
Aliases: []string{"load"},
|
|
Usage: "Get a value in the network.",
|
|
Flags: []cli.Flag{
|
|
flagKey,
|
|
flagNode,
|
|
flagLocal,
|
|
},
|
|
Action: cliGet,
|
|
},
|
|
},
|
|
}
|
|
|
|
var (
|
|
flagIP = &cli.StringFlag{
|
|
Name: "ip",
|
|
Usage: "IP to bind. May include port.",
|
|
Value: "127.0.0.1:3000",
|
|
}
|
|
flagPort = &cli.UintFlag{
|
|
Name: "p",
|
|
Usage: "Port to bind. Overrides port in -ip if given.",
|
|
}
|
|
flagNode = &cli.StringFlag{
|
|
Name: "n",
|
|
Usage: "Address of any node in the network.",
|
|
Required: true,
|
|
}
|
|
flagKey = &cli.StringFlag{
|
|
Name: "k",
|
|
Usage: "Key to operate on.",
|
|
Required: true,
|
|
}
|
|
flagLocal = &cli.BoolFlag{
|
|
Name: "l",
|
|
Usage: "Stop if the key is not local to the node.",
|
|
}
|
|
)
|
|
|
|
func addrflag(ip string, p uint64) (netip.AddrPort, error) {
|
|
ap, err := netip.ParseAddrPort(ip)
|
|
if err == nil {
|
|
// note inverted error check
|
|
if p != 0 {
|
|
ap = netip.AddrPortFrom(ap.Addr(), uint16(p))
|
|
}
|
|
return ap, nil
|
|
}
|
|
a, err := netip.ParseAddr(ip)
|
|
if err == nil {
|
|
// note inverted error check
|
|
return netip.AddrPortFrom(a, uint16(p)), nil
|
|
}
|
|
return netip.AddrPort{}, fmt.Errorf("couldn't parse %q as ip:port or ip", ip)
|
|
}
|
|
|
|
func cliJoin(ctx context.Context, cmd *cli.Command) error {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
addr, err := addrflag(cmd.String("ip"), cmd.Uint("p"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
l, err := net.Listen("tcp", addr.String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer l.Close()
|
|
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
|
|
var node *chord.Node
|
|
if peer := cmd.String("c"); peer != "" {
|
|
var p netip.AddrPort
|
|
p, err = netip.ParseAddrPort(peer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
node, err = chord.Join(ctx, cl, addr, chord.Address(p))
|
|
} else {
|
|
node, err = chord.Create(addr)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s, err := httpnode.New(l, cl, node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
srv := http.Server{
|
|
Handler: s.Router(),
|
|
ReadTimeout: 5 * time.Second,
|
|
BaseContext: func(l net.Listener) context.Context { return ctx },
|
|
}
|
|
go func() {
|
|
slog.InfoContext(ctx, "HTTP API server", slog.Any("addr", l.Addr()))
|
|
defer cancel()
|
|
err := srv.Serve(l)
|
|
if err == http.ErrServerClosed {
|
|
return
|
|
}
|
|
slog.ErrorContext(ctx, "HTTP API server closed", slog.Any("err", err))
|
|
}()
|
|
t := time.NewTicker(time.Second)
|
|
go func() {
|
|
for range t.C {
|
|
if err := chord.Stabilize(ctx, cl, node); err != nil {
|
|
slog.ErrorContext(ctx, "stabilize", slog.Any("err", err))
|
|
node.SuccessorFailed()
|
|
}
|
|
pred, succ := node.Neighbors(nil)
|
|
slog.InfoContext(ctx, "neighbors", slog.Any("node", node), slog.Any("predecessor", pred), slog.Any("successors", succ))
|
|
}
|
|
}()
|
|
<-ctx.Done()
|
|
t.Stop()
|
|
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
return srv.Shutdown(ctx)
|
|
}
|
|
|
|
func cliLeave(ctx context.Context, cmd *cli.Command) error {
|
|
// I'm not really clear on why this command takes a startup IP and port.
|
|
// All we need is the node to tell to leave.
|
|
n, err := netip.ParseAddrPort(cmd.String("n"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
|
|
return cl.SayBye(ctx, chord.Address(n))
|
|
}
|
|
|
|
func cliLookup(ctx context.Context, cmd *cli.Command) error {
|
|
n, err := netip.ParseAddrPort(cmd.String("n"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
k := chord.Key(cmd.String("k"))
|
|
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
|
|
p, err := cl.FindSuccessor(ctx, chord.Address(n), k)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, addr := p.Values()
|
|
fmt.Println(addr)
|
|
return nil
|
|
}
|
|
|
|
func cliPut(ctx context.Context, cmd *cli.Command) error {
|
|
n, err := netip.ParseAddrPort(cmd.String("n"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
k := chord.Key(cmd.String("k"))
|
|
v := cmd.String("v")
|
|
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
|
|
p, err := cl.FindSuccessor(ctx, chord.Address(n), k)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return cl.Set(ctx, p, k, v)
|
|
}
|
|
|
|
func cliGet(ctx context.Context, cmd *cli.Command) error {
|
|
n, err := netip.ParseAddrPort(cmd.String("n"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
k := chord.Key(cmd.String("k"))
|
|
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
|
|
p, err := cl.FindSuccessor(ctx, chord.Address(n), k)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
v, err := cl.Get(ctx, p, k)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fmt.Println(v)
|
|
return nil
|
|
}
|
|
|
|
func main() {
|
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
|
go func() {
|
|
<-ctx.Done()
|
|
stop()
|
|
}()
|
|
if err := app.Run(ctx, os.Args); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
}
|
|
}
|