diff --git a/client.go b/client.go new file mode 100644 index 0000000..3654cf3 --- /dev/null +++ b/client.go @@ -0,0 +1,323 @@ +// Package cubby is a Go client for cubby, the tiny shared in-memory cache. +// +// Typical use: +// +// c, err := cubby.Dial("/tmp/cubby.sock") +// if err != nil { ... } +// defer c.Close() +// +// c.Set("hello", "world", 0) +// v, ok, _ := c.Get("hello") +// +// A Client wraps a single connection and is NOT safe for concurrent use. +// For concurrent callers, use a Pool. +package cubby + +import ( + "bufio" + "errors" + "fmt" + "net" + "strconv" + "strings" + "sync" + "time" +) + +// Client is a single connection to a cubby server. Not safe for concurrent +// use — wrap with a Pool if multiple goroutines need access. +type Client struct { + conn net.Conn + r *bufio.Reader + w *bufio.Writer +} + +// Dial connects to the cubby server at the given Unix socket path. +func Dial(socketPath string) (*Client, error) { + conn, err := net.Dial("unix", socketPath) + if err != nil { + return nil, fmt.Errorf("cubby: dial %s: %w", socketPath, err) + } + return &Client{ + conn: conn, + r: bufio.NewReader(conn), + w: bufio.NewWriter(conn), + }, nil +} + +// Close closes the underlying connection. Calling Close more than once is safe. +func (c *Client) Close() error { + if c.conn == nil { + return nil + } + err := c.conn.Close() + c.conn = nil + return err +} + +// SetDeadline sets read/write deadlines on the underlying connection. +// Pass a zero time.Time to clear. +func (c *Client) SetDeadline(t time.Time) error { + return c.conn.SetDeadline(t) +} + +// Ping sends PING and returns an error if the server doesn't reply with PONG. +func (c *Client) Ping() error { + if err := c.send("PING"); err != nil { + return err + } + line, err := c.readLine() + if err != nil { + return err + } + if line != "+PONG" { + return fmt.Errorf("cubby: unexpected ping reply: %q", line) + } + return nil +} + +// Get fetches a key. The bool reports whether the key was present; +// a missing key is not an error. +func (c *Client) Get(key string) (string, bool, error) { + if err := c.send("GET", key); err != nil { + return "", false, err + } + line, err := c.readLine() + if err != nil { + return "", false, err + } + switch { + case line == "_": + return "", false, nil + case strings.HasPrefix(line, "$"): + v, err := strconv.Unquote(line[1:]) + if err != nil { + return "", false, fmt.Errorf("cubby: bad value encoding: %w", err) + } + return v, true, nil + case strings.HasPrefix(line, "-"): + return "", false, fmt.Errorf("cubby: %s", line[1:]) + default: + return "", false, fmt.Errorf("cubby: unexpected get reply: %q", line) + } +} + +// Set stores a value. Pass ttl=0 for no expiry. +// +// TTLs are rounded up to the nearest second (cubby's protocol is +// second-granular). A non-zero ttl below one second becomes one second. +func (c *Client) Set(key, value string, ttl time.Duration) error { + args := []string{"SET", key, value} + if ttl > 0 { + secs := int((ttl + time.Second - 1) / time.Second) // round up + args = append(args, strconv.Itoa(secs)) + } + if err := c.send(args...); err != nil { + return err + } + return c.expectOK() +} + +// Del removes a key. Returns true if the key was present. +func (c *Client) Del(key string) (bool, error) { + if err := c.send("DEL", key); err != nil { + return false, err + } + n, err := c.readCount() + if err != nil { + return false, err + } + return n > 0, nil +} + +// Keys returns all live keys. For a busy cache this can be a lot — use +// sparingly. +func (c *Client) Keys() ([]string, error) { + if err := c.send("KEYS"); err != nil { + return nil, err + } + n, err := c.readCount() + if err != nil { + return nil, err + } + out := make([]string, 0, n) + for range n { + line, err := c.readLine() + if err != nil { + return nil, err + } + if !strings.HasPrefix(line, "$") { + return nil, fmt.Errorf("cubby: expected value, got %q", line) + } + k, err := strconv.Unquote(line[1:]) + if err != nil { + return nil, fmt.Errorf("cubby: bad key encoding: %w", err) + } + out = append(out, k) + } + return out, nil +} + +// ----- Internals ----- + +// send writes a single command. Arguments containing whitespace, quotes, +// or non-printable bytes are Go-quoted; bare arguments are sent as-is. +// The command verb (args[0]) is never quoted. +func (c *Client) send(args ...string) error { + if c.conn == nil { + return errors.New("cubby: client is closed") + } + var b strings.Builder + for i, a := range args { + if i > 0 { + b.WriteByte(' ') + } + if i == 0 || isBareSafe(a) { + b.WriteString(a) + } else { + b.WriteString(strconv.Quote(a)) + } + } + b.WriteByte('\n') + if _, err := c.w.WriteString(b.String()); err != nil { + return err + } + return c.w.Flush() +} + +// isBareSafe reports whether s can be sent without Go-quoting. +// We're conservative: only printable ASCII, no whitespace, no quote chars. +func isBareSafe(s string) bool { + if s == "" { + return false + } + for i := 0; i < len(s); i++ { + ch := s[i] + if ch < 0x21 || ch > 0x7e || ch == '"' || ch == '\\' { + return false + } + } + return true +} + +func (c *Client) readLine() (string, error) { + line, err := c.r.ReadString('\n') + if err != nil { + return "", err + } + return strings.TrimRight(line, "\r\n"), nil +} + +func (c *Client) expectOK() error { + line, err := c.readLine() + if err != nil { + return err + } + switch { + case line == "+OK": + return nil + case strings.HasPrefix(line, "-"): + return fmt.Errorf("cubby: %s", line[1:]) + default: + return fmt.Errorf("cubby: unexpected reply: %q", line) + } +} + +func (c *Client) readCount() (int, error) { + line, err := c.readLine() + if err != nil { + return 0, err + } + if strings.HasPrefix(line, "-") { + return 0, fmt.Errorf("cubby: %s", line[1:]) + } + if !strings.HasPrefix(line, "#") { + return 0, fmt.Errorf("cubby: expected count, got %q", line) + } + return strconv.Atoi(line[1:]) +} + +// ----- Pool ----- + +// Pool is a small connection pool for concurrent callers. It dials on demand +// and caps the number of idle connections it keeps around. +type Pool struct { + socket string + maxIdle int + mu sync.Mutex + idle []*Client + closed bool +} + +// NewPool creates a pool that dials socketPath on demand and keeps up to +// maxIdle connections cached. maxIdle <= 0 means "don't cache, dial each time." +func NewPool(socketPath string, maxIdle int) *Pool { + return &Pool{socket: socketPath, maxIdle: maxIdle} +} + +// Get returns a client from the pool, dialing a new one if none are idle. +// Call Put to return it (or Close it directly to drop it). +func (p *Pool) Get() (*Client, error) { + p.mu.Lock() + if p.closed { + p.mu.Unlock() + return nil, errors.New("cubby: pool is closed") + } + if n := len(p.idle); n > 0 { + c := p.idle[n-1] + p.idle = p.idle[:n-1] + p.mu.Unlock() + return c, nil + } + p.mu.Unlock() + return Dial(p.socket) +} + +// Put returns a client to the pool. If the pool is full or the client looks +// broken, the client is closed instead. Pass the error from your last call so +// Put can decide whether to keep the connection. +func (p *Pool) Put(c *Client, lastErr error) { + if c == nil { + return + } + if lastErr != nil { + _ = c.Close() + return + } + p.mu.Lock() + if p.closed || len(p.idle) >= p.maxIdle { + p.mu.Unlock() + _ = c.Close() + return + } + p.idle = append(p.idle, c) + p.mu.Unlock() +} + +// Close shuts down all idle connections. In-flight clients held by callers +// are not affected; closing the pool just prevents new Gets and drops the +// idle cache. +func (p *Pool) Close() error { + p.mu.Lock() + p.closed = true + idle := p.idle + p.idle = nil + p.mu.Unlock() + for _, c := range idle { + _ = c.Close() + } + return nil +} + +// Do is a convenience that gets a client, runs fn, and returns it to the pool. +// Use this for one-shot operations; for multi-step sequences, hold the client +// yourself with Get/Put. +func (p *Pool) Do(fn func(*Client) error) error { + c, err := p.Get() + if err != nil { + return err + } + err = fn(c) + p.Put(c, err) + return err +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..2135fab --- /dev/null +++ b/client_test.go @@ -0,0 +1,181 @@ +package cubby + +import ( + "os" + "path/filepath" + "strconv" + "sync" + "testing" + "time" +) + +// startServer launches the cubby server binary as a subprocess for the test. +// Returns the socket path. Caller doesn't need to clean up; t.Cleanup handles it. +// +// We can't import the server's main package, so this test exercises the wire +// protocol via a real subprocess. That's actually what we want — it catches +// any drift between client and server. +func startServer(t *testing.T) string { + t.Helper() + + dir := t.TempDir() + sock := filepath.Join(dir, "cubby.sock") + + // Build the server binary into the temp dir. + bin := filepath.Join(dir, "cubby") + if out, err := runCmd(t, "go", "build", "-o", bin, "./cmd/cubby"); err != nil { + t.Fatalf("build server: %v\n%s", err, out) + } + + // Start the server. + srv := startCmd(t, bin, "-socket", sock) + t.Cleanup(func() { + _ = srv.Process.Signal(os.Interrupt) + _, _ = srv.Process.Wait() + }) + + // Wait for the socket to appear. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if _, err := os.Stat(sock); err == nil { + return sock + } + time.Sleep(20 * time.Millisecond) + } + t.Fatal("server didn't start in time") + return "" +} + +func TestBasic(t *testing.T) { + sock := startServer(t) + c, err := Dial(sock) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + if err := c.Ping(); err != nil { + t.Fatal(err) + } + + if err := c.Set("k", "v", 0); err != nil { + t.Fatal(err) + } + v, ok, err := c.Get("k") + if err != nil || !ok || v != "v" { + t.Fatalf("Get(k) = %q,%v,%v; want v,true,nil", v, ok, err) + } + + // Missing key: ok=false, no error. + _, ok, err = c.Get("nope") + if err != nil || ok { + t.Fatalf("Get(nope) = _,%v,%v; want _,false,nil", ok, err) + } +} + +func TestValueWithSpacesAndNewlines(t *testing.T) { + sock := startServer(t) + c, err := Dial(sock) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + tricky := "hello world\nline two\twith tab\nand a quote: \"" + if err := c.Set("trick", tricky, 0); err != nil { + t.Fatal(err) + } + got, ok, err := c.Get("trick") + if err != nil || !ok || got != tricky { + t.Fatalf("round-trip failed:\n got %q\nwant %q\n err=%v ok=%v", got, tricky, err, ok) + } +} + +func TestTTL(t *testing.T) { + sock := startServer(t) + c, err := Dial(sock) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + if err := c.Set("ephemeral", "soon-gone", time.Second); err != nil { + t.Fatal(err) + } + // Right away: present. + _, ok, _ := c.Get("ephemeral") + if !ok { + t.Fatal("expected key present immediately after set") + } + // After TTL: gone. Pad a bit for janitor + clock granularity. + time.Sleep(1500 * time.Millisecond) + _, ok, _ = c.Get("ephemeral") + if ok { + t.Fatal("expected key to expire") + } +} + +func TestDelAndKeys(t *testing.T) { + sock := startServer(t) + c, err := Dial(sock) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + for _, k := range []string{"a", "b", "c"} { + if err := c.Set(k, k+"-val", 0); err != nil { + t.Fatal(err) + } + } + keys, err := c.Keys() + if err != nil || len(keys) != 3 { + t.Fatalf("Keys() = %v,%v; want 3 keys", keys, err) + } + + existed, err := c.Del("b") + if err != nil || !existed { + t.Fatalf("Del(b) = %v,%v; want true,nil", existed, err) + } + existed, err = c.Del("b") // again — already gone + if err != nil || existed { + t.Fatalf("Del(b) twice = %v,%v; want false,nil", existed, err) + } +} + +func TestPoolConcurrent(t *testing.T) { + sock := startServer(t) + pool := NewPool(sock, 4) + defer pool.Close() + + const workers = 16 + const ops = 50 + var wg sync.WaitGroup + errCh := make(chan error, workers) + + for w := range workers { + wg.Add(1) + go func(w int) { + defer wg.Done() + for i := range ops { + err := pool.Do(func(c *Client) error { + key := "w" + strconv.Itoa(w) + "-" + strconv.Itoa(i) + if err := c.Set(key, "x", 0); err != nil { + return err + } + _, _, err := c.Get(key) + return err + }) + if err != nil { + errCh <- err + return + } + } + }(w) + } + wg.Wait() + close(errCh) + for err := range errCh { + t.Fatal(err) + } +} diff --git a/server.go b/cmd/cubby/main.go similarity index 100% rename from server.go rename to cmd/cubby/main.go diff --git a/testhelpers_test.go b/testhelpers_test.go new file mode 100644 index 0000000..ec0e12c --- /dev/null +++ b/testhelpers_test.go @@ -0,0 +1,20 @@ +package cubby + +import ( + "os/exec" + "testing" +) + +func runCmd(t *testing.T, name string, args ...string) ([]byte, error) { + t.Helper() + return exec.Command(name, args...).CombinedOutput() +} + +func startCmd(t *testing.T, name string, args ...string) *exec.Cmd { + t.Helper() + cmd := exec.Command(name, args...) + if err := cmd.Start(); err != nil { + t.Fatalf("start %s: %v", name, err) + } + return cmd +}