package main import ( "context" "fmt" "log/slog" "net" "net/http" "net/netip" "os" "os/signal" "time" "github.com/urfave/cli/v3" "git.sunturtle.xyz/zephyr/chord/chord" "git.sunturtle.xyz/zephyr/chord/chord/httpnode" ) var app = cli.Command{ Name: "chord", Usage: "Distributed hash table using the Chord protocol", Commands: []*cli.Command{ { Name: "join", Aliases: []string{"init", "initialize"}, Usage: "Join or initialize a Chord network.", Flags: []cli.Flag{ flagIP, flagPort, &cli.StringFlag{ Name: "c", Usage: "Address of an existing node to join.", }, }, Action: cliJoin, }, { Name: "leave", Usage: "Instruct a node to leave the network.", Flags: []cli.Flag{ flagIP, flagPort, &cli.StringFlag{ Name: "n", Usage: "Node to leave.", Required: true, }, }, Action: cliLeave, }, { Name: "lookup", Usage: "Find the node responsible for a key.", Flags: []cli.Flag{ flagKey, flagNode, flagLocal, }, Action: cliLookup, }, { Name: "put", Aliases: []string{"store", "set"}, Usage: "Set a value in the network.", Flags: []cli.Flag{ flagKey, &cli.StringFlag{ Name: "v", Usage: "Value to set.", Required: true, }, flagNode, flagLocal, }, Action: cliPut, }, { Name: "get", Aliases: []string{"load"}, Usage: "Get a value in the network.", Flags: []cli.Flag{ flagKey, flagNode, flagLocal, }, Action: cliGet, }, }, } var ( flagIP = &cli.StringFlag{ Name: "ip", Usage: "IP to bind. May include port.", Value: "127.0.0.1:3000", } flagPort = &cli.UintFlag{ Name: "p", Usage: "Port to bind. Overrides port in -ip if given.", } flagNode = &cli.StringFlag{ Name: "n", Usage: "Address of any node in the network.", Required: true, } flagKey = &cli.StringFlag{ Name: "k", Usage: "Key to operate on.", Required: true, } flagLocal = &cli.BoolFlag{ Name: "l", Usage: "Stop if the key is not local to the node.", } ) func addrflag(ip string, p uint64) (netip.AddrPort, error) { ap, err := netip.ParseAddrPort(ip) if err == nil { // note inverted error check if p != 0 { ap = netip.AddrPortFrom(ap.Addr(), uint16(p)) } return ap, nil } a, err := netip.ParseAddr(ip) if err == nil { // note inverted error check return netip.AddrPortFrom(a, uint16(p)), nil } return netip.AddrPort{}, fmt.Errorf("couldn't parse %q as ip:port or ip", ip) } func cliJoin(ctx context.Context, cmd *cli.Command) error { ctx, cancel := context.WithCancel(ctx) addr, err := addrflag(cmd.String("ip"), cmd.Uint("p")) if err != nil { return err } l, err := net.Listen("tcp", addr.String()) if err != nil { return err } defer l.Close() cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}} var node *chord.Node if peer := cmd.String("c"); peer != "" { var p netip.AddrPort p, err = netip.ParseAddrPort(peer) if err != nil { return err } node, err = chord.Join(ctx, cl, addr, chord.Address(p)) } else { node, err = chord.Create(addr) } if err != nil { return err } s, err := httpnode.New(l, cl, node) if err != nil { return err } srv := http.Server{ Handler: s.Router(), ReadTimeout: 5 * time.Second, BaseContext: func(l net.Listener) context.Context { return ctx }, } go func() { slog.InfoContext(ctx, "HTTP API server", slog.Any("addr", l.Addr())) defer cancel() err := srv.Serve(l) if err == http.ErrServerClosed { return } slog.ErrorContext(ctx, "HTTP API server closed", slog.Any("err", err)) }() t := time.NewTicker(time.Second) go func() { for range t.C { if err := chord.Stabilize(ctx, cl, node); err != nil { slog.ErrorContext(ctx, "stabilize", slog.Any("err", err)) node.SuccessorFailed() } pred, succ := node.Neighbors(nil) slog.InfoContext(ctx, "neighbors", slog.Any("node", node), slog.Any("predecessor", pred), slog.Any("successors", succ)) } }() <-ctx.Done() t.Stop() ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) defer cancel() return srv.Shutdown(ctx) } func cliLeave(ctx context.Context, cmd *cli.Command) error { // I'm not really clear on why this command takes a startup IP and port. // All we need is the node to tell to leave. n, err := netip.ParseAddrPort(cmd.String("n")) if err != nil { return err } cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}} return cl.SayBye(ctx, chord.Address(n)) } func cliLookup(ctx context.Context, cmd *cli.Command) error { n, err := netip.ParseAddrPort(cmd.String("n")) if err != nil { return err } k := chord.Key(cmd.String("k")) cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}} p, err := cl.FindSuccessor(ctx, chord.Address(n), k) if err != nil { return err } _, addr := p.Values() fmt.Println(addr) return nil } func cliPut(ctx context.Context, cmd *cli.Command) error { n, err := netip.ParseAddrPort(cmd.String("n")) if err != nil { return err } k := chord.Key(cmd.String("k")) v := cmd.String("v") cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}} p, err := cl.FindSuccessor(ctx, chord.Address(n), k) if err != nil { return err } return cl.Set(ctx, p, k, v) } func cliGet(ctx context.Context, cmd *cli.Command) error { n, err := netip.ParseAddrPort(cmd.String("n")) if err != nil { return err } k := chord.Key(cmd.String("k")) cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}} p, err := cl.FindSuccessor(ctx, chord.Address(n), k) if err != nil { return err } v, err := cl.Get(ctx, p, k) if err != nil { return err } fmt.Println(v) return nil } func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) go func() { <-ctx.Done() stop() }() if err := app.Run(ctx, os.Args); err != nil { fmt.Fprintln(os.Stderr, err) } }