Compare commits

...

3 Commits

Author SHA1 Message Date
Branden J Brown
2460f745bc keys are owned by successor
Despite being certain I was doing it wrong, the behavior I had
implemented was to assign ownership of a key to the peer that knows the
key's successor, not to the successor of the key.
2025-03-14 15:49:41 -04:00
Branden J Brown
b034852b34 add testing script 2025-03-14 12:32:34 -04:00
Branden J Brown
afd8755131 log neighbors when stabilizing 2025-03-14 12:22:23 -04:00
8 changed files with 130 additions and 66 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/chord-node

View File

@ -9,38 +9,48 @@ import (
// Client represents the communications a Chord node performs. // Client represents the communications a Chord node performs.
type Client interface { type Client interface {
// Find asks s to find a value and the peer that owns it. // FindSuccessor asks s to find the peer that most closely follows a key.
// If the ID is not associated with a key, the result must be the empty FindSuccessor(ctx context.Context, s Peer, id ID) (Peer, error)
// string with a nil error.
Find(ctx context.Context, s Peer, id ID) (Peer, string, error)
// Set asks s to save a value for an ID.
Set(ctx context.Context, s Peer, id ID, v string) error
// Notify tells s we believe n to be its predecessor. // Notify tells s we believe n to be its predecessor.
Notify(ctx context.Context, n *Node, s Peer) error Notify(ctx context.Context, n *Node, s Peer) error
// Neighbors requests a peer's beliefs about its own neighbors. // Neighbors requests a peer's beliefs about its own neighbors.
Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error) Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error)
// Get asks s for a saved value.
Get(ctx context.Context, s Peer, id ID) (string, error)
// Set asks s to save a value.
Set(ctx context.Context, s Peer, id ID, v string) error
} }
// TODO(branden): Find should be plural; if we have multiple keys to // TODO(branden): FindSuccessor should be plural; if we have multiple keys to
// search, we shouldn't have to do the whole query for all of them, especially // search, we shouldn't have to do the whole query for all of them, especially
// considering we can sort by increasing distance from the origin and then do // considering we can sort by increasing distance from the origin and then do
// the query in linear time. // the query in linear time.
// Find gets a value and the peer that owns it. // FindSuccessor gets the peer that most closely follows a key.
func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, string, error) { func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, error) {
if n.IsLocal(id) { p, ok := n.Closest(id)
return n.self, "", nil if ok {
return p, nil
} }
p, s, err := cl.Find(ctx, n.Closest(id), id) p, err := cl.FindSuccessor(ctx, p, id)
return p, s, err return p, err
} }
// TODO(branden): Set should be plural for the same reasons. It should also // TODO(branden): Get and Set should be plural for the same reasons.
// return an error if the key isn't local to the peer.
// Get gets a value in the Chord network.
func Get(ctx context.Context, cl Client, n *Node, key ID) (string, error) {
p, err := Find(ctx, cl, n, key)
if err != nil {
return "", err
}
return cl.Get(ctx, p, key)
}
// Set saves a value in the Chord network. // Set saves a value in the Chord network.
func Set(ctx context.Context, cl Client, n *Node, key ID, val string) error { func Set(ctx context.Context, cl Client, n *Node, key ID, val string) error {
p, _, err := Find(ctx, cl, n, key) p, err := Find(ctx, cl, n, key)
if err != nil { if err != nil {
return fmt.Errorf("couldn't find peer to save key: %w", err) return fmt.Errorf("couldn't find peer to save key: %w", err)
} }
@ -57,7 +67,7 @@ func Join(ctx context.Context, cl Client, addr netip.AddrPort, np Peer) (*Node,
return nil, errors.New("chord: invalid peer") return nil, errors.New("chord: invalid peer")
} }
self := Address(addr) self := Address(addr)
p, _, err := cl.Find(ctx, np, self.id) p, err := cl.FindSuccessor(ctx, np, self.id)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't query own successor: %w", err) return nil, fmt.Errorf("couldn't query own successor: %w", err)
} }

View File

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"net/netip"
"net/url" "net/url"
"path" "path"
"strings" "strings"
@ -21,12 +22,36 @@ type Client struct {
APIBase string APIBase string
} }
// Find asks s to find a value and the peer that owns it. // FindSuccessor asks s to find a value and the peer that owns it.
func (cl *Client) Find(ctx context.Context, s chord.Peer, id chord.ID) (chord.Peer, string, error) { func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID) (chord.Peer, error) {
_, addr := s.Values() _, addr := s.Values()
if !addr.IsValid() { if !addr.IsValid() {
return chord.Peer{}, "", errors.New("Find with invalid peer") return chord.Peer{}, errors.New("FindSuccessor with invalid peer")
} }
url := url.URL{
Scheme: "http",
Host: addr.String(),
Path: path.Join("/", cl.APIBase, "find", id.String()),
}
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
if err != nil {
return chord.Peer{}, err
}
slog.InfoContext(ctx, "find", slog.String("url", url.String()))
resp, err := cl.HTTP.Do(req)
if err != nil {
return chord.Peer{}, err
}
p, err := readResponse[netip.AddrPort](resp)
if err != nil {
return chord.Peer{}, fmt.Errorf("%w (%s)", err, resp.Status)
}
slog.InfoContext(ctx, "found", slog.String("peer", p.String()))
return chord.Address(p), nil
}
func (cl *Client) Get(ctx context.Context, s chord.Peer, id chord.ID) (string, error) {
_, addr := s.Values()
url := url.URL{ url := url.URL{
Scheme: "http", Scheme: "http",
Host: addr.String(), Host: addr.String(),
@ -34,19 +59,21 @@ func (cl *Client) Find(ctx context.Context, s chord.Peer, id chord.ID) (chord.Pe
} }
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil) req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
if err != nil { if err != nil {
return chord.Peer{}, "", err return "", nil
} }
slog.InfoContext(ctx, "find", slog.String("url", url.String()))
resp, err := cl.HTTP.Do(req) resp, err := cl.HTTP.Do(req)
if err != nil { if err != nil {
return chord.Peer{}, "", err return "", err
} }
p, err := readResponse[peervalue](resp) switch resp.StatusCode {
if err != nil { case http.StatusOK:
return chord.Peer{}, "", fmt.Errorf("%w (%s)", err, resp.Status) v, err := readResponse[string](resp)
return v, err
case http.StatusNotFound:
return "", nil
default:
return "", errors.New(resp.Status)
} }
slog.InfoContext(ctx, "found", slog.String("peer", p.Peer.String()), slog.String("value", p.Value))
return chord.Address(p.Peer), p.Value, nil
} }
func (cl *Client) Set(ctx context.Context, s chord.Peer, id chord.ID, v string) error { func (cl *Client) Set(ctx context.Context, s chord.Peer, id chord.ID, v string) error {

View File

@ -48,8 +48,3 @@ type neighbors struct {
Succ []netip.AddrPort `json:"succ"` Succ []netip.AddrPort `json:"succ"`
Pred netip.AddrPort `json:"pred,omitzero"` Pred netip.AddrPort `json:"pred,omitzero"`
} }
type peervalue struct {
Peer netip.AddrPort `json:"peer"`
Value string `json:"value,omitzero"`
}

View File

@ -45,7 +45,8 @@ func New(l net.Listener, cl chord.Client, self *chord.Node) (*Node, error) {
// Router creates a handler for the Chord HTTP endpoints. // Router creates a handler for the Chord HTTP endpoints.
func (n *Node) Router() http.Handler { func (n *Node) Router() http.Handler {
m := http.NewServeMux() m := http.NewServeMux()
m.HandleFunc("GET /key/{id}", n.key) m.HandleFunc("GET /find/{id}", n.find)
m.HandleFunc("GET /key/{id}", n.get)
m.HandleFunc("POST /key/{id}", n.set) m.HandleFunc("POST /key/{id}", n.set)
m.HandleFunc("POST /pred", n.notify) m.HandleFunc("POST /pred", n.notify)
m.HandleFunc("GET /neighbors", n.neighbors) m.HandleFunc("GET /neighbors", n.neighbors)
@ -66,7 +67,7 @@ func (n *Node) Check(ctx context.Context) error {
return nil return nil
} }
func (n *Node) key(w http.ResponseWriter, r *http.Request) { func (n *Node) find(w http.ResponseWriter, r *http.Request) {
s := r.PathValue("id") s := r.PathValue("id")
slog.InfoContext(r.Context(), "received find", slog.String("id", s), slog.String("from", r.RemoteAddr)) slog.InfoContext(r.Context(), "received find", slog.String("id", s), slog.String("from", r.RemoteAddr))
id, err := chord.ParseID(s) id, err := chord.ParseID(s)
@ -74,15 +75,29 @@ func (n *Node) key(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusBadRequest, err.Error()) writeError(w, http.StatusBadRequest, err.Error())
return return
} }
p, v, err := chord.Find(r.Context(), n.client, n.self, id) p, err := chord.Find(r.Context(), n.client, n.self, id)
if err != nil { if err != nil {
writeError(w, http.StatusInternalServerError, err.Error()) writeError(w, http.StatusInternalServerError, err.Error())
return return
} }
_, addr := p.Values() _, addr := p.Values()
slog.InfoContext(r.Context(), "tell found", slog.String("id", s), slog.String("addr", addr.String()), slog.String("value", v)) slog.InfoContext(r.Context(), "tell found", slog.String("id", s), slog.String("addr", addr.String()))
pv := peervalue{addr, v} writeOk(w, addr)
writeOk(w, pv) }
func (n *Node) get(w http.ResponseWriter, r *http.Request) {
s := r.PathValue("id")
id, err := chord.ParseID(s)
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
v, ok := n.self.GetLocal(id)
if !ok {
writeError(w, http.StatusNotFound, s+" not found")
return
}
writeOk(w, v)
} }
func (n *Node) set(w http.ResponseWriter, r *http.Request) { func (n *Node) set(w http.ResponseWriter, r *http.Request) {
@ -97,10 +112,7 @@ func (n *Node) set(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, err.Error()) writeError(w, http.StatusInternalServerError, err.Error())
return return
} }
if !n.self.SetLocal(id, string(val)) { n.self.SetLocal(id, string(val))
writeError(w, http.StatusNotFound, "id does not belong to this peer")
return
}
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }

View File

@ -47,22 +47,21 @@ func (n *Node) Neighbors(s []Peer) (Peer, []Peer) {
return n.pred, append(s, n.succ...) return n.pred, append(s, n.succ...)
} }
func (n *Node) localLocked(id ID) bool { func (n *Node) succOwnsLocked(id ID) bool {
return contains(n.self.id, n.succ[0].id, id) return contains(n.self.id, n.succ[0].id, id)
} }
// IsLocal reports whether this node owns the given key.
func (n *Node) IsLocal(id ID) bool {
n.mu.Lock()
defer n.mu.Unlock()
return n.localLocked(id)
}
// Closest finds the locally known peer which is the closest predecessor of key. // Closest finds the locally known peer which is the closest predecessor of key.
func (n *Node) Closest(id ID) Peer { // If the boolean value is true, then this node believes that the peer is the
// closest predecessor in the network.
func (n *Node) Closest(id ID) (Peer, bool) {
self := n.self.id self := n.self.id
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() defer n.mu.Unlock()
// Check the immediate successor first to satisfy the contract.
if n.succOwnsLocked(id) {
return n.succ[0], true
}
l := n.fingers l := n.fingers
for i := len(l) - 1; i >= 0; i-- { for i := len(l) - 1; i >= 0; i-- {
f := l[i] f := l[i]
@ -74,7 +73,7 @@ func (n *Node) Closest(id ID) Peer {
if id == f.id { if id == f.id {
continue continue
} }
return f return f, false
} }
} }
// Also try successors. // Also try successors.
@ -85,32 +84,26 @@ func (n *Node) Closest(id ID) Peer {
if id == f.id { if id == f.id {
continue continue
} }
return f return f, false
} }
} }
return n.self // No known node is closer than we are.
return n.self, true
} }
// GetLocal obtains the value for a key if it is local to and owned by the node. // GetLocal obtains the value for a key.
func (n *Node) GetLocal(k ID) (v string, found bool) { func (n *Node) GetLocal(k ID) (v string, found bool) {
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() defer n.mu.Unlock()
if n.localLocked(k) { v, found = n.data[k]
v, found = n.data[k]
}
return v, found return v, found
} }
// SetLocal sets the value for a key. // SetLocal sets the value for a key.
// Returns false if the key is not owned by the node. func (n *Node) SetLocal(k ID, v string) {
func (n *Node) SetLocal(k ID, v string) bool {
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() defer n.mu.Unlock()
if n.localLocked(k) { n.data[k] = v
n.data[k] = v
return true
}
return false
} }
// SuccessorFailed marks the node's current successor as having failed, e.g. // SuccessorFailed marks the node's current successor as having failed, e.g.
@ -175,6 +168,11 @@ func (p Peer) Values() (ID, netip.AddrPort) {
return p.id, p.addr return p.id, p.addr
} }
// String formats the peer's address for debugging.
func (p Peer) String() string {
return p.addr.String()
}
// Create creates a new Chord network using the given address as the initial node. // Create creates a new Chord network using the given address as the initial node.
func Create(addr netip.AddrPort) (*Node, error) { func Create(addr netip.AddrPort) (*Node, error) {
if !addr.IsValid() { if !addr.IsValid() {

View File

@ -183,6 +183,8 @@ func cliJoin(ctx context.Context, cmd *cli.Command) error {
slog.ErrorContext(ctx, "stabilize", slog.Any("err", err)) slog.ErrorContext(ctx, "stabilize", slog.Any("err", err))
node.SuccessorFailed() node.SuccessorFailed()
} }
pred, succ := node.Neighbors(nil)
slog.InfoContext(ctx, "neighbors", slog.Any("predecessor", pred), slog.Any("successors", succ))
} }
}() }()
<-ctx.Done() <-ctx.Done()

19
test.bash Executable file
View File

@ -0,0 +1,19 @@
#!/bin/bash
set -ex
go build -o ./chord-node
# Test create and join.
./chord-node join -ip 127.0.0.1:3000 &
FIRST=$!
sleep 3
./chord-node join -ip 127.0.0.1:3001 -c 127.0.0.1:3000 &
SECOND=$!
./chord-node join -ip 127.0.0.1:3002 -c 127.0.0.1:3000 &
THIRD=$!
sleep 5
# Each node logs its predecessor and successors. At this point, we see the ring.
kill $FIRST $SECOND $THIRD