Compare commits
5 Commits
f4a8b104ab
...
9610513384
Author | SHA1 | Date | |
---|---|---|---|
![]() |
9610513384 | ||
![]() |
4b44ffcd13 | ||
![]() |
8047f4f13e | ||
![]() |
70f4e149d4 | ||
![]() |
ef03e13a87 |
@ -13,13 +13,15 @@ type Client interface {
|
|||||||
// If the ID is not associated with a key, the result must be the empty
|
// If the ID is not associated with a key, the result must be the empty
|
||||||
// string with a nil error.
|
// string with a nil error.
|
||||||
Find(ctx context.Context, s Peer, id ID) (Peer, string, error)
|
Find(ctx context.Context, s Peer, id ID) (Peer, string, error)
|
||||||
|
// Set asks s to save a value for an ID.
|
||||||
|
Set(ctx context.Context, s Peer, id ID, v string) error
|
||||||
// Notify tells s we believe n to be its predecessor.
|
// Notify tells s we believe n to be its predecessor.
|
||||||
Notify(ctx context.Context, n *Node, s Peer) error
|
Notify(ctx context.Context, n *Node, s Peer) error
|
||||||
// Neighbors requests a peer's beliefs about its own neighbors.
|
// Neighbors requests a peer's beliefs about its own neighbors.
|
||||||
Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error)
|
Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(branden): FindSuccessor should be plural; if we have multiple keys to
|
// TODO(branden): Find should be plural; if we have multiple keys to
|
||||||
// search, we shouldn't have to do the whole query for all of them, especially
|
// search, we shouldn't have to do the whole query for all of them, especially
|
||||||
// considering we can sort by increasing distance from the origin and then do
|
// considering we can sort by increasing distance from the origin and then do
|
||||||
// the query in linear time.
|
// the query in linear time.
|
||||||
@ -33,6 +35,18 @@ func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, string, error)
|
|||||||
return p, s, err
|
return p, s, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(branden): Set should be plural for the same reasons. It should also
|
||||||
|
// return an error if the key isn't local to the peer.
|
||||||
|
|
||||||
|
// Set saves a value in the Chord network.
|
||||||
|
func Set(ctx context.Context, cl Client, n *Node, key ID, val string) error {
|
||||||
|
p, _, err := Find(ctx, cl, n, key)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("couldn't find peer to save key: %w", err)
|
||||||
|
}
|
||||||
|
return cl.Set(ctx, p, key, val)
|
||||||
|
}
|
||||||
|
|
||||||
// Join creates a new node joining an existing Chord network by communicating
|
// Join creates a new node joining an existing Chord network by communicating
|
||||||
// with any peer already in the network.
|
// with any peer already in the network.
|
||||||
func Join(ctx context.Context, cl Client, addr netip.AddrPort, np Peer) (*Node, error) {
|
func Join(ctx context.Context, cl Client, addr netip.AddrPort, np Peer) (*Node, error) {
|
||||||
|
@ -2,11 +2,13 @@ package httpnode
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"git.sunturtle.xyz/zephyr/chord/chord"
|
"git.sunturtle.xyz/zephyr/chord/chord"
|
||||||
)
|
)
|
||||||
@ -27,8 +29,7 @@ func (cl *Client) Find(ctx context.Context, s chord.Peer, id chord.ID) (chord.Pe
|
|||||||
url := url.URL{
|
url := url.URL{
|
||||||
Scheme: "http",
|
Scheme: "http",
|
||||||
Host: addr.String(),
|
Host: addr.String(),
|
||||||
Path: path.Join("/", cl.APIBase, "succ"),
|
Path: path.Join("/", cl.APIBase, "key", id.String()),
|
||||||
RawQuery: url.Values{"s": {id.String()}}.Encode(),
|
|
||||||
}
|
}
|
||||||
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
|
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -45,6 +46,29 @@ func (cl *Client) Find(ctx context.Context, s chord.Peer, id chord.ID) (chord.Pe
|
|||||||
return chord.Address(p.Peer), p.Value, nil
|
return chord.Address(p.Peer), p.Value, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cl *Client) Set(ctx context.Context, s chord.Peer, id chord.ID, v string) error {
|
||||||
|
_, addr := s.Values()
|
||||||
|
url := url.URL{
|
||||||
|
Scheme: "http",
|
||||||
|
Host: addr.String(),
|
||||||
|
Path: path.Join("/", cl.APIBase, "key", id.String()),
|
||||||
|
}
|
||||||
|
body := strings.NewReader(base64.StdEncoding.EncodeToString([]byte(v)))
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "POST", url.String(), body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp, err := cl.HTTP.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if resp.StatusCode >= 400 {
|
||||||
|
_, err := readResponse[struct{}](resp)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Notify tells s we believe n to be its predecessor.
|
// Notify tells s we believe n to be its predecessor.
|
||||||
func (cl *Client) Notify(ctx context.Context, n *chord.Node, s chord.Peer) error {
|
func (cl *Client) Notify(ctx context.Context, n *chord.Node, s chord.Peer) error {
|
||||||
_, addr := s.Values()
|
_, addr := s.Values()
|
||||||
|
@ -2,8 +2,10 @@ package httpnode
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
@ -45,7 +47,8 @@ func New(l net.Listener, cl chord.Client) (*Node, error) {
|
|||||||
// Router creates a handler for the Chord HTTP endpoints.
|
// Router creates a handler for the Chord HTTP endpoints.
|
||||||
func (n *Node) Router() http.Handler {
|
func (n *Node) Router() http.Handler {
|
||||||
m := http.NewServeMux()
|
m := http.NewServeMux()
|
||||||
m.HandleFunc("GET /key", n.key)
|
m.HandleFunc("GET /key/{id}", n.key)
|
||||||
|
m.HandleFunc("POST /key/{id}", n.set)
|
||||||
m.HandleFunc("POST /pred", n.notify)
|
m.HandleFunc("POST /pred", n.notify)
|
||||||
m.HandleFunc("GET /neighbors", n.neighbors)
|
m.HandleFunc("GET /neighbors", n.neighbors)
|
||||||
return m
|
return m
|
||||||
@ -66,7 +69,7 @@ func (n *Node) Check(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) key(w http.ResponseWriter, r *http.Request) {
|
func (n *Node) key(w http.ResponseWriter, r *http.Request) {
|
||||||
s := r.FormValue("s")
|
s := r.PathValue("id")
|
||||||
id, err := chord.ParseID(s)
|
id, err := chord.ParseID(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, http.StatusBadRequest, err.Error())
|
writeError(w, http.StatusBadRequest, err.Error())
|
||||||
@ -82,6 +85,25 @@ func (n *Node) key(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeOk(w, pv)
|
writeOk(w, pv)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Node) set(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s := r.PathValue("id")
|
||||||
|
id, err := chord.ParseID(s)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
val, err := io.ReadAll(base64.NewDecoder(base64.StdEncoding, r.Body))
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !n.self.SetLocal(id, string(val)) {
|
||||||
|
writeError(w, http.StatusNotFound, "id does not belong to this peer")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
}
|
||||||
|
|
||||||
func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
|
func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
|
||||||
s := r.FormValue("p")
|
s := r.FormValue("p")
|
||||||
addr, err := netip.ParseAddrPort(s)
|
addr, err := netip.ParseAddrPort(s)
|
||||||
|
@ -47,9 +47,15 @@ func (n *Node) Neighbors(s []Peer) (Peer, []Peer) {
|
|||||||
return n.pred, append(s, n.succ...)
|
return n.pred, append(s, n.succ...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Node) localLocked(id ID) bool {
|
||||||
|
return contains(n.self.id, n.succ[0].id, id)
|
||||||
|
}
|
||||||
|
|
||||||
// IsLocal reports whether this node owns the given key.
|
// IsLocal reports whether this node owns the given key.
|
||||||
func (n *Node) IsLocal(id ID) bool {
|
func (n *Node) IsLocal(id ID) bool {
|
||||||
return contains(n.self.id, n.Successor().id, id)
|
n.mu.Lock()
|
||||||
|
defer n.mu.Unlock()
|
||||||
|
return n.localLocked(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Closest finds the locally known peer which is the closest predecessor of key.
|
// Closest finds the locally known peer which is the closest predecessor of key.
|
||||||
@ -85,19 +91,26 @@ func (n *Node) Closest(id ID) Peer {
|
|||||||
return n.self
|
return n.self
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get obtains the value for a key owned by the node.
|
// GetLocal obtains the value for a key if it is local to and owned by the node.
|
||||||
func (n *Node) Get(k ID) (v string, found bool) {
|
func (n *Node) GetLocal(k ID) (v string, found bool) {
|
||||||
n.mu.Lock()
|
n.mu.Lock()
|
||||||
defer n.mu.Unlock()
|
defer n.mu.Unlock()
|
||||||
|
if n.localLocked(k) {
|
||||||
v, found = n.data[k]
|
v, found = n.data[k]
|
||||||
|
}
|
||||||
return v, found
|
return v, found
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set sets the value for a key.
|
// SetLocal sets the value for a key.
|
||||||
func (n *Node) Set(k ID, v string) {
|
// Returns false if the key is not owned by the node.
|
||||||
|
func (n *Node) SetLocal(k ID, v string) bool {
|
||||||
n.mu.Lock()
|
n.mu.Lock()
|
||||||
defer n.mu.Unlock()
|
defer n.mu.Unlock()
|
||||||
|
if n.localLocked(k) {
|
||||||
n.data[k] = v
|
n.data[k] = v
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Peer is the ID and address of a node.
|
// Peer is the ID and address of a node.
|
||||||
|
Loading…
Reference in New Issue
Block a user