cubby/client.go
2026-04-29 12:36:08 +00:00

323 lines
7.5 KiB
Go

// Package cubby is a Go client for cubby, the tiny shared in-memory cache.
//
// Typical use:
//
// c, err := cubby.Dial("/tmp/cubby.sock")
// if err != nil { ... }
// defer c.Close()
//
// c.Set("hello", "world", 0)
// v, ok, _ := c.Get("hello")
//
// A Client wraps a single connection and is NOT safe for concurrent use.
// For concurrent callers, use a Pool.
package cubby
import (
"bufio"
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
)
// Client is a single connection to a cubby server. Not safe for concurrent
// use — wrap with a Pool if multiple goroutines need access.
type Client struct {
conn net.Conn
r *bufio.Reader
w *bufio.Writer
}
// Dial connects to the cubby server at the given Unix socket path.
func Dial(socketPath string) (*Client, error) {
conn, err := net.Dial("unix", socketPath)
if err != nil {
return nil, fmt.Errorf("cubby: dial %s: %w", socketPath, err)
}
return &Client{
conn: conn,
r: bufio.NewReader(conn),
w: bufio.NewWriter(conn),
}, nil
}
// Close closes the underlying connection. Calling Close more than once is safe.
func (c *Client) Close() error {
if c.conn == nil {
return nil
}
err := c.conn.Close()
c.conn = nil
return err
}
// SetDeadline sets read/write deadlines on the underlying connection.
// Pass a zero time.Time to clear.
func (c *Client) SetDeadline(t time.Time) error {
return c.conn.SetDeadline(t)
}
// Ping sends PING and returns an error if the server doesn't reply with PONG.
func (c *Client) Ping() error {
if err := c.send("PING"); err != nil {
return err
}
line, err := c.readLine()
if err != nil {
return err
}
if line != "+PONG" {
return fmt.Errorf("cubby: unexpected ping reply: %q", line)
}
return nil
}
// Get fetches a key. The bool reports whether the key was present;
// a missing key is not an error.
func (c *Client) Get(key string) (string, bool, error) {
if err := c.send("GET", key); err != nil {
return "", false, err
}
line, err := c.readLine()
if err != nil {
return "", false, err
}
switch {
case line == "_":
return "", false, nil
case strings.HasPrefix(line, "$"):
v, err := strconv.Unquote(line[1:])
if err != nil {
return "", false, fmt.Errorf("cubby: bad value encoding: %w", err)
}
return v, true, nil
case strings.HasPrefix(line, "-"):
return "", false, fmt.Errorf("cubby: %s", line[1:])
default:
return "", false, fmt.Errorf("cubby: unexpected get reply: %q", line)
}
}
// Set stores a value. Pass ttl=0 for no expiry.
//
// TTLs are rounded up to the nearest second (cubby's protocol is
// second-granular). A non-zero ttl below one second becomes one second.
func (c *Client) Set(key, value string, ttl time.Duration) error {
args := []string{"SET", key, value}
if ttl > 0 {
secs := int((ttl + time.Second - 1) / time.Second) // round up
args = append(args, strconv.Itoa(secs))
}
if err := c.send(args...); err != nil {
return err
}
return c.expectOK()
}
// Del removes a key. Returns true if the key was present.
func (c *Client) Del(key string) (bool, error) {
if err := c.send("DEL", key); err != nil {
return false, err
}
n, err := c.readCount()
if err != nil {
return false, err
}
return n > 0, nil
}
// Keys returns all live keys. For a busy cache this can be a lot — use
// sparingly.
func (c *Client) Keys() ([]string, error) {
if err := c.send("KEYS"); err != nil {
return nil, err
}
n, err := c.readCount()
if err != nil {
return nil, err
}
out := make([]string, 0, n)
for range n {
line, err := c.readLine()
if err != nil {
return nil, err
}
if !strings.HasPrefix(line, "$") {
return nil, fmt.Errorf("cubby: expected value, got %q", line)
}
k, err := strconv.Unquote(line[1:])
if err != nil {
return nil, fmt.Errorf("cubby: bad key encoding: %w", err)
}
out = append(out, k)
}
return out, nil
}
// ----- Internals -----
// send writes a single command. Arguments containing whitespace, quotes,
// or non-printable bytes are Go-quoted; bare arguments are sent as-is.
// The command verb (args[0]) is never quoted.
func (c *Client) send(args ...string) error {
if c.conn == nil {
return errors.New("cubby: client is closed")
}
var b strings.Builder
for i, a := range args {
if i > 0 {
b.WriteByte(' ')
}
if i == 0 || isBareSafe(a) {
b.WriteString(a)
} else {
b.WriteString(strconv.Quote(a))
}
}
b.WriteByte('\n')
if _, err := c.w.WriteString(b.String()); err != nil {
return err
}
return c.w.Flush()
}
// isBareSafe reports whether s can be sent without Go-quoting.
// We're conservative: only printable ASCII, no whitespace, no quote chars.
func isBareSafe(s string) bool {
if s == "" {
return false
}
for i := 0; i < len(s); i++ {
ch := s[i]
if ch < 0x21 || ch > 0x7e || ch == '"' || ch == '\\' {
return false
}
}
return true
}
func (c *Client) readLine() (string, error) {
line, err := c.r.ReadString('\n')
if err != nil {
return "", err
}
return strings.TrimRight(line, "\r\n"), nil
}
func (c *Client) expectOK() error {
line, err := c.readLine()
if err != nil {
return err
}
switch {
case line == "+OK":
return nil
case strings.HasPrefix(line, "-"):
return fmt.Errorf("cubby: %s", line[1:])
default:
return fmt.Errorf("cubby: unexpected reply: %q", line)
}
}
func (c *Client) readCount() (int, error) {
line, err := c.readLine()
if err != nil {
return 0, err
}
if strings.HasPrefix(line, "-") {
return 0, fmt.Errorf("cubby: %s", line[1:])
}
if !strings.HasPrefix(line, "#") {
return 0, fmt.Errorf("cubby: expected count, got %q", line)
}
return strconv.Atoi(line[1:])
}
// ----- Pool -----
// Pool is a small connection pool for concurrent callers. It dials on demand
// and caps the number of idle connections it keeps around.
type Pool struct {
socket string
maxIdle int
mu sync.Mutex
idle []*Client
closed bool
}
// NewPool creates a pool that dials socketPath on demand and keeps up to
// maxIdle connections cached. maxIdle <= 0 means "don't cache, dial each time."
func NewPool(socketPath string, maxIdle int) *Pool {
return &Pool{socket: socketPath, maxIdle: maxIdle}
}
// Get returns a client from the pool, dialing a new one if none are idle.
// Call Put to return it (or Close it directly to drop it).
func (p *Pool) Get() (*Client, error) {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil, errors.New("cubby: pool is closed")
}
if n := len(p.idle); n > 0 {
c := p.idle[n-1]
p.idle = p.idle[:n-1]
p.mu.Unlock()
return c, nil
}
p.mu.Unlock()
return Dial(p.socket)
}
// Put returns a client to the pool. If the pool is full or the client looks
// broken, the client is closed instead. Pass the error from your last call so
// Put can decide whether to keep the connection.
func (p *Pool) Put(c *Client, lastErr error) {
if c == nil {
return
}
if lastErr != nil {
_ = c.Close()
return
}
p.mu.Lock()
if p.closed || len(p.idle) >= p.maxIdle {
p.mu.Unlock()
_ = c.Close()
return
}
p.idle = append(p.idle, c)
p.mu.Unlock()
}
// Close shuts down all idle connections. In-flight clients held by callers
// are not affected; closing the pool just prevents new Gets and drops the
// idle cache.
func (p *Pool) Close() error {
p.mu.Lock()
p.closed = true
idle := p.idle
p.idle = nil
p.mu.Unlock()
for _, c := range idle {
_ = c.Close()
}
return nil
}
// Do is a convenience that gets a client, runs fn, and returns it to the pool.
// Use this for one-shot operations; for multi-step sequences, hold the client
// yourself with Get/Put.
func (p *Pool) Do(fn func(*Client) error) error {
c, err := p.Get()
if err != nil {
return err
}
err = fn(c)
p.Put(c, err)
return err
}