Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for RDMA Atomics + Support for user-defined initiator and responder resources on connect/accept #51

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connect(

if (cm_listen_id != NULL) {
memset(&conn_param, 0, sizeof(conn_param));
conn_param.initiator_depth = dev_attr.max_qp_rd_atom;
conn_param.initiator_depth = dev_attr.max_qp_init_rd_atom;
conn_param.responder_resources = dev_attr.max_qp_rd_atom;
conn_param.retry_count = (unsigned char)retry;
conn_param.rnr_retry_count = (unsigned char)rnr_retry;
Expand All @@ -430,6 +430,41 @@ JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connect(
}
}

/*
* Class: com_ibm_disni_verbs_impl_NativeDispatcher
* Method: _connectV2
* Signature: (JJ)V
*/
JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connectV2(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the idea of multiple connect implementations. Why not only keep V2 and expose query_device to Java. This solves multiple problems at once: 1) setting initiator/responder resource without knowing what the max are 2) two implementations of the same function

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. However, since the libdisni is shipped separately, it may force some developers to recompile their java code which uses the library.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be ok. Developers can always choose to use an old version of the library if needed.

JNIEnv *env, jobject obj, jlong id, jlong param) {
struct rdma_cm_id *cm_listen_id = NULL;
struct rdma_conn_param *conn_param = NULL;

cm_listen_id = (struct rdma_cm_id *)id;
conn_param = (struct rdma_conn_param *)param;

if (cm_listen_id != NULL && conn_param!=NULL) {
int ret = rdma_connect(cm_listen_id, conn_param);
if (ret == 0) {
log("j2c::connect: ret %i, guid %" PRIu64 "\n", ret,
ibv_get_device_guid(cm_listen_id->verbs->device));
} else {
log("j2c::connect: rdma_connect failed\n");
JNU_ThrowIOExceptionWithLastError(env,
"j2c::connect: rdma_connect failed");
}
} else {
if(cm_listen_id == NULL){
log("j2c:connect: cm_listen_id null\n");
JNU_ThrowIOException(env, "j2c:connect: cm_listen_id null\n");
} else {
log("j2c:connect: conn_param null\n");
JNU_ThrowIOException(env, "j2c:connect: conn_param null\n");
}
}
}


/*
* Class: com_ibm_disni_verbs_impl_NativeDispatcher
* Method: _accept
Expand All @@ -446,7 +481,7 @@ JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1accept(

if (cm_listen_id != NULL) {
memset(&conn_param, 0, sizeof(conn_param));
conn_param.initiator_depth = dev_attr.max_qp_rd_atom;
conn_param.initiator_depth = dev_attr.max_qp_init_rd_atom;
conn_param.responder_resources = dev_attr.max_qp_rd_atom;
conn_param.retry_count = (unsigned char)retry;
conn_param.rnr_retry_count = (unsigned char)rnr_retry;
Expand All @@ -463,6 +498,40 @@ JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1accept(
}
}

/*
* Class: com_ibm_disni_verbs_impl_NativeDispatcher
* Method: _acceptV2
* Signature: (JJ)V
*/
JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1acceptV2(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment to connectV2

JNIEnv *env, jobject obj, jlong id, jlong param) {
struct rdma_cm_id *cm_listen_id = NULL;
struct rdma_conn_param *conn_param = NULL;

cm_listen_id = (struct rdma_cm_id *)id;
conn_param = (struct rdma_conn_param *)param;

if (cm_listen_id != NULL && conn_param!=NULL) {
int ret = rdma_accept(cm_listen_id, conn_param);
if (ret == 0) {
log("j2c::connect: ret %i, guid %" PRIu64 "\n", ret,
ibv_get_device_guid(cm_listen_id->verbs->device));
} else {
log("j2c::connect: rdma_connect failed\n");
JNU_ThrowIOExceptionWithLastError(env,
"j2c::connect: rdma_connect failed");
}
} else {
if(cm_listen_id == NULL){
log("j2c:connect: cm_listen_id null\n");
JNU_ThrowIOException(env, "j2c:connect: cm_listen_id null\n");
} else {
log("j2c:connect: conn_param null\n");
JNU_ThrowIOException(env, "j2c:connect: conn_param null\n");
}
}
}

/*
* Class: com_ibm_disni_verbs_impl_NativeDispatcher
* Method: _ackCmEvent
Expand Down
16 changes: 16 additions & 0 deletions libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.h
100755 → 100644

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/main/java/com/ibm/disni/RdmaEndpointGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ public RdmaConnParam getConnParam() {
return connParam;
}

public void setConnParam(RdmaConnParam connParam) {
this.connParam = connParam;
}

public synchronized void close() throws IOException, InterruptedException {
logger.info("shutting down group");
if (closed.get()){
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/ibm/disni/verbs/IbvSendWR.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public Rdma getRdma() {
}

/**
* Unsupported.
*
*
* @return the atomic
*/
Expand Down Expand Up @@ -322,7 +322,7 @@ public String getClassName() {
}

/**
* Unsupported.
*
*/
public static class Atomic {
protected long remote_addr;
Expand Down
27 changes: 25 additions & 2 deletions src/main/java/com/ibm/disni/verbs/RdmaConnParam.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
package com.ibm.disni.verbs;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

// TODO: Auto-generated Javadoc
//struct rdma_conn_param {
Expand Down Expand Up @@ -51,6 +53,8 @@ public class RdmaConnParam {
protected byte srq;
protected int qp_num;

public static int CSIZE = 24;

public RdmaConnParam() {
this.private_data_addr = 0;
this.private_data_len = 0;
Expand Down Expand Up @@ -111,7 +115,7 @@ public byte getResponder_resources() {
* @param responder_resources the new responder resources.
*/
public void setResponder_resources(byte responder_resources) throws IOException {
throw new IOException("Operation currently not supported");
this.responder_resources = responder_resources;
}

/**
Expand All @@ -129,7 +133,7 @@ public byte getInitiator_depth() {
* @param initiator_depth the new initiater depth.
*/
public void setInitiator_depth(byte initiator_depth) throws IOException {
throw new IOException("Operation currently not supported");
this.initiator_depth = initiator_depth;
}

/**
Expand Down Expand Up @@ -221,4 +225,23 @@ public int getQp_num() {
public void setQp_num(int qp_num) throws IOException {
throw new IOException("Operation currently not supported");
}


public void writeBack(ByteBuffer buffer) {
buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.putLong(private_data_addr);
TaranovK marked this conversation as resolved.
Show resolved Hide resolved
buffer.order(ByteOrder.nativeOrder());
buffer.put(private_data_len);
buffer.put(responder_resources);
buffer.put(initiator_depth);
buffer.put(flow_control);
buffer.put(retry_count);
buffer.put(rnr_retry_count);
buffer.put(srq);
buffer.put((byte)0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cq_num is u32 not u8, so this should be putInt(0)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I do not agree. Actually, buffer.put((byte)0); is a padding. cq_num field is 8 byte aligned. To add cq_num the code should be: buffer.put(srq); buffer.put((byte)0); buffer.putInt(cq_num);
I did not add cq_num as it is ignored anyway and does not cause any problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

}

public int size() {
return CSIZE;
}
}
78 changes: 74 additions & 4 deletions src/main/java/com/ibm/disni/verbs/impl/NatIbvSendWR.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,21 @@ public class NatIbvSendWR extends IbvSendWR implements SendWRMod {
public static int SENDFLAGS_OFFSET = 32;
// public static int IMMDATA_OFFSET = 36;
public static int REMOTEADDR_OFFSET = 40;
public static int ATOMIC_OFFSET = 40;
public static int RKEY_OFFSET = 48;

private NatPostSendCall postSendCall;
private int bufPosition;
private long next;
private long ptr_sge_list;
private NatRdma natRdma;
private NatAtomic natAtomic;

public NatIbvSendWR(NatPostSendCall postSendCall, NatRdma natRdma, IbvSendWR sendWR, LinkedList<IbvSge> sg_list) {
super(natRdma, null, null, sg_list);
public NatIbvSendWR(NatPostSendCall postSendCall, NatRdma natRdma, NatAtomic natAtomic,
IbvSendWR sendWR, LinkedList<IbvSge> sg_list) {
super(natRdma, natAtomic, null, sg_list);
this.natRdma = natRdma;
this.natAtomic = natAtomic;
this.next = 0;
this.ptr_sge_list = 0;

Expand All @@ -102,9 +106,16 @@ public void writeBack(ByteBuffer buffer) {
buffer.putInt(opcode);
buffer.putInt(send_flags);
buffer.putInt(imm_data);

if(opcode >= IBV_WR_ATOMIC_CMP_AND_SWP){
buffer.position(initialPos + NatIbvSendWR.ATOMIC_OFFSET);
natAtomic.writeBack(buffer);
} else {
buffer.position(initialPos + NatIbvSendWR.REMOTEADDR_OFFSET);
natRdma.writeBack(buffer);
}

buffer.position(initialPos + NatIbvSendWR.REMOTEADDR_OFFSET);
natRdma.writeBack(buffer);

int newPos = initialPos + CSIZE;
buffer.position(newPos);
}
Expand Down Expand Up @@ -205,4 +216,63 @@ public int getBufPosition() {
return bufPosition;
}
}

public static class NatAtomic extends IbvSendWR.Atomic {
private NatPostSendCall postSendCall;
private int bufPosition;

public NatAtomic(Atomic atomic, NatPostSendCall postSendCall){
this.remote_addr = atomic.getRemote_addr();
this.compare_add = atomic.getCompare_add();
this.swap = atomic.getSwap();
this.rkey = atomic.getRkey();
this.reserved = atomic.getReserved();

this.postSendCall = postSendCall;
}

@Override
public void setRemote_addr(long remote_addr) {
super.setRemote_addr(remote_addr);
postSendCall.setRemote_addr(this, 0);
}

@Override
public void setCompare_add(long compare_add) {
super.setCompare_add(compare_add);
postSendCall.setCompare_add(this, 8);
}

@Override
public void setSwap(long swap) {
super.setSwap(swap);
postSendCall.setSwap(this, 16);
}


@Override
public void setRkey(int rkey) {
super.setRkey(rkey);
postSendCall.setRkey(this, 24);
}

@Override
public void setReserved(int reserved) {
super.setReserved(reserved);
postSendCall.setReserved(this, 28);
}

public void writeBack(ByteBuffer buffer) {
this.bufPosition = buffer.position();
buffer.putLong(getRemote_addr());
buffer.putLong(getCompare_add());
buffer.putLong(getSwap());
buffer.putInt(getRkey());
buffer.putInt(getReserved());
}

public int getBufPosition() {
return bufPosition;
}
}
}
29 changes: 28 additions & 1 deletion src/main/java/com/ibm/disni/verbs/impl/NatPostSendCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.ibm.disni.verbs.IbvSge;
import com.ibm.disni.verbs.SVCPostSend;
import com.ibm.disni.verbs.impl.NatIbvSendWR.NatRdma;
import com.ibm.disni.verbs.impl.NatIbvSendWR.NatAtomic;
import com.ibm.disni.util.MemBuf;
import com.ibm.disni.util.MemoryAllocation;

Expand Down Expand Up @@ -82,7 +83,8 @@ private void setWrList(List<IbvSendWR> wrList) {
}

NatRdma natRdma = new NatRdma(sendWR.getRdma(), this);
NatIbvSendWR natSendWR = new NatIbvSendWR(this, natRdma, sendWR, sg_list);
NatAtomic natAtomic = new NatAtomic(sendWR.getAtomic(), this);
NatIbvSendWR natSendWR = new NatIbvSendWR(this, natRdma, natAtomic, sendWR, sg_list);
natSendWR.setPtr_sge_list(sgeOffset);
natSendWR.setNext(wrOffset);
wrNatList.add(natSendWR);
Expand Down Expand Up @@ -156,6 +158,31 @@ public void setReserved(NatRdma rdma, int offset) {
cmd.getBuffer().putInt(position, rdma.getReserved());
}

public void setCompare_add(NatAtomic atomic, int offset) {
int position = atomic.getBufPosition() + offset;
cmd.getBuffer().putLong(position, atomic.getCompare_add());
}

public void setSwap(NatAtomic atomic, int offset) {
int position = atomic.getBufPosition() + offset;
cmd.getBuffer().putLong(position, atomic.getSwap());
}

public void setRemote_addr(NatAtomic atomic, int offset) {
int position = atomic.getBufPosition() + offset;
cmd.getBuffer().putLong(position, atomic.getRemote_addr());
}

public void setRkey(NatAtomic atomic, int offset) {
int position = atomic.getBufPosition() + offset;
cmd.getBuffer().putInt(position, atomic.getRkey());
}

public void setReserved(NatAtomic atomic, int offset) {
int position = atomic.getBufPosition() + offset;
cmd.getBuffer().putInt(position, atomic.getReserved());
}

public void setAddr(NatIbvSge sge, int offset) {
int position = sge.getBufPosition() + offset;
cmd.getBuffer().putLong(position, sge.getAddr());
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/ibm/disni/verbs/impl/NativeDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ public native long _createQP(long id, long pd, long sendcq, long recvcq, int qpt
public native int _getCmEvent(long channel, long listenid, long clientid, int timeout);
public native void _connect(long id, int retrycount, int rnrretrycount, long privdataaddr, byte privdatalen)
throws IOException;
public native void _connectV2(long id, long conn_param) throws IOException;
public native void _accept(long id, int retrycount, int rnrretrycount) throws IOException;
public native void _acceptV2(long id, long conn_param) throws IOException;
public native int _ackCmEvent(int cmEvent);
public native int _disconnect(long id);
public native int _destroyEventChannel(long fd);
Expand Down
Loading