Compare commits
	
		
			3 Commits
		
	
	
		
			6316576823
			...
			2460f745bc
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 2460f745bc | |||
| b034852b34 | |||
| afd8755131 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | ||||
| /chord-node | ||||
| @@ -9,38 +9,48 @@ import ( | ||||
|  | ||||
| // Client represents the communications a Chord node performs. | ||||
| type Client interface { | ||||
| 	// Find asks s to find a value and the peer that owns it. | ||||
| 	// If the ID is not associated with a key, the result must be the empty | ||||
| 	// string with a nil error. | ||||
| 	Find(ctx context.Context, s Peer, id ID) (Peer, string, error) | ||||
| 	// Set asks s to save a value for an ID. | ||||
| 	Set(ctx context.Context, s Peer, id ID, v string) error | ||||
| 	// FindSuccessor asks s to find the peer that most closely follows a key. | ||||
| 	FindSuccessor(ctx context.Context, s Peer, id ID) (Peer, error) | ||||
| 	// Notify tells s we believe n to be its predecessor. | ||||
| 	Notify(ctx context.Context, n *Node, s Peer) error | ||||
| 	// Neighbors requests a peer's beliefs about its own neighbors. | ||||
| 	Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error) | ||||
|  | ||||
| 	// Get asks s for a saved value. | ||||
| 	Get(ctx context.Context, s Peer, id ID) (string, error) | ||||
| 	// Set asks s to save a value. | ||||
| 	Set(ctx context.Context, s Peer, id ID, v string) error | ||||
| } | ||||
|  | ||||
| // TODO(branden): Find should be plural; if we have multiple keys to | ||||
| // TODO(branden): FindSuccessor should be plural; if we have multiple keys to | ||||
| // search, we shouldn't have to do the whole query for all of them, especially | ||||
| // considering we can sort by increasing distance from the origin and then do | ||||
| // the query in linear time. | ||||
|  | ||||
| // Find gets a value and the peer that owns it. | ||||
| func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, string, error) { | ||||
| 	if n.IsLocal(id) { | ||||
| 		return n.self, "", nil | ||||
| // FindSuccessor gets the peer that most closely follows a key. | ||||
| func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, error) { | ||||
| 	p, ok := n.Closest(id) | ||||
| 	if ok { | ||||
| 		return p, nil | ||||
| 	} | ||||
| 	p, s, err := cl.Find(ctx, n.Closest(id), id) | ||||
| 	return p, s, err | ||||
| 	p, err := cl.FindSuccessor(ctx, p, id) | ||||
| 	return p, err | ||||
| } | ||||
|  | ||||
| // TODO(branden): Set should be plural for the same reasons. It should also | ||||
| // return an error if the key isn't local to the peer. | ||||
| // TODO(branden): Get and Set should be plural for the same reasons. | ||||
|  | ||||
| // Get gets a value in the Chord network. | ||||
| func Get(ctx context.Context, cl Client, n *Node, key ID) (string, error) { | ||||
| 	p, err := Find(ctx, cl, n, key) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 	return cl.Get(ctx, p, key) | ||||
| } | ||||
|  | ||||
| // Set saves a value in the Chord network. | ||||
| func Set(ctx context.Context, cl Client, n *Node, key ID, val string) error { | ||||
| 	p, _, err := Find(ctx, cl, n, key) | ||||
| 	p, err := Find(ctx, cl, n, key) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("couldn't find peer to save key: %w", err) | ||||
| 	} | ||||
| @@ -57,7 +67,7 @@ func Join(ctx context.Context, cl Client, addr netip.AddrPort, np Peer) (*Node, | ||||
| 		return nil, errors.New("chord: invalid peer") | ||||
| 	} | ||||
| 	self := Address(addr) | ||||
| 	p, _, err := cl.Find(ctx, np, self.id) | ||||
| 	p, err := cl.FindSuccessor(ctx, np, self.id) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("couldn't query own successor: %w", err) | ||||
| 	} | ||||
|   | ||||
| @@ -7,6 +7,7 @@ import ( | ||||
| 	"fmt" | ||||
| 	"log/slog" | ||||
| 	"net/http" | ||||
| 	"net/netip" | ||||
| 	"net/url" | ||||
| 	"path" | ||||
| 	"strings" | ||||
| @@ -21,12 +22,36 @@ type Client struct { | ||||
| 	APIBase string | ||||
| } | ||||
|  | ||||
| // Find asks s to find a value and the peer that owns it. | ||||
| func (cl *Client) Find(ctx context.Context, s chord.Peer, id chord.ID) (chord.Peer, string, error) { | ||||
| // FindSuccessor asks s to find a value and the peer that owns it. | ||||
| func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID) (chord.Peer, error) { | ||||
| 	_, addr := s.Values() | ||||
| 	if !addr.IsValid() { | ||||
| 		return chord.Peer{}, "", errors.New("Find with invalid peer") | ||||
| 		return chord.Peer{}, errors.New("FindSuccessor with invalid peer") | ||||
| 	} | ||||
| 	url := url.URL{ | ||||
| 		Scheme: "http", | ||||
| 		Host:   addr.String(), | ||||
| 		Path:   path.Join("/", cl.APIBase, "find", id.String()), | ||||
| 	} | ||||
| 	req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil) | ||||
| 	if err != nil { | ||||
| 		return chord.Peer{}, err | ||||
| 	} | ||||
| 	slog.InfoContext(ctx, "find", slog.String("url", url.String())) | ||||
| 	resp, err := cl.HTTP.Do(req) | ||||
| 	if err != nil { | ||||
| 		return chord.Peer{}, err | ||||
| 	} | ||||
| 	p, err := readResponse[netip.AddrPort](resp) | ||||
| 	if err != nil { | ||||
| 		return chord.Peer{}, fmt.Errorf("%w (%s)", err, resp.Status) | ||||
| 	} | ||||
| 	slog.InfoContext(ctx, "found", slog.String("peer", p.String())) | ||||
| 	return chord.Address(p), nil | ||||
| } | ||||
|  | ||||
| func (cl *Client) Get(ctx context.Context, s chord.Peer, id chord.ID) (string, error) { | ||||
| 	_, addr := s.Values() | ||||
| 	url := url.URL{ | ||||
| 		Scheme: "http", | ||||
| 		Host:   addr.String(), | ||||
| @@ -34,19 +59,21 @@ func (cl *Client) Find(ctx context.Context, s chord.Peer, id chord.ID) (chord.Pe | ||||
| 	} | ||||
| 	req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil) | ||||
| 	if err != nil { | ||||
| 		return chord.Peer{}, "", err | ||||
| 		return "", nil | ||||
| 	} | ||||
| 	slog.InfoContext(ctx, "find", slog.String("url", url.String())) | ||||
| 	resp, err := cl.HTTP.Do(req) | ||||
| 	if err != nil { | ||||
| 		return chord.Peer{}, "", err | ||||
| 		return "", err | ||||
| 	} | ||||
| 	p, err := readResponse[peervalue](resp) | ||||
| 	if err != nil { | ||||
| 		return chord.Peer{}, "", fmt.Errorf("%w (%s)", err, resp.Status) | ||||
| 	switch resp.StatusCode { | ||||
| 	case http.StatusOK: | ||||
| 		v, err := readResponse[string](resp) | ||||
| 		return v, err | ||||
| 	case http.StatusNotFound: | ||||
| 		return "", nil | ||||
| 	default: | ||||
| 		return "", errors.New(resp.Status) | ||||
| 	} | ||||
| 	slog.InfoContext(ctx, "found", slog.String("peer", p.Peer.String()), slog.String("value", p.Value)) | ||||
| 	return chord.Address(p.Peer), p.Value, nil | ||||
| } | ||||
|  | ||||
| func (cl *Client) Set(ctx context.Context, s chord.Peer, id chord.ID, v string) error { | ||||
|   | ||||
| @@ -48,8 +48,3 @@ type neighbors struct { | ||||
| 	Succ []netip.AddrPort `json:"succ"` | ||||
| 	Pred netip.AddrPort   `json:"pred,omitzero"` | ||||
| } | ||||
|  | ||||
| type peervalue struct { | ||||
| 	Peer  netip.AddrPort `json:"peer"` | ||||
| 	Value string         `json:"value,omitzero"` | ||||
| } | ||||
|   | ||||
| @@ -45,7 +45,8 @@ func New(l net.Listener, cl chord.Client, self *chord.Node) (*Node, error) { | ||||
| // Router creates a handler for the Chord HTTP endpoints. | ||||
| func (n *Node) Router() http.Handler { | ||||
| 	m := http.NewServeMux() | ||||
| 	m.HandleFunc("GET /key/{id}", n.key) | ||||
| 	m.HandleFunc("GET /find/{id}", n.find) | ||||
| 	m.HandleFunc("GET /key/{id}", n.get) | ||||
| 	m.HandleFunc("POST /key/{id}", n.set) | ||||
| 	m.HandleFunc("POST /pred", n.notify) | ||||
| 	m.HandleFunc("GET /neighbors", n.neighbors) | ||||
| @@ -66,7 +67,7 @@ func (n *Node) Check(ctx context.Context) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (n *Node) key(w http.ResponseWriter, r *http.Request) { | ||||
| func (n *Node) find(w http.ResponseWriter, r *http.Request) { | ||||
| 	s := r.PathValue("id") | ||||
| 	slog.InfoContext(r.Context(), "received find", slog.String("id", s), slog.String("from", r.RemoteAddr)) | ||||
| 	id, err := chord.ParseID(s) | ||||
| @@ -74,15 +75,29 @@ func (n *Node) key(w http.ResponseWriter, r *http.Request) { | ||||
| 		writeError(w, http.StatusBadRequest, err.Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	p, v, err := chord.Find(r.Context(), n.client, n.self, id) | ||||
| 	p, err := chord.Find(r.Context(), n.client, n.self, id) | ||||
| 	if err != nil { | ||||
| 		writeError(w, http.StatusInternalServerError, err.Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	_, addr := p.Values() | ||||
| 	slog.InfoContext(r.Context(), "tell found", slog.String("id", s), slog.String("addr", addr.String()), slog.String("value", v)) | ||||
| 	pv := peervalue{addr, v} | ||||
| 	writeOk(w, pv) | ||||
| 	slog.InfoContext(r.Context(), "tell found", slog.String("id", s), slog.String("addr", addr.String())) | ||||
| 	writeOk(w, addr) | ||||
| } | ||||
|  | ||||
| func (n *Node) get(w http.ResponseWriter, r *http.Request) { | ||||
| 	s := r.PathValue("id") | ||||
| 	id, err := chord.ParseID(s) | ||||
| 	if err != nil { | ||||
| 		writeError(w, http.StatusBadRequest, err.Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	v, ok := n.self.GetLocal(id) | ||||
| 	if !ok { | ||||
| 		writeError(w, http.StatusNotFound, s+" not found") | ||||
| 		return | ||||
| 	} | ||||
| 	writeOk(w, v) | ||||
| } | ||||
|  | ||||
| func (n *Node) set(w http.ResponseWriter, r *http.Request) { | ||||
| @@ -97,10 +112,7 @@ func (n *Node) set(w http.ResponseWriter, r *http.Request) { | ||||
| 		writeError(w, http.StatusInternalServerError, err.Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	if !n.self.SetLocal(id, string(val)) { | ||||
| 		writeError(w, http.StatusNotFound, "id does not belong to this peer") | ||||
| 		return | ||||
| 	} | ||||
| 	n.self.SetLocal(id, string(val)) | ||||
| 	w.WriteHeader(http.StatusNoContent) | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -47,22 +47,21 @@ func (n *Node) Neighbors(s []Peer) (Peer, []Peer) { | ||||
| 	return n.pred, append(s, n.succ...) | ||||
| } | ||||
|  | ||||
| func (n *Node) localLocked(id ID) bool { | ||||
| func (n *Node) succOwnsLocked(id ID) bool { | ||||
| 	return contains(n.self.id, n.succ[0].id, id) | ||||
| } | ||||
|  | ||||
| // IsLocal reports whether this node owns the given key. | ||||
| func (n *Node) IsLocal(id ID) bool { | ||||
| 	n.mu.Lock() | ||||
| 	defer n.mu.Unlock() | ||||
| 	return n.localLocked(id) | ||||
| } | ||||
|  | ||||
| // Closest finds the locally known peer which is the closest predecessor of key. | ||||
| func (n *Node) Closest(id ID) Peer { | ||||
| // If the boolean value is true, then this node believes that the peer is the | ||||
| // closest predecessor in the network. | ||||
| func (n *Node) Closest(id ID) (Peer, bool) { | ||||
| 	self := n.self.id | ||||
| 	n.mu.Lock() | ||||
| 	defer n.mu.Unlock() | ||||
| 	// Check the immediate successor first to satisfy the contract. | ||||
| 	if n.succOwnsLocked(id) { | ||||
| 		return n.succ[0], true | ||||
| 	} | ||||
| 	l := n.fingers | ||||
| 	for i := len(l) - 1; i >= 0; i-- { | ||||
| 		f := l[i] | ||||
| @@ -74,7 +73,7 @@ func (n *Node) Closest(id ID) Peer { | ||||
| 			if id == f.id { | ||||
| 				continue | ||||
| 			} | ||||
| 			return f | ||||
| 			return f, false | ||||
| 		} | ||||
| 	} | ||||
| 	// Also try successors. | ||||
| @@ -85,32 +84,26 @@ func (n *Node) Closest(id ID) Peer { | ||||
| 			if id == f.id { | ||||
| 				continue | ||||
| 			} | ||||
| 			return f | ||||
| 			return f, false | ||||
| 		} | ||||
| 	} | ||||
| 	return n.self | ||||
| 	// No known node is closer than we are. | ||||
| 	return n.self, true | ||||
| } | ||||
|  | ||||
| // GetLocal obtains the value for a key if it is local to and owned by the node. | ||||
| // GetLocal obtains the value for a key. | ||||
| func (n *Node) GetLocal(k ID) (v string, found bool) { | ||||
| 	n.mu.Lock() | ||||
| 	defer n.mu.Unlock() | ||||
| 	if n.localLocked(k) { | ||||
| 		v, found = n.data[k] | ||||
| 	} | ||||
| 	v, found = n.data[k] | ||||
| 	return v, found | ||||
| } | ||||
|  | ||||
| // SetLocal sets the value for a key. | ||||
| // Returns false if the key is not owned by the node. | ||||
| func (n *Node) SetLocal(k ID, v string) bool { | ||||
| func (n *Node) SetLocal(k ID, v string) { | ||||
| 	n.mu.Lock() | ||||
| 	defer n.mu.Unlock() | ||||
| 	if n.localLocked(k) { | ||||
| 		n.data[k] = v | ||||
| 		return true | ||||
| 	} | ||||
| 	return false | ||||
| 	n.data[k] = v | ||||
| } | ||||
|  | ||||
| // SuccessorFailed marks the node's current successor as having failed, e.g. | ||||
| @@ -175,6 +168,11 @@ func (p Peer) Values() (ID, netip.AddrPort) { | ||||
| 	return p.id, p.addr | ||||
| } | ||||
|  | ||||
| // String formats the peer's address for debugging. | ||||
| func (p Peer) String() string { | ||||
| 	return p.addr.String() | ||||
| } | ||||
|  | ||||
| // Create creates a new Chord network using the given address as the initial node. | ||||
| func Create(addr netip.AddrPort) (*Node, error) { | ||||
| 	if !addr.IsValid() { | ||||
|   | ||||
							
								
								
									
										2
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								main.go
									
									
									
									
									
								
							| @@ -183,6 +183,8 @@ func cliJoin(ctx context.Context, cmd *cli.Command) error { | ||||
| 				slog.ErrorContext(ctx, "stabilize", slog.Any("err", err)) | ||||
| 				node.SuccessorFailed() | ||||
| 			} | ||||
| 			pred, succ := node.Neighbors(nil) | ||||
| 			slog.InfoContext(ctx, "neighbors", slog.Any("predecessor", pred), slog.Any("successors", succ)) | ||||
| 		} | ||||
| 	}() | ||||
| 	<-ctx.Done() | ||||
|   | ||||
							
								
								
									
										19
									
								
								test.bash
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										19
									
								
								test.bash
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,19 @@ | ||||
| #!/bin/bash | ||||
|  | ||||
| set -ex | ||||
|  | ||||
| go build -o ./chord-node | ||||
|  | ||||
| # Test create and join. | ||||
| ./chord-node join -ip 127.0.0.1:3000 & | ||||
| FIRST=$! | ||||
| sleep 3 | ||||
|  | ||||
| ./chord-node join -ip 127.0.0.1:3001 -c 127.0.0.1:3000 & | ||||
| SECOND=$! | ||||
| ./chord-node join -ip 127.0.0.1:3002 -c 127.0.0.1:3000 & | ||||
| THIRD=$! | ||||
|  | ||||
| sleep 5 | ||||
| # Each node logs its predecessor and successors. At this point, we see the ring. | ||||
| kill $FIRST $SECOND $THIRD | ||||
		Reference in New Issue
	
	Block a user