Compare commits
5 Commits
2460f745bc
...
5d5155d8d8
Author | SHA1 | Date | |
---|---|---|---|
![]() |
5d5155d8d8 | ||
![]() |
c0afd85f11 | ||
![]() |
bd2085ee87 | ||
![]() |
73068a9c1f | ||
![]() |
ca35fd19db |
@ -15,6 +15,11 @@ type Client interface {
|
|||||||
Notify(ctx context.Context, n *Node, s Peer) error
|
Notify(ctx context.Context, n *Node, s Peer) error
|
||||||
// Neighbors requests a peer's beliefs about its own neighbors.
|
// Neighbors requests a peer's beliefs about its own neighbors.
|
||||||
Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error)
|
Neighbors(ctx context.Context, p Peer) (pred Peer, succ []Peer, err error)
|
||||||
|
// Bye tells p that its predecessor or successor is leaving
|
||||||
|
// and had the given successor list.
|
||||||
|
Bye(ctx context.Context, p, n Peer, succ []Peer) error
|
||||||
|
// SayBye tells p to leave the network.
|
||||||
|
SayBye(ctx context.Context, p Peer) error
|
||||||
|
|
||||||
// Get asks s for a saved value.
|
// Get asks s for a saved value.
|
||||||
Get(ctx context.Context, s Peer, id ID) (string, error)
|
Get(ctx context.Context, s Peer, id ID) (string, error)
|
||||||
@ -93,6 +98,32 @@ func Join(ctx context.Context, cl Client, addr netip.AddrPort, np Peer) (*Node,
|
|||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Leave causes n to exit the network gracefully,
|
||||||
|
// handing off its data to its successor.
|
||||||
|
func Leave(ctx context.Context, cl Client, n *Node) error {
|
||||||
|
pred, succ := n.Neighbors(nil)
|
||||||
|
if succ[0].addr == n.self.addr {
|
||||||
|
// Last node in the network. No data transfer.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := cl.Bye(ctx, succ[0], n.self, succ); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if pred.IsValid() {
|
||||||
|
if err := cl.Bye(ctx, pred, n.self, succ); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
n.mu.Lock()
|
||||||
|
defer n.mu.Unlock()
|
||||||
|
for k, v := range n.data {
|
||||||
|
if err := cl.Set(ctx, succ[0], k, v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Stabilize recomputes n's successor.
|
// Stabilize recomputes n's successor.
|
||||||
func Stabilize(ctx context.Context, cl Client, n *Node) error {
|
func Stabilize(ctx context.Context, cl Client, n *Node) error {
|
||||||
pred, _, err := cl.Neighbors(ctx, n.Successor())
|
pred, _, err := cl.Neighbors(ctx, n.Successor())
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package httpnode
|
package httpnode
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
@ -13,6 +14,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"git.sunturtle.xyz/zephyr/chord/chord"
|
"git.sunturtle.xyz/zephyr/chord/chord"
|
||||||
|
"github.com/go-json-experiment/json"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
@ -160,3 +162,69 @@ func (cl *Client) Neighbors(ctx context.Context, p chord.Peer) (pred chord.Peer,
|
|||||||
}
|
}
|
||||||
return pred, succ, nil
|
return pred, succ, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Bye tells p that its predecessor or successor is leaving
|
||||||
|
// and had the given successor list.
|
||||||
|
func (cl *Client) Bye(ctx context.Context, p, n chord.Peer, succ []chord.Peer) error {
|
||||||
|
_, peer := p.Values()
|
||||||
|
_, addr := n.Values()
|
||||||
|
if !peer.IsValid() || !addr.IsValid() {
|
||||||
|
return errors.New("Bye with invalid address")
|
||||||
|
}
|
||||||
|
s := make([]netip.AddrPort, 0, len(succ))
|
||||||
|
for _, p := range succ {
|
||||||
|
_, addr := p.Values()
|
||||||
|
if !addr.IsValid() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
s = append(s, addr)
|
||||||
|
}
|
||||||
|
q := bye{Peer: peer, Succ: s}
|
||||||
|
b, err := json.Marshal(&q)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
url := url.URL{
|
||||||
|
Scheme: "http",
|
||||||
|
Host: addr.String(),
|
||||||
|
Path: path.Join("/", cl.APIBase, "bye"),
|
||||||
|
}
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewReader(b))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp, err := cl.HTTP.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if resp.StatusCode >= 400 {
|
||||||
|
_, err := readResponse[struct{}](resp)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SayBye tells p to leave the network.
|
||||||
|
func (cl *Client) SayBye(ctx context.Context, p chord.Peer) error {
|
||||||
|
_, addr := p.Values()
|
||||||
|
if !addr.IsValid() {
|
||||||
|
return errors.New("SayBye with invalid peer")
|
||||||
|
}
|
||||||
|
url := url.URL{
|
||||||
|
Scheme: "http",
|
||||||
|
Host: addr.String(),
|
||||||
|
Path: path.Join("/", cl.APIBase),
|
||||||
|
}
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "DELETE", url.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp, err := cl.HTTP.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return errors.New(resp.Status)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -48,3 +48,8 @@ type neighbors struct {
|
|||||||
Succ []netip.AddrPort `json:"succ"`
|
Succ []netip.AddrPort `json:"succ"`
|
||||||
Pred netip.AddrPort `json:"pred,omitzero"`
|
Pred netip.AddrPort `json:"pred,omitzero"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type bye struct {
|
||||||
|
Peer netip.AddrPort `json:"peer"`
|
||||||
|
Succ []netip.AddrPort `json:"succ"`
|
||||||
|
}
|
||||||
|
@ -11,6 +11,8 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
|
|
||||||
|
"github.com/go-json-experiment/json"
|
||||||
|
|
||||||
"git.sunturtle.xyz/zephyr/chord/chord"
|
"git.sunturtle.xyz/zephyr/chord/chord"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -21,6 +23,9 @@ type Node struct {
|
|||||||
// TODO(branden): really we should have a client per peer so that we can
|
// TODO(branden): really we should have a client per peer so that we can
|
||||||
// interchange protocols
|
// interchange protocols
|
||||||
client chord.Client
|
client chord.Client
|
||||||
|
// l is the listener that the HTTP server serves.
|
||||||
|
// When the server leaves, it closes the listener.
|
||||||
|
l net.Listener
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates an instance of a Chord network that responds on HTTP.
|
// New creates an instance of a Chord network that responds on HTTP.
|
||||||
@ -38,6 +43,7 @@ func New(l net.Listener, cl chord.Client, self *chord.Node) (*Node, error) {
|
|||||||
r := &Node{
|
r := &Node{
|
||||||
self: self,
|
self: self,
|
||||||
client: cl,
|
client: cl,
|
||||||
|
l: l,
|
||||||
}
|
}
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
@ -50,6 +56,8 @@ func (n *Node) Router() http.Handler {
|
|||||||
m.HandleFunc("POST /key/{id}", n.set)
|
m.HandleFunc("POST /key/{id}", n.set)
|
||||||
m.HandleFunc("POST /pred", n.notify)
|
m.HandleFunc("POST /pred", n.notify)
|
||||||
m.HandleFunc("GET /neighbors", n.neighbors)
|
m.HandleFunc("GET /neighbors", n.neighbors)
|
||||||
|
m.HandleFunc("POST /bye", n.bye)
|
||||||
|
m.HandleFunc("DELETE /", n.saybye)
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,3 +149,36 @@ func (n *Node) neighbors(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
writeOk(w, &u)
|
writeOk(w, &u)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Node) bye(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var q bye
|
||||||
|
if err := json.UnmarshalRead(r.Body, &q); err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !q.Peer.IsValid() {
|
||||||
|
writeError(w, http.StatusBadRequest, "invalid leaving peer")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(q.Succ) == 0 {
|
||||||
|
writeError(w, http.StatusBadRequest, "no successors for leaving node")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p := chord.Address(q.Peer)
|
||||||
|
s := make([]chord.Peer, 0, len(q.Succ))
|
||||||
|
for _, p := range q.Succ {
|
||||||
|
s = append(s, chord.Address(p))
|
||||||
|
}
|
||||||
|
n.self.Leave(p, s)
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Node) saybye(w http.ResponseWriter, r *http.Request) {
|
||||||
|
err := chord.Leave(r.Context(), n.client, n.self)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
n.l.Close()
|
||||||
|
}
|
||||||
|
@ -106,6 +106,28 @@ func (n *Node) SetLocal(k ID, v string) {
|
|||||||
n.data[k] = v
|
n.data[k] = v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Leave tells the node that a peer is leaving.
|
||||||
|
// succ gives the leaving peer's successors.
|
||||||
|
func (n *Node) Leave(p Peer, succ []Peer) {
|
||||||
|
n.mu.Lock()
|
||||||
|
defer n.mu.Unlock()
|
||||||
|
// The leaving peer might be either this node's successor, or its
|
||||||
|
// predecessor, or both in the case of a network of two nodes.
|
||||||
|
if p.addr == n.pred.addr {
|
||||||
|
n.pred = Peer{}
|
||||||
|
}
|
||||||
|
for i, s := range n.succ {
|
||||||
|
// TODO(branden): we should explicitly merge the successor lists
|
||||||
|
if s.addr == p.addr {
|
||||||
|
// Found the successor.
|
||||||
|
// Delete it and add in the last received successor.
|
||||||
|
copy(n.succ[i:], n.succ[i+1:])
|
||||||
|
n.succ[len(n.succ)-1] = succ[len(succ)-1]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SuccessorFailed marks the node's current successor as having failed, e.g.
|
// SuccessorFailed marks the node's current successor as having failed, e.g.
|
||||||
// during stabilization.
|
// during stabilization.
|
||||||
func (n *Node) SuccessorFailed() {
|
func (n *Node) SuccessorFailed() {
|
||||||
|
37
main.go
37
main.go
@ -135,6 +135,7 @@ func addrflag(ip string, p uint64) (netip.AddrPort, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func cliJoin(ctx context.Context, cmd *cli.Command) error {
|
func cliJoin(ctx context.Context, cmd *cli.Command) error {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
addr, err := addrflag(cmd.String("ip"), cmd.Uint("p"))
|
addr, err := addrflag(cmd.String("ip"), cmd.Uint("p"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -170,6 +171,7 @@ func cliJoin(ctx context.Context, cmd *cli.Command) error {
|
|||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
slog.InfoContext(ctx, "HTTP API server", slog.Any("addr", l.Addr()))
|
slog.InfoContext(ctx, "HTTP API server", slog.Any("addr", l.Addr()))
|
||||||
|
defer cancel()
|
||||||
err := srv.Serve(l)
|
err := srv.Serve(l)
|
||||||
if err == http.ErrServerClosed {
|
if err == http.ErrServerClosed {
|
||||||
return
|
return
|
||||||
@ -189,15 +191,40 @@ func cliJoin(ctx context.Context, cmd *cli.Command) error {
|
|||||||
}()
|
}()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
t.Stop()
|
t.Stop()
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
return srv.Shutdown(ctx)
|
return srv.Shutdown(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func cliLeave(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
|
func cliLeave(ctx context.Context, cmd *cli.Command) error {
|
||||||
func cliLookup(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
|
// I'm not really clear on why this command takes a startup IP and port.
|
||||||
func cliPut(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
|
// All we need is the node to tell to leave.
|
||||||
func cliGet(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
|
n, err := netip.ParseAddrPort(cmd.String("n"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
|
||||||
|
return cl.SayBye(ctx, chord.Address(n))
|
||||||
|
}
|
||||||
|
|
||||||
|
func cliLookup(ctx context.Context, cmd *cli.Command) error {
|
||||||
|
n, err := netip.ParseAddrPort(cmd.String("n"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
k := chord.Key(cmd.String("k"))
|
||||||
|
cl := &httpnode.Client{HTTP: http.Client{Timeout: 5 * time.Second}}
|
||||||
|
p, err := cl.FindSuccessor(ctx, chord.Address(n), k)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, addr := p.Values()
|
||||||
|
fmt.Println(addr)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func cliPut(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
|
||||||
|
func cliGet(ctx context.Context, cmd *cli.Command) error { return errors.New("not implemented") }
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||||
|
13
test.bash
13
test.bash
@ -16,4 +16,17 @@ THIRD=$!
|
|||||||
|
|
||||||
sleep 5
|
sleep 5
|
||||||
# Each node logs its predecessor and successors. At this point, we see the ring.
|
# Each node logs its predecessor and successors. At this point, we see the ring.
|
||||||
|
|
||||||
|
# Test lookup.
|
||||||
|
# First check that the lookup is independent of the node we ask.
|
||||||
|
./chord-node lookup -k key1 -n 127.0.0.1:3000
|
||||||
|
./chord-node lookup -k key1 -n 127.0.0.1:3001
|
||||||
|
./chord-node lookup -k key1 -n 127.0.0.1:3002
|
||||||
|
# Now check that we get some different nodes for different keys.
|
||||||
|
./chord-node lookup -k key2 -n 127.0.0.1:3000
|
||||||
|
./chord-node lookup -k key3 -n 127.0.0.1:3000
|
||||||
|
|
||||||
|
# Test leaving.
|
||||||
|
./chord-node leave -n 127.0.0.1:3000
|
||||||
|
|
||||||
kill $FIRST $SECOND $THIRD
|
kill $FIRST $SECOND $THIRD
|
||||||
|
Loading…
Reference in New Issue
Block a user