Compare commits

...

2 Commits

Author SHA1 Message Date
Branden J Brown
462a9783e9 start work on node server 2025-03-11 08:52:20 -04:00
Branden J Brown
ae5f39fb34 implement more primitives 2025-03-11 08:51:46 -04:00
6 changed files with 141 additions and 17 deletions

View File

@ -22,6 +22,15 @@ type Client interface {
// considering we can sort by increasing distance from the origin and then do
// the query in linear time.
// Find finds the first node in the network preceding id.
func Find(ctx context.Context, cl Client, n *Node, id ID) (Peer, error) {
if n.IsLocal(id) {
return n.self, nil
}
p, err := cl.FindSuccessor(ctx, n.Closest(id), id)
return p, err
}
// Join creates a new node joining an existing Chord network by communicating
// with any peer already in the network.
func Join(ctx context.Context, cl Client, addr netip.AddrPort, np Peer) (*Node, error) {

View File

@ -9,8 +9,6 @@ import (
"net/url"
"path"
"github.com/go-json-experiment/json"
"git.sunturtle.xyz/zephyr/chord/chord"
)
@ -41,14 +39,11 @@ func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID)
if err != nil {
return chord.Peer{}, err
}
var r response[netip.AddrPort]
if err := json.UnmarshalRead(resp.Body, &r); err != nil {
return chord.Peer{}, err
p, err := readResponse[netip.AddrPort](resp)
if err != nil {
return chord.Peer{}, fmt.Errorf("%w (%s)", err, resp.Status)
}
if r.Error != "" {
return chord.Peer{}, fmt.Errorf("%s (%s)", r.Error, resp.Status)
}
return chord.Address(*r.Data), nil
return chord.Address(p), nil
}
// Notify tells s we believe n to be its predecessor.

View File

@ -2,11 +2,43 @@
// using JSON over HTTP.
package httpnode
import "net/netip"
import (
"errors"
"net/http"
"net/netip"
type response[T any] struct {
Data *T `json:"data,omitzero"`
Error string `json:"error,omitzero"`
"github.com/go-json-experiment/json"
)
func writeOk[T any](w http.ResponseWriter, x T) {
w.Header().Set("content-type", "application/json")
r := struct {
Data T `json:"data"`
}{x}
json.MarshalWrite(w, &r)
}
func writeError(w http.ResponseWriter, status int, msg string) {
w.Header().Set("content-type", "application/json")
w.WriteHeader(status)
r := struct {
Error string `json:"error"`
}{msg}
json.MarshalWrite(w, &r)
}
func readResponse[T any](r *http.Response) (x T, err error) {
var b struct {
Data T `json:"data"`
Error string `json:"error"`
}
if err := json.UnmarshalRead(r.Body, &b); err != nil {
return x, err
}
if b.Error != "" {
return x, errors.New(b.Error)
}
return x, nil
}
type neighbors struct {

75
chord/httpnode/server.go Normal file
View File

@ -0,0 +1,75 @@
package httpnode
import (
"errors"
"net"
"net/http"
"net/netip"
"git.sunturtle.xyz/zephyr/chord/chord"
)
type Node struct {
// self is the node topology this server represents.
self *chord.Node
// client is the Chord client for forwarding queries to other nodes.
client chord.Client
}
// New creates an instance of a Chord network that responds on HTTP.
// The listener must be the same one used for the HTTP server that routes to
// [Node.ServeHTTP]. It must be bound to a single interface and port.
func New(l net.Listener, cl chord.Client) (*Node, error) {
addr, err := netip.ParseAddrPort(l.Addr().String())
if err != nil {
return nil, err
}
if addr.Addr().IsUnspecified() || addr.Port() == 0 {
return nil, errors.New("listener must be bound to a single interface and port")
}
n, err := chord.Create(addr)
if err != nil {
return nil, err
}
r := &Node{
self: n,
client: cl,
}
return r, nil
}
// Router creates a handler for the Chord HTTP endpoints.
func (n *Node) Router() http.Handler {
m := http.NewServeMux()
m.HandleFunc("GET /succ", n.successor)
m.HandleFunc("POST /pred", n.notify)
return m
}
func (n *Node) successor(w http.ResponseWriter, r *http.Request) {
s := r.FormValue("s")
id, err := chord.ParseID(s)
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
p, err := chord.Find(r.Context(), n.client, n.self, id)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
_, addr := p.Values()
writeOk(w, addr)
}
func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
// Another node is telling us they think they're our predecessor.
s := r.FormValue("p")
addr, err := netip.ParseAddrPort(s)
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
np := chord.Address(addr)
chord.Notify(n.self, np)
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"crypto/sha1"
"encoding/hex"
"errors"
"net/netip"
)
@ -49,3 +50,12 @@ func (id ID) String() string {
b = hex.AppendEncode(b, id[:])
return string(b)
}
func ParseID(s string) (ID, error) {
if len(s) != len(ID{})*2 {
return ID{}, errors.New("invalid ID")
}
var id ID
_, err := hex.AppendDecode(id[:], []byte(s))
return id, err
}

View File

@ -26,16 +26,19 @@ func (n *Node) Successor() Peer {
return n.succ[0]
}
// Neighbors returns the node's predecessor and appends its successor list to s.
func (n *Node) Neighbors(s []Peer) (Peer, []Peer) {
return n.pred, append(s, n.succ...)
}
// IsLocal reports whether this node owns the given key.
func (n *Node) IsLocal(key string) bool {
id := keyID(key)
func (n *Node) IsLocal(id ID) bool {
return contains(n.self.id, n.Successor().id, id)
}
// Closest finds the locally known peer which is the closest predecessor of key.
func (n *Node) Closest(key string) Peer {
func (n *Node) Closest(id ID) Peer {
self := n.self.id
id := keyID(key)
l := n.fingers
for i := len(l) - 1; i >= 0; i-- {
f := l[i]