budgit/internal/repository/account_deletion_request.go
juancwu 43e6f76c01
All checks were successful
Deploy / build-and-deploy (push) Successful in 1m45s
feat: notify user on account deletion
2026-05-17 15:40:03 +00:00

164 lines
4.7 KiB
Go

package repository
import (
"database/sql"
"errors"
"time"
"git.juancwu.dev/juancwu/budgit/internal/model"
"github.com/jmoiron/sqlx"
)
var ErrAccountDeletionRequestNotFound = errors.New("account deletion request not found")
type AccountDeletionRequestRepository interface {
CreateTx(tx *sqlx.Tx, req *model.AccountDeletionRequest) error
ByID(id string) (*model.AccountDeletionRequest, error)
HasPendingForUser(userID string) (bool, error)
LatestForUser(userID string) (*model.AccountDeletionRequest, error)
// ClaimNextPending atomically transitions the oldest pending request to
// "processing" and returns it. Returns ErrAccountDeletionRequestNotFound
// when no pending row exists. Uses SKIP LOCKED so multiple workers can
// safely run in parallel without colliding.
ClaimNextPending() (*model.AccountDeletionRequest, error)
// MarkCompletedTx marks the request as completed within the same tx that
// deletes the user's data, so the queue row and the data wipe always
// agree.
MarkCompletedTx(tx *sqlx.Tx, id string, spacesDeleted int) error
// MarkFailedRetryable records the error and returns the request to the
// pending state so the next worker tick will retry it.
MarkFailedRetryable(id string, errMsg string) error
// MarkFailedTerminal records the error and parks the request in the
// failed state for human investigation. Called once attempts exceed the
// retry budget.
MarkFailedTerminal(id string, errMsg string) error
}
type accountDeletionRequestRepository struct {
db *sqlx.DB
}
func NewAccountDeletionRequestRepository(db *sqlx.DB) AccountDeletionRequestRepository {
return &accountDeletionRequestRepository{db: db}
}
func (r *accountDeletionRequestRepository) CreateTx(tx *sqlx.Tx, req *model.AccountDeletionRequest) error {
_, err := tx.Exec(
`INSERT INTO account_deletion_requests
(id, user_id, email, name, reason, ip_address, status, attempts,
requested_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9);`,
req.ID, req.UserID, req.Email, req.Name, req.Reason, req.IPAddress,
req.Status, req.Attempts, req.RequestedAt,
)
return err
}
func (r *accountDeletionRequestRepository) ByID(id string) (*model.AccountDeletionRequest, error) {
var req model.AccountDeletionRequest
err := r.db.Get(&req, `SELECT * FROM account_deletion_requests WHERE id = $1;`, id)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrAccountDeletionRequestNotFound
}
if err != nil {
return nil, err
}
return &req, nil
}
func (r *accountDeletionRequestRepository) LatestForUser(userID string) (*model.AccountDeletionRequest, error) {
var req model.AccountDeletionRequest
err := r.db.Get(&req,
`SELECT * FROM account_deletion_requests
WHERE user_id = $1
ORDER BY requested_at DESC
LIMIT 1;`,
userID,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrAccountDeletionRequestNotFound
}
if err != nil {
return nil, err
}
return &req, nil
}
func (r *accountDeletionRequestRepository) HasPendingForUser(userID string) (bool, error) {
var n int
err := r.db.Get(&n,
`SELECT COUNT(*) FROM account_deletion_requests
WHERE user_id = $1 AND status IN ('pending', 'processing');`,
userID,
)
return n > 0, err
}
func (r *accountDeletionRequestRepository) ClaimNextPending() (*model.AccountDeletionRequest, error) {
var req model.AccountDeletionRequest
err := r.db.Get(&req,
`UPDATE account_deletion_requests
SET status = 'processing',
attempts = attempts + 1,
updated_at = $1
WHERE id = (
SELECT id FROM account_deletion_requests
WHERE status = 'pending'
ORDER BY requested_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *;`,
time.Now(),
)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrAccountDeletionRequestNotFound
}
if err != nil {
return nil, err
}
return &req, nil
}
func (r *accountDeletionRequestRepository) MarkCompletedTx(tx *sqlx.Tx, id string, spacesDeleted int) error {
now := time.Now()
_, err := tx.Exec(
`UPDATE account_deletion_requests
SET status = 'completed',
spaces_deleted = $2,
completed_at = $3,
updated_at = $3,
last_error = NULL
WHERE id = $1;`,
id, spacesDeleted, now,
)
return err
}
func (r *accountDeletionRequestRepository) MarkFailedRetryable(id string, errMsg string) error {
_, err := r.db.Exec(
`UPDATE account_deletion_requests
SET status = 'pending',
last_error = $2,
updated_at = $3
WHERE id = $1;`,
id, errMsg, time.Now(),
)
return err
}
func (r *accountDeletionRequestRepository) MarkFailedTerminal(id string, errMsg string) error {
_, err := r.db.Exec(
`UPDATE account_deletion_requests
SET status = 'failed',
last_error = $2,
updated_at = $3
WHERE id = $1;`,
id, errMsg, time.Now(),
)
return err
}