add a client to use cubby in go projects
This commit is contained in:
parent
5f9c7813df
commit
699aa30b07
4 changed files with 524 additions and 0 deletions
323
client.go
Normal file
323
client.go
Normal file
|
|
@ -0,0 +1,323 @@
|
||||||
|
// Package cubby is a Go client for cubby, the tiny shared in-memory cache.
|
||||||
|
//
|
||||||
|
// Typical use:
|
||||||
|
//
|
||||||
|
// c, err := cubby.Dial("/tmp/cubby.sock")
|
||||||
|
// if err != nil { ... }
|
||||||
|
// defer c.Close()
|
||||||
|
//
|
||||||
|
// c.Set("hello", "world", 0)
|
||||||
|
// v, ok, _ := c.Get("hello")
|
||||||
|
//
|
||||||
|
// A Client wraps a single connection and is NOT safe for concurrent use.
|
||||||
|
// For concurrent callers, use a Pool.
|
||||||
|
package cubby
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Client is a single connection to a cubby server. Not safe for concurrent
|
||||||
|
// use — wrap with a Pool if multiple goroutines need access.
|
||||||
|
type Client struct {
|
||||||
|
conn net.Conn
|
||||||
|
r *bufio.Reader
|
||||||
|
w *bufio.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dial connects to the cubby server at the given Unix socket path.
|
||||||
|
func Dial(socketPath string) (*Client, error) {
|
||||||
|
conn, err := net.Dial("unix", socketPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cubby: dial %s: %w", socketPath, err)
|
||||||
|
}
|
||||||
|
return &Client{
|
||||||
|
conn: conn,
|
||||||
|
r: bufio.NewReader(conn),
|
||||||
|
w: bufio.NewWriter(conn),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the underlying connection. Calling Close more than once is safe.
|
||||||
|
func (c *Client) Close() error {
|
||||||
|
if c.conn == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err := c.conn.Close()
|
||||||
|
c.conn = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetDeadline sets read/write deadlines on the underlying connection.
|
||||||
|
// Pass a zero time.Time to clear.
|
||||||
|
func (c *Client) SetDeadline(t time.Time) error {
|
||||||
|
return c.conn.SetDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping sends PING and returns an error if the server doesn't reply with PONG.
|
||||||
|
func (c *Client) Ping() error {
|
||||||
|
if err := c.send("PING"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
line, err := c.readLine()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if line != "+PONG" {
|
||||||
|
return fmt.Errorf("cubby: unexpected ping reply: %q", line)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get fetches a key. The bool reports whether the key was present;
|
||||||
|
// a missing key is not an error.
|
||||||
|
func (c *Client) Get(key string) (string, bool, error) {
|
||||||
|
if err := c.send("GET", key); err != nil {
|
||||||
|
return "", false, err
|
||||||
|
}
|
||||||
|
line, err := c.readLine()
|
||||||
|
if err != nil {
|
||||||
|
return "", false, err
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case line == "_":
|
||||||
|
return "", false, nil
|
||||||
|
case strings.HasPrefix(line, "$"):
|
||||||
|
v, err := strconv.Unquote(line[1:])
|
||||||
|
if err != nil {
|
||||||
|
return "", false, fmt.Errorf("cubby: bad value encoding: %w", err)
|
||||||
|
}
|
||||||
|
return v, true, nil
|
||||||
|
case strings.HasPrefix(line, "-"):
|
||||||
|
return "", false, fmt.Errorf("cubby: %s", line[1:])
|
||||||
|
default:
|
||||||
|
return "", false, fmt.Errorf("cubby: unexpected get reply: %q", line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set stores a value. Pass ttl=0 for no expiry.
|
||||||
|
//
|
||||||
|
// TTLs are rounded up to the nearest second (cubby's protocol is
|
||||||
|
// second-granular). A non-zero ttl below one second becomes one second.
|
||||||
|
func (c *Client) Set(key, value string, ttl time.Duration) error {
|
||||||
|
args := []string{"SET", key, value}
|
||||||
|
if ttl > 0 {
|
||||||
|
secs := int((ttl + time.Second - 1) / time.Second) // round up
|
||||||
|
args = append(args, strconv.Itoa(secs))
|
||||||
|
}
|
||||||
|
if err := c.send(args...); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return c.expectOK()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Del removes a key. Returns true if the key was present.
|
||||||
|
func (c *Client) Del(key string) (bool, error) {
|
||||||
|
if err := c.send("DEL", key); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
n, err := c.readCount()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return n > 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keys returns all live keys. For a busy cache this can be a lot — use
|
||||||
|
// sparingly.
|
||||||
|
func (c *Client) Keys() ([]string, error) {
|
||||||
|
if err := c.send("KEYS"); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
n, err := c.readCount()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out := make([]string, 0, n)
|
||||||
|
for range n {
|
||||||
|
line, err := c.readLine()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(line, "$") {
|
||||||
|
return nil, fmt.Errorf("cubby: expected value, got %q", line)
|
||||||
|
}
|
||||||
|
k, err := strconv.Unquote(line[1:])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cubby: bad key encoding: %w", err)
|
||||||
|
}
|
||||||
|
out = append(out, k)
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----- Internals -----
|
||||||
|
|
||||||
|
// send writes a single command. Arguments containing whitespace, quotes,
|
||||||
|
// or non-printable bytes are Go-quoted; bare arguments are sent as-is.
|
||||||
|
// The command verb (args[0]) is never quoted.
|
||||||
|
func (c *Client) send(args ...string) error {
|
||||||
|
if c.conn == nil {
|
||||||
|
return errors.New("cubby: client is closed")
|
||||||
|
}
|
||||||
|
var b strings.Builder
|
||||||
|
for i, a := range args {
|
||||||
|
if i > 0 {
|
||||||
|
b.WriteByte(' ')
|
||||||
|
}
|
||||||
|
if i == 0 || isBareSafe(a) {
|
||||||
|
b.WriteString(a)
|
||||||
|
} else {
|
||||||
|
b.WriteString(strconv.Quote(a))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.WriteByte('\n')
|
||||||
|
if _, err := c.w.WriteString(b.String()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return c.w.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
// isBareSafe reports whether s can be sent without Go-quoting.
|
||||||
|
// We're conservative: only printable ASCII, no whitespace, no quote chars.
|
||||||
|
func isBareSafe(s string) bool {
|
||||||
|
if s == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i := 0; i < len(s); i++ {
|
||||||
|
ch := s[i]
|
||||||
|
if ch < 0x21 || ch > 0x7e || ch == '"' || ch == '\\' {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) readLine() (string, error) {
|
||||||
|
line, err := c.r.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return strings.TrimRight(line, "\r\n"), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) expectOK() error {
|
||||||
|
line, err := c.readLine()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case line == "+OK":
|
||||||
|
return nil
|
||||||
|
case strings.HasPrefix(line, "-"):
|
||||||
|
return fmt.Errorf("cubby: %s", line[1:])
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("cubby: unexpected reply: %q", line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) readCount() (int, error) {
|
||||||
|
line, err := c.readLine()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(line, "-") {
|
||||||
|
return 0, fmt.Errorf("cubby: %s", line[1:])
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(line, "#") {
|
||||||
|
return 0, fmt.Errorf("cubby: expected count, got %q", line)
|
||||||
|
}
|
||||||
|
return strconv.Atoi(line[1:])
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----- Pool -----
|
||||||
|
|
||||||
|
// Pool is a small connection pool for concurrent callers. It dials on demand
|
||||||
|
// and caps the number of idle connections it keeps around.
|
||||||
|
type Pool struct {
|
||||||
|
socket string
|
||||||
|
maxIdle int
|
||||||
|
mu sync.Mutex
|
||||||
|
idle []*Client
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPool creates a pool that dials socketPath on demand and keeps up to
|
||||||
|
// maxIdle connections cached. maxIdle <= 0 means "don't cache, dial each time."
|
||||||
|
func NewPool(socketPath string, maxIdle int) *Pool {
|
||||||
|
return &Pool{socket: socketPath, maxIdle: maxIdle}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns a client from the pool, dialing a new one if none are idle.
|
||||||
|
// Call Put to return it (or Close it directly to drop it).
|
||||||
|
func (p *Pool) Get() (*Client, error) {
|
||||||
|
p.mu.Lock()
|
||||||
|
if p.closed {
|
||||||
|
p.mu.Unlock()
|
||||||
|
return nil, errors.New("cubby: pool is closed")
|
||||||
|
}
|
||||||
|
if n := len(p.idle); n > 0 {
|
||||||
|
c := p.idle[n-1]
|
||||||
|
p.idle = p.idle[:n-1]
|
||||||
|
p.mu.Unlock()
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
p.mu.Unlock()
|
||||||
|
return Dial(p.socket)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put returns a client to the pool. If the pool is full or the client looks
|
||||||
|
// broken, the client is closed instead. Pass the error from your last call so
|
||||||
|
// Put can decide whether to keep the connection.
|
||||||
|
func (p *Pool) Put(c *Client, lastErr error) {
|
||||||
|
if c == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if lastErr != nil {
|
||||||
|
_ = c.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.mu.Lock()
|
||||||
|
if p.closed || len(p.idle) >= p.maxIdle {
|
||||||
|
p.mu.Unlock()
|
||||||
|
_ = c.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.idle = append(p.idle, c)
|
||||||
|
p.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close shuts down all idle connections. In-flight clients held by callers
|
||||||
|
// are not affected; closing the pool just prevents new Gets and drops the
|
||||||
|
// idle cache.
|
||||||
|
func (p *Pool) Close() error {
|
||||||
|
p.mu.Lock()
|
||||||
|
p.closed = true
|
||||||
|
idle := p.idle
|
||||||
|
p.idle = nil
|
||||||
|
p.mu.Unlock()
|
||||||
|
for _, c := range idle {
|
||||||
|
_ = c.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do is a convenience that gets a client, runs fn, and returns it to the pool.
|
||||||
|
// Use this for one-shot operations; for multi-step sequences, hold the client
|
||||||
|
// yourself with Get/Put.
|
||||||
|
func (p *Pool) Do(fn func(*Client) error) error {
|
||||||
|
c, err := p.Get()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = fn(c)
|
||||||
|
p.Put(c, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
181
client_test.go
Normal file
181
client_test.go
Normal file
|
|
@ -0,0 +1,181 @@
|
||||||
|
package cubby
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// startServer launches the cubby server binary as a subprocess for the test.
|
||||||
|
// Returns the socket path. Caller doesn't need to clean up; t.Cleanup handles it.
|
||||||
|
//
|
||||||
|
// We can't import the server's main package, so this test exercises the wire
|
||||||
|
// protocol via a real subprocess. That's actually what we want — it catches
|
||||||
|
// any drift between client and server.
|
||||||
|
func startServer(t *testing.T) string {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
dir := t.TempDir()
|
||||||
|
sock := filepath.Join(dir, "cubby.sock")
|
||||||
|
|
||||||
|
// Build the server binary into the temp dir.
|
||||||
|
bin := filepath.Join(dir, "cubby")
|
||||||
|
if out, err := runCmd(t, "go", "build", "-o", bin, "./cmd/cubby"); err != nil {
|
||||||
|
t.Fatalf("build server: %v\n%s", err, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the server.
|
||||||
|
srv := startCmd(t, bin, "-socket", sock)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
_ = srv.Process.Signal(os.Interrupt)
|
||||||
|
_, _ = srv.Process.Wait()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Wait for the socket to appear.
|
||||||
|
deadline := time.Now().Add(2 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if _, err := os.Stat(sock); err == nil {
|
||||||
|
return sock
|
||||||
|
}
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
}
|
||||||
|
t.Fatal("server didn't start in time")
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBasic(t *testing.T) {
|
||||||
|
sock := startServer(t)
|
||||||
|
c, err := Dial(sock)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
if err := c.Ping(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.Set("k", "v", 0); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
v, ok, err := c.Get("k")
|
||||||
|
if err != nil || !ok || v != "v" {
|
||||||
|
t.Fatalf("Get(k) = %q,%v,%v; want v,true,nil", v, ok, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Missing key: ok=false, no error.
|
||||||
|
_, ok, err = c.Get("nope")
|
||||||
|
if err != nil || ok {
|
||||||
|
t.Fatalf("Get(nope) = _,%v,%v; want _,false,nil", ok, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValueWithSpacesAndNewlines(t *testing.T) {
|
||||||
|
sock := startServer(t)
|
||||||
|
c, err := Dial(sock)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
tricky := "hello world\nline two\twith tab\nand a quote: \""
|
||||||
|
if err := c.Set("trick", tricky, 0); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
got, ok, err := c.Get("trick")
|
||||||
|
if err != nil || !ok || got != tricky {
|
||||||
|
t.Fatalf("round-trip failed:\n got %q\nwant %q\n err=%v ok=%v", got, tricky, err, ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTTL(t *testing.T) {
|
||||||
|
sock := startServer(t)
|
||||||
|
c, err := Dial(sock)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
if err := c.Set("ephemeral", "soon-gone", time.Second); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// Right away: present.
|
||||||
|
_, ok, _ := c.Get("ephemeral")
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("expected key present immediately after set")
|
||||||
|
}
|
||||||
|
// After TTL: gone. Pad a bit for janitor + clock granularity.
|
||||||
|
time.Sleep(1500 * time.Millisecond)
|
||||||
|
_, ok, _ = c.Get("ephemeral")
|
||||||
|
if ok {
|
||||||
|
t.Fatal("expected key to expire")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDelAndKeys(t *testing.T) {
|
||||||
|
sock := startServer(t)
|
||||||
|
c, err := Dial(sock)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
for _, k := range []string{"a", "b", "c"} {
|
||||||
|
if err := c.Set(k, k+"-val", 0); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
keys, err := c.Keys()
|
||||||
|
if err != nil || len(keys) != 3 {
|
||||||
|
t.Fatalf("Keys() = %v,%v; want 3 keys", keys, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
existed, err := c.Del("b")
|
||||||
|
if err != nil || !existed {
|
||||||
|
t.Fatalf("Del(b) = %v,%v; want true,nil", existed, err)
|
||||||
|
}
|
||||||
|
existed, err = c.Del("b") // again — already gone
|
||||||
|
if err != nil || existed {
|
||||||
|
t.Fatalf("Del(b) twice = %v,%v; want false,nil", existed, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPoolConcurrent(t *testing.T) {
|
||||||
|
sock := startServer(t)
|
||||||
|
pool := NewPool(sock, 4)
|
||||||
|
defer pool.Close()
|
||||||
|
|
||||||
|
const workers = 16
|
||||||
|
const ops = 50
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
errCh := make(chan error, workers)
|
||||||
|
|
||||||
|
for w := range workers {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(w int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for i := range ops {
|
||||||
|
err := pool.Do(func(c *Client) error {
|
||||||
|
key := "w" + strconv.Itoa(w) + "-" + strconv.Itoa(i)
|
||||||
|
if err := c.Set(key, "x", 0); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, _, err := c.Get(key)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
errCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(w)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
close(errCh)
|
||||||
|
for err := range errCh {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
20
testhelpers_test.go
Normal file
20
testhelpers_test.go
Normal file
|
|
@ -0,0 +1,20 @@
|
||||||
|
package cubby
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os/exec"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func runCmd(t *testing.T, name string, args ...string) ([]byte, error) {
|
||||||
|
t.Helper()
|
||||||
|
return exec.Command(name, args...).CombinedOutput()
|
||||||
|
}
|
||||||
|
|
||||||
|
func startCmd(t *testing.T, name string, args ...string) *exec.Cmd {
|
||||||
|
t.Helper()
|
||||||
|
cmd := exec.Command(name, args...)
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
t.Fatalf("start %s: %v", name, err)
|
||||||
|
}
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue