LINE (Large-scale Information Network Embedding) algorithm is one of the well-known algorithms in the field of Network Embedding. It embeds graph data into vector space as to use vertor-based machine learning algorithm to handle graph datas.
The LINE algorithm is a network representation learning algorithm(also be considered as a preprocessing algorithm for graph data). The algorithm recieve a network as input and, produces the vector representation for each node. The LINE algorithm mainly focuses on optimizing two objective functions:
where, p_1 characterizes the first-order similarity between nodes (direct edge), and p_2 depicts the second-order similarity between nodes (similar neighbors). in other words,
- If there are joints between two nodes, then the two nodes are also close in the embedded vector space
- If the neighbors of two nodes are similar, then in the embedded vector space, the two nodes are also close
For more details, please refer to the paper [1]
There are currently two completely different implementation versions in Spark On Angel: LINE V1 and LINE V2. Each of these two methods has advantages and disadvantages and application scenarios.
In LINE V1, we push the training data to the PS and then training the model in PS. It can avoid a large amount of network IO caused by pulling the model, but it is not suitable for scenes with smaller node embedding dimensions, because it has some limitations: the number of model partitions must be smaller than the node embedding dimension, and the node embedding dimension must be the integer number of partitions. If the node dimension is small, the number of model partitions must be very small, PS is easy to become a bottleneck.
in LINE V2, we pull model from PS to Worker, then training the model in Worker, so it is suitable for the case where the node embedding dimension is not very high. In addition, LINE V2 is much more stable. In general, we strongly recommend LINE V2.
The implementation of the LINE algorithm refers to Yahoo's paper [2], which
The model is divided by column, it means that each partition contains all nodes.
-
split the Embedding vector into multiple PSs by dimension
-
process the dot product between nodes partially in each PS, and merge in spark executors.
-
calculate the gradient for each node in executors
-
push the gradient to all PS, and then update vectors
The model is divided by node id range, it means that each partition contains part of node
-
negative sample and pull the nodes embedding from PSs
-
process the dot product between nodes in executors
-
calculate the gradient for each node in executors
-
push the gradient to all PS, and then update nodes embedding
- input: hdfs path, undirected graph, nodes need to be consecutively numbered starting from 0, separated by white space or comma, for example: 0 2 2 1 3 1 3 2 4 1
- output: hdfs path, the final model save path is output/epoch_checkpoint_x, where x represents the xth round epoch
- saveModelInterval: save the model every few rounds of epoch
- checkpointInterval: write the model checkpoint every few rounds of epoch
- embedding: The vector space dimension of the embedding vector and the vector dimension of the context (meaning that the model space occupied by the second-order optimization is twice the first-order optimization under the same parameters)
- negative: The algorithm samples negative sampling optimization, indicating the number of negative sampling nodes used by each pair
- stepSize: The learning rate affects the results of the algorithm
- batchSize: the size of each mini batch
- epoch: the number of rounds used by the sample, the sample will be shuffled after each round
- order: Optimize the order, 1 or 2
- subSample sub sample or not, true or false
- remapping: remapping the node id or not, true or false
Several steps must be done before editing the submitting script and running.
- confirm Hadoop and Spark have ready in your environment
- unzip sona--bin.zip to local directory (SONA_HOME)
- upload sona--bin directory to HDFS (SONA_HDFS_HOME)
- Edit $SONA_HOME/bin/spark-on-angel-env.sh, set SPARK_HOME, SONA_HOME, SONA_HDFS_HOME and ANGEL_VERSION
Here's an example of submitting scripts, remember to adjust the parameters and fill in the paths according to your own task.
HADOOP_HOME=my-hadoop-home
input=hdfs://my-hdfs/data
output=hdfs://my-hdfs/model
queue=my-queue
export HADOOP_HOME=$HADOOP_HOME
source ./bin/spark-on-angel-env.sh
$SPARK_HOME/bin/spark-submit \
--master yarn-cluster\
--conf spark.yarn.allocation.am.maxMemory=55g \
--conf spark.yarn.allocation.executor.maxMemory=55g \
--conf spark.driver.maxResultSize=20g \
--conf spark.kryoserializer.buffer.max=2000m\
--conf spark.ps.instances=10 \
--conf spark.ps.cores=4 \
--conf spark.ps.jars=$SONA_ANGEL_JARS \
--conf spark.ps.memory=50g \
--conf spark.ps.log.level=INFO \
--conf spark.offline.evaluate=200 \
--conf spark.hadoop.angel.model.partitioner.max.partition.number=1000\
--conf spark.hadoop.angel.ps.backup.interval.ms=2000000000 \
--conf spark.hadoop.angel.matrixtransfer.request.timeout.ms=60000\
--conf spark.hadoop.angel.ps.jvm.direct.factor.use.direct.buff=0.20\
--queue $queue \
--name "line angel" \
--jars $SONA_SPARK_JARS \
--driver-memory 15g \
--num-executors 10 \
--executor-cores 3 \
--executor-memory 15g \
--class org.apache.spark.angel.examples.graph.LINEExample2 \
./lib/angelml-$SONA_VERSION.jar \
input:$input output:$output embedding:32 negative:5 epoch:10 stepSize:0.01 batchSize:1000 numParts:500 subSample:false remapping:false order:2 saveModelInterval:1 checkpointInterval:1
Note that there are two versions of LINE, if you want to run the first version, set the above "class" parameter to "org.apache.spark.angel.examples.graph.LINEExample".