feat: account deletion
This commit is contained in:
parent
4769760b93
commit
2db260f849
20 changed files with 785 additions and 29 deletions
132
internal/repository/account_deletion_request.go
Normal file
132
internal/repository/account_deletion_request.go
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
package repository
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.juancwu.dev/juancwu/budgit/internal/model"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
var ErrAccountDeletionRequestNotFound = errors.New("account deletion request not found")
|
||||
|
||||
type AccountDeletionRequestRepository interface {
|
||||
CreateTx(tx *sqlx.Tx, req *model.AccountDeletionRequest) error
|
||||
HasPendingForUser(userID string) (bool, error)
|
||||
|
||||
// ClaimNextPending atomically transitions the oldest pending request to
|
||||
// "processing" and returns it. Returns ErrAccountDeletionRequestNotFound
|
||||
// when no pending row exists. Uses SKIP LOCKED so multiple workers can
|
||||
// safely run in parallel without colliding.
|
||||
ClaimNextPending() (*model.AccountDeletionRequest, error)
|
||||
|
||||
// MarkCompletedTx marks the request as completed within the same tx that
|
||||
// deletes the user's data, so the queue row and the data wipe always
|
||||
// agree.
|
||||
MarkCompletedTx(tx *sqlx.Tx, id string, spacesDeleted int) error
|
||||
|
||||
// MarkFailedRetryable records the error and returns the request to the
|
||||
// pending state so the next worker tick will retry it.
|
||||
MarkFailedRetryable(id string, errMsg string) error
|
||||
|
||||
// MarkFailedTerminal records the error and parks the request in the
|
||||
// failed state for human investigation. Called once attempts exceed the
|
||||
// retry budget.
|
||||
MarkFailedTerminal(id string, errMsg string) error
|
||||
}
|
||||
|
||||
type accountDeletionRequestRepository struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
func NewAccountDeletionRequestRepository(db *sqlx.DB) AccountDeletionRequestRepository {
|
||||
return &accountDeletionRequestRepository{db: db}
|
||||
}
|
||||
|
||||
func (r *accountDeletionRequestRepository) CreateTx(tx *sqlx.Tx, req *model.AccountDeletionRequest) error {
|
||||
_, err := tx.Exec(
|
||||
`INSERT INTO account_deletion_requests
|
||||
(id, user_id, email, name, reason, ip_address, status, attempts,
|
||||
requested_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9);`,
|
||||
req.ID, req.UserID, req.Email, req.Name, req.Reason, req.IPAddress,
|
||||
req.Status, req.Attempts, req.RequestedAt,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *accountDeletionRequestRepository) HasPendingForUser(userID string) (bool, error) {
|
||||
var n int
|
||||
err := r.db.Get(&n,
|
||||
`SELECT COUNT(*) FROM account_deletion_requests
|
||||
WHERE user_id = $1 AND status IN ('pending', 'processing');`,
|
||||
userID,
|
||||
)
|
||||
return n > 0, err
|
||||
}
|
||||
|
||||
func (r *accountDeletionRequestRepository) ClaimNextPending() (*model.AccountDeletionRequest, error) {
|
||||
var req model.AccountDeletionRequest
|
||||
err := r.db.Get(&req,
|
||||
`UPDATE account_deletion_requests
|
||||
SET status = 'processing',
|
||||
attempts = attempts + 1,
|
||||
updated_at = $1
|
||||
WHERE id = (
|
||||
SELECT id FROM account_deletion_requests
|
||||
WHERE status = 'pending'
|
||||
ORDER BY requested_at
|
||||
FOR UPDATE SKIP LOCKED
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING *;`,
|
||||
time.Now(),
|
||||
)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, ErrAccountDeletionRequestNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &req, nil
|
||||
}
|
||||
|
||||
func (r *accountDeletionRequestRepository) MarkCompletedTx(tx *sqlx.Tx, id string, spacesDeleted int) error {
|
||||
now := time.Now()
|
||||
_, err := tx.Exec(
|
||||
`UPDATE account_deletion_requests
|
||||
SET status = 'completed',
|
||||
spaces_deleted = $2,
|
||||
completed_at = $3,
|
||||
updated_at = $3,
|
||||
last_error = NULL
|
||||
WHERE id = $1;`,
|
||||
id, spacesDeleted, now,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *accountDeletionRequestRepository) MarkFailedRetryable(id string, errMsg string) error {
|
||||
_, err := r.db.Exec(
|
||||
`UPDATE account_deletion_requests
|
||||
SET status = 'pending',
|
||||
last_error = $2,
|
||||
updated_at = $3
|
||||
WHERE id = $1;`,
|
||||
id, errMsg, time.Now(),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *accountDeletionRequestRepository) MarkFailedTerminal(id string, errMsg string) error {
|
||||
_, err := r.db.Exec(
|
||||
`UPDATE account_deletion_requests
|
||||
SET status = 'failed',
|
||||
last_error = $2,
|
||||
updated_at = $3
|
||||
WHERE id = $1;`,
|
||||
id, errMsg, time.Now(),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"database/sql"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.juancwu.dev/juancwu/budgit/internal/model"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
|
@ -20,6 +21,11 @@ type UserRepository interface {
|
|||
ByEmail(email string) (*model.User, error)
|
||||
Update(user *model.User) error
|
||||
Delete(id string) error
|
||||
|
||||
// MarkPendingDeletionTx sets the pending_deletion_at flag inside the
|
||||
// supplied transaction. The flag is what middleware checks on every
|
||||
// request to lock the user out of any further actions.
|
||||
MarkPendingDeletionTx(tx *sqlx.Tx, userID string, at time.Time) error
|
||||
}
|
||||
|
||||
type userRepository struct {
|
||||
|
|
@ -77,6 +83,32 @@ func (r *userRepository) Update(user *model.User) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (r *userRepository) MarkPendingDeletionTx(tx *sqlx.Tx, userID string, at time.Time) error {
|
||||
res, err := tx.Exec(
|
||||
`UPDATE users SET pending_deletion_at = $1, updated_at = $1 WHERE id = $2 AND pending_deletion_at IS NULL;`,
|
||||
at, userID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rows, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rows == 0 {
|
||||
// Either the user does not exist or is already pending. Verify so we
|
||||
// can distinguish.
|
||||
var exists bool
|
||||
if err := tx.Get(&exists, `SELECT EXISTS(SELECT 1 FROM users WHERE id = $1);`, userID); err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
return ErrUserNotFound
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *userRepository) Delete(id string) error {
|
||||
query := `DELETE FROM users WHERE id = $1;`
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue