
Despite being certain I was doing it wrong, the behavior I had implemented was to assign ownership of a key to the peer that knows the key's successor, not to the successor of the key.
144 lines
3.8 KiB
Go
144 lines
3.8 KiB
Go
package httpnode
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"net/netip"
|
|
|
|
"git.sunturtle.xyz/zephyr/chord/chord"
|
|
)
|
|
|
|
type Node struct {
|
|
// self is the node topology this server represents.
|
|
self *chord.Node
|
|
// client is the Chord client for forwarding queries to other nodes.
|
|
// TODO(branden): really we should have a client per peer so that we can
|
|
// interchange protocols
|
|
client chord.Client
|
|
}
|
|
|
|
// New creates an instance of a Chord network that responds on HTTP.
|
|
// The listener must be the same one used for the HTTP server that routes to
|
|
// [Node.ServeHTTP] and its address must match self.
|
|
// It must be bound to a single interface and port.
|
|
func New(l net.Listener, cl chord.Client, self *chord.Node) (*Node, error) {
|
|
addr, err := netip.ParseAddrPort(l.Addr().String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if addr.Addr().IsUnspecified() || addr.Port() == 0 {
|
|
return nil, errors.New("listener must be bound to a single interface and port")
|
|
}
|
|
r := &Node{
|
|
self: self,
|
|
client: cl,
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
// Router creates a handler for the Chord HTTP endpoints.
|
|
func (n *Node) Router() http.Handler {
|
|
m := http.NewServeMux()
|
|
m.HandleFunc("GET /find/{id}", n.find)
|
|
m.HandleFunc("GET /key/{id}", n.get)
|
|
m.HandleFunc("POST /key/{id}", n.set)
|
|
m.HandleFunc("POST /pred", n.notify)
|
|
m.HandleFunc("GET /neighbors", n.neighbors)
|
|
return m
|
|
}
|
|
|
|
// Check performs checks that implement the Chord protocol.
|
|
// It must be called periodically while the node is alive.
|
|
func (n *Node) Check(ctx context.Context) error {
|
|
chord.CheckPredecessor(ctx, n.client, n.self)
|
|
if err := chord.Stabilize(ctx, n.client, n.self); err != nil {
|
|
return fmt.Errorf("failed to stabilize: %w", err)
|
|
}
|
|
// TODO(zeph): enable once implemented
|
|
// if err := chord.FixFingers(ctx, n.client, n.self); err != nil {
|
|
// return fmt.Errorf("failed to fix fingers: %w", err)
|
|
// }
|
|
return nil
|
|
}
|
|
|
|
func (n *Node) find(w http.ResponseWriter, r *http.Request) {
|
|
s := r.PathValue("id")
|
|
slog.InfoContext(r.Context(), "received find", slog.String("id", s), slog.String("from", r.RemoteAddr))
|
|
id, err := chord.ParseID(s)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
p, err := chord.Find(r.Context(), n.client, n.self, id)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
_, addr := p.Values()
|
|
slog.InfoContext(r.Context(), "tell found", slog.String("id", s), slog.String("addr", addr.String()))
|
|
writeOk(w, addr)
|
|
}
|
|
|
|
func (n *Node) get(w http.ResponseWriter, r *http.Request) {
|
|
s := r.PathValue("id")
|
|
id, err := chord.ParseID(s)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
v, ok := n.self.GetLocal(id)
|
|
if !ok {
|
|
writeError(w, http.StatusNotFound, s+" not found")
|
|
return
|
|
}
|
|
writeOk(w, v)
|
|
}
|
|
|
|
func (n *Node) set(w http.ResponseWriter, r *http.Request) {
|
|
s := r.PathValue("id")
|
|
id, err := chord.ParseID(s)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
val, err := io.ReadAll(base64.NewDecoder(base64.StdEncoding, r.Body))
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
n.self.SetLocal(id, string(val))
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
|
|
s := r.FormValue("p")
|
|
addr, err := netip.ParseAddrPort(s)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
np := chord.Address(addr)
|
|
chord.Notify(n.self, np)
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
func (n *Node) neighbors(w http.ResponseWriter, r *http.Request) {
|
|
pred, succ := n.self.Neighbors(nil)
|
|
_, paddr := pred.Values()
|
|
u := neighbors{
|
|
Succ: make([]netip.AddrPort, 0, len(succ)),
|
|
Pred: paddr,
|
|
}
|
|
for _, s := range succ {
|
|
_, addr := s.Values()
|
|
u.Succ = append(u.Succ, addr)
|
|
}
|
|
writeOk(w, &u)
|
|
}
|