Skip to content

Commit

Permalink
ignite: dist chmod optimization (LIKE '?%') (#340)
Browse files Browse the repository at this point in the history
  • Loading branch information
gangliao authored Sep 12, 2021
1 parent f8e34e0 commit 7107f41
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ bench/hdfs
bench/voltfs
ignite/*

ignite
filescale_init/ignite/work
*.pem

work_dir
8 changes: 8 additions & 0 deletions filescale_init/src/main/java/HdfsMetaInfoSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,14 @@ private HdfsMetaInfoSchema() throws SQLException {
} catch (SQLException ex) {
System.err.println(ex.getMessage());
}

if (env.equals("IGNITE")) {
IgniteCluster cluster = ignite_client.cluster();
cluster.active(true);
cluster.enableWal("inodes");
cluster.baselineAutoAdjustEnabled(false);
ignite_client.close();
}
}

public Connection getConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import org.apache.ignite.binary.BinaryObject;

public class PermissionsPayload {
public Set<BinaryObject> keys;
public String path;
public long permission;

public PermissionsPayload(Set<BinaryObject> keys, long permission) {
this.keys = keys;
public PermissionsPayload(String path, long permission) {
this.path = path;
this.permission = permission;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;

// Ignite does not allow updating a primary key because the latter defines a partition the key
// and its value belong to statically. While the partition with all its data can change several
// cluster owners, the key always belongs to a single partition. The partition is calculated
// using a hash function applied to the key’s value.
// Thus, if a key needs to be updated it has to be removed and then inserted.
public class RenameSubtreeINodes implements IgniteClosure<RenamePayload, String> {

@IgniteInstanceResource
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package org.apache.hadoop.hdfs.db.ignite;

import java.io.File;
import java.util.List;
import java.util.TreeSet;
import java.util.Set;
import java.util.Map;
import org.apache.ignite.IgniteCache;
import java.util.HashMap;
import java.util.HashSet;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;

public class SetPermissions implements IgniteClosure<PermissionsPayload, String> {

Expand All @@ -23,12 +32,51 @@ public class SetPermissions implements IgniteClosure<PermissionsPayload, String>
public String apply(PermissionsPayload payload) {
IgniteCache<BinaryObject, BinaryObject> inodesBinary = ignite.cache("inodes").withKeepBinary();

// Using EntryProcessor.invokeAll to set every permission value in place.
inodesBinary.invokeAll(payload.keys, (entry, object) -> {
BinaryObject inode = entry.getValue().toBuilder().setField("permission", payload.permission).build();
entry.setValue(inode);
return null;
});
File file = new File(payload.path);
String parent = file.getParent();
String name = file.getName();

Transaction tx = ignite.transactions().txStart(
TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);

// 1. query subtree inodes
List<Cache.Entry<BinaryObject, BinaryObject>> result;
ScanQuery<BinaryObject, BinaryObject> scanAddress = new ScanQuery<>(
new IgniteBiPredicate<BinaryObject, BinaryObject>() {
@Override
public boolean apply(BinaryObject binaryKey, BinaryObject binaryObject) {
return ((String)binaryKey.field("parentName")).startsWith(parent);
}
}
);
result = inodesBinary.query(scanAddress).getAll();

// 2. update subtree permission
Map<BinaryObject, BinaryObject> map = new HashMap<>();
BinaryObjectBuilder inodeKeyBuilder = ignite.binary().builder("InodeKey");
for (Cache.Entry<BinaryObject, BinaryObject> entry : result) {
BinaryObject inodeValue = entry.getValue();
inodeValue = inodeValue.toBuilder()
.setField("permission", payload.permission)
.build();
map.put(entry.getKey(), inodeValue);
}

// 3. update subtree to DB
inodesBinary.putAll(map);

BinaryObject rootKey = inodeKeyBuilder
.setField("parentName", parent)
.setField("name", name)
.build();
BinaryObject inodeValue = inodesBinary.get(rootKey);
inodeValue = inodeValue.toBuilder()
.setField("permission", payload.permission)
.build();
inodesBinary.put(rootKey, inodeValue);

tx.commit();
tx.close();

FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)(
((IgniteEx)ignite).context().cache().context().wal());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.apache.hadoop.hdfs.db.ignite;

import java.io.File;
import java.util.List;
import java.util.TreeSet;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.HashSet;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;


public class SetPermissionsV2 implements IgniteClosure<PermissionsPayload, String> {

@IgniteInstanceResource
private Ignite ignite;

@Override
public String apply(PermissionsPayload payload) {
IgniteCache<BinaryObject, BinaryObject> inodesBinary = ignite.cache("inodes").withKeepBinary();

File file = new File(payload.path);
String parent = file.getParent();
String name = file.getName();

Transaction tx = ignite.transactions().txStart(
TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);

inodesBinary.query(new SqlFieldsQuery("UPDATE inodes SET permission = ? WHERE parentName LIKE ?")
.setArgs(payload.permission, payload.path + "%")).getAll();
inodesBinary.query(new SqlFieldsQuery("UPDATE inodes SET permission = ? WHERE parentName = ? and name = ?")
.setArgs(payload.permission, parent, name)).getAll();

tx.commit();
tx.close();

FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)(
((IgniteEx)ignite).context().cache().context().wal());
return walMgr.lastWritePointer().toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.apache.hadoop.hdfs.db.ignite;

import java.io.File;
import java.util.List;
import java.util.TreeSet;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;

public class SetPermissionsV3 implements IgniteClosure<PermissionsPayload, String> {

@IgniteInstanceResource
private Ignite ignite;

@Override
public String apply(PermissionsPayload payload) {
IgniteCache<BinaryObject, BinaryObject> inodesBinary = ignite.cache("inodes").withKeepBinary();

File file = new File(payload.path);
String parent = file.getParent();
String name = file.getName();

Transaction tx = ignite.transactions().txStart(
TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);

// 1. query subtree inodes
ScanQuery<BinaryObject, BinaryObject> scanAddress = new ScanQuery<>(
new IgniteBiPredicate<BinaryObject, BinaryObject>() {
@Override
public boolean apply(BinaryObject binaryKey, BinaryObject binaryObject) {
return ((String)binaryKey.field("parentName")).startsWith(parent);
}
}
);
Iterator<Cache.Entry<BinaryObject, BinaryObject>> iterator = inodesBinary.
query(scanAddress).iterator();

// 2. update subtree permission
while (iterator.hasNext()) {
BinaryObject key = iterator.next().getKey();
BinaryObject value = inodesBinary.get(key);
value = value.toBuilder()
.setField("permission", payload.permission)
.build();
inodesBinary.put(key, value);
}

BinaryObjectBuilder inodeKeyBuilder = ignite.binary().builder("InodeKey");
BinaryObject rootKey = inodeKeyBuilder
.setField("parentName", parent)
.setField("name", name)
.build();
BinaryObject rootValue = inodesBinary.get(rootKey);
rootValue = rootValue.toBuilder()
.setField("permission", payload.permission)
.build();
inodesBinary.put(rootKey, rootValue);

tx.commit();
tx.close();

FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)(
((IgniteEx)ignite).context().cache().context().wal());
return walMgr.lastWritePointer().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.db.ignite.RenamePayload;
import org.apache.hadoop.hdfs.db.ignite.RenameSubtreeINodes;
import org.apache.hadoop.hdfs.db.ignite.SetPermissions;
import org.apache.hadoop.hdfs.db.ignite.SetPermissionsV2;
import org.apache.hadoop.hdfs.db.ignite.PermissionsPayload;

/**
Expand Down Expand Up @@ -571,7 +572,7 @@ private final void remoteChmod(String path, Set<Pair<String, String>> mpoints) {
} else if (database.equals("IGNITE")) {
IgniteCompute compute = conn.getIgniteClient().compute();
INodeKeyedObjects.setWalOffset(
compute.apply(new SetPermissions(), new PermissionsPayload(keys, this.permission))
compute.apply(new SetPermissionsV2(), new PermissionsPayload(path, this.permission))
);
}
String end = INodeKeyedObjects.getWalOffset();
Expand Down

0 comments on commit 7107f41

Please sign in to comment.