Compare commits
5 Commits
936decf0e6
...
f4a8b104ab
Author | SHA1 | Date | |
---|---|---|---|
![]() |
f4a8b104ab | ||
![]() |
d5f26bb2bf | ||
![]() |
9c49d89637 | ||
![]() |
ad0015a2be | ||
![]() |
d9b1b5349a |
@ -9,8 +9,10 @@ import (
|
|||||||
|
|
||||||
// Client represents the communications a Chord node performs.
|
// Client represents the communications a Chord node performs.
|
||||||
type Client interface {
|
type Client interface {
|
||||||
// FindSuccessor asks s to find the first peer in the network preceding id.
|
// Find asks s to find a value and the peer that owns it.
|
||||||
FindSuccessor(ctx context.Context, s Peer, id ID) (Peer, error)
|
// If the ID is not associated with a key, the result must be the empty
|
||||||
|
// string with a nil error.
|
||||||
|
Find(ctx context.Context, s Peer, id ID) (Peer, string, error)
|
||||||
// Notify tells s we believe n to be its predecessor.
|
// Notify tells s we believe n to be its predecessor.
|
||||||
Notify(ctx context.Context, n *Node, s Peer) error
|
Notify(ctx context.Context, n *Node, s Peer) error
|
||||||
// Neighbors requests a peer's beliefs about its own neighbors.
|
// Neighbors requests a peer's beliefs about its own neighbors.
|
||||||
@ -22,13 +24,13 @@ type Client interface {
|
|||||||
// considering we can sort by increasing distance from the origin and then do
|
// considering we can sort by increasing distance from the origin and then do
|
||||||
// the query in linear time.
|
// the query in linear time.
|
||||||
|
|
||||||
// Find finds the first node in the network preceding id.
|
// Find gets a value and the peer that owns it.
|
||||||
func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, error) {
|
func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, string, error) {
|
||||||
if n.IsLocal(id) {
|
if n.IsLocal(id) {
|
||||||
return n.self, nil
|
return n.self, "", nil
|
||||||
}
|
}
|
||||||
p, err := cl.FindSuccessor(ctx, n.Closest(id), id)
|
p, s, err := cl.Find(ctx, n.Closest(id), id)
|
||||||
return p, err
|
return p, s, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Join creates a new node joining an existing Chord network by communicating
|
// Join creates a new node joining an existing Chord network by communicating
|
||||||
@ -41,7 +43,7 @@ func Join(ctx context.Context, cl Client, addr netip.AddrPort, np Peer) (*Node,
|
|||||||
return nil, errors.New("chord: invalid peer")
|
return nil, errors.New("chord: invalid peer")
|
||||||
}
|
}
|
||||||
self := Address(addr)
|
self := Address(addr)
|
||||||
p, err := cl.FindSuccessor(ctx, np, self.id)
|
p, _, err := cl.Find(ctx, np, self.id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't query own successor: %w", err)
|
return nil, fmt.Errorf("couldn't query own successor: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/netip"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
|
|
||||||
@ -19,11 +18,11 @@ type Client struct {
|
|||||||
APIBase string
|
APIBase string
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindSuccessor asks s to find the first peer in the network preceding id.
|
// Find asks s to find a value and the peer that owns it.
|
||||||
func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID) (chord.Peer, error) {
|
func (cl *Client) Find(ctx context.Context, s chord.Peer, id chord.ID) (chord.Peer, string, error) {
|
||||||
_, addr := s.Values()
|
_, addr := s.Values()
|
||||||
if !addr.IsValid() {
|
if !addr.IsValid() {
|
||||||
return chord.Peer{}, errors.New("FindSuccessor with invalid peer")
|
return chord.Peer{}, "", errors.New("Find with invalid peer")
|
||||||
}
|
}
|
||||||
url := url.URL{
|
url := url.URL{
|
||||||
Scheme: "http",
|
Scheme: "http",
|
||||||
@ -33,17 +32,17 @@ func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID)
|
|||||||
}
|
}
|
||||||
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
|
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return chord.Peer{}, err
|
return chord.Peer{}, "", err
|
||||||
}
|
}
|
||||||
resp, err := cl.HTTP.Do(req)
|
resp, err := cl.HTTP.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return chord.Peer{}, err
|
return chord.Peer{}, "", err
|
||||||
}
|
}
|
||||||
p, err := readResponse[netip.AddrPort](resp)
|
p, err := readResponse[peervalue](resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return chord.Peer{}, fmt.Errorf("%w (%s)", err, resp.Status)
|
return chord.Peer{}, "", fmt.Errorf("%w (%s)", err, resp.Status)
|
||||||
}
|
}
|
||||||
return chord.Address(p), nil
|
return chord.Address(p.Peer), p.Value, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify tells s we believe n to be its predecessor.
|
// Notify tells s we believe n to be its predecessor.
|
||||||
|
@ -45,3 +45,8 @@ type neighbors struct {
|
|||||||
Succ []netip.AddrPort `json:"succ"`
|
Succ []netip.AddrPort `json:"succ"`
|
||||||
Pred netip.AddrPort `json:"pred,omitzero"`
|
Pred netip.AddrPort `json:"pred,omitzero"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type peervalue struct {
|
||||||
|
Peer netip.AddrPort `json:"peer"`
|
||||||
|
Value string `json:"value,omitzero"`
|
||||||
|
}
|
||||||
|
@ -45,7 +45,7 @@ func New(l net.Listener, cl chord.Client) (*Node, error) {
|
|||||||
// Router creates a handler for the Chord HTTP endpoints.
|
// Router creates a handler for the Chord HTTP endpoints.
|
||||||
func (n *Node) Router() http.Handler {
|
func (n *Node) Router() http.Handler {
|
||||||
m := http.NewServeMux()
|
m := http.NewServeMux()
|
||||||
m.HandleFunc("GET /succ", n.successor)
|
m.HandleFunc("GET /key", n.key)
|
||||||
m.HandleFunc("POST /pred", n.notify)
|
m.HandleFunc("POST /pred", n.notify)
|
||||||
m.HandleFunc("GET /neighbors", n.neighbors)
|
m.HandleFunc("GET /neighbors", n.neighbors)
|
||||||
return m
|
return m
|
||||||
@ -65,20 +65,21 @@ func (n *Node) Check(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) successor(w http.ResponseWriter, r *http.Request) {
|
func (n *Node) key(w http.ResponseWriter, r *http.Request) {
|
||||||
s := r.FormValue("s")
|
s := r.FormValue("s")
|
||||||
id, err := chord.ParseID(s)
|
id, err := chord.ParseID(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, http.StatusBadRequest, err.Error())
|
writeError(w, http.StatusBadRequest, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p, err := chord.Find(r.Context(), n.client, n.self, id)
|
p, v, err := chord.Find(r.Context(), n.client, n.self, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, http.StatusInternalServerError, err.Error())
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_, addr := p.Values()
|
_, addr := p.Values()
|
||||||
writeOk(w, addr)
|
pv := peervalue{addr, v}
|
||||||
|
writeOk(w, pv)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
|
func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -16,10 +16,11 @@ func addrID(addr netip.AddrPort) ID {
|
|||||||
}
|
}
|
||||||
b := make([]byte, 0, len("[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff%enp5s0]:65535"))
|
b := make([]byte, 0, len("[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff%enp5s0]:65535"))
|
||||||
b = addr.AppendTo(b)
|
b = addr.AppendTo(b)
|
||||||
return keyID(b)
|
return Key(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func keyID[T string | []byte](key T) ID {
|
// Key obtains the ID for an arbitrary key.
|
||||||
|
func Key[T string | []byte](key T) ID {
|
||||||
s := sha1.Sum([]byte(key))
|
s := sha1.Sum([]byte(key))
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
@ -3,11 +3,16 @@ package chord
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Node represents a node's topological relations within a Chord network.
|
// Node represents a node's topological relations within a Chord network.
|
||||||
|
// It is safe to use the methods of Node concurrently.
|
||||||
type Node struct {
|
type Node struct {
|
||||||
|
// mu guards the mutable fields of the node.
|
||||||
|
mu sync.Mutex
|
||||||
// self is the node's own address.
|
// self is the node's own address.
|
||||||
|
// It is fully immutable; the mutex does not protect it.
|
||||||
self Peer
|
self Peer
|
||||||
// pred is the node's predecessor.
|
// pred is the node's predecessor.
|
||||||
// If not valid, the predecessor is not currently known.
|
// If not valid, the predecessor is not currently known.
|
||||||
@ -20,6 +25,9 @@ type Node struct {
|
|||||||
succ []Peer
|
succ []Peer
|
||||||
// fingers is the node's finger table.
|
// fingers is the node's finger table.
|
||||||
fingers []Peer
|
fingers []Peer
|
||||||
|
|
||||||
|
// data is the data the node owns.
|
||||||
|
data map[ID]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) Self() Peer {
|
func (n *Node) Self() Peer {
|
||||||
@ -27,11 +35,15 @@ func (n *Node) Self() Peer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) Successor() Peer {
|
func (n *Node) Successor() Peer {
|
||||||
|
n.mu.Lock()
|
||||||
|
defer n.mu.Unlock()
|
||||||
return n.succ[0]
|
return n.succ[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Neighbors returns the node's predecessor and appends its successor list to s.
|
// Neighbors returns the node's predecessor and appends its successor list to s.
|
||||||
func (n *Node) Neighbors(s []Peer) (Peer, []Peer) {
|
func (n *Node) Neighbors(s []Peer) (Peer, []Peer) {
|
||||||
|
n.mu.Lock()
|
||||||
|
defer n.mu.Unlock()
|
||||||
return n.pred, append(s, n.succ...)
|
return n.pred, append(s, n.succ...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,6 +55,8 @@ func (n *Node) IsLocal(id ID) bool {
|
|||||||
// Closest finds the locally known peer which is the closest predecessor of key.
|
// Closest finds the locally known peer which is the closest predecessor of key.
|
||||||
func (n *Node) Closest(id ID) Peer {
|
func (n *Node) Closest(id ID) Peer {
|
||||||
self := n.self.id
|
self := n.self.id
|
||||||
|
n.mu.Lock()
|
||||||
|
defer n.mu.Unlock()
|
||||||
l := n.fingers
|
l := n.fingers
|
||||||
for i := len(l) - 1; i >= 0; i-- {
|
for i := len(l) - 1; i >= 0; i-- {
|
||||||
f := l[i]
|
f := l[i]
|
||||||
@ -71,6 +85,21 @@ func (n *Node) Closest(id ID) Peer {
|
|||||||
return n.self
|
return n.self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get obtains the value for a key owned by the node.
|
||||||
|
func (n *Node) Get(k ID) (v string, found bool) {
|
||||||
|
n.mu.Lock()
|
||||||
|
defer n.mu.Unlock()
|
||||||
|
v, found = n.data[k]
|
||||||
|
return v, found
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set sets the value for a key.
|
||||||
|
func (n *Node) Set(k ID, v string) {
|
||||||
|
n.mu.Lock()
|
||||||
|
defer n.mu.Unlock()
|
||||||
|
n.data[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
// Peer is the ID and address of a node.
|
// Peer is the ID and address of a node.
|
||||||
type Peer struct {
|
type Peer struct {
|
||||||
id ID
|
id ID
|
||||||
|
Loading…
Reference in New Issue
Block a user