Compare commits
No commits in common. "936decf0e6bd6c345ad4171cdcdeb3972eaa28e3" and "462a9783e9360a992b69ba223fa556a60e4fcfe0" have entirely different histories.
936decf0e6
...
462a9783e9
@ -48,62 +48,10 @@ func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID)
|
|||||||
|
|
||||||
// Notify tells s we believe n to be its predecessor.
|
// Notify tells s we believe n to be its predecessor.
|
||||||
func (cl *Client) Notify(ctx context.Context, n *chord.Node, s chord.Peer) error {
|
func (cl *Client) Notify(ctx context.Context, n *chord.Node, s chord.Peer) error {
|
||||||
_, addr := s.Values()
|
panic("not implemented") // TODO: Implement
|
||||||
if !addr.IsValid() {
|
|
||||||
return errors.New("Notify with invalid peer")
|
|
||||||
}
|
|
||||||
_, self := n.Self().Values()
|
|
||||||
url := url.URL{
|
|
||||||
Scheme: "http",
|
|
||||||
Host: addr.String(),
|
|
||||||
Path: path.Join("/", cl.APIBase, "pred"),
|
|
||||||
RawQuery: url.Values{"s": {self.String()}}.Encode(),
|
|
||||||
}
|
|
||||||
req, err := http.NewRequestWithContext(ctx, "POST", url.String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
resp, err := cl.HTTP.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// We expect the server to only be capable of responding with No Content.
|
|
||||||
if resp.StatusCode != http.StatusNoContent {
|
|
||||||
return fmt.Errorf("strange response: %s", resp.Status)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Neighbors requests a peer's beliefs about its own neighbors.
|
// Neighbors requests a peer's beliefs about its own neighbors.
|
||||||
func (cl *Client) Neighbors(ctx context.Context, p chord.Peer) (pred chord.Peer, succ []chord.Peer, err error) {
|
func (cl *Client) Neighbors(ctx context.Context, p chord.Peer) (pred chord.Peer, succ []chord.Peer, err error) {
|
||||||
_, addr := p.Values()
|
panic("not implemented") // TODO: Implement
|
||||||
if !addr.IsValid() {
|
|
||||||
return chord.Peer{}, nil, errors.New("Neighbors with invalid peer")
|
|
||||||
}
|
|
||||||
url := url.URL{
|
|
||||||
Scheme: "http",
|
|
||||||
Host: addr.String(),
|
|
||||||
Path: path.Join("/", cl.APIBase, "neighbors"),
|
|
||||||
}
|
|
||||||
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return chord.Peer{}, nil, err
|
|
||||||
}
|
|
||||||
resp, err := cl.HTTP.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return chord.Peer{}, nil, err
|
|
||||||
}
|
|
||||||
n, err := readResponse[neighbors](resp)
|
|
||||||
if err != nil {
|
|
||||||
return chord.Peer{}, nil, fmt.Errorf("%w (%s)", err, resp.Status)
|
|
||||||
}
|
|
||||||
pred = chord.Address(n.Pred)
|
|
||||||
succ = make([]chord.Peer, 0, len(n.Succ))
|
|
||||||
for _, addr := range n.Succ {
|
|
||||||
p := chord.Address(addr)
|
|
||||||
if p.IsValid() {
|
|
||||||
succ = append(succ, p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return pred, succ, nil
|
|
||||||
}
|
}
|
||||||
|
@ -43,5 +43,5 @@ func readResponse[T any](r *http.Response) (x T, err error) {
|
|||||||
|
|
||||||
type neighbors struct {
|
type neighbors struct {
|
||||||
Succ []netip.AddrPort `json:"succ"`
|
Succ []netip.AddrPort `json:"succ"`
|
||||||
Pred netip.AddrPort `json:"pred,omitzero"`
|
Pred netip.AddrPort `json:"pred"`
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,7 @@
|
|||||||
package httpnode
|
package httpnode
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
@ -15,8 +13,6 @@ type Node struct {
|
|||||||
// self is the node topology this server represents.
|
// self is the node topology this server represents.
|
||||||
self *chord.Node
|
self *chord.Node
|
||||||
// client is the Chord client for forwarding queries to other nodes.
|
// client is the Chord client for forwarding queries to other nodes.
|
||||||
// TODO(branden): really we should have a client per peer so that we can
|
|
||||||
// interchange protocols
|
|
||||||
client chord.Client
|
client chord.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -47,24 +43,9 @@ func (n *Node) Router() http.Handler {
|
|||||||
m := http.NewServeMux()
|
m := http.NewServeMux()
|
||||||
m.HandleFunc("GET /succ", n.successor)
|
m.HandleFunc("GET /succ", n.successor)
|
||||||
m.HandleFunc("POST /pred", n.notify)
|
m.HandleFunc("POST /pred", n.notify)
|
||||||
m.HandleFunc("GET /neighbors", n.neighbors)
|
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check performs checks that implement the Chord protocol.
|
|
||||||
// It must be called periodically while the node is alive.
|
|
||||||
func (n *Node) Check(ctx context.Context) error {
|
|
||||||
if err := chord.Stabilize(ctx, n.client, n.self); err != nil {
|
|
||||||
return fmt.Errorf("failed to stabilize: %w", err)
|
|
||||||
}
|
|
||||||
// TODO(zeph): enable once implemented
|
|
||||||
// if err := chord.FixFingers(ctx, n.client, n.self); err != nil {
|
|
||||||
// return fmt.Errorf("failed to fix fingers: %w", err)
|
|
||||||
// }
|
|
||||||
chord.CheckPredecessor(ctx, n.client, n.self)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Node) successor(w http.ResponseWriter, r *http.Request) {
|
func (n *Node) successor(w http.ResponseWriter, r *http.Request) {
|
||||||
s := r.FormValue("s")
|
s := r.FormValue("s")
|
||||||
id, err := chord.ParseID(s)
|
id, err := chord.ParseID(s)
|
||||||
@ -82,6 +63,7 @@ func (n *Node) successor(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
|
func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// Another node is telling us they think they're our predecessor.
|
||||||
s := r.FormValue("p")
|
s := r.FormValue("p")
|
||||||
addr, err := netip.ParseAddrPort(s)
|
addr, err := netip.ParseAddrPort(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -90,19 +72,4 @@ func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
np := chord.Address(addr)
|
np := chord.Address(addr)
|
||||||
chord.Notify(n.self, np)
|
chord.Notify(n.self, np)
|
||||||
w.WriteHeader(http.StatusNoContent)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Node) neighbors(w http.ResponseWriter, r *http.Request) {
|
|
||||||
pred, succ := n.self.Neighbors(nil)
|
|
||||||
_, paddr := pred.Values()
|
|
||||||
u := neighbors{
|
|
||||||
Succ: make([]netip.AddrPort, 0, len(succ)),
|
|
||||||
Pred: paddr,
|
|
||||||
}
|
|
||||||
for _, s := range succ {
|
|
||||||
_, addr := s.Values()
|
|
||||||
u.Succ = append(u.Succ, addr)
|
|
||||||
}
|
|
||||||
writeOk(w, &u)
|
|
||||||
}
|
}
|
||||||
|
@ -22,10 +22,6 @@ type Node struct {
|
|||||||
fingers []Peer
|
fingers []Peer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) Self() Peer {
|
|
||||||
return n.self
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Node) Successor() Peer {
|
func (n *Node) Successor() Peer {
|
||||||
return n.succ[0]
|
return n.succ[0]
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user