Skip to content

Commit

Permalink
(fix:pd) become a leader before the addLeaderStateListener (#1062)
Browse files Browse the repository at this point in the history
Co-authored-by: 叶晓炜 <[email protected]>
  • Loading branch information
ye-xiaowei and 叶晓炜 authored Jan 3, 2024
1 parent 1e9590e commit ffe1abf
Showing 1 changed file with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
*/
package com.alipay.sofa.jraft.rhea;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.rhea.options.RegionEngineOptions;
import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions;
import com.alipay.sofa.jraft.rhea.util.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -128,11 +133,14 @@ public synchronized boolean init(final PlacementDriverServerOptions opts) {
final RheaKVStoreOptions rheaOpts = opts.getRheaKVStoreOptions();
Requires.requireNonNull(rheaOpts, "opts.rheaKVStoreOptions");
this.rheaKVStore = new DefaultRheaKVStore();
this.placementDriverService = new DefaultPlacementDriverService(this.rheaKVStore);
// Set up a listener before becoming a leader
this.rheaKVStore.addLeaderStateListener(getPdReginId(rheaOpts),
((DefaultPlacementDriverService) this.placementDriverService));
if (!this.rheaKVStore.init(rheaOpts)) {
LOG.error("Fail to init [RheaKVStore].");
return false;
}
this.placementDriverService = new DefaultPlacementDriverService(this.rheaKVStore);
if (!this.placementDriverService.init(opts)) {
LOG.error("Fail to init [PlacementDriverService].");
return false;
Expand All @@ -147,13 +155,42 @@ public synchronized boolean init(final PlacementDriverServerOptions opts) {
throw new IllegalArgumentException("Only support single region for [PlacementDriverServer]");
}
this.regionEngine = regionEngines.get(0);
this.rheaKVStore.addLeaderStateListener(this.regionEngine.getRegion().getId(),
((DefaultPlacementDriverService) this.placementDriverService));
addPlacementDriverProcessor(storeEngine.getRpcServer());
LOG.info("[PlacementDriverServer] start successfully, options: {}.", opts);
return this.started = true;
}

private long getPdReginId(final RheaKVStoreOptions rheaOpts) {
StoreEngineOptions storeEngineOptions = rheaOpts.getStoreEngineOptions();
Requires.requireNonNull(storeEngineOptions, "storeEngineOptions");
List<RegionEngineOptions> rOptsList = storeEngineOptions.getRegionEngineOptionsList();
if (rOptsList == null || rOptsList.isEmpty()) {
return Constants.DEFAULT_REGION_ID;
}
List<RegionEngineOptions> filteredOptsList = new ArrayList<>();
for (RegionEngineOptions rOpts : rOptsList) {
if (inConfiguration(rOpts.getServerAddress().toString(), rOpts.getInitialServerList())) {
filteredOptsList.add(rOpts);
}
}
if (filteredOptsList.size() > 1) {
throw new IllegalArgumentException("Only support single region for [PlacementDriverServer]");
}
return rOptsList.get(0).getRegionId();
}

private boolean inConfiguration(final String curr, final String all) {
final PeerId currPeer = new PeerId();
if (!currPeer.parse(curr)) {
return false;
}
final Configuration allConf = new Configuration();
if (!allConf.parse(all)) {
return false;
}
return allConf.contains(currPeer) || allConf.getLearners().contains(currPeer);
}

@Override
public synchronized void shutdown() {
if (!this.started) {
Expand Down

0 comments on commit ffe1abf

Please sign in to comment.