chord/chord/httpnode/client.go

231 lines
5.6 KiB
Go

package httpnode
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"log/slog"
"net/http"
"net/netip"
"net/url"
"path"
"strings"
"git.sunturtle.xyz/zephyr/chord/chord"
"github.com/go-json-experiment/json"
)
type Client struct {
// HTTP is the client used to make requests.
HTTP http.Client
// APIBase is the path under which the Chord API is served.
APIBase string
}
// FindSuccessor asks s to find a value and the peer that owns it.
func (cl *Client) FindSuccessor(ctx context.Context, s chord.Peer, id chord.ID) (chord.Peer, error) {
_, addr := s.Values()
if !addr.IsValid() {
return chord.Peer{}, errors.New("FindSuccessor with invalid peer")
}
url := url.URL{
Scheme: "http",
Host: addr.String(),
Path: path.Join("/", cl.APIBase, "find", id.String()),
}
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
if err != nil {
return chord.Peer{}, err
}
slog.InfoContext(ctx, "find", slog.String("url", url.String()))
resp, err := cl.HTTP.Do(req)
if err != nil {
return chord.Peer{}, err
}
p, err := readResponse[netip.AddrPort](resp)
if err != nil {
return chord.Peer{}, fmt.Errorf("%w (%s)", err, resp.Status)
}
slog.InfoContext(ctx, "found", slog.String("peer", p.String()))
return chord.Address(p), nil
}
func (cl *Client) Get(ctx context.Context, s chord.Peer, id chord.ID) (string, error) {
_, addr := s.Values()
url := url.URL{
Scheme: "http",
Host: addr.String(),
Path: path.Join("/", cl.APIBase, "key", id.String()),
}
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
if err != nil {
return "", nil
}
resp, err := cl.HTTP.Do(req)
if err != nil {
return "", err
}
switch resp.StatusCode {
case http.StatusOK:
v, err := readResponse[string](resp)
return v, err
case http.StatusNotFound:
return "", nil
default:
return "", errors.New(resp.Status)
}
}
func (cl *Client) Set(ctx context.Context, s chord.Peer, id chord.ID, v string) error {
_, addr := s.Values()
url := url.URL{
Scheme: "http",
Host: addr.String(),
Path: path.Join("/", cl.APIBase, "key", id.String()),
}
body := strings.NewReader(base64.StdEncoding.EncodeToString([]byte(v)))
req, err := http.NewRequestWithContext(ctx, "POST", url.String(), body)
if err != nil {
return err
}
resp, err := cl.HTTP.Do(req)
if err != nil {
return err
}
if resp.StatusCode >= 400 {
_, err := readResponse[struct{}](resp)
return err
}
return nil
}
// Notify tells s we believe n to be its predecessor.
func (cl *Client) Notify(ctx context.Context, n *chord.Node, s chord.Peer) error {
_, addr := s.Values()
if !addr.IsValid() {
return errors.New("Notify with invalid peer")
}
_, self := n.Self().Values()
url := url.URL{
Scheme: "http",
Host: addr.String(),
Path: path.Join("/", cl.APIBase, "pred"),
RawQuery: url.Values{"p": {self.String()}}.Encode(),
}
req, err := http.NewRequestWithContext(ctx, "POST", url.String(), nil)
if err != nil {
return err
}
resp, err := cl.HTTP.Do(req)
if err != nil {
return err
}
// We expect the server to only be capable of responding with No Content.
if resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("strange response: %s", resp.Status)
}
return nil
}
// Neighbors requests a peer's beliefs about its own neighbors.
func (cl *Client) Neighbors(ctx context.Context, p chord.Peer) (pred chord.Peer, succ []chord.Peer, err error) {
_, addr := p.Values()
if !addr.IsValid() {
return chord.Peer{}, nil, errors.New("Neighbors with invalid peer")
}
url := url.URL{
Scheme: "http",
Host: addr.String(),
Path: path.Join("/", cl.APIBase, "neighbors"),
}
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
if err != nil {
return chord.Peer{}, nil, err
}
resp, err := cl.HTTP.Do(req)
if err != nil {
return chord.Peer{}, nil, err
}
n, err := readResponse[neighbors](resp)
if err != nil {
return chord.Peer{}, nil, fmt.Errorf("%w (%s)", err, resp.Status)
}
pred = chord.Address(n.Pred)
succ = make([]chord.Peer, 0, len(n.Succ))
for _, addr := range n.Succ {
p := chord.Address(addr)
if p.IsValid() {
succ = append(succ, p)
}
}
return pred, succ, nil
}
// Bye tells p that its predecessor or successor is leaving
// and had the given successor list.
func (cl *Client) Bye(ctx context.Context, p, n chord.Peer, succ []chord.Peer) error {
_, peer := p.Values()
_, addr := n.Values()
if !peer.IsValid() || !addr.IsValid() {
return errors.New("Bye with invalid address")
}
s := make([]netip.AddrPort, 0, len(succ))
for _, p := range succ {
_, addr := p.Values()
if !addr.IsValid() {
continue
}
s = append(s, addr)
}
q := bye{Peer: peer, Succ: s}
b, err := json.Marshal(&q)
if err != nil {
return err
}
url := url.URL{
Scheme: "http",
Host: addr.String(),
Path: path.Join("/", cl.APIBase, "bye"),
}
req, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewReader(b))
if err != nil {
return err
}
resp, err := cl.HTTP.Do(req)
if err != nil {
return err
}
if resp.StatusCode >= 400 {
_, err := readResponse[struct{}](resp)
return err
}
return nil
}
// SayBye tells p to leave the network.
func (cl *Client) SayBye(ctx context.Context, p chord.Peer) error {
_, addr := p.Values()
if !addr.IsValid() {
return errors.New("SayBye with invalid peer")
}
url := url.URL{
Scheme: "http",
Host: addr.String(),
Path: path.Join("/", cl.APIBase),
}
req, err := http.NewRequestWithContext(ctx, "DELETE", url.String(), nil)
if err != nil {
return err
}
resp, err := cl.HTTP.Do(req)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return errors.New(resp.Status)
}
return nil
}