implement graceful leaving
This commit is contained in:
parent
2460f745bc
commit
ca35fd19db
@ -15,6 +15,9 @@ type Client interface {
|
||||
Notify(ctx context.Context, n *Node, s Peer) error
|
||||
// Neighbors requests a peer's beliefs about its own neighbors.
|
||||
Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error)
|
||||
// Bye tells p that its predecessor or successor is leaving
|
||||
// and had the given successor list.
|
||||
Bye(ctx context.Context, p, n Peer, succ []Peer) error
|
||||
|
||||
// Get asks s for a saved value.
|
||||
Get(ctx context.Context, s Peer, id ID) (string, error)
|
||||
@ -93,6 +96,32 @@ func Join(ctx context.Context, cl Client, addr netip.AddrPort, np Peer) (*Node,
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// Leave causes n to exit the network gracefully,
|
||||
// handing off its data to its successor.
|
||||
func Leave(ctx context.Context, cl Client, n *Node) error {
|
||||
pred, succ := n.Neighbors(nil)
|
||||
if succ[0].addr == n.self.addr {
|
||||
// Last node in the network. No data transfer.
|
||||
return nil
|
||||
}
|
||||
if err := cl.Bye(ctx, succ[0], n.self, succ); err != nil {
|
||||
return err
|
||||
}
|
||||
if pred.IsValid() {
|
||||
if err := cl.Bye(ctx, pred, n.self, succ); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
for k, v := range n.data {
|
||||
if err := cl.Set(ctx, succ[0], k, v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stabilize recomputes n's successor.
|
||||
func Stabilize(ctx context.Context, cl Client, n *Node) error {
|
||||
pred, _, err := cl.Neighbors(ctx, n.Successor())
|
||||
|
@ -1,6 +1,7 @@
|
||||
package httpnode
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
@ -13,6 +14,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/chord/chord"
|
||||
"github.com/go-json-experiment/json"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
@ -160,3 +162,44 @@ func (cl *Client) Neighbors(ctx context.Context, p chord.Peer) (pred chord.Peer,
|
||||
}
|
||||
return pred, succ, nil
|
||||
}
|
||||
|
||||
// Bye tells p that its predecessor or successor is leaving
|
||||
// and had the given successor list.
|
||||
func (cl *Client) Bye(ctx context.Context, p, n chord.Peer, succ []chord.Peer) error {
|
||||
_, peer := p.Values()
|
||||
_, addr := n.Values()
|
||||
if !peer.IsValid() || !addr.IsValid() {
|
||||
return errors.New("Bye with invalid address")
|
||||
}
|
||||
s := make([]netip.AddrPort, 0, len(succ))
|
||||
for _, p := range succ {
|
||||
_, addr := p.Values()
|
||||
if !addr.IsValid() {
|
||||
continue
|
||||
}
|
||||
s = append(s, addr)
|
||||
}
|
||||
q := bye{Peer: peer, Succ: s}
|
||||
b, err := json.Marshal(&q)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
url := url.URL{
|
||||
Scheme: "http",
|
||||
Host: addr.String(),
|
||||
Path: path.Join("/", cl.APIBase, "bye"),
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewReader(b))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := cl.HTTP.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.StatusCode >= 400 {
|
||||
_, err := readResponse[struct{}](resp)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -48,3 +48,8 @@ type neighbors struct {
|
||||
Succ []netip.AddrPort `json:"succ"`
|
||||
Pred netip.AddrPort `json:"pred,omitzero"`
|
||||
}
|
||||
|
||||
type bye struct {
|
||||
Peer netip.AddrPort `json:"peer"`
|
||||
Succ []netip.AddrPort `json:"succ"`
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"net/netip"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/chord/chord"
|
||||
"github.com/go-json-experiment/json"
|
||||
)
|
||||
|
||||
type Node struct {
|
||||
@ -50,6 +51,7 @@ func (n *Node) Router() http.Handler {
|
||||
m.HandleFunc("POST /key/{id}", n.set)
|
||||
m.HandleFunc("POST /pred", n.notify)
|
||||
m.HandleFunc("GET /neighbors", n.neighbors)
|
||||
m.HandleFunc("POST /bye", n.bye)
|
||||
return m
|
||||
}
|
||||
|
||||
@ -141,3 +143,26 @@ func (n *Node) neighbors(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
writeOk(w, &u)
|
||||
}
|
||||
|
||||
func (n *Node) bye(w http.ResponseWriter, r *http.Request) {
|
||||
var q bye
|
||||
if err := json.UnmarshalRead(r.Body, &q); err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
if !q.Peer.IsValid() {
|
||||
writeError(w, http.StatusBadRequest, "invalid leaving peer")
|
||||
return
|
||||
}
|
||||
if len(q.Succ) == 0 {
|
||||
writeError(w, http.StatusBadRequest, "no successors for leaving node")
|
||||
return
|
||||
}
|
||||
p := chord.Address(q.Peer)
|
||||
s := make([]chord.Peer, 0, len(q.Succ))
|
||||
for _, p := range q.Succ {
|
||||
s = append(s, chord.Address(p))
|
||||
}
|
||||
n.self.Leave(p, s)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
@ -106,6 +106,28 @@ func (n *Node) SetLocal(k ID, v string) {
|
||||
n.data[k] = v
|
||||
}
|
||||
|
||||
// Leave tells the node that a peer is leaving.
|
||||
// succ gives the leaving peer's successors.
|
||||
func (n *Node) Leave(p Peer, succ []Peer) {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
// The leaving peer might be either this node's successor, or its
|
||||
// predecessor, or both in the case of a network of two nodes.
|
||||
if p.addr == n.pred.addr {
|
||||
n.pred = Peer{}
|
||||
}
|
||||
for i, s := range n.succ {
|
||||
// TODO(branden): we should explicitly merge the successor lists
|
||||
if s.addr == p.addr {
|
||||
// Found the successor.
|
||||
// Delete it and add in the last received successor.
|
||||
copy(n.succ[i:], n.succ[i+1:])
|
||||
n.succ[len(n.succ)-1] = succ[len(succ)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SuccessorFailed marks the node's current successor as having failed, e.g.
|
||||
// during stabilization.
|
||||
func (n *Node) SuccessorFailed() {
|
||||
|
Loading…
Reference in New Issue
Block a user