Compare commits

...

5 Commits

Author SHA1 Message Date
Branden J Brown
936decf0e6 implement periodic checks in a function 2025-03-12 09:54:59 -04:00
Branden J Brown
85e381f4bd implement client neighbors 2025-03-11 09:58:20 -04:00
Branden J Brown
68ce8c5f34 don't transmit nil predecessors 2025-03-11 09:27:45 -04:00
Branden J Brown
93771480ff implement client notify 2025-03-11 09:23:51 -04:00
Branden J Brown
54d9536692 finish node server 2025-03-11 09:15:18 -04:00
4 changed files with 93 additions and 4 deletions

View File

@ -48,10 +48,62 @@ func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID)
// Notify tells s we believe n to be its predecessor.
func (cl *Client) Notify(ctx context.Context, n *chord.Node, s chord.Peer) error {
panic("not implemented") // TODO: Implement
_, addr := s.Values()
if !addr.IsValid() {
return errors.New("Notify with invalid peer")
}
_, self := n.Self().Values()
url := url.URL{
Scheme: "http",
Host: addr.String(),
Path: path.Join("/", cl.APIBase, "pred"),
RawQuery: url.Values{"s": {self.String()}}.Encode(),
}
req, err := http.NewRequestWithContext(ctx, "POST", url.String(), nil)
if err != nil {
return err
}
resp, err := cl.HTTP.Do(req)
if err != nil {
return err
}
// We expect the server to only be capable of responding with No Content.
if resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("strange response: %s", resp.Status)
}
return nil
}
// Neighbors requests a peer's beliefs about its own neighbors.
func (cl *Client) Neighbors(ctx context.Context, p chord.Peer) (pred chord.Peer, succ []chord.Peer, err error) {
panic("not implemented") // TODO: Implement
_, addr := p.Values()
if !addr.IsValid() {
return chord.Peer{}, nil, errors.New("Neighbors with invalid peer")
}
url := url.URL{
Scheme: "http",
Host: addr.String(),
Path: path.Join("/", cl.APIBase, "neighbors"),
}
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
if err != nil {
return chord.Peer{}, nil, err
}
resp, err := cl.HTTP.Do(req)
if err != nil {
return chord.Peer{}, nil, err
}
n, err := readResponse[neighbors](resp)
if err != nil {
return chord.Peer{}, nil, fmt.Errorf("%w (%s)", err, resp.Status)
}
pred = chord.Address(n.Pred)
succ = make([]chord.Peer, 0, len(n.Succ))
for _, addr := range n.Succ {
p := chord.Address(addr)
if p.IsValid() {
succ = append(succ, p)
}
}
return pred, succ, nil
}

View File

@ -43,5 +43,5 @@ func readResponse[T any](r *http.Response) (x T, err error) {
type neighbors struct {
Succ []netip.AddrPort `json:"succ"`
Pred netip.AddrPort `json:"pred"`
Pred netip.AddrPort `json:"pred,omitzero"`
}

View File

@ -1,7 +1,9 @@
package httpnode
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/netip"
@ -13,6 +15,8 @@ type Node struct {
// self is the node topology this server represents.
self *chord.Node
// client is the Chord client for forwarding queries to other nodes.
// TODO(branden): really we should have a client per peer so that we can
// interchange protocols
client chord.Client
}
@ -43,9 +47,24 @@ func (n *Node) Router() http.Handler {
m := http.NewServeMux()
m.HandleFunc("GET /succ", n.successor)
m.HandleFunc("POST /pred", n.notify)
m.HandleFunc("GET /neighbors", n.neighbors)
return m
}
// Check performs checks that implement the Chord protocol.
// It must be called periodically while the node is alive.
func (n *Node) Check(ctx context.Context) error {
if err := chord.Stabilize(ctx, n.client, n.self); err != nil {
return fmt.Errorf("failed to stabilize: %w", err)
}
// TODO(zeph): enable once implemented
// if err := chord.FixFingers(ctx, n.client, n.self); err != nil {
// return fmt.Errorf("failed to fix fingers: %w", err)
// }
chord.CheckPredecessor(ctx, n.client, n.self)
return nil
}
func (n *Node) successor(w http.ResponseWriter, r *http.Request) {
s := r.FormValue("s")
id, err := chord.ParseID(s)
@ -63,7 +82,6 @@ func (n *Node) successor(w http.ResponseWriter, r *http.Request) {
}
func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
// Another node is telling us they think they're our predecessor.
s := r.FormValue("p")
addr, err := netip.ParseAddrPort(s)
if err != nil {
@ -72,4 +90,19 @@ func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
}
np := chord.Address(addr)
chord.Notify(n.self, np)
w.WriteHeader(http.StatusNoContent)
}
func (n *Node) neighbors(w http.ResponseWriter, r *http.Request) {
pred, succ := n.self.Neighbors(nil)
_, paddr := pred.Values()
u := neighbors{
Succ: make([]netip.AddrPort, 0, len(succ)),
Pred: paddr,
}
for _, s := range succ {
_, addr := s.Values()
u.Succ = append(u.Succ, addr)
}
writeOk(w, &u)
}

View File

@ -22,6 +22,10 @@ type Node struct {
fingers []Peer
}
func (n *Node) Self() Peer {
return n.self
}
func (n *Node) Successor() Peer {
return n.succ[0]
}