diff --git a/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.cpp b/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.cpp
index adb79342..02f13532 100755
--- a/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.cpp
+++ b/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.cpp
@@ -430,6 +430,41 @@ JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connect(
}
}
+/*
+ * Class: com_ibm_disni_verbs_impl_NativeDispatcher
+ * Method: _connectV2
+ * Signature: (JJ)V
+ */
+JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connectV2(
+ JNIEnv *env, jobject obj, jlong id, jlong param) {
+ struct rdma_cm_id *cm_listen_id = NULL;
+ struct rdma_conn_param *conn_param = NULL;
+
+ cm_listen_id = (struct rdma_cm_id *)id;
+ conn_param = (struct rdma_conn_param *)param;
+
+ if (cm_listen_id != NULL && conn_param!=NULL) {
+ int ret = rdma_connect(cm_listen_id, conn_param);
+ if (ret == 0) {
+ log("j2c::connect: ret %i, guid %" PRIu64 "\n", ret,
+ ibv_get_device_guid(cm_listen_id->verbs->device));
+ } else {
+ log("j2c::connect: rdma_connect failed\n");
+ JNU_ThrowIOExceptionWithLastError(env,
+ "j2c::connect: rdma_connect failed");
+ }
+ } else {
+ if(cm_listen_id == NULL){
+ log("j2c:connect: cm_listen_id null\n");
+ JNU_ThrowIOException(env, "j2c:connect: cm_listen_id null\n");
+ } else {
+ log("j2c:connect: conn_param null\n");
+ JNU_ThrowIOException(env, "j2c:connect: conn_param null\n");
+ }
+ }
+}
+
+
/*
* Class: com_ibm_disni_verbs_impl_NativeDispatcher
* Method: _accept
@@ -463,6 +498,40 @@ JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1accept(
}
}
+/*
+ * Class: com_ibm_disni_verbs_impl_NativeDispatcher
+ * Method: _acceptV2
+ * Signature: (JJ)V
+ */
+JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1acceptV2(
+ JNIEnv *env, jobject obj, jlong id, jlong param) {
+ struct rdma_cm_id *cm_listen_id = NULL;
+ struct rdma_conn_param *conn_param = NULL;
+
+ cm_listen_id = (struct rdma_cm_id *)id;
+ conn_param = (struct rdma_conn_param *)param;
+
+ if (cm_listen_id != NULL && conn_param!=NULL) {
+ int ret = rdma_accept(cm_listen_id, conn_param);
+ if (ret == 0) {
+ log("j2c::connect: ret %i, guid %" PRIu64 "\n", ret,
+ ibv_get_device_guid(cm_listen_id->verbs->device));
+ } else {
+ log("j2c::connect: rdma_connect failed\n");
+ JNU_ThrowIOExceptionWithLastError(env,
+ "j2c::connect: rdma_connect failed");
+ }
+ } else {
+ if(cm_listen_id == NULL){
+ log("j2c:connect: cm_listen_id null\n");
+ JNU_ThrowIOException(env, "j2c:connect: cm_listen_id null\n");
+ } else {
+ log("j2c:connect: conn_param null\n");
+ JNU_ThrowIOException(env, "j2c:connect: conn_param null\n");
+ }
+ }
+}
+
/*
* Class: com_ibm_disni_verbs_impl_NativeDispatcher
* Method: _ackCmEvent
@@ -823,6 +892,32 @@ Java_com_ibm_disni_verbs_impl_NativeDispatcher__1queryOdpSupport(JNIEnv *env,
return ret;
}
+/*
+ * Class: com_ibm_disni_verbs_impl_NativeDispatcher
+ * Method: _queryDevice
+ * Signature: (JJ)I
+ */
+JNIEXPORT jint JNICALL
+Java_com_ibm_disni_verbs_impl_NativeDispatcher__1queryDevice(JNIEnv *env,
+ jobject obj,
+ jlong id,
+ jlong addr) {
+ jint ret = -1;
+ struct ibv_context *context = (struct ibv_context *)id;
+
+ struct ibv_device_attr *dev_attr = (struct ibv_device_attr *)addr;
+ ret = ibv_query_device(context, dev_attr);
+
+ if(ret != 0) {
+ log("j2c::queryDevice: ibv_query_device failed, error %s\n",
+ strerror(ret));
+ ret = -1;
+ JNU_ThrowIOExceptionWithLastError(env, "j2c::queryDevice: ibv_query_device failed");
+ }
+ return ret;
+}
+
+
/*
* Class: com_ibm_disni_verbs_impl_NativeDispatcher
* Method: _expPrefetchMr
diff --git a/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.h b/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.h
old mode 100755
new mode 100644
index 6583636e..0f67a091
--- a/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.h
+++ b/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.h
@@ -90,6 +90,14 @@ Java_com_ibm_disni_verbs_impl_NativeDispatcher__1getCmEvent(JNIEnv *, jobject,
JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connect(
JNIEnv *, jobject, jlong, jint, jint, jlong, jbyte);
+/*
+ * Class: com_ibm_disni_verbs_impl_NativeDispatcher
+ * Method: _connectV2
+ * Signature: (JJ)V
+ */
+JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connectV2(
+ JNIEnv *, jobject, jlong, jlong);
+
/*
* Class: com_ibm_disni_verbs_impl_NativeDispatcher
* Method: _accept
@@ -98,6 +106,14 @@ JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connect(
JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1accept(
JNIEnv *, jobject, jlong, jint, jint);
+/*
+ * Class: com_ibm_disni_verbs_impl_NativeDispatcher
+ * Method: _acceptV2
+ * Signature: (JJ)V
+ */
+JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1acceptV2(
+ JNIEnv *, jobject, jlong, jlong);
+
/*
* Class: com_ibm_disni_verbs_impl_NativeDispatcher
* Method: _ackCmEvent
@@ -219,6 +235,19 @@ Java_com_ibm_disni_verbs_impl_NativeDispatcher__1queryOdpSupport(JNIEnv *env,
jobject,
jlong);
+
+/*
+ * Class: com_ibm_disni_verbs_impl_NativeDispatcher
+ * Method: _queryDevice
+ * Signature: (JJ)I
+ */
+JNIEXPORT jint JNICALL
+Java_com_ibm_disni_verbs_impl_NativeDispatcher__1queryDevice(JNIEnv *env,
+ jobject,
+ jlong,
+ jlong);
+
+
/*
* Class: com_ibm_disni_verbs_impl_NativeDispatcher
* Method: _expPrefetchMr
diff --git a/pom.xml b/pom.xml
index 456df05b..4c3433b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
com.ibm.disni
disni
jar
- 2.1
+ 2.2
disni
DiSNI (Direct Storage and Networking Interface) is a Java library for direct storage and networking access from userpace.
http://github.com/zrlio/disni
diff --git a/src/main/java/com/ibm/disni/RdmaEndpointGroup.java b/src/main/java/com/ibm/disni/RdmaEndpointGroup.java
index 927cf109..7898dbf0 100644
--- a/src/main/java/com/ibm/disni/RdmaEndpointGroup.java
+++ b/src/main/java/com/ibm/disni/RdmaEndpointGroup.java
@@ -157,6 +157,10 @@ public RdmaConnParam getConnParam() {
return connParam;
}
+ public void setConnParam(RdmaConnParam connParam) {
+ this.connParam = connParam;
+ }
+
public synchronized void close() throws IOException, InterruptedException {
logger.info("shutting down group");
if (closed.get()){
diff --git a/src/main/java/com/ibm/disni/verbs/IbvContext.java b/src/main/java/com/ibm/disni/verbs/IbvContext.java
index 9b6595fd..e6b6227a 100755
--- a/src/main/java/com/ibm/disni/verbs/IbvContext.java
+++ b/src/main/java/com/ibm/disni/verbs/IbvContext.java
@@ -101,4 +101,6 @@ public IbvCQ createCQ(IbvCompChannel compChannel, int ncqe, int comp_vector) thr
}
public int queryOdpSupport() throws IOException { return verbs.queryOdpSupport(this); }
+
+ public IbvDeviceAttr queryDevice() throws IOException { return verbs.queryDevice(this); }
}
diff --git a/src/main/java/com/ibm/disni/verbs/IbvDeviceAttr.java b/src/main/java/com/ibm/disni/verbs/IbvDeviceAttr.java
new file mode 100644
index 00000000..da634709
--- /dev/null
+++ b/src/main/java/com/ibm/disni/verbs/IbvDeviceAttr.java
@@ -0,0 +1,579 @@
+/*
+ * DiSNI: Direct Storage and Networking Interface
+ *
+ * Author: Konstantin Taranov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.ibm.disni.verbs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+//struct ibv_device_attr {
+// char fw_ver[64]; /* FW version */
+// uint64_t node_guid; /* Node GUID (in network byte order) */
+// uint64_t sys_image_guid; /* System image GUID (in network byte order) */
+// uint64_t max_mr_size; /* Largest contiguous block that can be registered */
+// uint64_t page_size_cap; /* Supported memory shift sizes */
+// uint32_t vendor_id; /* Vendor ID, per IEEE */
+// uint32_t vendor_part_id; /* Vendor supplied part ID */
+// uint32_t hw_ver; /* Hardware version */
+// int max_qp; /* Maximum number of supported QPs */
+// int max_qp_wr; /* Maximum number of outstanding WR on any work queue */
+// unsigned int device_cap_flags; /* HCA capabilities mask */
+// int max_sge; /* Maximum number of s/g per WR for SQ & RQ of QP for non RDMA Read operations */
+// int max_sge_rd; /* Maximum number of s/g per WR for RDMA Read operations */
+// int max_cq; /* Maximum number of supported CQs */
+// int max_cqe; /* Maximum number of CQE capacity per CQ */
+// int max_mr; /* Maximum number of supported MRs */
+// int max_pd; /* Maximum number of supported PDs */
+// int max_qp_rd_atom; /* Maximum number of RDMA Read & Atomic operations that can be outstanding per QP */
+// int max_ee_rd_atom; /* Maximum number of RDMA Read & Atomic operations that can be outstanding per EEC */
+// int max_res_rd_atom; /* Maximum number of resources used for RDMA Read & Atomic operations by this HCA as the Target */
+// int max_qp_init_rd_atom; /* Maximum depth per QP for initiation of RDMA Read & Atomic operations */
+// int max_ee_init_rd_atom; /* Maximum depth per EEC for initiation of RDMA Read & Atomic operations */
+// enum ibv_atomic_cap atomic_cap; /* Atomic operations support level */
+// int max_ee; /* Maximum number of supported EE contexts */
+// int max_rdd; /* Maximum number of supported RD domains */
+// int max_mw; /* Maximum number of supported MWs */
+// int max_raw_ipv6_qp; /* Maximum number of supported raw IPv6 datagram QPs */
+// int max_raw_ethy_qp; /* Maximum number of supported Ethertype datagram QPs */
+// int max_mcast_grp; /* Maximum number of supported multicast groups */
+// int max_mcast_qp_attach; /* Maximum number of QPs per multicast group which can be attached */
+// int max_total_mcast_qp_attach;/* Maximum number of QPs which can be attached to multicast groups */
+// int max_ah; /* Maximum number of supported address handles */
+// int max_fmr; /* Maximum number of supported FMRs */
+// int max_map_per_fmr; /* Maximum number of (re)maps per FMR before an unmap operation in required */
+// int max_srq; /* Maximum number of supported SRQs */
+// int max_srq_wr; /* Maximum number of WRs per SRQ */
+// int max_srq_sge; /* Maximum number of s/g per SRQ */
+// uint16_t max_pkeys; /* Maximum number of partitions */
+// uint8_t local_ca_ack_delay; /* Local CA ack delay */
+// uint8_t phys_port_cnt; /* Number of physical ports */
+//};
+
+
+/**
+ * RDMA device's attributes
+ */
+public class IbvDeviceAttr {
+ protected byte fw_ver[];
+ protected long node_guid;
+ protected long sys_image_guid;
+ protected long max_mr_size;
+ protected long page_size_cap;
+ protected int vendor_id;
+ protected int vendor_part_id;
+ protected int hw_ver;
+ protected int max_qp;
+ protected int max_qp_wr;
+ protected int device_cap_flags;
+ protected int max_sge;
+ protected int max_sge_rd;
+ protected int max_cq;
+ protected int max_cqe;
+ protected int max_mr;
+ protected int max_pd;
+ protected int max_qp_rd_atom;
+ protected int max_ee_rd_atom;
+ protected int max_res_rd_atom;
+ protected int max_qp_init_rd_atom;
+ protected int max_ee_init_rd_atom;
+ protected int atomic_cap;// not supported
+ protected int max_ee;
+ protected int max_rdd;
+ protected int max_mw;
+ protected int max_raw_ipv6_qp;
+ protected int max_raw_ethy_qp;
+ protected int max_mcast_grp;
+ protected int max_mcast_qp_attach;
+ protected int max_total_mcast_qp_attach;
+ protected int max_ah;
+ protected int max_fmr;
+ protected int max_map_per_fmr;
+ protected int max_srq;
+ protected int max_srq_wr;
+ protected int max_srq_sge;
+ protected short max_pkeys;
+ protected byte local_ca_ack_delay;
+ protected byte phys_port_cnt;
+ protected int reserved;
+
+ public static int CSIZE = 232;
+
+ public IbvDeviceAttr(){
+ this.fw_ver = new byte[64];
+ this.node_guid = 0;
+ this.sys_image_guid = 0;
+ this.max_mr_size = 0;
+ this.page_size_cap = 0;
+ this.vendor_id = 0;
+ this.vendor_part_id = 0;
+ this.hw_ver = 0;
+ this.max_qp = 0;
+ this.max_qp_wr = 0;
+ this.device_cap_flags = 0;
+ this.max_sge = 0;
+ this.max_sge_rd = 0;
+ this.max_cq = 0;
+ this.max_cqe = 0;
+ this.max_mr = 0;
+ this.max_pd = 0;
+ this.max_qp_rd_atom = 0;
+ this.max_ee_rd_atom = 0;
+ this.max_res_rd_atom = 0;
+ this.max_qp_init_rd_atom = 0;
+ this.max_ee_init_rd_atom = 0;
+ this.atomic_cap = 0;
+ this.max_ee = 0;
+ this.max_rdd = 0;
+ this.max_mw = 0;
+ this.max_raw_ipv6_qp = 0;
+ this.max_raw_ethy_qp = 0;
+ this.max_mcast_grp = 0;
+ this.max_mcast_qp_attach = 0;
+ this.max_total_mcast_qp_attach = 0;
+ this.max_ah = 0;
+ this.max_fmr = 0;
+ this.max_map_per_fmr = 0;
+ this.max_srq = 0;
+ this.max_srq_wr = 0;
+ this.max_srq_sge = 0;
+ this.max_pkeys = 0;
+ this.local_ca_ack_delay = 0;
+ this.phys_port_cnt = 0;
+ this.reserved = 0;
+ }
+
+ /**
+ * Gets the fw_ver.
+ *
+ * @return the fw_ver
+ */
+ public byte[] getFw_ver() {
+ return fw_ver;
+ }
+
+ /**
+ * Gets the node_guid.
+ *
+ * @return the node_guid
+ */
+ public long getNode_guid() {
+ return node_guid;
+ }
+
+ /**
+ * Gets the sys_image_guid.
+ *
+ * @return the sys_image_guid
+ */
+ public long getSys_image_guid() {
+ return sys_image_guid;
+ }
+
+ /**
+ * Gets the max_mr_size.
+ *
+ * @return the max_mr_size
+ */
+ public long getMax_mr_size() {
+ return max_mr_size;
+ }
+
+ /**
+ * Gets the page_size_cap.
+ *
+ * @return the page_size_cap
+ */
+ public long getPage_size_cap() {
+ return page_size_cap;
+ }
+
+ /**
+ * Gets the vendor_id.
+ *
+ * @return the vendor_id
+ */
+ public int getVendor_id() {
+ return vendor_id;
+ }
+
+ /**
+ * Gets the vendor_part_id.
+ *
+ * @return the vendor_part_id
+ */
+ public int getVendor_part_id() {
+ return vendor_part_id;
+ }
+
+ /**
+ * Gets the hw_ver.
+ *
+ * @return the hw_ver
+ */
+ public int getHw_ver() {
+ return hw_ver;
+ }
+
+ /**
+ * Gets the max_qp.
+ *
+ * @return the max_qp
+ */
+ public int getMax_qp() {
+ return max_qp;
+ }
+
+ /**
+ * Gets the max_qp_wr.
+ *
+ * @return the max_qp_wr
+ */
+ public int getMax_qp_wr() {
+ return max_qp_wr;
+ }
+
+ /**
+ * Gets the device_cap_flags.
+ *
+ * @return the device_cap_flags
+ */
+ public int getDevice_cap_flags() {
+ return device_cap_flags;
+ }
+
+ /**
+ * Gets the max_sge.
+ *
+ * @return the max_sge
+ */
+ public int getMax_sge() {
+ return max_sge;
+ }
+
+ /**
+ * Gets the max_sge_rd.
+ *
+ * @return the max_sge_rd
+ */
+ public int getMax_sge_rd() {
+ return max_sge_rd;
+ }
+
+
+ /**
+ * Gets the max_cq.
+ *
+ * @return the max_cq
+ */
+ public int getMax_cq() {
+ return max_cq;
+ }
+
+ /**
+ * Gets the max_cqe.
+ *
+ * @return the max_cqe
+ */
+ public int getMax_cqe() {
+ return max_cqe;
+ }
+
+ /**
+ * Gets the max_mr.
+ *
+ * @return the max_mr
+ */
+ public int getMax_mr() {
+ return max_mr;
+ }
+
+ /**
+ * Gets the max_pd.
+ *
+ * @return the max_pd
+ */
+ public int getMax_pd() {
+ return max_pd;
+ }
+
+ /**
+ * Gets the max_qp_rd_atom.
+ *
+ * @return the max_qp_rd_atom
+ */
+ public int getMax_qp_rd_atom() {
+ return max_qp_rd_atom;
+ }
+
+ /**
+ * Gets the max_ee_rd_atom.
+ *
+ * @return the max_ee_rd_atom
+ */
+ public int getMax_ee_rd_atom() {
+ return max_ee_rd_atom;
+ }
+
+ /**
+ * Gets the max_res_rd_atom.
+ *
+ * @return the max_res_rd_atom
+ */
+ public int getMax_res_rd_atom() {
+ return max_res_rd_atom;
+ }
+
+ /**
+ * Gets the max_qp_init_rd_atom.
+ *
+ * @return the max_qp_init_rd_atom
+ */
+ public int getMax_qp_init_rd_atom() {
+ return max_qp_init_rd_atom;
+ }
+
+
+ /**
+ * Gets the max_ee_init_rd_atom.
+ *
+ * @return the max_ee_init_rd_atom
+ */
+ public int getMax_ee_init_rd_atom() {
+ return max_ee_init_rd_atom;
+ }
+
+ /**
+ * Gets the atomic_cap.
+ *
+ * @return the atomic_cap
+ */
+ public int getAtomic_cap() {
+ return atomic_cap;
+ }
+
+ /**
+ * Gets the max_ee.
+ *
+ * @return the max_ee
+ */
+ public int getMax_ee() {
+ return max_ee;
+ }
+
+ /**
+ * Gets the max_rdd.
+ *
+ * @return the max_rdd
+ */
+ public int getMax_rdd() {
+ return max_rdd;
+ }
+
+ /**
+ * Gets the max_mw.
+ *
+ * @return the max_mw
+ */
+ public int getMax_mw() {
+ return max_mw;
+ }
+
+ /**
+ * Gets the max_raw_ipv6_qp.
+ *
+ * @return the max_raw_ipv6_qp
+ */
+ public int getMax_raw_ipv6_qp() {
+ return max_raw_ipv6_qp;
+ }
+
+ /**
+ * Gets the max_raw_ethy_qp.
+ *
+ * @return the max_raw_ethy_qp
+ */
+ public int getMax_raw_ethy_qp() {
+ return max_raw_ethy_qp;
+ }
+
+ /**
+ * Gets the max_mcast_grp.
+ *
+ * @return the max_mcast_grp
+ */
+ public int getMax_mcast_grp() {
+ return max_mcast_grp;
+ }
+
+ /**
+ * Gets the max_mcast_qp_attach.
+ *
+ * @return the max_mcast_qp_attach
+ */
+ public int getMax_mcast_qp_attach() {
+ return max_mcast_qp_attach;
+ }
+
+ /**
+ * Gets the max_total_mcast_qp_attach.
+ *
+ * @return the max_total_mcast_qp_attach
+ */
+ public int getMax_total_mcast_qp_attach() {
+ return max_total_mcast_qp_attach;
+ }
+
+ /**
+ * Gets the max_ah.
+ *
+ * @return the max_ah
+ */
+ public int getMax_ah() {
+ return max_ah;
+ }
+
+ /**
+ * Gets the max_fmr.
+ *
+ * @return the max_fmr
+ */
+ public int getMax_fmr() {
+ return max_fmr;
+ }
+
+ /**
+ * Gets the max_map_per_fmr.
+ *
+ * @return the max_map_per_fmr
+ */
+ public int getMax_map_per_fmr() {
+ return max_map_per_fmr;
+ }
+
+ /**
+ * Gets the max_srq.
+ *
+ * @return the max_srq
+ */
+ public int getMax_srq() {
+ return max_srq;
+ }
+
+ /**
+ * Gets the max_srq_wr.
+ *
+ * @return the max_srq_wr
+ */
+ public int getMax_srq_wr() {
+ return max_srq_wr;
+ }
+
+ /**
+ * Gets the max_srq_sge.
+ *
+ * @return the max_srq_sge
+ */
+ public int getMax_srq_sge() {
+ return max_srq_sge;
+ }
+
+ /**
+ * Gets the max_pkeys.
+ *
+ * @return the max_pkeys
+ */
+ public short getMax_pkeys() {
+ return max_pkeys;
+ }
+
+ /**
+ * Gets the local_ca_ack_delay.
+ *
+ * @return the local_ca_ack_delay
+ */
+ public byte getLocal_ca_ack_delay() {
+ return local_ca_ack_delay;
+ }
+
+ /**
+ * Gets the local_ca_ack_delay.
+ *
+ * @return the local_ca_ack_delay
+ */
+ public byte getPhys_port_cnt() {
+ return phys_port_cnt;
+ }
+
+ /**
+ * Gets the reserved.
+ *
+ * @return the reserved
+ */
+ public int getReserved() {
+ return reserved;
+ }
+
+ public void update(ByteBuffer buffer) {
+ buffer.get(this.fw_ver);
+ this.node_guid = buffer.getLong();
+ this.sys_image_guid = buffer.getLong();
+ this.max_mr_size = buffer.getLong();
+ this.page_size_cap = buffer.getLong();
+ this.vendor_id = buffer.getInt();
+ this.vendor_part_id = buffer.getInt();
+ this.hw_ver = buffer.getInt();
+ this.max_qp = buffer.getInt();
+ this.max_qp_wr = buffer.getInt();
+ this.device_cap_flags = buffer.getInt();
+ this.max_sge = buffer.getInt();
+ this.max_sge_rd = buffer.getInt();
+ this.max_cq = buffer.getInt();
+ this.max_cqe = buffer.getInt();
+ this.max_mr = buffer.getInt();
+ this.max_pd = buffer.getInt();
+ this.max_qp_rd_atom = buffer.getInt();
+ this.max_ee_rd_atom = buffer.getInt();
+ this.max_res_rd_atom = buffer.getInt();
+ this.max_qp_init_rd_atom = buffer.getInt();
+ this.max_ee_init_rd_atom = buffer.getInt();
+ this.atomic_cap = buffer.getInt();
+ this.max_ee = buffer.getInt();
+ this.max_rdd = buffer.getInt();
+ this.max_mw = buffer.getInt();
+ this.max_raw_ipv6_qp = buffer.getInt();
+ this.max_raw_ethy_qp = buffer.getInt();
+ this.max_mcast_grp = buffer.getInt();
+ this.max_mcast_qp_attach = buffer.getInt();
+ this.max_total_mcast_qp_attach = buffer.getInt();
+ this.max_ah = buffer.getInt();
+ this.max_fmr = buffer.getInt();
+ this.max_map_per_fmr =buffer.getInt();
+ this.max_srq = buffer.getInt();
+ this.max_srq_wr = buffer.getInt();
+ this.max_srq_sge = buffer.getInt();
+ this.max_pkeys = buffer.getShort();
+ this.local_ca_ack_delay = buffer.get();
+ this.phys_port_cnt = buffer.get();
+ this.reserved = buffer.getInt();
+ }
+
+ public int size() {
+ return CSIZE;
+ }
+}
diff --git a/src/main/java/com/ibm/disni/verbs/IbvSendWR.java b/src/main/java/com/ibm/disni/verbs/IbvSendWR.java
index ac66ef51..41e89512 100644
--- a/src/main/java/com/ibm/disni/verbs/IbvSendWR.java
+++ b/src/main/java/com/ibm/disni/verbs/IbvSendWR.java
@@ -234,7 +234,7 @@ public Rdma getRdma() {
}
/**
- * Unsupported.
+ *
*
* @return the atomic
*/
@@ -322,7 +322,7 @@ public String getClassName() {
}
/**
- * Unsupported.
+ *
*/
public static class Atomic {
protected long remote_addr;
diff --git a/src/main/java/com/ibm/disni/verbs/RdmaConnParam.java b/src/main/java/com/ibm/disni/verbs/RdmaConnParam.java
index 90287d33..6feb8d2a 100644
--- a/src/main/java/com/ibm/disni/verbs/RdmaConnParam.java
+++ b/src/main/java/com/ibm/disni/verbs/RdmaConnParam.java
@@ -22,6 +22,8 @@
package com.ibm.disni.verbs;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
// TODO: Auto-generated Javadoc
//struct rdma_conn_param {
@@ -49,8 +51,11 @@ public class RdmaConnParam {
protected byte retry_count;
protected byte rnr_retry_count;
protected byte srq;
+ protected byte reserved;
protected int qp_num;
+ public static int CSIZE = 24;
+
public RdmaConnParam() {
this.private_data_addr = 0;
this.private_data_len = 0;
@@ -60,6 +65,7 @@ public RdmaConnParam() {
this.retry_count = 0;
this.rnr_retry_count = 0;
this.srq = 0;
+ this.reserved = 0;
this.qp_num = 0;
}
@@ -111,7 +117,7 @@ public byte getResponder_resources() {
* @param responder_resources the new responder resources.
*/
public void setResponder_resources(byte responder_resources) throws IOException {
- throw new IOException("Operation currently not supported");
+ this.responder_resources = responder_resources;
}
/**
@@ -129,7 +135,7 @@ public byte getInitiator_depth() {
* @param initiator_depth the new initiater depth.
*/
public void setInitiator_depth(byte initiator_depth) throws IOException {
- throw new IOException("Operation currently not supported");
+ this.initiator_depth = initiator_depth;
}
/**
@@ -147,7 +153,7 @@ public byte getFlow_control() {
* @param flow_control the new flow control.
*/
public void setFlow_control(byte flow_control) throws IOException {
- throw new IOException("Operation currently not supported");
+ this.flow_control = flow_control;
}
/**
@@ -201,7 +207,7 @@ public byte getSrq() {
* @param srq the new shared receive queue.
*/
public void setSrq(byte srq) throws IOException {
- throw new IOException("Operation currently not supported");
+ this.srq = srq;
}
/**
@@ -219,6 +225,24 @@ public int getQp_num() {
* @param qp_num the new qp_num
*/
public void setQp_num(int qp_num) throws IOException {
- throw new IOException("Operation currently not supported");
+ this.qp_num = qp_num;
+ }
+
+
+ public void writeBack(ByteBuffer buffer) {
+ buffer.putLong(private_data_addr);
+ buffer.put(private_data_len);
+ buffer.put(responder_resources);
+ buffer.put(initiator_depth);
+ buffer.put(flow_control);
+ buffer.put(retry_count);
+ buffer.put(rnr_retry_count);
+ buffer.put(srq);
+ buffer.put(reserved);
+ buffer.putInt(qp_num);
+ }
+
+ public int size() {
+ return CSIZE;
}
}
diff --git a/src/main/java/com/ibm/disni/verbs/RdmaVerbs.java b/src/main/java/com/ibm/disni/verbs/RdmaVerbs.java
index 0a1fd8e4..f5a01907 100755
--- a/src/main/java/com/ibm/disni/verbs/RdmaVerbs.java
+++ b/src/main/java/com/ibm/disni/verbs/RdmaVerbs.java
@@ -125,6 +125,15 @@ public abstract IbvCQ createCQ(IbvContext context,
*/
public abstract int queryOdpSupport(IbvContext context) throws IOException;
+ /**
+ * Query the maximum number of incoming RDMA read and atomic operations that the local side can accept.
+ *
+ * @param context the device context.
+ * @return all device attributes
+ * @throws Exception on failure.
+ */
+ public abstract IbvDeviceAttr queryDevice(IbvContext context) throws IOException;
+
/**
* Prefetch part of a memory region.
* Can be used only with MRs registered with IBV_EXP_ACCESS_ON_DEMAND
diff --git a/src/main/java/com/ibm/disni/verbs/impl/NatIbvSendWR.java b/src/main/java/com/ibm/disni/verbs/impl/NatIbvSendWR.java
index 8c16a5aa..1323848f 100644
--- a/src/main/java/com/ibm/disni/verbs/impl/NatIbvSendWR.java
+++ b/src/main/java/com/ibm/disni/verbs/impl/NatIbvSendWR.java
@@ -68,6 +68,7 @@ public class NatIbvSendWR extends IbvSendWR implements SendWRMod {
public static int SENDFLAGS_OFFSET = 32;
// public static int IMMDATA_OFFSET = 36;
public static int REMOTEADDR_OFFSET = 40;
+ public static int ATOMIC_OFFSET = 40;
public static int RKEY_OFFSET = 48;
private NatPostSendCall postSendCall;
@@ -75,10 +76,13 @@ public class NatIbvSendWR extends IbvSendWR implements SendWRMod {
private long next;
private long ptr_sge_list;
private NatRdma natRdma;
+ private NatAtomic natAtomic;
- public NatIbvSendWR(NatPostSendCall postSendCall, NatRdma natRdma, IbvSendWR sendWR, LinkedList sg_list) {
- super(natRdma, null, null, sg_list);
+ public NatIbvSendWR(NatPostSendCall postSendCall, NatRdma natRdma, NatAtomic natAtomic,
+ IbvSendWR sendWR, LinkedList sg_list) {
+ super(natRdma, natAtomic, null, sg_list);
this.natRdma = natRdma;
+ this.natAtomic = natAtomic;
this.next = 0;
this.ptr_sge_list = 0;
@@ -102,9 +106,16 @@ public void writeBack(ByteBuffer buffer) {
buffer.putInt(opcode);
buffer.putInt(send_flags);
buffer.putInt(imm_data);
+
+ if(opcode >= IBV_WR_ATOMIC_CMP_AND_SWP){
+ buffer.position(initialPos + NatIbvSendWR.ATOMIC_OFFSET);
+ natAtomic.writeBack(buffer);
+ } else {
+ buffer.position(initialPos + NatIbvSendWR.REMOTEADDR_OFFSET);
+ natRdma.writeBack(buffer);
+ }
- buffer.position(initialPos + NatIbvSendWR.REMOTEADDR_OFFSET);
- natRdma.writeBack(buffer);
+
int newPos = initialPos + CSIZE;
buffer.position(newPos);
}
@@ -205,4 +216,63 @@ public int getBufPosition() {
return bufPosition;
}
}
+
+ public static class NatAtomic extends IbvSendWR.Atomic {
+ private NatPostSendCall postSendCall;
+ private int bufPosition;
+
+ public NatAtomic(Atomic atomic, NatPostSendCall postSendCall){
+ this.remote_addr = atomic.getRemote_addr();
+ this.compare_add = atomic.getCompare_add();
+ this.swap = atomic.getSwap();
+ this.rkey = atomic.getRkey();
+ this.reserved = atomic.getReserved();
+
+ this.postSendCall = postSendCall;
+ }
+
+ @Override
+ public void setRemote_addr(long remote_addr) {
+ super.setRemote_addr(remote_addr);
+ postSendCall.setRemote_addr(this, 0);
+ }
+
+ @Override
+ public void setCompare_add(long compare_add) {
+ super.setCompare_add(compare_add);
+ postSendCall.setCompare_add(this, 8);
+ }
+
+ @Override
+ public void setSwap(long swap) {
+ super.setSwap(swap);
+ postSendCall.setSwap(this, 16);
+ }
+
+
+ @Override
+ public void setRkey(int rkey) {
+ super.setRkey(rkey);
+ postSendCall.setRkey(this, 24);
+ }
+
+ @Override
+ public void setReserved(int reserved) {
+ super.setReserved(reserved);
+ postSendCall.setReserved(this, 28);
+ }
+
+ public void writeBack(ByteBuffer buffer) {
+ this.bufPosition = buffer.position();
+ buffer.putLong(getRemote_addr());
+ buffer.putLong(getCompare_add());
+ buffer.putLong(getSwap());
+ buffer.putInt(getRkey());
+ buffer.putInt(getReserved());
+ }
+
+ public int getBufPosition() {
+ return bufPosition;
+ }
+ }
}
diff --git a/src/main/java/com/ibm/disni/verbs/impl/NatPostSendCall.java b/src/main/java/com/ibm/disni/verbs/impl/NatPostSendCall.java
index c2990df7..e9d21590 100644
--- a/src/main/java/com/ibm/disni/verbs/impl/NatPostSendCall.java
+++ b/src/main/java/com/ibm/disni/verbs/impl/NatPostSendCall.java
@@ -31,6 +31,7 @@
import com.ibm.disni.verbs.IbvSge;
import com.ibm.disni.verbs.SVCPostSend;
import com.ibm.disni.verbs.impl.NatIbvSendWR.NatRdma;
+import com.ibm.disni.verbs.impl.NatIbvSendWR.NatAtomic;
import com.ibm.disni.util.MemBuf;
import com.ibm.disni.util.MemoryAllocation;
@@ -82,7 +83,8 @@ private void setWrList(List wrList) {
}
NatRdma natRdma = new NatRdma(sendWR.getRdma(), this);
- NatIbvSendWR natSendWR = new NatIbvSendWR(this, natRdma, sendWR, sg_list);
+ NatAtomic natAtomic = new NatAtomic(sendWR.getAtomic(), this);
+ NatIbvSendWR natSendWR = new NatIbvSendWR(this, natRdma, natAtomic, sendWR, sg_list);
natSendWR.setPtr_sge_list(sgeOffset);
natSendWR.setNext(wrOffset);
wrNatList.add(natSendWR);
@@ -156,6 +158,31 @@ public void setReserved(NatRdma rdma, int offset) {
cmd.getBuffer().putInt(position, rdma.getReserved());
}
+ public void setCompare_add(NatAtomic atomic, int offset) {
+ int position = atomic.getBufPosition() + offset;
+ cmd.getBuffer().putLong(position, atomic.getCompare_add());
+ }
+
+ public void setSwap(NatAtomic atomic, int offset) {
+ int position = atomic.getBufPosition() + offset;
+ cmd.getBuffer().putLong(position, atomic.getSwap());
+ }
+
+ public void setRemote_addr(NatAtomic atomic, int offset) {
+ int position = atomic.getBufPosition() + offset;
+ cmd.getBuffer().putLong(position, atomic.getRemote_addr());
+ }
+
+ public void setRkey(NatAtomic atomic, int offset) {
+ int position = atomic.getBufPosition() + offset;
+ cmd.getBuffer().putInt(position, atomic.getRkey());
+ }
+
+ public void setReserved(NatAtomic atomic, int offset) {
+ int position = atomic.getBufPosition() + offset;
+ cmd.getBuffer().putInt(position, atomic.getReserved());
+ }
+
public void setAddr(NatIbvSge sge, int offset) {
int position = sge.getBufPosition() + offset;
cmd.getBuffer().putLong(position, sge.getAddr());
diff --git a/src/main/java/com/ibm/disni/verbs/impl/NativeDispatcher.java b/src/main/java/com/ibm/disni/verbs/impl/NativeDispatcher.java
index 5d61a5c8..d7e16348 100755
--- a/src/main/java/com/ibm/disni/verbs/impl/NativeDispatcher.java
+++ b/src/main/java/com/ibm/disni/verbs/impl/NativeDispatcher.java
@@ -99,7 +99,9 @@ public native long _createQP(long id, long pd, long sendcq, long recvcq, int qpt
public native int _getCmEvent(long channel, long listenid, long clientid, int timeout);
public native void _connect(long id, int retrycount, int rnrretrycount, long privdataaddr, byte privdatalen)
throws IOException;
+ public native void _connectV2(long id, long conn_param) throws IOException;
public native void _accept(long id, int retrycount, int rnrretrycount) throws IOException;
+ public native void _acceptV2(long id, long conn_param) throws IOException;
public native int _ackCmEvent(int cmEvent);
public native int _disconnect(long id);
public native int _destroyEventChannel(long fd);
@@ -116,6 +118,7 @@ public native void _connect(long id, int retrycount, int rnrretrycount, long pri
public native int _modifyQP(long qp, long attr) throws IOException;
public native long _regMr(long pd, long addr, int len, int access, long lkey, long rkey, long handle) throws IOException;
public native int _queryOdpSupport(long context);
+ public native int _queryDevice(long context, long addr);
public native int _expPrefetchMr(long handle, long addr, int len) throws IOException;
public native void _deregMr(long handle) throws IOException;
public native void _postSend(long qp, long wrList) throws IOException;
diff --git a/src/main/java/com/ibm/disni/verbs/impl/RdmaCmNat.java b/src/main/java/com/ibm/disni/verbs/impl/RdmaCmNat.java
index 67737b1f..21153b88 100644
--- a/src/main/java/com/ibm/disni/verbs/impl/RdmaCmNat.java
+++ b/src/main/java/com/ibm/disni/verbs/impl/RdmaCmNat.java
@@ -217,10 +217,14 @@ public void connect(RdmaCmId id, RdmaConnParam connParam)
if (!idPriv.isOpen()) {
throw new IOException("Trying to call connect() with closed ID");
}
- nativeDispatcher._connect(idPriv.getObjId(), connParam.getRetry_count(),
- connParam.getRnr_retry_count(), connParam.getPrivate_data(), connParam.getPrivate_data_len());
+
+ MemBuf memBuf = memAlloc.allocate(connParam.CSIZE);
+ connParam.writeBack(memBuf.getBuffer());
+
+ nativeDispatcher._connectV2(idPriv.getObjId(), memBuf.address());
logger.info("connect, id " + id.getPs());
-
+
+ memBuf.free();
return;
}
@@ -231,9 +235,14 @@ public void accept(RdmaCmId id, RdmaConnParam connParam)
if (!idPriv.isOpen()) {
throw new IOException("Trying to call accept() with closed ID");
}
- nativeDispatcher._accept(idPriv.getObjId(), connParam.getRetry_count(), connParam.getRnr_retry_count());
+
+ MemBuf memBuf = memAlloc.allocate(connParam.CSIZE);
+ connParam.writeBack(memBuf.getBuffer());
+
+ nativeDispatcher._acceptV2(idPriv.getObjId(), memBuf.address());
logger.info("accept, id " + id.getPs());
+ memBuf.free();
return;
}
diff --git a/src/main/java/com/ibm/disni/verbs/impl/RdmaVerbsNat.java b/src/main/java/com/ibm/disni/verbs/impl/RdmaVerbsNat.java
index 6c205a3f..c75a198d 100755
--- a/src/main/java/com/ibm/disni/verbs/impl/RdmaVerbsNat.java
+++ b/src/main/java/com/ibm/disni/verbs/impl/RdmaVerbsNat.java
@@ -27,6 +27,7 @@
import org.slf4j.Logger;
+import com.ibm.disni.verbs.IbvDeviceAttr;
import com.ibm.disni.verbs.IbvCQ;
import com.ibm.disni.verbs.IbvCompChannel;
import com.ibm.disni.verbs.IbvContext;
@@ -46,7 +47,7 @@
import com.ibm.disni.verbs.SVCReqNotify;
import com.ibm.disni.util.DiSNILogger;
import com.ibm.disni.util.MemoryAllocation;
-
+import com.ibm.disni.util.MemBuf;
public class RdmaVerbsNat extends RdmaVerbs {
private static final Logger logger = DiSNILogger.getLogger();
@@ -136,11 +137,24 @@ public SVCRegMr regMr(IbvPd pd, long address, int length, int access) {
}
- public int queryOdpSupport(IbvContext context){
+ public int queryOdpSupport(IbvContext context) {
NatIbvContext natContext = (NatIbvContext) context;
return nativeDispatcher._queryOdpSupport(natContext.getObjId());
}
+ public IbvDeviceAttr queryDevice(IbvContext context) throws IOException{
+ NatIbvContext natContext = (NatIbvContext) context;
+ MemBuf dstBuf = memAlloc.allocate(IbvDeviceAttr.CSIZE);
+ int ret = nativeDispatcher._queryDevice(natContext.getObjId(),dstBuf.address());
+ IbvDeviceAttr deviceAttr = new IbvDeviceAttr();
+ deviceAttr.update(dstBuf.getBuffer());
+ dstBuf.free();
+ if(ret == 0) {
+ return deviceAttr;
+ }
+ return null;
+ }
+
public int expPrefetchMr(IbvMr ibvMr, long address, int length) throws IOException {
return nativeDispatcher._expPrefetchMr(((NatIbvMr)ibvMr).getObjId(), address, length);
}
diff --git a/src/test/java/com/ibm/disni/benchmarks/RDMAvsTcpBenchmarkClient.java b/src/test/java/com/ibm/disni/benchmarks/RDMAvsTcpBenchmarkClient.java
index 4f1df397..5fb85c3c 100644
--- a/src/test/java/com/ibm/disni/benchmarks/RDMAvsTcpBenchmarkClient.java
+++ b/src/test/java/com/ibm/disni/benchmarks/RDMAvsTcpBenchmarkClient.java
@@ -76,10 +76,21 @@ public void runRDMA() throws Exception {
//create a EndpointGroup. The RdmaActiveEndpointGroup contains CQ processing and delivers CQ event to the endpoint.dispatchCqEvent() method.
endpointGroup = new RdmaActiveEndpointGroup(1000, false, 128, 4, 128);
endpointGroup.init(this);
+
//we have passed our own endpoint factory to the group, therefore new endpoints will be of type CustomClientEndpoint
//let's create a new client endpoint
CustomClientEndpoint endpoint = endpointGroup.createEndpoint();
+ IbvDeviceAttr deviceAttr = endpoint.getIdPriv().getVerbs().queryDevice();
+
+ int maxResponderResources = deviceAttr.getMax_qp_rd_atom();
+ int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom();
+
+ RdmaConnParam connParam = new RdmaConnParam();
+ connParam.setResponder_resources((byte) maxResponderResources);
+ connParam.setInitiator_depth((byte) maxInitiatorDepth);
+ endpointGroup.setConnParam(connParam);
+
//connect to the server
endpoint.connect(rdmaAddress, 1000);
System.out.println("RDMAvsTcpBenchmarkClient::client channel set up ");
diff --git a/src/test/java/com/ibm/disni/benchmarks/ReadClient.java b/src/test/java/com/ibm/disni/benchmarks/ReadClient.java
index 00ef34ba..7dcbefa2 100644
--- a/src/test/java/com/ibm/disni/benchmarks/ReadClient.java
+++ b/src/test/java/com/ibm/disni/benchmarks/ReadClient.java
@@ -57,10 +57,21 @@ public ReadClient.ReadClientEndpoint createEndpoint(RdmaCmId id, boolean serverS
private void run() throws Exception {
System.out.println("ReadClient, size " + size + ", loop " + loop);
-
+
ReadClient.ReadClientEndpoint endpoint = group.createEndpoint();
InetAddress ipAddress = InetAddress.getByName(host);
- InetSocketAddress address = new InetSocketAddress(ipAddress, port);
+ InetSocketAddress address = new InetSocketAddress(ipAddress, port);
+
+ IbvDeviceAttr deviceAttr = endpoint.getIdPriv().getVerbs().queryDevice();
+
+ int maxResponderResources = deviceAttr.getMax_qp_rd_atom();
+ int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom();
+
+ RdmaConnParam connParam = new RdmaConnParam();
+ connParam.setResponder_resources((byte) maxResponderResources);
+ connParam.setInitiator_depth((byte) maxInitiatorDepth);
+ group.setConnParam(connParam);
+
endpoint.connect(address, 1000);
System.out.println("ReadClient, client connected, address " + host + ", port " + port);
diff --git a/src/test/java/com/ibm/disni/benchmarks/ReadServer.java b/src/test/java/com/ibm/disni/benchmarks/ReadServer.java
index f6e95750..4e50c1e2 100644
--- a/src/test/java/com/ibm/disni/benchmarks/ReadServer.java
+++ b/src/test/java/com/ibm/disni/benchmarks/ReadServer.java
@@ -61,6 +61,17 @@ private void run() throws Exception {
InetAddress ipAddress = InetAddress.getByName(host);
InetSocketAddress address = new InetSocketAddress(ipAddress, port);
serverEndpoint.bind(address, 10);
+
+ IbvDeviceAttr deviceAttr = serverEndpoint.getIdPriv().getVerbs().queryDevice();
+
+ int maxResponderResources = deviceAttr.getMax_qp_rd_atom();
+ int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom();
+
+ RdmaConnParam connParam = new RdmaConnParam();
+ connParam.setResponder_resources((byte) maxResponderResources);
+ connParam.setInitiator_depth((byte) maxInitiatorDepth);
+ group.setConnParam(connParam);
+
ReadServer.ReadServerEndpoint endpoint = serverEndpoint.accept();
System.out.println("ReadServer, client connected, address " + address.toString());
diff --git a/src/test/java/com/ibm/disni/examples/AtomicClient.java b/src/test/java/com/ibm/disni/examples/AtomicClient.java
new file mode 100755
index 00000000..35c28a2f
--- /dev/null
+++ b/src/test/java/com/ibm/disni/examples/AtomicClient.java
@@ -0,0 +1,272 @@
+/*
+ * DiSNI: Direct Storage and Networking Interface
+ *
+ * Author: Konstantin Taranov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.ibm.disni.examples;
+
+import com.ibm.disni.CmdLineCommon;
+import com.ibm.disni.verbs.*;
+import org.apache.commons.cli.ParseException;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.LinkedList;
+
+public class AtomicClient {
+ private String ipAddress;
+ private int port;
+
+ public void run() throws Exception {
+ System.out.println("AtomicClient::starting...");
+ //open the CM and the verbs interfaces
+
+ //create a communication channel for receiving CM events
+ RdmaEventChannel cmChannel = RdmaEventChannel.createEventChannel();
+ if (cmChannel == null){
+ System.out.println("AtomicClient::cmChannel null");
+ return;
+ }
+
+ //create a RdmaCmId for this client
+ RdmaCmId idPriv = cmChannel.createId(RdmaCm.RDMA_PS_TCP);
+ if (idPriv == null){
+ System.out.println("AtomicClient::id null");
+ return;
+ }
+
+ //before connecting, we have to resolve addresses
+ InetAddress _dst = InetAddress.getByName(ipAddress);
+ InetSocketAddress dst = new InetSocketAddress(_dst, port);
+ idPriv.resolveAddr(null, dst, 2000);
+
+ //resolve addr returns an event, we have to catch that event
+ RdmaCmEvent cmEvent = cmChannel.getCmEvent(-1);
+ if (cmEvent == null){
+ System.out.println("AtomicClient::cmEvent null");
+ return;
+ } else if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_ADDR_RESOLVED
+ .ordinal()) {
+ System.out.println("AtomicClient::wrong event received: " + cmEvent.getEvent());
+ return;
+ }
+ cmEvent.ackEvent();
+
+ //we also have to resolve the route
+ idPriv.resolveRoute(2000);
+ //and catch that event too
+ cmEvent = cmChannel.getCmEvent(-1);
+ if (cmEvent == null){
+ System.out.println("AtomicClient::cmEvent null");
+ return;
+ } else if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_ROUTE_RESOLVED
+ .ordinal()) {
+ System.out.println("AtomicClient::wrong event received: " + cmEvent.getEvent());
+ return;
+ }
+ cmEvent.ackEvent();
+
+ //let's create a device context
+ IbvContext context = idPriv.getVerbs();
+
+ //and a protection domain, we use that one later for registering memory
+ IbvPd pd = context.allocPd();
+ if (pd == null){
+ System.out.println("AtomicClient::pd null");
+ return;
+ }
+
+ //the comp channel is used for getting CQ events
+ IbvCompChannel compChannel = context.createCompChannel();
+ if (compChannel == null){
+ System.out.println("AtomicClient::compChannel null");
+ return;
+ }
+
+ //let's create a completion queue
+ IbvCQ cq = context.createCQ(compChannel, 50, 0);
+ if (cq == null){
+ System.out.println("AtomicClient::cq null");
+ return;
+ }
+ //and request to be notified for this queue
+ cq.reqNotification(false).execute().free();
+
+ //we prepare for the creation of a queue pair (QP)
+ IbvQPInitAttr attr = new IbvQPInitAttr();
+ attr.cap().setMax_recv_sge(1);
+ attr.cap().setMax_recv_wr(10);
+ attr.cap().setMax_send_sge(1);
+ attr.cap().setMax_send_wr(10);
+ attr.setQp_type(IbvQP.IBV_QPT_RC);
+ attr.setRecv_cq(cq);
+ attr.setSend_cq(cq);
+ //let's create a queue pair
+ IbvQP qp = idPriv.createQP(pd, attr);
+ if (qp == null){
+ System.out.println("AtomicClient::qp null");
+ return;
+ }
+
+ int buffercount = 3;
+ int buffersize = 100;
+ ByteBuffer buffers[] = new ByteBuffer[buffercount];
+ IbvMr mrlist[] = new IbvMr[buffercount];
+ int access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ;
+
+ //before we connect we also want to register some buffers
+ for (int i = 0; i < buffercount; i++){
+ buffers[i] = ByteBuffer.allocateDirect(buffersize);
+ mrlist[i] = pd.regMr(buffers[i], access).execute().free().getMr();
+ }
+
+ ByteBuffer dataBuf = buffers[0];
+ IbvMr dataMr = mrlist[0];
+ IbvMr sendMr = mrlist[1];
+ ByteBuffer recvBuf = buffers[2];
+ IbvMr recvMr = mrlist[2];
+
+ LinkedList wrList_recv = new LinkedList();
+
+ IbvSge sgeRecv = new IbvSge();
+ sgeRecv.setAddr(recvMr.getAddr());
+ sgeRecv.setLength(recvMr.getLength());
+ sgeRecv.setLkey(recvMr.getLkey());
+ LinkedList sgeListRecv = new LinkedList();
+ sgeListRecv.add(sgeRecv);
+ IbvRecvWR recvWR = new IbvRecvWR();
+ recvWR.setSg_list(sgeListRecv);
+ recvWR.setWr_id(1000);
+ wrList_recv.add(recvWR);
+
+ //it's important to post those receive operations before connecting
+ //otherwise the server may issue a send operation and which cannot be received
+ //this class wraps soem of the RDMA data operations
+ VerbsTools commRdma = new VerbsTools(context, compChannel, qp, cq);
+ commRdma.initSGRecv(wrList_recv);
+
+ IbvDeviceAttr deviceAttr = context.queryDevice();
+
+ int maxResponderResources = deviceAttr.getMax_qp_rd_atom();
+ int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom();
+
+ //now let's connect to the server
+ RdmaConnParam connParam = new RdmaConnParam();
+ connParam.setRetry_count((byte) 2);
+ connParam.setResponder_resources((byte) maxResponderResources);
+ connParam.setInitiator_depth((byte) maxInitiatorDepth);
+ idPriv.connect(connParam);
+
+
+ //wait until we are really connected
+ cmEvent = cmChannel.getCmEvent(-1);
+ if (cmEvent == null){
+ System.out.println("AtomicClient::cmEvent null");
+ return;
+ } else if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_ESTABLISHED
+ .ordinal()) {
+ System.out.println("AtomicClient::wrong event received: " + cmEvent.getEvent());
+ return;
+ }
+ cmEvent.ackEvent();
+
+ //let's wait for the first message to be received from the server
+ commRdma.completeSGRecv(wrList_recv, false);
+
+ //here we go, it contains the RDMA information of a remote buffer
+ recvBuf.clear();
+ long addr = recvBuf.getLong();
+ int length = recvBuf.getInt();
+ int lkey = recvBuf.getInt();
+ recvBuf.clear();
+ System.out.println("AtomicClient::receiving rdma information, addr " + addr + ", length " + length + ", key " + lkey);
+ System.out.println("AtomicClient::preparing atomic operation...");
+
+ dataBuf.order(ByteOrder.LITTLE_ENDIAN);
+ System.out.println("AtomicClient::initial value in the buffer: " + dataBuf.getLong());
+
+ //let's prepare a one-sided RDMA atomic operation to fetch the content of that remote buffer
+ LinkedList wrList_send = new LinkedList();
+ IbvSge sgeSend = new IbvSge();
+ sgeSend.setAddr(dataMr.getAddr());
+ sgeSend.setLength(8);
+ sgeSend.setLkey(dataMr.getLkey());
+ LinkedList sgeList = new LinkedList();
+ sgeList.add(sgeSend);
+ IbvSendWR sendWR = new IbvSendWR();
+ sendWR.setWr_id(1001);
+ sendWR.setSg_list(sgeList);
+ sendWR.setOpcode(IbvSendWR.IBV_WR_ATOMIC_FETCH_AND_ADD);
+ sendWR.setSend_flags(IbvSendWR.IBV_SEND_SIGNALED);
+ sendWR.getAtomic().setRemote_addr(addr);
+ sendWR.getAtomic().setRkey(lkey);
+ sendWR.getAtomic().setCompare_add(10);
+ wrList_send.add(sendWR);
+
+ //now we post the operation, the RDMA/atomic operation will take off
+ //the wrapper class will also wait of the CQ event
+ //once the CQ event is received we know the RDMA/atomic operation has completed
+ //we should have the content of the remote buffer stored in our own local buffer
+ //let's print it
+ commRdma.send(buffers, wrList_send, true, false);
+ dataBuf.clear();
+ System.out.println("AtomicClient::the remote server values has been incremented by 10");
+ System.out.println("AtomicClient::the fetched value from the remote server: " + dataBuf.getLong());
+
+ //now we send a final message to signal everything went fine
+ sgeSend = new IbvSge();
+ sgeSend.setAddr(sendMr.getAddr());
+ sgeSend.setLength(sendMr.getLength());
+ sgeSend.setLkey(sendMr.getLkey());
+ sgeList.clear();
+ sgeList.add(sgeSend);
+ sendWR = new IbvSendWR();
+ sendWR.setWr_id(1002);
+ sendWR.setSg_list(sgeList);
+ sendWR.setOpcode(IbvSendWR.IBV_WR_SEND);
+ sendWR.setSend_flags(IbvSendWR.IBV_SEND_SIGNALED);
+ wrList_send.clear();
+ wrList_send.add(sendWR);
+
+ //let's post the final message
+ commRdma.send(buffers, wrList_send, true, false);
+ }
+
+
+ public void launch(String[] args) throws Exception {
+ CmdLineCommon cmdLine = new CmdLineCommon("AtomicClient");
+
+ try {
+ cmdLine.parse(args);
+ } catch (ParseException e) {
+ cmdLine.printHelp();
+ System.exit(-1);
+ }
+ ipAddress = cmdLine.getIp();
+ port = cmdLine.getPort();
+
+ this.run();
+ }
+
+ public static void main(String[] args) throws Exception {
+ AtomicClient AtomicClient = new AtomicClient();
+ AtomicClient.launch(args);
+ }
+}
+
diff --git a/src/test/java/com/ibm/disni/examples/AtomicServer.java b/src/test/java/com/ibm/disni/examples/AtomicServer.java
new file mode 100755
index 00000000..9f40558f
--- /dev/null
+++ b/src/test/java/com/ibm/disni/examples/AtomicServer.java
@@ -0,0 +1,248 @@
+/*
+ * DiSNI: Direct Storage and Networking Interface
+ *
+ * Author: Konstantin Taranov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.ibm.disni.examples;
+
+import com.ibm.disni.CmdLineCommon;
+import com.ibm.disni.verbs.*;
+import org.apache.commons.cli.ParseException;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.LinkedList;
+
+public class AtomicServer {
+ private String ipAddress;
+ private int port;
+
+ public void run() throws Exception {
+ System.out.println("AtomicServer::starting...");
+
+ //create a communication channel for receiving CM events
+ RdmaEventChannel cmChannel = RdmaEventChannel.createEventChannel();
+ if (cmChannel == null){
+ System.out.println("AtomicServer::CM channel null");
+ return;
+ }
+
+ //create a RdmaCmId for the server
+ RdmaCmId idPriv = cmChannel.createId(RdmaCm.RDMA_PS_TCP);
+ if (idPriv == null){
+ System.out.println("idPriv null");
+ return;
+ }
+
+ InetAddress _src = InetAddress.getByName(ipAddress);
+ InetSocketAddress src = new InetSocketAddress(_src, port);
+ idPriv.bindAddr(src);
+
+ //listen on the id
+ idPriv.listen(10);
+
+ //wait for new connect requests
+ RdmaCmEvent cmEvent = cmChannel.getCmEvent(-1);
+ if (cmEvent == null){
+ System.out.println("cmEvent null");
+ return;
+ }
+ else if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_CONNECT_REQUEST
+ .ordinal()) {
+ System.out.println("AtomicServer::wrong event received: " + cmEvent.getEvent());
+ return;
+ }
+ //always acknowledge CM events
+ cmEvent.ackEvent();
+
+ //get the id of the newly connection
+ RdmaCmId connId = cmEvent.getConnIdPriv();
+ if (connId == null){
+ System.out.println("AtomicServer::connId null");
+ return;
+ }
+
+ //get the device context of the new connection, typically the same as with the server id
+ IbvContext context = connId.getVerbs();
+ if (context == null){
+ System.out.println("AtomicServer::context null");
+ return;
+ }
+
+ //create a new protection domain, we will use the pd later when registering memory
+ IbvPd pd = context.allocPd();
+ if (pd == null){
+ System.out.println("AtomicServer::pd null");
+ return;
+ }
+
+ //the comp channel is used to get CQ notifications
+ IbvCompChannel compChannel = context.createCompChannel();
+ if (compChannel == null){
+ System.out.println("AtomicServer::compChannel null");
+ return;
+ }
+
+ //create a completion queue
+ IbvCQ cq = context.createCQ(compChannel, 50, 0);
+ if (cq == null){
+ System.out.println("AtomicServer::cq null");
+ return;
+ }
+ //request to be notified on that CQ
+ cq.reqNotification(false).execute().free();
+
+ //prepare a new queue pair
+ IbvQPInitAttr attr = new IbvQPInitAttr();
+ attr.cap().setMax_recv_sge(1);
+ attr.cap().setMax_recv_wr(10);
+ attr.cap().setMax_send_sge(1);
+ attr.cap().setMax_send_wr(10);
+ attr.setQp_type(IbvQP.IBV_QPT_RC);
+ attr.setRecv_cq(cq);
+ attr.setSend_cq(cq);
+ //create the queue pair for the client connection
+ IbvQP qp = connId.createQP(pd, attr);
+ if (qp == null){
+ System.out.println("AtomicServer::qp null");
+ return;
+ }
+
+ int buffercount = 3;
+ int buffersize = 100;
+ ByteBuffer buffers[] = new ByteBuffer[buffercount];
+ IbvMr mrlist[] = new IbvMr[buffercount];
+ int access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ | IbvMr.IBV_ACCESS_REMOTE_ATOMIC;
+
+ IbvDeviceAttr deviceAttr = context.queryDevice();
+
+ int maxResponderResources = deviceAttr.getMax_qp_rd_atom();
+ int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom();
+
+ RdmaConnParam connParam = new RdmaConnParam();
+ connParam.setRetry_count((byte) 2);
+ connParam.setResponder_resources((byte) maxResponderResources);
+ connParam.setInitiator_depth((byte) maxInitiatorDepth);
+ //once the client id is set up, accept the connection
+ connId.accept(connParam);
+ //wait until the connection is officially switched into established mode
+ cmEvent = cmChannel.getCmEvent(-1);
+ if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_ESTABLISHED
+ .ordinal()) {
+ System.out.println("AtomicServer::wrong event received: " + cmEvent.getEvent());
+ return;
+ }
+ //always ack CM events
+ cmEvent.ackEvent();
+
+ //register some buffers to be used later
+ for (int i = 0; i < buffercount; i++){
+ buffers[i] = ByteBuffer.allocateDirect(buffersize);
+ mrlist[i] = pd.regMr(buffers[i], access).execute().free().getMr();
+ }
+
+ ByteBuffer dataBuf = buffers[0];
+ IbvMr dataMr = mrlist[0];
+ ByteBuffer sendBuf = buffers[1];
+ IbvMr sendMr = mrlist[1];
+ IbvMr recvMr = mrlist[2];
+
+ dataBuf.order(ByteOrder.LITTLE_ENDIAN);
+ dataBuf.putLong(dataMr.getAddr());
+ dataBuf.clear();
+
+ System.out.println("AtomicServer::content of buffer before: " + dataBuf.getLong());
+ dataBuf.clear();
+
+ sendBuf.putLong(dataMr.getAddr());
+ sendBuf.putInt(dataMr.getLength());
+ sendBuf.putInt(dataMr.getLkey());
+ sendBuf.clear();
+
+ //this class is a thin wrapper over some of the data operations in jverbs
+ //we use it to issue data transfer operations
+ VerbsTools commRdma = new VerbsTools(context, compChannel, qp, cq);
+ LinkedList wrList_send = new LinkedList();
+
+ //let's prepare some work requests for sending
+ IbvSge sgeSend = new IbvSge();
+ sgeSend.setAddr(sendMr.getAddr());
+ sgeSend.setLength(sendMr.getLength());
+ sgeSend.setLkey(sendMr.getLkey());
+ LinkedList sgeList = new LinkedList();
+ sgeList.add(sgeSend);
+ IbvSendWR sendWR = new IbvSendWR();
+ sendWR.setWr_id(2000);
+ sendWR.setSg_list(sgeList);
+ sendWR.setOpcode(IbvSendWR.IBV_WR_SEND);
+ sendWR.setSend_flags(IbvSendWR.IBV_SEND_SIGNALED);
+ wrList_send.add(sendWR);
+
+ LinkedList wrList_recv = new LinkedList();
+
+ //let's prepare some work requests for receiving
+ IbvSge sgeRecv = new IbvSge();
+ sgeRecv.setAddr(recvMr.getAddr());
+ sgeRecv.setLength(recvMr.getLength());
+ int lkey = recvMr.getLkey();
+ sgeRecv.setLkey(lkey);
+ LinkedList sgeListRecv = new LinkedList();
+ sgeListRecv.add(sgeRecv);
+ IbvRecvWR recvWR = new IbvRecvWR();
+ recvWR.setSg_list(sgeListRecv);
+ recvWR.setWr_id(2001);
+ wrList_recv.add(recvWR);
+
+ //post a receive call
+ commRdma.initSGRecv(wrList_recv);
+ System.out.println("AtomicServer::initiated recv, about to send stag info");
+ //post a send call, here we send a message which include the RDMA information of a data buffer
+ commRdma.send(buffers, wrList_send, true, false);
+ System.out.println("AtomicServer::stag info sent");
+
+ //wait for the final message from the server
+ commRdma.completeSGRecv(wrList_recv, false);
+
+ System.out.println("AtomicServer::content of buffer after: " + dataBuf.getLong());
+ dataBuf.clear();
+
+ System.out.println("AtomicServer::done");
+ }
+
+ public void launch(String[] args) throws Exception {
+ CmdLineCommon cmdLine = new CmdLineCommon("AtomicServer");
+
+ try {
+ cmdLine.parse(args);
+ } catch (ParseException e) {
+ cmdLine.printHelp();
+ System.exit(-1);
+ }
+ ipAddress = cmdLine.getIp();
+ port = cmdLine.getPort();
+
+ this.run();
+ }
+
+ public static void main(String[] args) throws Exception {
+ AtomicServer AtomicServer = new AtomicServer();
+ AtomicServer.launch(args);
+ }
+}
+
diff --git a/src/test/java/com/ibm/disni/examples/ReadClient.java b/src/test/java/com/ibm/disni/examples/ReadClient.java
index 7ca6804d..32847b8b 100644
--- a/src/test/java/com/ibm/disni/examples/ReadClient.java
+++ b/src/test/java/com/ibm/disni/examples/ReadClient.java
@@ -48,13 +48,25 @@ public void run() throws Exception {
//create a EndpointGroup. The RdmaActiveEndpointGroup contains CQ processing and delivers CQ event to the endpoint.dispatchCqEvent() method.
endpointGroup = new RdmaActiveEndpointGroup(1000, false, 128, 4, 128);
endpointGroup.init(this);
+
//we have passed our own endpoint factory to the group, therefore new endpoints will be of type CustomClientEndpoint
//let's create a new client endpoint
ReadClient.CustomClientEndpoint endpoint = endpointGroup.createEndpoint();
//connect to the server
InetAddress ipAddress = InetAddress.getByName(host);
- InetSocketAddress address = new InetSocketAddress(ipAddress, port);
+ InetSocketAddress address = new InetSocketAddress(ipAddress, port);
+
+ IbvDeviceAttr deviceAttr = endpoint.getIdPriv().getVerbs().queryDevice();
+
+ int maxResponderResources = deviceAttr.getMax_qp_rd_atom();
+ int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom();
+
+ RdmaConnParam connParam = new RdmaConnParam();
+ connParam.setResponder_resources((byte) maxResponderResources);
+ connParam.setInitiator_depth((byte) maxInitiatorDepth);
+ endpointGroup.setConnParam(connParam);
+
endpoint.connect(address, 1000);
InetSocketAddress _addr = (InetSocketAddress) endpoint.getDstAddr();
System.out.println("ReadClient::client connected, address " + _addr.toString());
diff --git a/src/test/java/com/ibm/disni/examples/ReadServer.java b/src/test/java/com/ibm/disni/examples/ReadServer.java
index 5da92048..b2fd490a 100644
--- a/src/test/java/com/ibm/disni/examples/ReadServer.java
+++ b/src/test/java/com/ibm/disni/examples/ReadServer.java
@@ -49,6 +49,7 @@ public void run() throws Exception {
//create a EndpointGroup. The RdmaActiveEndpointGroup contains CQ processing and delivers CQ event to the endpoint.dispatchCqEvent() method.
endpointGroup = new RdmaActiveEndpointGroup(1000, false, 128, 4, 128);
endpointGroup.init(this);
+
//create a server endpoint
RdmaServerEndpoint serverEndpoint = endpointGroup.createServerEndpoint();
@@ -58,6 +59,16 @@ public void run() throws Exception {
serverEndpoint.bind(address, 10);
System.out.println("ReadServer::server bound to address" + address.toString());
+ IbvDeviceAttr deviceAttr = serverEndpoint.getIdPriv().getVerbs().queryDevice();
+
+ int maxResponderResources = deviceAttr.getMax_qp_rd_atom();
+ int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom();
+
+ RdmaConnParam connParam = new RdmaConnParam();
+ connParam.setResponder_resources((byte) maxResponderResources);
+ connParam.setInitiator_depth((byte) maxInitiatorDepth);
+ endpointGroup.setConnParam(connParam);
+
//we can accept new connections
ReadServer.CustomServerEndpoint endpoint = serverEndpoint.accept();
System.out.println("ReadServer::connection accepted ");
diff --git a/src/test/java/com/ibm/disni/examples/VerbsClient.java b/src/test/java/com/ibm/disni/examples/VerbsClient.java
index 9bce8adc..6ee8f41c 100755
--- a/src/test/java/com/ibm/disni/examples/VerbsClient.java
+++ b/src/test/java/com/ibm/disni/examples/VerbsClient.java
@@ -165,9 +165,11 @@ public void run() throws Exception {
//now let's connect to the server
RdmaConnParam connParam = new RdmaConnParam();
connParam.setRetry_count((byte) 2);
- idPriv.connect(connParam);
-
+ connParam.setResponder_resources((byte) 1);
+ connParam.setInitiator_depth((byte) 1);
+ idPriv.connect(connParam);
+
//wait until we are really connected
cmEvent = cmChannel.getCmEvent(-1);
if (cmEvent == null){
diff --git a/src/test/java/com/ibm/disni/examples/VerbsServer.java b/src/test/java/com/ibm/disni/examples/VerbsServer.java
index 6fe14693..1bfb80c3 100755
--- a/src/test/java/com/ibm/disni/examples/VerbsServer.java
+++ b/src/test/java/com/ibm/disni/examples/VerbsServer.java
@@ -131,8 +131,11 @@ else if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_CONNECT_REQUE
IbvMr mrlist[] = new IbvMr[buffercount];
int access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ;
+
RdmaConnParam connParam = new RdmaConnParam();
connParam.setRetry_count((byte) 2);
+ connParam.setResponder_resources((byte) 1);
+ connParam.setInitiator_depth((byte) 1);
//once the client id is set up, accept the connection
connId.accept(connParam);
//wait until the connection is officially switched into established mode