Skip to content

Commit

Permalink
fix for #61
Browse files Browse the repository at this point in the history
  • Loading branch information
dsukhoroslov committed Sep 1, 2016
1 parent 1f6f165 commit fd02103
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@

<hz:topic name="xdm-health"/>
<hz:topic name="xdm-counters"/>
<hz:topic name="xdm-population"/>

<hz:replicatedmap name="xdm-query" in-memory-format="BINARY" replication-delay-millis="0"/>
<hz:replicatedmap name="dict-index" in-memory-format="BINARY" replication-delay-millis="0"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ private void initialize(HazelcastInstance hzInstance) {
this.hzInstance = hzInstance;
ITopic<Counter> cTopic = hzInstance.getTopic(TPN_XDM_COUNTERS);
cTopic.addMessageListener(this);
ITopic<Long> pTopic = hzInstance.getTopic(TPN_XDM_POPULATION);
pTopic.addMessageListener(new PopulationStateListener());
hTopic = hzInstance.getTopic(TPN_XDM_HEALTH);
xddCache = hzInstance.getMap(CN_XDM_DOCUMENT);
hzInstance.getPartitionService().addPartitionLostListener(this);
Expand All @@ -75,7 +77,7 @@ private void updateState(Counter counter) {

private void checkState() {
int docSize = xddCache.size();
logger.trace("checkStats; active count: {}; inactive count: {}; cache size: {}", cntActive, cntInactive, docSize);
logger.trace("checkState; active count: {}; inactive count: {}; cache size: {}", cntActive, cntInactive, docSize);
long fullSize = cntActive.get() + cntInactive.get();
HealthState hState;
if (fullSize < docSize - thLow) {
Expand Down Expand Up @@ -126,6 +128,18 @@ public void partitionLost(PartitionLostEvent event) {
checkState();
}

private class PopulationStateListener implements MessageListener<Long> {

@Override
public void onMessage(Message<Long> message) {
int lo = (int) message.getMessageObject().longValue();
int hi = (int) (message.getMessageObject().longValue() >> 32);
initState(lo, hi);
// now we can disconnect, actually..
}

}

/*
@Override
public void checkClusterState() throws XDMException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,23 +142,28 @@ public void shutdown(boolean terminate) {
}
}

public void checkPopulation(int currentSize) {
public void checkPopulation(int currentSize) throws Exception {
logger.info("checkPopulation; populationSize: {}; currentSize: {}", populationSize, currentSize);
if (enabled) {
activateDocStore();
xddCache.addEntryListener(this, true);
}
if (populationSize == currentSize && xddCache.size() == 0) {
SchemaPopulator pop = new SchemaPopulator(schemaName);
// try to run it from the same thread..
nodeEngine.getHazelcastInstance().getExecutorService(PN_XDM_SCHEMA_POOL).submitToMember(pop, nodeEngine.getLocalMember());
//nodeEngine.getHazelcastInstance().getExecutorService(PN_XDM_SCHEMA_POOL).submitToAllMembers(pop);
//pop.call();
}
}

public Document getDocument(Long docKey) {
return enabled ? docStore.getEntry(docKey) : null;
}

public int getActiveCount() {
return enabled ? docStore.getActiveEntryCount() : 0;
}

public int getDocumentCount() {
return enabled ? docStore.getFullEntryCount() : 0;
}
Expand Down Expand Up @@ -192,12 +197,6 @@ private void activateDocStore() {

ApplicationContext schemaCtx = (ApplicationContext) getContext(schemaName, schema_context);
docMgr = schemaCtx.getBean(DocumentManagementImpl.class);

// only local HM should be notified!
HealthManagementImpl hMgr = schemaCtx.getBean(HealthManagementImpl.class);
int actCount = docStore.getActiveEntryCount();
int docCount = docStore.getFullEntryCount();
hMgr.initState(actCount, docCount - actCount);
}

private KeyFactory getKeyFactory() {
Expand Down Expand Up @@ -242,7 +241,7 @@ public void memberAdded(MembershipEvent membershipEvent) {

@Override
public void memberRemoved(MembershipEvent membershipEvent) {
logger.info("memberRemoved; event: {}; docs size: {}", membershipEvent, xddCache.size());
logger.info("memberRemoved; event: {}; docs size: {}; active count: {}", membershipEvent, xddCache.size(), docStore.getActiveEntryCount());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import static com.bagri.xdm.api.XDMException.ecTransNoNested;
import static com.bagri.xdm.api.XDMException.ecTransNotFound;
import static com.bagri.xdm.api.XDMException.ecTransWrongState;
import static com.bagri.xdm.cache.api.CacheConstants.CN_XDM_TRANSACTION;
import static com.bagri.xdm.cache.api.CacheConstants.PN_XDM_SCHEMA_POOL;
import static com.bagri.xdm.cache.api.CacheConstants.SQN_TRANSACTION;
import static com.bagri.xdm.cache.api.CacheConstants.TPN_XDM_COUNTERS;
import static com.bagri.xdm.cache.api.CacheConstants.*;

import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -51,13 +48,13 @@
public class TransactionManagementImpl implements TransactionManagement, StatisticsProvider, MultiExecutionCallback {

private static final Logger logger = LoggerFactory.getLogger(TransactionManagementImpl.class);
private static final long TX_START = 5L;
private static final long TX_START = 5L;

private ThreadLocal<Long> thTx = new ThreadLocal<Long>() {

@Override
protected Long initialValue() {
return TransactionManagement.TX_NO;
return TX_NO;
}

};
Expand Down Expand Up @@ -90,8 +87,7 @@ public void setHzInstance(HazelcastInstance hzInstance) {
txGen = new IdGeneratorImpl(hzInstance.getAtomicLong(SQN_TRANSACTION));
txGen.adjust(TX_START);
cTopic = hzInstance.getTopic(TPN_XDM_COUNTERS);
//execService = hzInstance.getExecutorService(PN_XDM_SCHEMA_POOL);
execService = hzInstance.getExecutorService("xdm-trans-pool");
execService = hzInstance.getExecutorService(PN_XDM_TRANS_POOL);
}

public long getTransactionTimeout() {
Expand Down Expand Up @@ -201,7 +197,8 @@ private void cleanAffectedDocuments(Transaction xTx) {
// asynchronous cleaning..
//execService.submitToAllMembers(new DocumentCleaner(xTx), this);

// synchronous cleaning.. causes a deadlock!
// synchronous cleaning.. causes a deadlock if used from the common schema exec-pool.
// that is why we use separate exec-pool for transaction tasks
Map<Member, Future<Transaction>> values = execService.submitToAllMembers(new DocumentCleaner(xTx));
Transaction txClean = null;
for (Future<Transaction> value: values.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.bagri.xdm.common.Constants.xdm_schema_store_data_path;
import static com.bagri.xdm.common.Constants.xdm_schema_store_tx_buffer_size;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
Expand Down Expand Up @@ -63,14 +64,19 @@ public void init(HazelcastInstance hazelcastInstance, Properties properties, Str
int size = bit2pos(bSize);
try {
raf = new RandomAccessFile(fileName, "rw");
} catch (FileNotFoundException ex) {
throw new RuntimeException("Path " + fileName + " does not exists", ex);
}

int txCount = 0;
try {
if (raf.length() > 0) {
logger.info("init; opened tx log with length: {}", raf.length());
// not sure we have to do this..
if (raf.length() > size) {
size = (int) raf.length();
}
}
int txCount = 0;
fc = raf.getChannel();
buff = fc.map(MapMode.READ_WRITE, 0, size);
if (raf.length() > 0) {
Expand All @@ -79,13 +85,11 @@ public void init(HazelcastInstance hazelcastInstance, Properties properties, Str
} else {
transactions = new HashMap<>();
}
logger.info("init; tx buffer initialized; tx count: {}", txCount);
loadTransactions(txCount);
} catch (IOException ex) {
logger.error("init.error", ex);
throw new RuntimeException("Cannot initialize Transaction Store", ex);
}

logger.info("init; tx buffer initialized; tx count: {}", txCount);
loadTransactions(txCount);
instance = this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@

import static com.bagri.xdm.cache.api.CacheConstants.CN_XDM_DOCUMENT;
import static com.bagri.xdm.cache.api.CacheConstants.CN_XDM_TRANSACTION;
import static com.bagri.xdm.cache.api.CacheConstants.TPN_XDM_POPULATION;
import static com.bagri.xdm.cache.hazelcast.util.SpringContextHolder.*;
import static com.bagri.xdm.cache.hazelcast.serialize.DataSerializationFactoryImpl.cli_PopulateSchemaTask;

import java.util.concurrent.Callable;

import org.springframework.context.ApplicationContext;

import com.bagri.xdm.cache.hazelcast.impl.PopulationManagementImpl;
import com.bagri.xdm.cache.hazelcast.impl.TransactionManagementImpl;
import com.bagri.xdm.domain.Document;
import com.bagri.xdm.domain.Transaction;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ITopic;

public class SchemaPopulator extends SchemaProcessingTask implements Callable<Boolean> {

Expand All @@ -30,9 +33,8 @@ public SchemaPopulator(String schemaName) {
public Boolean call() throws Exception {
logger.debug("call.enter; schema: {}", schemaName);
boolean result = false;
// get hzInstance and close it...
// get hzInstance
HazelcastInstance hz = Hazelcast.getHazelcastInstanceByName(schemaName);
//hz = hzInstance;
if (hz != null) {
try {
// TODO: ensure that partitions migration has been already finished!
Expand All @@ -59,34 +61,25 @@ private boolean populateSchema(HazelcastInstance hz) {
return false;
}

//ApplicationContext storeCtx = (ApplicationContext) getContext(schemaName, store_context);
//if (storeCtx == null) {
// schema configured with no persistent store
// logger.debug("populateSchema.exit; No persistent store configured");
// return false;
//}

//MapLoader docCacheStore = storeCtx.getBean("docCacheStore", MapLoader.class);

//Properties props = new Properties();
//props.put("documentIdGenerator", schemaCtx.getBean("xdm.document"));
//props.put("keyFactory", schemaCtx.getBean(XDMFactoryImpl.class));
//props.put("xdmModel", schemaCtx.getBean(XDMModelManagement.class));
//props.put("xdmManager", schemaCtx.getBean(DocumentManagementImpl.class));
//((MapLoaderLifecycleSupport) docCacheStore).init(hz, props, CN_XDM_DOCUMENT);

IMap<Long, Document> xddCache = hz.getMap(CN_XDM_DOCUMENT);
xddCache.loadAll(false);
logger.info("populateSchema; documents size after loadAll: {}", xddCache.size());

IMap<Long, Transaction> xtxCache = hz.getMap(CN_XDM_TRANSACTION);
xtxCache.loadAll(false);
logger.info("populateSchema; transactions size after loadAll: {}", xtxCache.size());

IMap<Long, Document> xddCache = hz.getMap(CN_XDM_DOCUMENT);
xddCache.loadAll(false);
logger.info("populateSchema; documents size after loadAll: {}", xddCache.size());

// adjusting tx idGen!
TransactionManagementImpl txMgr = schemaCtx.getBean("txManager", TransactionManagementImpl.class);
txMgr.adjustTxCounter();


ITopic<Long> pTopic = hz.getTopic(TPN_XDM_POPULATION);
PopulationManagementImpl pm = (PopulationManagementImpl) hz.getUserContext().get("popManager");
int lo = pm.getActiveCount();
int hi = pm.getDocumentCount() - lo;
long counts = ((long) hi << 32) + lo;
pTopic.publish(counts);

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@

<hz:topic name="xdm-health"/>
<hz:topic name="xdm-counters"/>
<hz:topic name="xdm-population"/>

<hz:replicatedmap name="xdm-query" in-memory-format="BINARY" replication-delay-millis="0"/>
<hz:replicatedmap name="dict-index" in-memory-format="BINARY" replication-delay-millis="0"/>
Expand Down
3 changes: 2 additions & 1 deletion bagri-xdm/src/main/java/com/bagri/xdm/api/ResultCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,15 @@ public interface ResultCursor extends AutoCloseable {

/**
*
* @param props result production properties
* @return transforms currently selected item to String
* @throws XDMException in case of data access error
*/
String getItemAsString(Properties props) throws XDMException;

/**
*
* @return true if cursor has a List of results fetched, false otherwise
* @return true if cursor has a static List of results fetched, false otherwise
*/
boolean isFixed();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public class CacheConstants {
*/
public static final String PN_XDM_SCHEMA_POOL = "xdm-exec-pool";

/**
* XDM Schema cluster execution service
*/
public static final String PN_XDM_TRANS_POOL = "xdm-trans-pool";

/**
* XDM System cluster execution service
*/
Expand All @@ -149,4 +154,9 @@ public class CacheConstants {
*/
public static final String TPN_XDM_COUNTERS = "xdm-counters";

/**
* Population notification topic
*/
public static final String TPN_XDM_POPULATION = "xdm-population";

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,59 @@
import com.bagri.xdm.common.DocumentKey;

/**
* Helper component which manages distributed population behavior
* Helper component which manages distributed population behavior. For cases when there is no a simple mechanism to
* construct Document's natural identifier (URI) from the internal Document's key this component manages mapping between
* DocumentKeys and URIs.
*
* @author Denis Sukhoroslov
*
*/
public interface PopulationManagement {

/**
* Returns stored mapping for the key provided
*
* @param key
* @return
* @param key the internal Document key
* @return URI of mapping found
*/
String getKeyMapping(DocumentKey key);

/**
* Stores provided key/mapping pair
*
* @param key
* @param mapping
* @param key the internal Document key
* @param mapping the Document uri
*/
void setKeyMapping(DocumentKey key, String mapping);

/**
* provide mappings between internal DocumentKey structure and Document's URI
* Return mappings between internal DocumentKey structure and Document's URI
*
* @param keys DocumentKey's
* @param keys the collection of DocumentKey's
* @return mappings for the keys provided
*/
Map<DocumentKey, String> getKeyMappings(Set<DocumentKey> keys);

/**
* store mappings
* Store mappings provided as a Map
*
* @param mappings
* @param mappings the Map of key/uri pairs
*/
void setKeyMappings(Map<DocumentKey, String> mappings);

/**
* Deletes mapping identified by key
*
* @param key
* @return
* @param key the internal Document key
* @return the URI of deleted mapping
*/
String deleteKeyMapping(DocumentKey key);

/**
* Delete mappings identified by keys
*
* @param keys
* @return
* @param keys the Collection of internal Document keys
* @return the number of deleted mappings
*/
int deleteKeyMappings(Set<DocumentKey> keys);

Expand Down
Loading

0 comments on commit fd02103

Please sign in to comment.