diff --git a/chord/client.go b/chord/client.go index ff967a5..1cd4212 100644 --- a/chord/client.go +++ b/chord/client.go @@ -15,6 +15,9 @@ type Client interface { Notify(ctx context.Context, n *Node, s Peer) error // Neighbors requests a peer's beliefs about its own neighbors. Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error) + // Bye tells p that its predecessor or successor is leaving + // and had the given successor list. + Bye(ctx context.Context, p, n Peer, succ []Peer) error // Get asks s for a saved value. Get(ctx context.Context, s Peer, id ID) (string, error) @@ -93,6 +96,32 @@ func Join(ctx context.Context, cl Client, addr netip.AddrPort, np Peer) (*Node, return r, nil } +// Leave causes n to exit the network gracefully, +// handing off its data to its successor. +func Leave(ctx context.Context, cl Client, n *Node) error { + pred, succ := n.Neighbors(nil) + if succ[0].addr == n.self.addr { + // Last node in the network. No data transfer. + return nil + } + if err := cl.Bye(ctx, succ[0], n.self, succ); err != nil { + return err + } + if pred.IsValid() { + if err := cl.Bye(ctx, pred, n.self, succ); err != nil { + return err + } + } + n.mu.Lock() + defer n.mu.Unlock() + for k, v := range n.data { + if err := cl.Set(ctx, succ[0], k, v); err != nil { + return err + } + } + return nil +} + // Stabilize recomputes n's successor. func Stabilize(ctx context.Context, cl Client, n *Node) error { pred, _, err := cl.Neighbors(ctx, n.Successor()) diff --git a/chord/httpnode/client.go b/chord/httpnode/client.go index 6500624..ef096db 100644 --- a/chord/httpnode/client.go +++ b/chord/httpnode/client.go @@ -1,6 +1,7 @@ package httpnode import ( + "bytes" "context" "encoding/base64" "errors" @@ -13,6 +14,7 @@ import ( "strings" "git.sunturtle.xyz/zephyr/chord/chord" + "github.com/go-json-experiment/json" ) type Client struct { @@ -160,3 +162,44 @@ func (cl *Client) Neighbors(ctx context.Context, p chord.Peer) (pred chord.Peer, } return pred, succ, nil } + +// Bye tells p that its predecessor or successor is leaving +// and had the given successor list. +func (cl *Client) Bye(ctx context.Context, p, n chord.Peer, succ []chord.Peer) error { + _, peer := p.Values() + _, addr := n.Values() + if !peer.IsValid() || !addr.IsValid() { + return errors.New("Bye with invalid address") + } + s := make([]netip.AddrPort, 0, len(succ)) + for _, p := range succ { + _, addr := p.Values() + if !addr.IsValid() { + continue + } + s = append(s, addr) + } + q := bye{Peer: peer, Succ: s} + b, err := json.Marshal(&q) + if err != nil { + return err + } + url := url.URL{ + Scheme: "http", + Host: addr.String(), + Path: path.Join("/", cl.APIBase, "bye"), + } + req, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewReader(b)) + if err != nil { + return err + } + resp, err := cl.HTTP.Do(req) + if err != nil { + return err + } + if resp.StatusCode >= 400 { + _, err := readResponse[struct{}](resp) + return err + } + return nil +} diff --git a/chord/httpnode/httpnode.go b/chord/httpnode/httpnode.go index 16f976a..3a59c1e 100644 --- a/chord/httpnode/httpnode.go +++ b/chord/httpnode/httpnode.go @@ -48,3 +48,8 @@ type neighbors struct { Succ []netip.AddrPort `json:"succ"` Pred netip.AddrPort `json:"pred,omitzero"` } + +type bye struct { + Peer netip.AddrPort `json:"peer"` + Succ []netip.AddrPort `json:"succ"` +} diff --git a/chord/httpnode/server.go b/chord/httpnode/server.go index 45d5ee6..a1e0a77 100644 --- a/chord/httpnode/server.go +++ b/chord/httpnode/server.go @@ -12,6 +12,7 @@ import ( "net/netip" "git.sunturtle.xyz/zephyr/chord/chord" + "github.com/go-json-experiment/json" ) type Node struct { @@ -50,6 +51,7 @@ func (n *Node) Router() http.Handler { m.HandleFunc("POST /key/{id}", n.set) m.HandleFunc("POST /pred", n.notify) m.HandleFunc("GET /neighbors", n.neighbors) + m.HandleFunc("POST /bye", n.bye) return m } @@ -141,3 +143,26 @@ func (n *Node) neighbors(w http.ResponseWriter, r *http.Request) { } writeOk(w, &u) } + +func (n *Node) bye(w http.ResponseWriter, r *http.Request) { + var q bye + if err := json.UnmarshalRead(r.Body, &q); err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + if !q.Peer.IsValid() { + writeError(w, http.StatusBadRequest, "invalid leaving peer") + return + } + if len(q.Succ) == 0 { + writeError(w, http.StatusBadRequest, "no successors for leaving node") + return + } + p := chord.Address(q.Peer) + s := make([]chord.Peer, 0, len(q.Succ)) + for _, p := range q.Succ { + s = append(s, chord.Address(p)) + } + n.self.Leave(p, s) + w.WriteHeader(http.StatusNoContent) +} diff --git a/chord/topology.go b/chord/topology.go index dbcf6c5..e126fb2 100644 --- a/chord/topology.go +++ b/chord/topology.go @@ -106,6 +106,28 @@ func (n *Node) SetLocal(k ID, v string) { n.data[k] = v } +// Leave tells the node that a peer is leaving. +// succ gives the leaving peer's successors. +func (n *Node) Leave(p Peer, succ []Peer) { + n.mu.Lock() + defer n.mu.Unlock() + // The leaving peer might be either this node's successor, or its + // predecessor, or both in the case of a network of two nodes. + if p.addr == n.pred.addr { + n.pred = Peer{} + } + for i, s := range n.succ { + // TODO(branden): we should explicitly merge the successor lists + if s.addr == p.addr { + // Found the successor. + // Delete it and add in the last received successor. + copy(n.succ[i:], n.succ[i+1:]) + n.succ[len(n.succ)-1] = succ[len(succ)-1] + break + } + } +} + // SuccessorFailed marks the node's current successor as having failed, e.g. // during stabilization. func (n *Node) SuccessorFailed() {