Skip to content

Commit

Permalink
chore: refactor acquirelock function (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
thevilledev authored Oct 29, 2024
1 parent 6f84385 commit c0377ab
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 55 deletions.
9 changes: 9 additions & 0 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ func isAWSErrorCode(err error, code string) bool {

return false
}

// Helper function for error handling in marshal operations.
func must(data []byte, err error) []byte {
if err != nil {
panic(err)
}

return data
}
115 changes: 60 additions & 55 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,139 +118,144 @@ func (m *Manager) getCurrentTerm() int64 {

// Check if lock is expired and try to acquire if it is.
func (m *Manager) acquireLock(ctx context.Context) error {
// Add grace period to prevent rapid failover
gracePeriod := m.gracePeriod
now := time.Now().Add(-gracePeriod)

// First check if there's an existing lock and if it's expired
// Check existing lock
currentLock, err := m.checkExistingLock(ctx, now)
if err != nil {
return err
}

// Prepare new lock info
lockInfo := m.prepareLockInfo(now, currentLock)

// Create attempt
attemptKey := fmt.Sprintf("%s.attempt.%s", m.lockKey, lockInfo.Version)
if err := m.createLockAttempt(ctx, attemptKey, lockInfo); err != nil {
return err
}

// Verify and acquire
if err := m.verifyAndAcquireLock(ctx, lockInfo, now); err != nil {
m.cleanupAttempt(ctx, attemptKey)

return err
}

// Update lease and cleanup
m.lease.UpdateLease(&lockInfo)
m.cleanupAttempt(ctx, attemptKey)

return nil
}

func (m *Manager) checkExistingLock(ctx context.Context, now time.Time) (*LockInfo, error) {
currentLock, err := m.GetLockInfo(ctx)
if err != nil && !errors.Is(err, ErrLockNotFound) {
return fmt.Errorf("failed to check existing lock: %w", err)
return nil, fmt.Errorf("failed to check existing lock: %w", err)
}

// If lock exists and is not expired, we can't acquire it
if err == nil && now.Before(currentLock.Expiry) {
return ErrLockExists
return nil, ErrLockExists
}

// Important: Check the last known term and ensure we advance it
var newTerm int64
if currentLock == nil {
newTerm = 1
} else {
return currentLock, nil
}

func (m *Manager) prepareLockInfo(now time.Time, currentLock *LockInfo) LockInfo {
// Set term
newTerm := int64(1)
if currentLock != nil {
newTerm = currentLock.Term + 1
}

m.term.Store(newTerm)

// Create new fence token and last known leader
// Prepare observers and leader info
existingObservers := make(map[string]observerInfo)
lastKnownLeader := ""
newFenceToken := int64(0)
existingObservers := make(map[string]observerInfo)

if currentLock != nil {
newFenceToken = currentLock.FenceToken + 1

lastKnownLeader = currentLock.Node
// Preserve existing observers but mark them as needing heartbeat renewal

for id, observer := range currentLock.Observers {
observer.IsActive = false // Require new heartbeat
observer.IsActive = false
existingObservers[id] = observer
}
}

// Lock doesn't exist or is expired, try to acquire it
newVersion := fmt.Sprintf("%d-%s-%d", now.UnixNano(), m.nodeID, newTerm)
lockInfo := LockInfo{
return LockInfo{
Node: m.nodeID,
Timestamp: now,
Expiry: now.Add(m.ttl),
Term: newTerm,
Version: newVersion,
Version: fmt.Sprintf("%d-%s-%d", now.UnixNano(), m.nodeID, newTerm),
FenceToken: newFenceToken,
LastKnownLeader: lastKnownLeader,
Observers: existingObservers,
}
}

func (m *Manager) createLockAttempt(ctx context.Context, attemptKey string, lockInfo LockInfo) error {
lockData, err := json.Marshal(lockInfo)
if err != nil {
return fmt.Errorf("%w: %w", ErrFailedToMarshalLockInfo, err)
}

// Create a new key with our attempt
attemptKey := fmt.Sprintf("%s.attempt.%s", m.lockKey, lockInfo.Version)

// First create our attempt atomically
input := &s3.PutObjectInput{
Bucket: aws.String(m.bucket),
Key: aws.String(attemptKey),
Body: bytes.NewReader(lockData),
ContentType: aws.String(jsonContentType),
IfNoneMatch: aws.String("*"), // Ensure atomic creation
ContentType: aws.String("application/json"),
IfNoneMatch: aws.String("*"),
}

_, err = m.s3Client.PutObject(ctx, input)
if err != nil {
if isAWSErrorCode(err, "PreconditionFailed") {
// Another node is also trying to acquire the lock
return ErrLockExists
}

return fmt.Errorf("failed to create lock attempt: %w", err)
}

// Successfully created our attempt, now verify we're still the most recent attempt
// and move it to the main lock key
return nil
}

func (m *Manager) verifyAndAcquireLock(ctx context.Context, lockInfo LockInfo, now time.Time) error {
afterAttempt, err := m.GetLockInfo(ctx)
if err != nil && !errors.Is(err, ErrLockNotFound) {
// Clean up our attempt
_, _ = m.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(m.bucket),
Key: aws.String(attemptKey),
})

return fmt.Errorf("failed to verify lock state: %w", err)
}

// If there's a valid lock now, someone beat us to it
if err == nil && now.Before(afterAttempt.Expiry) {
// Clean up our attempt
_, _ = m.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(m.bucket),
Key: aws.String(attemptKey),
})

return ErrLockExists
}

// Move our attempt to the main lock key
input = &s3.PutObjectInput{
input := &s3.PutObjectInput{
Bucket: aws.String(m.bucket),
Key: aws.String(m.lockKey),
Body: bytes.NewReader(lockData),
ContentType: aws.String(jsonContentType),
Body: bytes.NewReader(must(json.Marshal(lockInfo))),
ContentType: aws.String("application/json"),
}

_, err = m.s3Client.PutObject(ctx, input)
if err != nil {
// Clean up our attempt
_, _ = m.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(m.bucket),
Key: aws.String(attemptKey),
})

return fmt.Errorf("failed to acquire lock: %w", err)
}

// Update lease information
m.lease.UpdateLease(&lockInfo)
return nil
}

// Clean up our attempt
func (m *Manager) cleanupAttempt(ctx context.Context, attemptKey string) {
_, _ = m.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(m.bucket),
Key: aws.String(attemptKey),
})

return nil
}

// renewLock attempts to update the lock using atomic operations.
Expand Down

0 comments on commit c0377ab

Please sign in to comment.