Compare commits
No commits in common. "2460f745bc6fb79ab01967e55782b5a6688c3ea6" and "63165768230d90a9c7ef2d582050db9af009e094" have entirely different histories.
2460f745bc
...
6316576823
1
.gitignore
vendored
1
.gitignore
vendored
@ -1 +0,0 @@
|
|||||||
/chord-node
|
|
@ -9,48 +9,38 @@ import (
|
|||||||
|
|
||||||
// Client represents the communications a Chord node performs.
|
// Client represents the communications a Chord node performs.
|
||||||
type Client interface {
|
type Client interface {
|
||||||
// FindSuccessor asks s to find the peer that most closely follows a key.
|
// Find asks s to find a value and the peer that owns it.
|
||||||
FindSuccessor(ctx context.Context, s Peer, id ID) (Peer, error)
|
// If the ID is not associated with a key, the result must be the empty
|
||||||
|
// string with a nil error.
|
||||||
|
Find(ctx context.Context, s Peer, id ID) (Peer, string, error)
|
||||||
|
// Set asks s to save a value for an ID.
|
||||||
|
Set(ctx context.Context, s Peer, id ID, v string) error
|
||||||
// Notify tells s we believe n to be its predecessor.
|
// Notify tells s we believe n to be its predecessor.
|
||||||
Notify(ctx context.Context, n *Node, s Peer) error
|
Notify(ctx context.Context, n *Node, s Peer) error
|
||||||
// Neighbors requests a peer's beliefs about its own neighbors.
|
// Neighbors requests a peer's beliefs about its own neighbors.
|
||||||
Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error)
|
Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error)
|
||||||
|
|
||||||
// Get asks s for a saved value.
|
|
||||||
Get(ctx context.Context, s Peer, id ID) (string, error)
|
|
||||||
// Set asks s to save a value.
|
|
||||||
Set(ctx context.Context, s Peer, id ID, v string) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(branden): FindSuccessor should be plural; if we have multiple keys to
|
// TODO(branden): Find should be plural; if we have multiple keys to
|
||||||
// search, we shouldn't have to do the whole query for all of them, especially
|
// search, we shouldn't have to do the whole query for all of them, especially
|
||||||
// considering we can sort by increasing distance from the origin and then do
|
// considering we can sort by increasing distance from the origin and then do
|
||||||
// the query in linear time.
|
// the query in linear time.
|
||||||
|
|
||||||
// FindSuccessor gets the peer that most closely follows a key.
|
// Find gets a value and the peer that owns it.
|
||||||
func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, error) {
|
func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, string, error) {
|
||||||
p, ok := n.Closest(id)
|
if n.IsLocal(id) {
|
||||||
if ok {
|
return n.self, "", nil
|
||||||
return p, nil
|
|
||||||
}
|
}
|
||||||
p, err := cl.FindSuccessor(ctx, p, id)
|
p, s, err := cl.Find(ctx, n.Closest(id), id)
|
||||||
return p, err
|
return p, s, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(branden): Get and Set should be plural for the same reasons.
|
// TODO(branden): Set should be plural for the same reasons. It should also
|
||||||
|
// return an error if the key isn't local to the peer.
|
||||||
// Get gets a value in the Chord network.
|
|
||||||
func Get(ctx context.Context, cl Client, n *Node, key ID) (string, error) {
|
|
||||||
p, err := Find(ctx, cl, n, key)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return cl.Get(ctx, p, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set saves a value in the Chord network.
|
// Set saves a value in the Chord network.
|
||||||
func Set(ctx context.Context, cl Client, n *Node, key ID, val string) error {
|
func Set(ctx context.Context, cl Client, n *Node, key ID, val string) error {
|
||||||
p, err := Find(ctx, cl, n, key)
|
p, _, err := Find(ctx, cl, n, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("couldn't find peer to save key: %w", err)
|
return fmt.Errorf("couldn't find peer to save key: %w", err)
|
||||||
}
|
}
|
||||||
@ -67,7 +57,7 @@ func Join(ctx context.Context, cl Client, addr netip.AddrPort, np Peer) (*Node,
|
|||||||
return nil, errors.New("chord: invalid peer")
|
return nil, errors.New("chord: invalid peer")
|
||||||
}
|
}
|
||||||
self := Address(addr)
|
self := Address(addr)
|
||||||
p, err := cl.FindSuccessor(ctx, np, self.id)
|
p, _, err := cl.Find(ctx, np, self.id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't query own successor: %w", err)
|
return nil, fmt.Errorf("couldn't query own successor: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/netip"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
@ -22,36 +21,12 @@ type Client struct {
|
|||||||
APIBase string
|
APIBase string
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindSuccessor asks s to find a value and the peer that owns it.
|
// Find asks s to find a value and the peer that owns it.
|
||||||
func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID) (chord.Peer, error) {
|
func (cl *Client) Find(ctx context.Context, s chord.Peer, id chord.ID) (chord.Peer, string, error) {
|
||||||
_, addr := s.Values()
|
_, addr := s.Values()
|
||||||
if !addr.IsValid() {
|
if !addr.IsValid() {
|
||||||
return chord.Peer{}, errors.New("FindSuccessor with invalid peer")
|
return chord.Peer{}, "", errors.New("Find with invalid peer")
|
||||||
}
|
}
|
||||||
url := url.URL{
|
|
||||||
Scheme: "http",
|
|
||||||
Host: addr.String(),
|
|
||||||
Path: path.Join("/", cl.APIBase, "find", id.String()),
|
|
||||||
}
|
|
||||||
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return chord.Peer{}, err
|
|
||||||
}
|
|
||||||
slog.InfoContext(ctx, "find", slog.String("url", url.String()))
|
|
||||||
resp, err := cl.HTTP.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return chord.Peer{}, err
|
|
||||||
}
|
|
||||||
p, err := readResponse[netip.AddrPort](resp)
|
|
||||||
if err != nil {
|
|
||||||
return chord.Peer{}, fmt.Errorf("%w (%s)", err, resp.Status)
|
|
||||||
}
|
|
||||||
slog.InfoContext(ctx, "found", slog.String("peer", p.String()))
|
|
||||||
return chord.Address(p), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cl *Client) Get(ctx context.Context, s chord.Peer, id chord.ID) (string, error) {
|
|
||||||
_, addr := s.Values()
|
|
||||||
url := url.URL{
|
url := url.URL{
|
||||||
Scheme: "http",
|
Scheme: "http",
|
||||||
Host: addr.String(),
|
Host: addr.String(),
|
||||||
@ -59,21 +34,19 @@ func (cl *Client) Get(ctx context.Context, s chord.Peer, id chord.ID) (string, e
|
|||||||
}
|
}
|
||||||
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
|
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil
|
return chord.Peer{}, "", err
|
||||||
}
|
}
|
||||||
|
slog.InfoContext(ctx, "find", slog.String("url", url.String()))
|
||||||
resp, err := cl.HTTP.Do(req)
|
resp, err := cl.HTTP.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return chord.Peer{}, "", err
|
||||||
}
|
}
|
||||||
switch resp.StatusCode {
|
p, err := readResponse[peervalue](resp)
|
||||||
case http.StatusOK:
|
if err != nil {
|
||||||
v, err := readResponse[string](resp)
|
return chord.Peer{}, "", fmt.Errorf("%w (%s)", err, resp.Status)
|
||||||
return v, err
|
|
||||||
case http.StatusNotFound:
|
|
||||||
return "", nil
|
|
||||||
default:
|
|
||||||
return "", errors.New(resp.Status)
|
|
||||||
}
|
}
|
||||||
|
slog.InfoContext(ctx, "found", slog.String("peer", p.Peer.String()), slog.String("value", p.Value))
|
||||||
|
return chord.Address(p.Peer), p.Value, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cl *Client) Set(ctx context.Context, s chord.Peer, id chord.ID, v string) error {
|
func (cl *Client) Set(ctx context.Context, s chord.Peer, id chord.ID, v string) error {
|
||||||
|
@ -48,3 +48,8 @@ type neighbors struct {
|
|||||||
Succ []netip.AddrPort `json:"succ"`
|
Succ []netip.AddrPort `json:"succ"`
|
||||||
Pred netip.AddrPort `json:"pred,omitzero"`
|
Pred netip.AddrPort `json:"pred,omitzero"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type peervalue struct {
|
||||||
|
Peer netip.AddrPort `json:"peer"`
|
||||||
|
Value string `json:"value,omitzero"`
|
||||||
|
}
|
||||||
|
@ -45,8 +45,7 @@ func New(l net.Listener, cl chord.Client, self *chord.Node) (*Node, error) {
|
|||||||
// Router creates a handler for the Chord HTTP endpoints.
|
// Router creates a handler for the Chord HTTP endpoints.
|
||||||
func (n *Node) Router() http.Handler {
|
func (n *Node) Router() http.Handler {
|
||||||
m := http.NewServeMux()
|
m := http.NewServeMux()
|
||||||
m.HandleFunc("GET /find/{id}", n.find)
|
m.HandleFunc("GET /key/{id}", n.key)
|
||||||
m.HandleFunc("GET /key/{id}", n.get)
|
|
||||||
m.HandleFunc("POST /key/{id}", n.set)
|
m.HandleFunc("POST /key/{id}", n.set)
|
||||||
m.HandleFunc("POST /pred", n.notify)
|
m.HandleFunc("POST /pred", n.notify)
|
||||||
m.HandleFunc("GET /neighbors", n.neighbors)
|
m.HandleFunc("GET /neighbors", n.neighbors)
|
||||||
@ -67,7 +66,7 @@ func (n *Node) Check(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) find(w http.ResponseWriter, r *http.Request) {
|
func (n *Node) key(w http.ResponseWriter, r *http.Request) {
|
||||||
s := r.PathValue("id")
|
s := r.PathValue("id")
|
||||||
slog.InfoContext(r.Context(), "received find", slog.String("id", s), slog.String("from", r.RemoteAddr))
|
slog.InfoContext(r.Context(), "received find", slog.String("id", s), slog.String("from", r.RemoteAddr))
|
||||||
id, err := chord.ParseID(s)
|
id, err := chord.ParseID(s)
|
||||||
@ -75,29 +74,15 @@ func (n *Node) find(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeError(w, http.StatusBadRequest, err.Error())
|
writeError(w, http.StatusBadRequest, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p, err := chord.Find(r.Context(), n.client, n.self, id)
|
p, v, err := chord.Find(r.Context(), n.client, n.self, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, http.StatusInternalServerError, err.Error())
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_, addr := p.Values()
|
_, addr := p.Values()
|
||||||
slog.InfoContext(r.Context(), "tell found", slog.String("id", s), slog.String("addr", addr.String()))
|
slog.InfoContext(r.Context(), "tell found", slog.String("id", s), slog.String("addr", addr.String()), slog.String("value", v))
|
||||||
writeOk(w, addr)
|
pv := peervalue{addr, v}
|
||||||
}
|
writeOk(w, pv)
|
||||||
|
|
||||||
func (n *Node) get(w http.ResponseWriter, r *http.Request) {
|
|
||||||
s := r.PathValue("id")
|
|
||||||
id, err := chord.ParseID(s)
|
|
||||||
if err != nil {
|
|
||||||
writeError(w, http.StatusBadRequest, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
v, ok := n.self.GetLocal(id)
|
|
||||||
if !ok {
|
|
||||||
writeError(w, http.StatusNotFound, s+" not found")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
writeOk(w, v)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) set(w http.ResponseWriter, r *http.Request) {
|
func (n *Node) set(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -112,7 +97,10 @@ func (n *Node) set(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeError(w, http.StatusInternalServerError, err.Error())
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
n.self.SetLocal(id, string(val))
|
if !n.self.SetLocal(id, string(val)) {
|
||||||
|
writeError(w, http.StatusNotFound, "id does not belong to this peer")
|
||||||
|
return
|
||||||
|
}
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,21 +47,22 @@ func (n *Node) Neighbors(s []Peer) (Peer, []Peer) {
|
|||||||
return n.pred, append(s, n.succ...)
|
return n.pred, append(s, n.succ...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) succOwnsLocked(id ID) bool {
|
func (n *Node) localLocked(id ID) bool {
|
||||||
return contains(n.self.id, n.succ[0].id, id)
|
return contains(n.self.id, n.succ[0].id, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsLocal reports whether this node owns the given key.
|
||||||
|
func (n *Node) IsLocal(id ID) bool {
|
||||||
|
n.mu.Lock()
|
||||||
|
defer n.mu.Unlock()
|
||||||
|
return n.localLocked(id)
|
||||||
|
}
|
||||||
|
|
||||||
// Closest finds the locally known peer which is the closest predecessor of key.
|
// Closest finds the locally known peer which is the closest predecessor of key.
|
||||||
// If the boolean value is true, then this node believes that the peer is the
|
func (n *Node) Closest(id ID) Peer {
|
||||||
// closest predecessor in the network.
|
|
||||||
func (n *Node) Closest(id ID) (Peer, bool) {
|
|
||||||
self := n.self.id
|
self := n.self.id
|
||||||
n.mu.Lock()
|
n.mu.Lock()
|
||||||
defer n.mu.Unlock()
|
defer n.mu.Unlock()
|
||||||
// Check the immediate successor first to satisfy the contract.
|
|
||||||
if n.succOwnsLocked(id) {
|
|
||||||
return n.succ[0], true
|
|
||||||
}
|
|
||||||
l := n.fingers
|
l := n.fingers
|
||||||
for i := len(l) - 1; i >= 0; i-- {
|
for i := len(l) - 1; i >= 0; i-- {
|
||||||
f := l[i]
|
f := l[i]
|
||||||
@ -73,7 +74,7 @@ func (n *Node) Closest(id ID) (Peer, bool) {
|
|||||||
if id == f.id {
|
if id == f.id {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return f, false
|
return f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Also try successors.
|
// Also try successors.
|
||||||
@ -84,26 +85,32 @@ func (n *Node) Closest(id ID) (Peer, bool) {
|
|||||||
if id == f.id {
|
if id == f.id {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return f, false
|
return f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// No known node is closer than we are.
|
return n.self
|
||||||
return n.self, true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLocal obtains the value for a key.
|
// GetLocal obtains the value for a key if it is local to and owned by the node.
|
||||||
func (n *Node) GetLocal(k ID) (v string, found bool) {
|
func (n *Node) GetLocal(k ID) (v string, found bool) {
|
||||||
n.mu.Lock()
|
n.mu.Lock()
|
||||||
defer n.mu.Unlock()
|
defer n.mu.Unlock()
|
||||||
|
if n.localLocked(k) {
|
||||||
v, found = n.data[k]
|
v, found = n.data[k]
|
||||||
|
}
|
||||||
return v, found
|
return v, found
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetLocal sets the value for a key.
|
// SetLocal sets the value for a key.
|
||||||
func (n *Node) SetLocal(k ID, v string) {
|
// Returns false if the key is not owned by the node.
|
||||||
|
func (n *Node) SetLocal(k ID, v string) bool {
|
||||||
n.mu.Lock()
|
n.mu.Lock()
|
||||||
defer n.mu.Unlock()
|
defer n.mu.Unlock()
|
||||||
|
if n.localLocked(k) {
|
||||||
n.data[k] = v
|
n.data[k] = v
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// SuccessorFailed marks the node's current successor as having failed, e.g.
|
// SuccessorFailed marks the node's current successor as having failed, e.g.
|
||||||
@ -168,11 +175,6 @@ func (p Peer) Values() (ID, netip.AddrPort) {
|
|||||||
return p.id, p.addr
|
return p.id, p.addr
|
||||||
}
|
}
|
||||||
|
|
||||||
// String formats the peer's address for debugging.
|
|
||||||
func (p Peer) String() string {
|
|
||||||
return p.addr.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create creates a new Chord network using the given address as the initial node.
|
// Create creates a new Chord network using the given address as the initial node.
|
||||||
func Create(addr netip.AddrPort) (*Node, error) {
|
func Create(addr netip.AddrPort) (*Node, error) {
|
||||||
if !addr.IsValid() {
|
if !addr.IsValid() {
|
||||||
|
2
main.go
2
main.go
@ -183,8 +183,6 @@ func cliJoin(ctx context.Context, cmd *cli.Command) error {
|
|||||||
slog.ErrorContext(ctx, "stabilize", slog.Any("err", err))
|
slog.ErrorContext(ctx, "stabilize", slog.Any("err", err))
|
||||||
node.SuccessorFailed()
|
node.SuccessorFailed()
|
||||||
}
|
}
|
||||||
pred, succ := node.Neighbors(nil)
|
|
||||||
slog.InfoContext(ctx, "neighbors", slog.Any("predecessor", pred), slog.Any("successors", succ))
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
19
test.bash
19
test.bash
@ -1,19 +0,0 @@
|
|||||||
#!/bin/bash
|
|
||||||
|
|
||||||
set -ex
|
|
||||||
|
|
||||||
go build -o ./chord-node
|
|
||||||
|
|
||||||
# Test create and join.
|
|
||||||
./chord-node join -ip 127.0.0.1:3000 &
|
|
||||||
FIRST=$!
|
|
||||||
sleep 3
|
|
||||||
|
|
||||||
./chord-node join -ip 127.0.0.1:3001 -c 127.0.0.1:3000 &
|
|
||||||
SECOND=$!
|
|
||||||
./chord-node join -ip 127.0.0.1:3002 -c 127.0.0.1:3000 &
|
|
||||||
THIRD=$!
|
|
||||||
|
|
||||||
sleep 5
|
|
||||||
# Each node logs its predecessor and successors. At this point, we see the ring.
|
|
||||||
kill $FIRST $SECOND $THIRD
|
|
Loading…
Reference in New Issue
Block a user