Compare commits
5 Commits
462a9783e9
...
936decf0e6
Author | SHA1 | Date | |
---|---|---|---|
![]() |
936decf0e6 | ||
![]() |
85e381f4bd | ||
![]() |
68ce8c5f34 | ||
![]() |
93771480ff | ||
![]() |
54d9536692 |
@ -48,10 +48,62 @@ func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID)
|
||||
|
||||
// Notify tells s we believe n to be its predecessor.
|
||||
func (cl *Client) Notify(ctx context.Context, n *chord.Node, s chord.Peer) error {
|
||||
panic("not implemented") // TODO: Implement
|
||||
_, addr := s.Values()
|
||||
if !addr.IsValid() {
|
||||
return errors.New("Notify with invalid peer")
|
||||
}
|
||||
_, self := n.Self().Values()
|
||||
url := url.URL{
|
||||
Scheme: "http",
|
||||
Host: addr.String(),
|
||||
Path: path.Join("/", cl.APIBase, "pred"),
|
||||
RawQuery: url.Values{"s": {self.String()}}.Encode(),
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url.String(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := cl.HTTP.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// We expect the server to only be capable of responding with No Content.
|
||||
if resp.StatusCode != http.StatusNoContent {
|
||||
return fmt.Errorf("strange response: %s", resp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Neighbors requests a peer's beliefs about its own neighbors.
|
||||
func (cl *Client) Neighbors(ctx context.Context, p chord.Peer) (pred chord.Peer, succ []chord.Peer, err error) {
|
||||
panic("not implemented") // TODO: Implement
|
||||
_, addr := p.Values()
|
||||
if !addr.IsValid() {
|
||||
return chord.Peer{}, nil, errors.New("Neighbors with invalid peer")
|
||||
}
|
||||
url := url.URL{
|
||||
Scheme: "http",
|
||||
Host: addr.String(),
|
||||
Path: path.Join("/", cl.APIBase, "neighbors"),
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
|
||||
if err != nil {
|
||||
return chord.Peer{}, nil, err
|
||||
}
|
||||
resp, err := cl.HTTP.Do(req)
|
||||
if err != nil {
|
||||
return chord.Peer{}, nil, err
|
||||
}
|
||||
n, err := readResponse[neighbors](resp)
|
||||
if err != nil {
|
||||
return chord.Peer{}, nil, fmt.Errorf("%w (%s)", err, resp.Status)
|
||||
}
|
||||
pred = chord.Address(n.Pred)
|
||||
succ = make([]chord.Peer, 0, len(n.Succ))
|
||||
for _, addr := range n.Succ {
|
||||
p := chord.Address(addr)
|
||||
if p.IsValid() {
|
||||
succ = append(succ, p)
|
||||
}
|
||||
}
|
||||
return pred, succ, nil
|
||||
}
|
||||
|
@ -43,5 +43,5 @@ func readResponse[T any](r *http.Response) (x T, err error) {
|
||||
|
||||
type neighbors struct {
|
||||
Succ []netip.AddrPort `json:"succ"`
|
||||
Pred netip.AddrPort `json:"pred"`
|
||||
Pred netip.AddrPort `json:"pred,omitzero"`
|
||||
}
|
||||
|
@ -1,7 +1,9 @@
|
||||
package httpnode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
@ -13,6 +15,8 @@ type Node struct {
|
||||
// self is the node topology this server represents.
|
||||
self *chord.Node
|
||||
// client is the Chord client for forwarding queries to other nodes.
|
||||
// TODO(branden): really we should have a client per peer so that we can
|
||||
// interchange protocols
|
||||
client chord.Client
|
||||
}
|
||||
|
||||
@ -43,9 +47,24 @@ func (n *Node) Router() http.Handler {
|
||||
m := http.NewServeMux()
|
||||
m.HandleFunc("GET /succ", n.successor)
|
||||
m.HandleFunc("POST /pred", n.notify)
|
||||
m.HandleFunc("GET /neighbors", n.neighbors)
|
||||
return m
|
||||
}
|
||||
|
||||
// Check performs checks that implement the Chord protocol.
|
||||
// It must be called periodically while the node is alive.
|
||||
func (n *Node) Check(ctx context.Context) error {
|
||||
if err := chord.Stabilize(ctx, n.client, n.self); err != nil {
|
||||
return fmt.Errorf("failed to stabilize: %w", err)
|
||||
}
|
||||
// TODO(zeph): enable once implemented
|
||||
// if err := chord.FixFingers(ctx, n.client, n.self); err != nil {
|
||||
// return fmt.Errorf("failed to fix fingers: %w", err)
|
||||
// }
|
||||
chord.CheckPredecessor(ctx, n.client, n.self)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) successor(w http.ResponseWriter, r *http.Request) {
|
||||
s := r.FormValue("s")
|
||||
id, err := chord.ParseID(s)
|
||||
@ -63,7 +82,6 @@ func (n *Node) successor(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
|
||||
// Another node is telling us they think they're our predecessor.
|
||||
s := r.FormValue("p")
|
||||
addr, err := netip.ParseAddrPort(s)
|
||||
if err != nil {
|
||||
@ -72,4 +90,19 @@ func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
np := chord.Address(addr)
|
||||
chord.Notify(n.self, np)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (n *Node) neighbors(w http.ResponseWriter, r *http.Request) {
|
||||
pred, succ := n.self.Neighbors(nil)
|
||||
_, paddr := pred.Values()
|
||||
u := neighbors{
|
||||
Succ: make([]netip.AddrPort, 0, len(succ)),
|
||||
Pred: paddr,
|
||||
}
|
||||
for _, s := range succ {
|
||||
_, addr := s.Values()
|
||||
u.Succ = append(u.Succ, addr)
|
||||
}
|
||||
writeOk(w, &u)
|
||||
}
|
||||
|
@ -22,6 +22,10 @@ type Node struct {
|
||||
fingers []Peer
|
||||
}
|
||||
|
||||
func (n *Node) Self() Peer {
|
||||
return n.self
|
||||
}
|
||||
|
||||
func (n *Node) Successor() Peer {
|
||||
return n.succ[0]
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user