start work on node server
This commit is contained in:
parent
ae5f39fb34
commit
462a9783e9
@ -9,8 +9,6 @@ import (
|
||||
"net/url"
|
||||
"path"
|
||||
|
||||
"github.com/go-json-experiment/json"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/chord/chord"
|
||||
)
|
||||
|
||||
@ -41,14 +39,11 @@ func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID)
|
||||
if err != nil {
|
||||
return chord.Peer{}, err
|
||||
}
|
||||
var r response[netip.AddrPort]
|
||||
if err := json.UnmarshalRead(resp.Body, &r); err != nil {
|
||||
return chord.Peer{}, err
|
||||
p, err := readResponse[netip.AddrPort](resp)
|
||||
if err != nil {
|
||||
return chord.Peer{}, fmt.Errorf("%w (%s)", err, resp.Status)
|
||||
}
|
||||
if r.Error != "" {
|
||||
return chord.Peer{}, fmt.Errorf("%s (%s)", r.Error, resp.Status)
|
||||
}
|
||||
return chord.Address(*r.Data), nil
|
||||
return chord.Address(p), nil
|
||||
}
|
||||
|
||||
// Notify tells s we believe n to be its predecessor.
|
||||
|
@ -2,11 +2,43 @@
|
||||
// using JSON over HTTP.
|
||||
package httpnode
|
||||
|
||||
import "net/netip"
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
|
||||
type response[T any] struct {
|
||||
Data *T `json:"data,omitzero"`
|
||||
Error string `json:"error,omitzero"`
|
||||
"github.com/go-json-experiment/json"
|
||||
)
|
||||
|
||||
func writeOk[T any](w http.ResponseWriter, x T) {
|
||||
w.Header().Set("content-type", "application/json")
|
||||
r := struct {
|
||||
Data T `json:"data"`
|
||||
}{x}
|
||||
json.MarshalWrite(w, &r)
|
||||
}
|
||||
|
||||
func writeError(w http.ResponseWriter, status int, msg string) {
|
||||
w.Header().Set("content-type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
r := struct {
|
||||
Error string `json:"error"`
|
||||
}{msg}
|
||||
json.MarshalWrite(w, &r)
|
||||
}
|
||||
|
||||
func readResponse[T any](r *http.Response) (x T, err error) {
|
||||
var b struct {
|
||||
Data T `json:"data"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
if err := json.UnmarshalRead(r.Body, &b); err != nil {
|
||||
return x, err
|
||||
}
|
||||
if b.Error != "" {
|
||||
return x, errors.New(b.Error)
|
||||
}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type neighbors struct {
|
||||
|
75
chord/httpnode/server.go
Normal file
75
chord/httpnode/server.go
Normal file
@ -0,0 +1,75 @@
|
||||
package httpnode
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
|
||||
"git.sunturtle.xyz/zephyr/chord/chord"
|
||||
)
|
||||
|
||||
type Node struct {
|
||||
// self is the node topology this server represents.
|
||||
self *chord.Node
|
||||
// client is the Chord client for forwarding queries to other nodes.
|
||||
client chord.Client
|
||||
}
|
||||
|
||||
// New creates an instance of a Chord network that responds on HTTP.
|
||||
// The listener must be the same one used for the HTTP server that routes to
|
||||
// [Node.ServeHTTP]. It must be bound to a single interface and port.
|
||||
func New(l net.Listener, cl chord.Client) (*Node, error) {
|
||||
addr, err := netip.ParseAddrPort(l.Addr().String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if addr.Addr().IsUnspecified() || addr.Port() == 0 {
|
||||
return nil, errors.New("listener must be bound to a single interface and port")
|
||||
}
|
||||
n, err := chord.Create(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r := &Node{
|
||||
self: n,
|
||||
client: cl,
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// Router creates a handler for the Chord HTTP endpoints.
|
||||
func (n *Node) Router() http.Handler {
|
||||
m := http.NewServeMux()
|
||||
m.HandleFunc("GET /succ", n.successor)
|
||||
m.HandleFunc("POST /pred", n.notify)
|
||||
return m
|
||||
}
|
||||
|
||||
func (n *Node) successor(w http.ResponseWriter, r *http.Request) {
|
||||
s := r.FormValue("s")
|
||||
id, err := chord.ParseID(s)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
p, err := chord.Find(r.Context(), n.client, n.self, id)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
_, addr := p.Values()
|
||||
writeOk(w, addr)
|
||||
}
|
||||
|
||||
func (n *Node) notify(w http.ResponseWriter, r *http.Request) {
|
||||
// Another node is telling us they think they're our predecessor.
|
||||
s := r.FormValue("p")
|
||||
addr, err := netip.ParseAddrPort(s)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
np := chord.Address(addr)
|
||||
chord.Notify(n.self, np)
|
||||
}
|
Loading…
Reference in New Issue
Block a user