Skip to content

Commit

Permalink
[Java] Change role to leader after winning election so leadership can…
Browse files Browse the repository at this point in the history
… be asserted during replay or replication.

(cherry picked from commit 796d050)
  • Loading branch information
mjpt777 authored and vyazelenko committed Nov 9, 2023
1 parent 30bc137 commit 5f90f54
Showing 1 changed file with 15 additions and 34 deletions.
49 changes: 15 additions & 34 deletions aeron-cluster/src/main/java/io/aeron/cluster/Election.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ boolean isLeaderStartup()
return isLeaderStartup;
}

int thisMemberId()
{
return thisMember.id();
}

int doWork(final long nowNs)
{
int workCount = 0;
Expand Down Expand Up @@ -309,17 +314,9 @@ void onCanvassPosition(

if (logLeadershipTermId < this.leadershipTermId)
{
switch (state)
if (Cluster.Role.LEADER == consensusModuleAgent.role())
{
case LEADER_LOG_REPLICATION:
case LEADER_INIT:
case LEADER_READY:
case LEADER_REPLAY:
publishNewLeadershipTerm(follower, logLeadershipTermId, ctx.clusterClock().time());
break;

default:
break;
publishNewLeadershipTerm(follower, logLeadershipTermId, ctx.clusterClock().time());
}
}
else if (logLeadershipTermId > this.leadershipTermId)
Expand Down Expand Up @@ -663,15 +660,6 @@ private int init(final long nowNs)
return 1;
}

private void prepareForNewLeadership(final long nowNs)
{
final long lastAppendPosition = consensusModuleAgent.prepareForNewLeadership(logPosition, nowNs);
if (NULL_POSITION != lastAppendPosition)
{
appendPosition = lastAppendPosition;
}
}

private int canvass(final long nowNs)
{
int workCount = 0;
Expand Down Expand Up @@ -790,14 +778,6 @@ private int followerBallot(final long nowNs)
return workCount;
}

/**
* Leader log replication must wait until we have consensus on the leaders append position. However,
* we want to be careful about updating the commit position as this will cause the clustered service to progress
* forward to early.
*
* @param nowNs current time
* @return work done
*/
private int leaderLogReplication(final long nowNs)
{
int workCount = 0;
Expand Down Expand Up @@ -917,7 +897,7 @@ private int followerLogReplication(final long nowNs)
if (replicationCommitPosition >= appendPosition)
{
ConsensusModuleAgent.logReplicationEnded(
thisMemberId(),
thisMember.id(),
"ELECTION",
logReplication.srcArchiveChannel(),
logReplication.recordingId(),
Expand Down Expand Up @@ -1313,11 +1293,8 @@ private void state(final ElectionState newState, final long nowNs)
break;

case LEADER_LOG_REPLICATION:
logSessionId = consensusModuleAgent.addLogPublication(appendPosition);
break;

case LEADER_INIT:
consensusModuleAgent.role(Cluster.Role.LEADER);
logSessionId = consensusModuleAgent.addLogPublication(appendPosition);
break;

case FOLLOWER_LOG_REPLICATION:
Expand Down Expand Up @@ -1486,9 +1463,13 @@ private void logStateChange(
*/
}

int thisMemberId()
private void prepareForNewLeadership(final long nowNs)
{
return thisMember.id();
final long lastAppendPosition = consensusModuleAgent.prepareForNewLeadership(logPosition, nowNs);
if (NULL_POSITION != lastAppendPosition)
{
appendPosition = lastAppendPosition;
}
}

public String toString()
Expand Down

0 comments on commit 5f90f54

Please sign in to comment.