Compare commits
	
		
			3 Commits
		
	
	
		
			6316576823
			...
			2460f745bc
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 2460f745bc | |||
| b034852b34 | |||
| afd8755131 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | |||||||
|  | /chord-node | ||||||
| @@ -9,38 +9,48 @@ import ( | |||||||
|  |  | ||||||
| // Client represents the communications a Chord node performs. | // Client represents the communications a Chord node performs. | ||||||
| type Client interface { | type Client interface { | ||||||
| 	// Find asks s to find a value and the peer that owns it. | 	// FindSuccessor asks s to find the peer that most closely follows a key. | ||||||
| 	// If the ID is not associated with a key, the result must be the empty | 	FindSuccessor(ctx context.Context, s Peer, id ID) (Peer, error) | ||||||
| 	// string with a nil error. |  | ||||||
| 	Find(ctx context.Context, s Peer, id ID) (Peer, string, error) |  | ||||||
| 	// Set asks s to save a value for an ID. |  | ||||||
| 	Set(ctx context.Context, s Peer, id ID, v string) error |  | ||||||
| 	// Notify tells s we believe n to be its predecessor. | 	// Notify tells s we believe n to be its predecessor. | ||||||
| 	Notify(ctx context.Context, n *Node, s Peer) error | 	Notify(ctx context.Context, n *Node, s Peer) error | ||||||
| 	// Neighbors requests a peer's beliefs about its own neighbors. | 	// Neighbors requests a peer's beliefs about its own neighbors. | ||||||
| 	Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error) | 	Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error) | ||||||
|  |  | ||||||
|  | 	// Get asks s for a saved value. | ||||||
|  | 	Get(ctx context.Context, s Peer, id ID) (string, error) | ||||||
|  | 	// Set asks s to save a value. | ||||||
|  | 	Set(ctx context.Context, s Peer, id ID, v string) error | ||||||
| } | } | ||||||
|  |  | ||||||
| // TODO(branden): Find should be plural; if we have multiple keys to | // TODO(branden): FindSuccessor should be plural; if we have multiple keys to | ||||||
| // search, we shouldn't have to do the whole query for all of them, especially | // search, we shouldn't have to do the whole query for all of them, especially | ||||||
| // considering we can sort by increasing distance from the origin and then do | // considering we can sort by increasing distance from the origin and then do | ||||||
| // the query in linear time. | // the query in linear time. | ||||||
|  |  | ||||||
| // Find gets a value and the peer that owns it. | // FindSuccessor gets the peer that most closely follows a key. | ||||||
| func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, string, error) { | func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, error) { | ||||||
| 	if n.IsLocal(id) { | 	p, ok := n.Closest(id) | ||||||
| 		return n.self, "", nil | 	if ok { | ||||||
|  | 		return p, nil | ||||||
| 	} | 	} | ||||||
| 	p, s, err := cl.Find(ctx, n.Closest(id), id) | 	p, err := cl.FindSuccessor(ctx, p, id) | ||||||
| 	return p, s, err | 	return p, err | ||||||
| } | } | ||||||
|  |  | ||||||
| // TODO(branden): Set should be plural for the same reasons. It should also | // TODO(branden): Get and Set should be plural for the same reasons. | ||||||
| // return an error if the key isn't local to the peer. |  | ||||||
|  | // Get gets a value in the Chord network. | ||||||
|  | func Get(ctx context.Context, cl Client, n *Node, key ID) (string, error) { | ||||||
|  | 	p, err := Find(ctx, cl, n, key) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 	return cl.Get(ctx, p, key) | ||||||
|  | } | ||||||
|  |  | ||||||
| // Set saves a value in the Chord network. | // Set saves a value in the Chord network. | ||||||
| func Set(ctx context.Context, cl Client, n *Node, key ID, val string) error { | func Set(ctx context.Context, cl Client, n *Node, key ID, val string) error { | ||||||
| 	p, _, err := Find(ctx, cl, n, key) | 	p, err := Find(ctx, cl, n, key) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("couldn't find peer to save key: %w", err) | 		return fmt.Errorf("couldn't find peer to save key: %w", err) | ||||||
| 	} | 	} | ||||||
| @@ -57,7 +67,7 @@ func Join(ctx context.Context, cl Client, addr netip.AddrPort, np Peer) (*Node, | |||||||
| 		return nil, errors.New("chord: invalid peer") | 		return nil, errors.New("chord: invalid peer") | ||||||
| 	} | 	} | ||||||
| 	self := Address(addr) | 	self := Address(addr) | ||||||
| 	p, _, err := cl.Find(ctx, np, self.id) | 	p, err := cl.FindSuccessor(ctx, np, self.id) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, fmt.Errorf("couldn't query own successor: %w", err) | 		return nil, fmt.Errorf("couldn't query own successor: %w", err) | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -7,6 +7,7 @@ import ( | |||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"log/slog" | 	"log/slog" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | 	"net/netip" | ||||||
| 	"net/url" | 	"net/url" | ||||||
| 	"path" | 	"path" | ||||||
| 	"strings" | 	"strings" | ||||||
| @@ -21,12 +22,36 @@ type Client struct { | |||||||
| 	APIBase string | 	APIBase string | ||||||
| } | } | ||||||
|  |  | ||||||
| // Find asks s to find a value and the peer that owns it. | // FindSuccessor asks s to find a value and the peer that owns it. | ||||||
| func (cl *Client) Find(ctx context.Context, s chord.Peer, id chord.ID) (chord.Peer, string, error) { | func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID) (chord.Peer, error) { | ||||||
| 	_, addr := s.Values() | 	_, addr := s.Values() | ||||||
| 	if !addr.IsValid() { | 	if !addr.IsValid() { | ||||||
| 		return chord.Peer{}, "", errors.New("Find with invalid peer") | 		return chord.Peer{}, errors.New("FindSuccessor with invalid peer") | ||||||
| 	} | 	} | ||||||
|  | 	url := url.URL{ | ||||||
|  | 		Scheme: "http", | ||||||
|  | 		Host:   addr.String(), | ||||||
|  | 		Path:   path.Join("/", cl.APIBase, "find", id.String()), | ||||||
|  | 	} | ||||||
|  | 	req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return chord.Peer{}, err | ||||||
|  | 	} | ||||||
|  | 	slog.InfoContext(ctx, "find", slog.String("url", url.String())) | ||||||
|  | 	resp, err := cl.HTTP.Do(req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return chord.Peer{}, err | ||||||
|  | 	} | ||||||
|  | 	p, err := readResponse[netip.AddrPort](resp) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return chord.Peer{}, fmt.Errorf("%w (%s)", err, resp.Status) | ||||||
|  | 	} | ||||||
|  | 	slog.InfoContext(ctx, "found", slog.String("peer", p.String())) | ||||||
|  | 	return chord.Address(p), nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (cl *Client) Get(ctx context.Context, s chord.Peer, id chord.ID) (string, error) { | ||||||
|  | 	_, addr := s.Values() | ||||||
| 	url := url.URL{ | 	url := url.URL{ | ||||||
| 		Scheme: "http", | 		Scheme: "http", | ||||||
| 		Host:   addr.String(), | 		Host:   addr.String(), | ||||||
| @@ -34,19 +59,21 @@ func (cl *Client) Find(ctx context.Context, s chord.Peer, id chord.ID) (chord.Pe | |||||||
| 	} | 	} | ||||||
| 	req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil) | 	req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return chord.Peer{}, "", err | 		return "", nil | ||||||
| 	} | 	} | ||||||
| 	slog.InfoContext(ctx, "find", slog.String("url", url.String())) |  | ||||||
| 	resp, err := cl.HTTP.Do(req) | 	resp, err := cl.HTTP.Do(req) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return chord.Peer{}, "", err | 		return "", err | ||||||
| 	} | 	} | ||||||
| 	p, err := readResponse[peervalue](resp) | 	switch resp.StatusCode { | ||||||
| 	if err != nil { | 	case http.StatusOK: | ||||||
| 		return chord.Peer{}, "", fmt.Errorf("%w (%s)", err, resp.Status) | 		v, err := readResponse[string](resp) | ||||||
|  | 		return v, err | ||||||
|  | 	case http.StatusNotFound: | ||||||
|  | 		return "", nil | ||||||
|  | 	default: | ||||||
|  | 		return "", errors.New(resp.Status) | ||||||
| 	} | 	} | ||||||
| 	slog.InfoContext(ctx, "found", slog.String("peer", p.Peer.String()), slog.String("value", p.Value)) |  | ||||||
| 	return chord.Address(p.Peer), p.Value, nil |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (cl *Client) Set(ctx context.Context, s chord.Peer, id chord.ID, v string) error { | func (cl *Client) Set(ctx context.Context, s chord.Peer, id chord.ID, v string) error { | ||||||
|   | |||||||
| @@ -48,8 +48,3 @@ type neighbors struct { | |||||||
| 	Succ []netip.AddrPort `json:"succ"` | 	Succ []netip.AddrPort `json:"succ"` | ||||||
| 	Pred netip.AddrPort   `json:"pred,omitzero"` | 	Pred netip.AddrPort   `json:"pred,omitzero"` | ||||||
| } | } | ||||||
|  |  | ||||||
| type peervalue struct { |  | ||||||
| 	Peer  netip.AddrPort `json:"peer"` |  | ||||||
| 	Value string         `json:"value,omitzero"` |  | ||||||
| } |  | ||||||
|   | |||||||
| @@ -45,7 +45,8 @@ func New(l net.Listener, cl chord.Client, self *chord.Node) (*Node, error) { | |||||||
| // Router creates a handler for the Chord HTTP endpoints. | // Router creates a handler for the Chord HTTP endpoints. | ||||||
| func (n *Node) Router() http.Handler { | func (n *Node) Router() http.Handler { | ||||||
| 	m := http.NewServeMux() | 	m := http.NewServeMux() | ||||||
| 	m.HandleFunc("GET /key/{id}", n.key) | 	m.HandleFunc("GET /find/{id}", n.find) | ||||||
|  | 	m.HandleFunc("GET /key/{id}", n.get) | ||||||
| 	m.HandleFunc("POST /key/{id}", n.set) | 	m.HandleFunc("POST /key/{id}", n.set) | ||||||
| 	m.HandleFunc("POST /pred", n.notify) | 	m.HandleFunc("POST /pred", n.notify) | ||||||
| 	m.HandleFunc("GET /neighbors", n.neighbors) | 	m.HandleFunc("GET /neighbors", n.neighbors) | ||||||
| @@ -66,7 +67,7 @@ func (n *Node) Check(ctx context.Context) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (n *Node) key(w http.ResponseWriter, r *http.Request) { | func (n *Node) find(w http.ResponseWriter, r *http.Request) { | ||||||
| 	s := r.PathValue("id") | 	s := r.PathValue("id") | ||||||
| 	slog.InfoContext(r.Context(), "received find", slog.String("id", s), slog.String("from", r.RemoteAddr)) | 	slog.InfoContext(r.Context(), "received find", slog.String("id", s), slog.String("from", r.RemoteAddr)) | ||||||
| 	id, err := chord.ParseID(s) | 	id, err := chord.ParseID(s) | ||||||
| @@ -74,15 +75,29 @@ func (n *Node) key(w http.ResponseWriter, r *http.Request) { | |||||||
| 		writeError(w, http.StatusBadRequest, err.Error()) | 		writeError(w, http.StatusBadRequest, err.Error()) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	p, v, err := chord.Find(r.Context(), n.client, n.self, id) | 	p, err := chord.Find(r.Context(), n.client, n.self, id) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		writeError(w, http.StatusInternalServerError, err.Error()) | 		writeError(w, http.StatusInternalServerError, err.Error()) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	_, addr := p.Values() | 	_, addr := p.Values() | ||||||
| 	slog.InfoContext(r.Context(), "tell found", slog.String("id", s), slog.String("addr", addr.String()), slog.String("value", v)) | 	slog.InfoContext(r.Context(), "tell found", slog.String("id", s), slog.String("addr", addr.String())) | ||||||
| 	pv := peervalue{addr, v} | 	writeOk(w, addr) | ||||||
| 	writeOk(w, pv) | } | ||||||
|  |  | ||||||
|  | func (n *Node) get(w http.ResponseWriter, r *http.Request) { | ||||||
|  | 	s := r.PathValue("id") | ||||||
|  | 	id, err := chord.ParseID(s) | ||||||
|  | 	if err != nil { | ||||||
|  | 		writeError(w, http.StatusBadRequest, err.Error()) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	v, ok := n.self.GetLocal(id) | ||||||
|  | 	if !ok { | ||||||
|  | 		writeError(w, http.StatusNotFound, s+" not found") | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	writeOk(w, v) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (n *Node) set(w http.ResponseWriter, r *http.Request) { | func (n *Node) set(w http.ResponseWriter, r *http.Request) { | ||||||
| @@ -97,10 +112,7 @@ func (n *Node) set(w http.ResponseWriter, r *http.Request) { | |||||||
| 		writeError(w, http.StatusInternalServerError, err.Error()) | 		writeError(w, http.StatusInternalServerError, err.Error()) | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	if !n.self.SetLocal(id, string(val)) { | 	n.self.SetLocal(id, string(val)) | ||||||
| 		writeError(w, http.StatusNotFound, "id does not belong to this peer") |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	w.WriteHeader(http.StatusNoContent) | 	w.WriteHeader(http.StatusNoContent) | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -47,22 +47,21 @@ func (n *Node) Neighbors(s []Peer) (Peer, []Peer) { | |||||||
| 	return n.pred, append(s, n.succ...) | 	return n.pred, append(s, n.succ...) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (n *Node) localLocked(id ID) bool { | func (n *Node) succOwnsLocked(id ID) bool { | ||||||
| 	return contains(n.self.id, n.succ[0].id, id) | 	return contains(n.self.id, n.succ[0].id, id) | ||||||
| } | } | ||||||
|  |  | ||||||
| // IsLocal reports whether this node owns the given key. |  | ||||||
| func (n *Node) IsLocal(id ID) bool { |  | ||||||
| 	n.mu.Lock() |  | ||||||
| 	defer n.mu.Unlock() |  | ||||||
| 	return n.localLocked(id) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Closest finds the locally known peer which is the closest predecessor of key. | // Closest finds the locally known peer which is the closest predecessor of key. | ||||||
| func (n *Node) Closest(id ID) Peer { | // If the boolean value is true, then this node believes that the peer is the | ||||||
|  | // closest predecessor in the network. | ||||||
|  | func (n *Node) Closest(id ID) (Peer, bool) { | ||||||
| 	self := n.self.id | 	self := n.self.id | ||||||
| 	n.mu.Lock() | 	n.mu.Lock() | ||||||
| 	defer n.mu.Unlock() | 	defer n.mu.Unlock() | ||||||
|  | 	// Check the immediate successor first to satisfy the contract. | ||||||
|  | 	if n.succOwnsLocked(id) { | ||||||
|  | 		return n.succ[0], true | ||||||
|  | 	} | ||||||
| 	l := n.fingers | 	l := n.fingers | ||||||
| 	for i := len(l) - 1; i >= 0; i-- { | 	for i := len(l) - 1; i >= 0; i-- { | ||||||
| 		f := l[i] | 		f := l[i] | ||||||
| @@ -74,7 +73,7 @@ func (n *Node) Closest(id ID) Peer { | |||||||
| 			if id == f.id { | 			if id == f.id { | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 			return f | 			return f, false | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	// Also try successors. | 	// Also try successors. | ||||||
| @@ -85,32 +84,26 @@ func (n *Node) Closest(id ID) Peer { | |||||||
| 			if id == f.id { | 			if id == f.id { | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 			return f | 			return f, false | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	return n.self | 	// No known node is closer than we are. | ||||||
|  | 	return n.self, true | ||||||
| } | } | ||||||
|  |  | ||||||
| // GetLocal obtains the value for a key if it is local to and owned by the node. | // GetLocal obtains the value for a key. | ||||||
| func (n *Node) GetLocal(k ID) (v string, found bool) { | func (n *Node) GetLocal(k ID) (v string, found bool) { | ||||||
| 	n.mu.Lock() | 	n.mu.Lock() | ||||||
| 	defer n.mu.Unlock() | 	defer n.mu.Unlock() | ||||||
| 	if n.localLocked(k) { | 	v, found = n.data[k] | ||||||
| 		v, found = n.data[k] |  | ||||||
| 	} |  | ||||||
| 	return v, found | 	return v, found | ||||||
| } | } | ||||||
|  |  | ||||||
| // SetLocal sets the value for a key. | // SetLocal sets the value for a key. | ||||||
| // Returns false if the key is not owned by the node. | func (n *Node) SetLocal(k ID, v string) { | ||||||
| func (n *Node) SetLocal(k ID, v string) bool { |  | ||||||
| 	n.mu.Lock() | 	n.mu.Lock() | ||||||
| 	defer n.mu.Unlock() | 	defer n.mu.Unlock() | ||||||
| 	if n.localLocked(k) { | 	n.data[k] = v | ||||||
| 		n.data[k] = v |  | ||||||
| 		return true |  | ||||||
| 	} |  | ||||||
| 	return false |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // SuccessorFailed marks the node's current successor as having failed, e.g. | // SuccessorFailed marks the node's current successor as having failed, e.g. | ||||||
| @@ -175,6 +168,11 @@ func (p Peer) Values() (ID, netip.AddrPort) { | |||||||
| 	return p.id, p.addr | 	return p.id, p.addr | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // String formats the peer's address for debugging. | ||||||
|  | func (p Peer) String() string { | ||||||
|  | 	return p.addr.String() | ||||||
|  | } | ||||||
|  |  | ||||||
| // Create creates a new Chord network using the given address as the initial node. | // Create creates a new Chord network using the given address as the initial node. | ||||||
| func Create(addr netip.AddrPort) (*Node, error) { | func Create(addr netip.AddrPort) (*Node, error) { | ||||||
| 	if !addr.IsValid() { | 	if !addr.IsValid() { | ||||||
|   | |||||||
							
								
								
									
										2
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								main.go
									
									
									
									
									
								
							| @@ -183,6 +183,8 @@ func cliJoin(ctx context.Context, cmd *cli.Command) error { | |||||||
| 				slog.ErrorContext(ctx, "stabilize", slog.Any("err", err)) | 				slog.ErrorContext(ctx, "stabilize", slog.Any("err", err)) | ||||||
| 				node.SuccessorFailed() | 				node.SuccessorFailed() | ||||||
| 			} | 			} | ||||||
|  | 			pred, succ := node.Neighbors(nil) | ||||||
|  | 			slog.InfoContext(ctx, "neighbors", slog.Any("predecessor", pred), slog.Any("successors", succ)) | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
| 	<-ctx.Done() | 	<-ctx.Done() | ||||||
|   | |||||||
							
								
								
									
										19
									
								
								test.bash
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										19
									
								
								test.bash
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,19 @@ | |||||||
|  | #!/bin/bash | ||||||
|  |  | ||||||
|  | set -ex | ||||||
|  |  | ||||||
|  | go build -o ./chord-node | ||||||
|  |  | ||||||
|  | # Test create and join. | ||||||
|  | ./chord-node join -ip 127.0.0.1:3000 & | ||||||
|  | FIRST=$! | ||||||
|  | sleep 3 | ||||||
|  |  | ||||||
|  | ./chord-node join -ip 127.0.0.1:3001 -c 127.0.0.1:3000 & | ||||||
|  | SECOND=$! | ||||||
|  | ./chord-node join -ip 127.0.0.1:3002 -c 127.0.0.1:3000 & | ||||||
|  | THIRD=$! | ||||||
|  |  | ||||||
|  | sleep 5 | ||||||
|  | # Each node logs its predecessor and successors. At this point, we see the ring. | ||||||
|  | kill $FIRST $SECOND $THIRD | ||||||
		Reference in New Issue
	
	Block a user