Compare commits

...

2 Commits

Author SHA1 Message Date
Branden J Brown
7a04a22f76 run stabilization 2025-03-13 22:07:12 -04:00
Branden J Brown
1e77429a91 join command implementation with create only 2025-03-13 21:57:33 -04:00
2 changed files with 85 additions and 8 deletions

View File

@ -24,8 +24,9 @@ type Node struct {
// New creates an instance of a Chord network that responds on HTTP.
// The listener must be the same one used for the HTTP server that routes to
// [Node.ServeHTTP]. It must be bound to a single interface and port.
func New(l net.Listener, cl chord.Client) (*Node, error) {
// [Node.ServeHTTP] and its address must match self.
// It must be bound to a single interface and port.
func New(l net.Listener, cl chord.Client, self *chord.Node) (*Node, error) {
addr, err := netip.ParseAddrPort(l.Addr().String())
if err != nil {
return nil, err
@ -33,12 +34,8 @@ func New(l net.Listener, cl chord.Client) (*Node, error) {
if addr.Addr().IsUnspecified() || addr.Port() == 0 {
return nil, errors.New("listener must be bound to a single interface and port")
}
n, err := chord.Create(addr)
if err != nil {
return nil, err
}
r := &Node{
self: n,
self: self,
client: cl,
}
return r, nil

82
main.go
View File

@ -4,10 +4,18 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"net/http"
"net/netip"
"os"
"os/signal"
"time"
"github.com/urfave/cli/v3"
"git.sunturtle.xyz/zephyr/chord/chord"
"git.sunturtle.xyz/zephyr/chord/chord/httpnode"
)
var app = cli.Command{
@ -109,7 +117,79 @@ var (
}
)
func cliJoin(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
func addrflag(ip string, p uint64) (netip.AddrPort, error) {
ap, err := netip.ParseAddrPort(ip)
if err == nil {
// note inverted error check
if p != 0 {
ap = netip.AddrPortFrom(ap.Addr(), uint16(p))
}
return ap, nil
}
a, err := netip.ParseAddr(ip)
if err == nil {
// note inverted error check
return netip.AddrPortFrom(a, uint16(p)), nil
}
return netip.AddrPort{}, fmt.Errorf("couldn't parse %q as ip:port or ip", ip)
}
func cliJoin(ctx context.Context, cmd *cli.Command) error {
addr, err := addrflag(cmd.String("ip"), cmd.Uint("p"))
if err != nil {
return err
}
l, err := net.Listen("tcp", addr.String())
if err != nil {
return err
}
defer l.Close()
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
var node *chord.Node
if peer := cmd.String("c"); peer != "" {
p, err := netip.ParseAddrPort(peer)
if err != nil {
return err
}
node, err = chord.Join(ctx, cl, addr, chord.Address(p))
} else {
node, err = chord.Create(addr)
}
if err != nil {
return err
}
s, err := httpnode.New(l, cl, node)
if err != nil {
return err
}
srv := http.Server{
Handler: s.Router(),
ReadTimeout: 5 * time.Second,
BaseContext: func(l net.Listener) context.Context { return ctx },
}
go func() {
slog.InfoContext(ctx, "HTTP API server", slog.Any("addr", l.Addr()))
err := srv.Serve(l)
if err == http.ErrServerClosed {
return
}
slog.ErrorContext(ctx, "HTTP API server closed", slog.Any("err", err))
}()
t := time.NewTicker(time.Second)
go func() {
for range t.C {
if err := chord.Stabilize(ctx, cl, node); err != nil {
slog.ErrorContext(ctx, "stabilize", slog.Any("err", err))
}
}
}()
<-ctx.Done()
t.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return srv.Shutdown(ctx)
}
func cliLeave(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
func cliLookup(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
func cliPut(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }