From 7a04a22f766b4cd99e06e4275cc321f452337921 Mon Sep 17 00:00:00 2001 From: Branden J Brown Date: Thu, 13 Mar 2025 22:07:12 -0400 Subject: [PATCH] run stabilization --- chord/httpnode/server.go | 11 ++++------- main.go | 35 +++++++++++++++++++++++++++++------ 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/chord/httpnode/server.go b/chord/httpnode/server.go index 892f6a5..050763e 100644 --- a/chord/httpnode/server.go +++ b/chord/httpnode/server.go @@ -24,8 +24,9 @@ type Node struct { // New creates an instance of a Chord network that responds on HTTP. // The listener must be the same one used for the HTTP server that routes to -// [Node.ServeHTTP]. It must be bound to a single interface and port. -func New(l net.Listener, cl chord.Client) (*Node, error) { +// [Node.ServeHTTP] and its address must match self. +// It must be bound to a single interface and port. +func New(l net.Listener, cl chord.Client, self *chord.Node) (*Node, error) { addr, err := netip.ParseAddrPort(l.Addr().String()) if err != nil { return nil, err @@ -33,12 +34,8 @@ func New(l net.Listener, cl chord.Client) (*Node, error) { if addr.Addr().IsUnspecified() || addr.Port() == 0 { return nil, errors.New("listener must be bound to a single interface and port") } - n, err := chord.Create(addr) - if err != nil { - return nil, err - } r := &Node{ - self: n, + self: self, client: cl, } return r, nil diff --git a/main.go b/main.go index 7bb34da..8606715 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "github.com/urfave/cli/v3" + "git.sunturtle.xyz/zephyr/chord/chord" "git.sunturtle.xyz/zephyr/chord/chord/httpnode" ) @@ -116,21 +117,21 @@ var ( } ) -func addrflag(ip string, p uint64) (string, error) { +func addrflag(ip string, p uint64) (netip.AddrPort, error) { ap, err := netip.ParseAddrPort(ip) if err == nil { // note inverted error check if p != 0 { ap = netip.AddrPortFrom(ap.Addr(), uint16(p)) } - return ap.String(), nil + return ap, nil } a, err := netip.ParseAddr(ip) if err == nil { // note inverted error check - return netip.AddrPortFrom(a, uint16(p)).String(), nil + return netip.AddrPortFrom(a, uint16(p)), nil } - return "", fmt.Errorf("couldn't parse %q as ip:port or ip", ip) + return netip.AddrPort{}, fmt.Errorf("couldn't parse %q as ip:port or ip", ip) } func cliJoin(ctx context.Context, cmd *cli.Command) error { @@ -138,13 +139,26 @@ func cliJoin(ctx context.Context, cmd *cli.Command) error { if err != nil { return err } - l, err := net.Listen("tcp", addr) + l, err := net.Listen("tcp", addr.String()) if err != nil { return err } defer l.Close() cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}} - s, err := httpnode.New(l, cl) + var node *chord.Node + if peer := cmd.String("c"); peer != "" { + p, err := netip.ParseAddrPort(peer) + if err != nil { + return err + } + node, err = chord.Join(ctx, cl, addr, chord.Address(p)) + } else { + node, err = chord.Create(addr) + } + if err != nil { + return err + } + s, err := httpnode.New(l, cl, node) if err != nil { return err } @@ -161,7 +175,16 @@ func cliJoin(ctx context.Context, cmd *cli.Command) error { } slog.ErrorContext(ctx, "HTTP API server closed", slog.Any("err", err)) }() + t := time.NewTicker(time.Second) + go func() { + for range t.C { + if err := chord.Stabilize(ctx, cl, node); err != nil { + slog.ErrorContext(ctx, "stabilize", slog.Any("err", err)) + } + } + }() <-ctx.Done() + t.Stop() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() return srv.Shutdown(ctx)