diff --git a/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.cpp b/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.cpp index adb79342..02f13532 100755 --- a/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.cpp +++ b/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.cpp @@ -430,6 +430,41 @@ JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connect( } } +/* + * Class: com_ibm_disni_verbs_impl_NativeDispatcher + * Method: _connectV2 + * Signature: (JJ)V + */ +JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connectV2( + JNIEnv *env, jobject obj, jlong id, jlong param) { + struct rdma_cm_id *cm_listen_id = NULL; + struct rdma_conn_param *conn_param = NULL; + + cm_listen_id = (struct rdma_cm_id *)id; + conn_param = (struct rdma_conn_param *)param; + + if (cm_listen_id != NULL && conn_param!=NULL) { + int ret = rdma_connect(cm_listen_id, conn_param); + if (ret == 0) { + log("j2c::connect: ret %i, guid %" PRIu64 "\n", ret, + ibv_get_device_guid(cm_listen_id->verbs->device)); + } else { + log("j2c::connect: rdma_connect failed\n"); + JNU_ThrowIOExceptionWithLastError(env, + "j2c::connect: rdma_connect failed"); + } + } else { + if(cm_listen_id == NULL){ + log("j2c:connect: cm_listen_id null\n"); + JNU_ThrowIOException(env, "j2c:connect: cm_listen_id null\n"); + } else { + log("j2c:connect: conn_param null\n"); + JNU_ThrowIOException(env, "j2c:connect: conn_param null\n"); + } + } +} + + /* * Class: com_ibm_disni_verbs_impl_NativeDispatcher * Method: _accept @@ -463,6 +498,40 @@ JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1accept( } } +/* + * Class: com_ibm_disni_verbs_impl_NativeDispatcher + * Method: _acceptV2 + * Signature: (JJ)V + */ +JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1acceptV2( + JNIEnv *env, jobject obj, jlong id, jlong param) { + struct rdma_cm_id *cm_listen_id = NULL; + struct rdma_conn_param *conn_param = NULL; + + cm_listen_id = (struct rdma_cm_id *)id; + conn_param = (struct rdma_conn_param *)param; + + if (cm_listen_id != NULL && conn_param!=NULL) { + int ret = rdma_accept(cm_listen_id, conn_param); + if (ret == 0) { + log("j2c::connect: ret %i, guid %" PRIu64 "\n", ret, + ibv_get_device_guid(cm_listen_id->verbs->device)); + } else { + log("j2c::connect: rdma_connect failed\n"); + JNU_ThrowIOExceptionWithLastError(env, + "j2c::connect: rdma_connect failed"); + } + } else { + if(cm_listen_id == NULL){ + log("j2c:connect: cm_listen_id null\n"); + JNU_ThrowIOException(env, "j2c:connect: cm_listen_id null\n"); + } else { + log("j2c:connect: conn_param null\n"); + JNU_ThrowIOException(env, "j2c:connect: conn_param null\n"); + } + } +} + /* * Class: com_ibm_disni_verbs_impl_NativeDispatcher * Method: _ackCmEvent @@ -823,6 +892,32 @@ Java_com_ibm_disni_verbs_impl_NativeDispatcher__1queryOdpSupport(JNIEnv *env, return ret; } +/* + * Class: com_ibm_disni_verbs_impl_NativeDispatcher + * Method: _queryDevice + * Signature: (JJ)I + */ +JNIEXPORT jint JNICALL +Java_com_ibm_disni_verbs_impl_NativeDispatcher__1queryDevice(JNIEnv *env, + jobject obj, + jlong id, + jlong addr) { + jint ret = -1; + struct ibv_context *context = (struct ibv_context *)id; + + struct ibv_device_attr *dev_attr = (struct ibv_device_attr *)addr; + ret = ibv_query_device(context, dev_attr); + + if(ret != 0) { + log("j2c::queryDevice: ibv_query_device failed, error %s\n", + strerror(ret)); + ret = -1; + JNU_ThrowIOExceptionWithLastError(env, "j2c::queryDevice: ibv_query_device failed"); + } + return ret; +} + + /* * Class: com_ibm_disni_verbs_impl_NativeDispatcher * Method: _expPrefetchMr diff --git a/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.h b/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.h old mode 100755 new mode 100644 index 6583636e..0f67a091 --- a/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.h +++ b/libdisni/src/verbs/com_ibm_disni_verbs_impl_NativeDispatcher.h @@ -90,6 +90,14 @@ Java_com_ibm_disni_verbs_impl_NativeDispatcher__1getCmEvent(JNIEnv *, jobject, JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connect( JNIEnv *, jobject, jlong, jint, jint, jlong, jbyte); +/* + * Class: com_ibm_disni_verbs_impl_NativeDispatcher + * Method: _connectV2 + * Signature: (JJ)V + */ +JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connectV2( + JNIEnv *, jobject, jlong, jlong); + /* * Class: com_ibm_disni_verbs_impl_NativeDispatcher * Method: _accept @@ -98,6 +106,14 @@ JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1connect( JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1accept( JNIEnv *, jobject, jlong, jint, jint); +/* + * Class: com_ibm_disni_verbs_impl_NativeDispatcher + * Method: _acceptV2 + * Signature: (JJ)V + */ +JNIEXPORT void JNICALL Java_com_ibm_disni_verbs_impl_NativeDispatcher__1acceptV2( + JNIEnv *, jobject, jlong, jlong); + /* * Class: com_ibm_disni_verbs_impl_NativeDispatcher * Method: _ackCmEvent @@ -219,6 +235,19 @@ Java_com_ibm_disni_verbs_impl_NativeDispatcher__1queryOdpSupport(JNIEnv *env, jobject, jlong); + +/* + * Class: com_ibm_disni_verbs_impl_NativeDispatcher + * Method: _queryDevice + * Signature: (JJ)I + */ +JNIEXPORT jint JNICALL +Java_com_ibm_disni_verbs_impl_NativeDispatcher__1queryDevice(JNIEnv *env, + jobject, + jlong, + jlong); + + /* * Class: com_ibm_disni_verbs_impl_NativeDispatcher * Method: _expPrefetchMr diff --git a/pom.xml b/pom.xml index 456df05b..4c3433b3 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.ibm.disni disni jar - 2.1 + 2.2 disni DiSNI (Direct Storage and Networking Interface) is a Java library for direct storage and networking access from userpace. http://github.com/zrlio/disni diff --git a/src/main/java/com/ibm/disni/RdmaEndpointGroup.java b/src/main/java/com/ibm/disni/RdmaEndpointGroup.java index 927cf109..7898dbf0 100644 --- a/src/main/java/com/ibm/disni/RdmaEndpointGroup.java +++ b/src/main/java/com/ibm/disni/RdmaEndpointGroup.java @@ -157,6 +157,10 @@ public RdmaConnParam getConnParam() { return connParam; } + public void setConnParam(RdmaConnParam connParam) { + this.connParam = connParam; + } + public synchronized void close() throws IOException, InterruptedException { logger.info("shutting down group"); if (closed.get()){ diff --git a/src/main/java/com/ibm/disni/verbs/IbvContext.java b/src/main/java/com/ibm/disni/verbs/IbvContext.java index 9b6595fd..e6b6227a 100755 --- a/src/main/java/com/ibm/disni/verbs/IbvContext.java +++ b/src/main/java/com/ibm/disni/verbs/IbvContext.java @@ -101,4 +101,6 @@ public IbvCQ createCQ(IbvCompChannel compChannel, int ncqe, int comp_vector) thr } public int queryOdpSupport() throws IOException { return verbs.queryOdpSupport(this); } + + public IbvDeviceAttr queryDevice() throws IOException { return verbs.queryDevice(this); } } diff --git a/src/main/java/com/ibm/disni/verbs/IbvDeviceAttr.java b/src/main/java/com/ibm/disni/verbs/IbvDeviceAttr.java new file mode 100644 index 00000000..da634709 --- /dev/null +++ b/src/main/java/com/ibm/disni/verbs/IbvDeviceAttr.java @@ -0,0 +1,579 @@ +/* + * DiSNI: Direct Storage and Networking Interface + * + * Author: Konstantin Taranov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.ibm.disni.verbs; + +import java.io.IOException; +import java.nio.ByteBuffer; + +//struct ibv_device_attr { +// char fw_ver[64]; /* FW version */ +// uint64_t node_guid; /* Node GUID (in network byte order) */ +// uint64_t sys_image_guid; /* System image GUID (in network byte order) */ +// uint64_t max_mr_size; /* Largest contiguous block that can be registered */ +// uint64_t page_size_cap; /* Supported memory shift sizes */ +// uint32_t vendor_id; /* Vendor ID, per IEEE */ +// uint32_t vendor_part_id; /* Vendor supplied part ID */ +// uint32_t hw_ver; /* Hardware version */ +// int max_qp; /* Maximum number of supported QPs */ +// int max_qp_wr; /* Maximum number of outstanding WR on any work queue */ +// unsigned int device_cap_flags; /* HCA capabilities mask */ +// int max_sge; /* Maximum number of s/g per WR for SQ & RQ of QP for non RDMA Read operations */ +// int max_sge_rd; /* Maximum number of s/g per WR for RDMA Read operations */ +// int max_cq; /* Maximum number of supported CQs */ +// int max_cqe; /* Maximum number of CQE capacity per CQ */ +// int max_mr; /* Maximum number of supported MRs */ +// int max_pd; /* Maximum number of supported PDs */ +// int max_qp_rd_atom; /* Maximum number of RDMA Read & Atomic operations that can be outstanding per QP */ +// int max_ee_rd_atom; /* Maximum number of RDMA Read & Atomic operations that can be outstanding per EEC */ +// int max_res_rd_atom; /* Maximum number of resources used for RDMA Read & Atomic operations by this HCA as the Target */ +// int max_qp_init_rd_atom; /* Maximum depth per QP for initiation of RDMA Read & Atomic operations */ +// int max_ee_init_rd_atom; /* Maximum depth per EEC for initiation of RDMA Read & Atomic operations */ +// enum ibv_atomic_cap atomic_cap; /* Atomic operations support level */ +// int max_ee; /* Maximum number of supported EE contexts */ +// int max_rdd; /* Maximum number of supported RD domains */ +// int max_mw; /* Maximum number of supported MWs */ +// int max_raw_ipv6_qp; /* Maximum number of supported raw IPv6 datagram QPs */ +// int max_raw_ethy_qp; /* Maximum number of supported Ethertype datagram QPs */ +// int max_mcast_grp; /* Maximum number of supported multicast groups */ +// int max_mcast_qp_attach; /* Maximum number of QPs per multicast group which can be attached */ +// int max_total_mcast_qp_attach;/* Maximum number of QPs which can be attached to multicast groups */ +// int max_ah; /* Maximum number of supported address handles */ +// int max_fmr; /* Maximum number of supported FMRs */ +// int max_map_per_fmr; /* Maximum number of (re)maps per FMR before an unmap operation in required */ +// int max_srq; /* Maximum number of supported SRQs */ +// int max_srq_wr; /* Maximum number of WRs per SRQ */ +// int max_srq_sge; /* Maximum number of s/g per SRQ */ +// uint16_t max_pkeys; /* Maximum number of partitions */ +// uint8_t local_ca_ack_delay; /* Local CA ack delay */ +// uint8_t phys_port_cnt; /* Number of physical ports */ +//}; + + +/** + * RDMA device's attributes + */ +public class IbvDeviceAttr { + protected byte fw_ver[]; + protected long node_guid; + protected long sys_image_guid; + protected long max_mr_size; + protected long page_size_cap; + protected int vendor_id; + protected int vendor_part_id; + protected int hw_ver; + protected int max_qp; + protected int max_qp_wr; + protected int device_cap_flags; + protected int max_sge; + protected int max_sge_rd; + protected int max_cq; + protected int max_cqe; + protected int max_mr; + protected int max_pd; + protected int max_qp_rd_atom; + protected int max_ee_rd_atom; + protected int max_res_rd_atom; + protected int max_qp_init_rd_atom; + protected int max_ee_init_rd_atom; + protected int atomic_cap;// not supported + protected int max_ee; + protected int max_rdd; + protected int max_mw; + protected int max_raw_ipv6_qp; + protected int max_raw_ethy_qp; + protected int max_mcast_grp; + protected int max_mcast_qp_attach; + protected int max_total_mcast_qp_attach; + protected int max_ah; + protected int max_fmr; + protected int max_map_per_fmr; + protected int max_srq; + protected int max_srq_wr; + protected int max_srq_sge; + protected short max_pkeys; + protected byte local_ca_ack_delay; + protected byte phys_port_cnt; + protected int reserved; + + public static int CSIZE = 232; + + public IbvDeviceAttr(){ + this.fw_ver = new byte[64]; + this.node_guid = 0; + this.sys_image_guid = 0; + this.max_mr_size = 0; + this.page_size_cap = 0; + this.vendor_id = 0; + this.vendor_part_id = 0; + this.hw_ver = 0; + this.max_qp = 0; + this.max_qp_wr = 0; + this.device_cap_flags = 0; + this.max_sge = 0; + this.max_sge_rd = 0; + this.max_cq = 0; + this.max_cqe = 0; + this.max_mr = 0; + this.max_pd = 0; + this.max_qp_rd_atom = 0; + this.max_ee_rd_atom = 0; + this.max_res_rd_atom = 0; + this.max_qp_init_rd_atom = 0; + this.max_ee_init_rd_atom = 0; + this.atomic_cap = 0; + this.max_ee = 0; + this.max_rdd = 0; + this.max_mw = 0; + this.max_raw_ipv6_qp = 0; + this.max_raw_ethy_qp = 0; + this.max_mcast_grp = 0; + this.max_mcast_qp_attach = 0; + this.max_total_mcast_qp_attach = 0; + this.max_ah = 0; + this.max_fmr = 0; + this.max_map_per_fmr = 0; + this.max_srq = 0; + this.max_srq_wr = 0; + this.max_srq_sge = 0; + this.max_pkeys = 0; + this.local_ca_ack_delay = 0; + this.phys_port_cnt = 0; + this.reserved = 0; + } + + /** + * Gets the fw_ver. + * + * @return the fw_ver + */ + public byte[] getFw_ver() { + return fw_ver; + } + + /** + * Gets the node_guid. + * + * @return the node_guid + */ + public long getNode_guid() { + return node_guid; + } + + /** + * Gets the sys_image_guid. + * + * @return the sys_image_guid + */ + public long getSys_image_guid() { + return sys_image_guid; + } + + /** + * Gets the max_mr_size. + * + * @return the max_mr_size + */ + public long getMax_mr_size() { + return max_mr_size; + } + + /** + * Gets the page_size_cap. + * + * @return the page_size_cap + */ + public long getPage_size_cap() { + return page_size_cap; + } + + /** + * Gets the vendor_id. + * + * @return the vendor_id + */ + public int getVendor_id() { + return vendor_id; + } + + /** + * Gets the vendor_part_id. + * + * @return the vendor_part_id + */ + public int getVendor_part_id() { + return vendor_part_id; + } + + /** + * Gets the hw_ver. + * + * @return the hw_ver + */ + public int getHw_ver() { + return hw_ver; + } + + /** + * Gets the max_qp. + * + * @return the max_qp + */ + public int getMax_qp() { + return max_qp; + } + + /** + * Gets the max_qp_wr. + * + * @return the max_qp_wr + */ + public int getMax_qp_wr() { + return max_qp_wr; + } + + /** + * Gets the device_cap_flags. + * + * @return the device_cap_flags + */ + public int getDevice_cap_flags() { + return device_cap_flags; + } + + /** + * Gets the max_sge. + * + * @return the max_sge + */ + public int getMax_sge() { + return max_sge; + } + + /** + * Gets the max_sge_rd. + * + * @return the max_sge_rd + */ + public int getMax_sge_rd() { + return max_sge_rd; + } + + + /** + * Gets the max_cq. + * + * @return the max_cq + */ + public int getMax_cq() { + return max_cq; + } + + /** + * Gets the max_cqe. + * + * @return the max_cqe + */ + public int getMax_cqe() { + return max_cqe; + } + + /** + * Gets the max_mr. + * + * @return the max_mr + */ + public int getMax_mr() { + return max_mr; + } + + /** + * Gets the max_pd. + * + * @return the max_pd + */ + public int getMax_pd() { + return max_pd; + } + + /** + * Gets the max_qp_rd_atom. + * + * @return the max_qp_rd_atom + */ + public int getMax_qp_rd_atom() { + return max_qp_rd_atom; + } + + /** + * Gets the max_ee_rd_atom. + * + * @return the max_ee_rd_atom + */ + public int getMax_ee_rd_atom() { + return max_ee_rd_atom; + } + + /** + * Gets the max_res_rd_atom. + * + * @return the max_res_rd_atom + */ + public int getMax_res_rd_atom() { + return max_res_rd_atom; + } + + /** + * Gets the max_qp_init_rd_atom. + * + * @return the max_qp_init_rd_atom + */ + public int getMax_qp_init_rd_atom() { + return max_qp_init_rd_atom; + } + + + /** + * Gets the max_ee_init_rd_atom. + * + * @return the max_ee_init_rd_atom + */ + public int getMax_ee_init_rd_atom() { + return max_ee_init_rd_atom; + } + + /** + * Gets the atomic_cap. + * + * @return the atomic_cap + */ + public int getAtomic_cap() { + return atomic_cap; + } + + /** + * Gets the max_ee. + * + * @return the max_ee + */ + public int getMax_ee() { + return max_ee; + } + + /** + * Gets the max_rdd. + * + * @return the max_rdd + */ + public int getMax_rdd() { + return max_rdd; + } + + /** + * Gets the max_mw. + * + * @return the max_mw + */ + public int getMax_mw() { + return max_mw; + } + + /** + * Gets the max_raw_ipv6_qp. + * + * @return the max_raw_ipv6_qp + */ + public int getMax_raw_ipv6_qp() { + return max_raw_ipv6_qp; + } + + /** + * Gets the max_raw_ethy_qp. + * + * @return the max_raw_ethy_qp + */ + public int getMax_raw_ethy_qp() { + return max_raw_ethy_qp; + } + + /** + * Gets the max_mcast_grp. + * + * @return the max_mcast_grp + */ + public int getMax_mcast_grp() { + return max_mcast_grp; + } + + /** + * Gets the max_mcast_qp_attach. + * + * @return the max_mcast_qp_attach + */ + public int getMax_mcast_qp_attach() { + return max_mcast_qp_attach; + } + + /** + * Gets the max_total_mcast_qp_attach. + * + * @return the max_total_mcast_qp_attach + */ + public int getMax_total_mcast_qp_attach() { + return max_total_mcast_qp_attach; + } + + /** + * Gets the max_ah. + * + * @return the max_ah + */ + public int getMax_ah() { + return max_ah; + } + + /** + * Gets the max_fmr. + * + * @return the max_fmr + */ + public int getMax_fmr() { + return max_fmr; + } + + /** + * Gets the max_map_per_fmr. + * + * @return the max_map_per_fmr + */ + public int getMax_map_per_fmr() { + return max_map_per_fmr; + } + + /** + * Gets the max_srq. + * + * @return the max_srq + */ + public int getMax_srq() { + return max_srq; + } + + /** + * Gets the max_srq_wr. + * + * @return the max_srq_wr + */ + public int getMax_srq_wr() { + return max_srq_wr; + } + + /** + * Gets the max_srq_sge. + * + * @return the max_srq_sge + */ + public int getMax_srq_sge() { + return max_srq_sge; + } + + /** + * Gets the max_pkeys. + * + * @return the max_pkeys + */ + public short getMax_pkeys() { + return max_pkeys; + } + + /** + * Gets the local_ca_ack_delay. + * + * @return the local_ca_ack_delay + */ + public byte getLocal_ca_ack_delay() { + return local_ca_ack_delay; + } + + /** + * Gets the local_ca_ack_delay. + * + * @return the local_ca_ack_delay + */ + public byte getPhys_port_cnt() { + return phys_port_cnt; + } + + /** + * Gets the reserved. + * + * @return the reserved + */ + public int getReserved() { + return reserved; + } + + public void update(ByteBuffer buffer) { + buffer.get(this.fw_ver); + this.node_guid = buffer.getLong(); + this.sys_image_guid = buffer.getLong(); + this.max_mr_size = buffer.getLong(); + this.page_size_cap = buffer.getLong(); + this.vendor_id = buffer.getInt(); + this.vendor_part_id = buffer.getInt(); + this.hw_ver = buffer.getInt(); + this.max_qp = buffer.getInt(); + this.max_qp_wr = buffer.getInt(); + this.device_cap_flags = buffer.getInt(); + this.max_sge = buffer.getInt(); + this.max_sge_rd = buffer.getInt(); + this.max_cq = buffer.getInt(); + this.max_cqe = buffer.getInt(); + this.max_mr = buffer.getInt(); + this.max_pd = buffer.getInt(); + this.max_qp_rd_atom = buffer.getInt(); + this.max_ee_rd_atom = buffer.getInt(); + this.max_res_rd_atom = buffer.getInt(); + this.max_qp_init_rd_atom = buffer.getInt(); + this.max_ee_init_rd_atom = buffer.getInt(); + this.atomic_cap = buffer.getInt(); + this.max_ee = buffer.getInt(); + this.max_rdd = buffer.getInt(); + this.max_mw = buffer.getInt(); + this.max_raw_ipv6_qp = buffer.getInt(); + this.max_raw_ethy_qp = buffer.getInt(); + this.max_mcast_grp = buffer.getInt(); + this.max_mcast_qp_attach = buffer.getInt(); + this.max_total_mcast_qp_attach = buffer.getInt(); + this.max_ah = buffer.getInt(); + this.max_fmr = buffer.getInt(); + this.max_map_per_fmr =buffer.getInt(); + this.max_srq = buffer.getInt(); + this.max_srq_wr = buffer.getInt(); + this.max_srq_sge = buffer.getInt(); + this.max_pkeys = buffer.getShort(); + this.local_ca_ack_delay = buffer.get(); + this.phys_port_cnt = buffer.get(); + this.reserved = buffer.getInt(); + } + + public int size() { + return CSIZE; + } +} diff --git a/src/main/java/com/ibm/disni/verbs/IbvSendWR.java b/src/main/java/com/ibm/disni/verbs/IbvSendWR.java index ac66ef51..41e89512 100644 --- a/src/main/java/com/ibm/disni/verbs/IbvSendWR.java +++ b/src/main/java/com/ibm/disni/verbs/IbvSendWR.java @@ -234,7 +234,7 @@ public Rdma getRdma() { } /** - * Unsupported. + * * * @return the atomic */ @@ -322,7 +322,7 @@ public String getClassName() { } /** - * Unsupported. + * */ public static class Atomic { protected long remote_addr; diff --git a/src/main/java/com/ibm/disni/verbs/RdmaConnParam.java b/src/main/java/com/ibm/disni/verbs/RdmaConnParam.java index 90287d33..6feb8d2a 100644 --- a/src/main/java/com/ibm/disni/verbs/RdmaConnParam.java +++ b/src/main/java/com/ibm/disni/verbs/RdmaConnParam.java @@ -22,6 +22,8 @@ package com.ibm.disni.verbs; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; // TODO: Auto-generated Javadoc //struct rdma_conn_param { @@ -49,8 +51,11 @@ public class RdmaConnParam { protected byte retry_count; protected byte rnr_retry_count; protected byte srq; + protected byte reserved; protected int qp_num; + public static int CSIZE = 24; + public RdmaConnParam() { this.private_data_addr = 0; this.private_data_len = 0; @@ -60,6 +65,7 @@ public RdmaConnParam() { this.retry_count = 0; this.rnr_retry_count = 0; this.srq = 0; + this.reserved = 0; this.qp_num = 0; } @@ -111,7 +117,7 @@ public byte getResponder_resources() { * @param responder_resources the new responder resources. */ public void setResponder_resources(byte responder_resources) throws IOException { - throw new IOException("Operation currently not supported"); + this.responder_resources = responder_resources; } /** @@ -129,7 +135,7 @@ public byte getInitiator_depth() { * @param initiator_depth the new initiater depth. */ public void setInitiator_depth(byte initiator_depth) throws IOException { - throw new IOException("Operation currently not supported"); + this.initiator_depth = initiator_depth; } /** @@ -147,7 +153,7 @@ public byte getFlow_control() { * @param flow_control the new flow control. */ public void setFlow_control(byte flow_control) throws IOException { - throw new IOException("Operation currently not supported"); + this.flow_control = flow_control; } /** @@ -201,7 +207,7 @@ public byte getSrq() { * @param srq the new shared receive queue. */ public void setSrq(byte srq) throws IOException { - throw new IOException("Operation currently not supported"); + this.srq = srq; } /** @@ -219,6 +225,24 @@ public int getQp_num() { * @param qp_num the new qp_num */ public void setQp_num(int qp_num) throws IOException { - throw new IOException("Operation currently not supported"); + this.qp_num = qp_num; + } + + + public void writeBack(ByteBuffer buffer) { + buffer.putLong(private_data_addr); + buffer.put(private_data_len); + buffer.put(responder_resources); + buffer.put(initiator_depth); + buffer.put(flow_control); + buffer.put(retry_count); + buffer.put(rnr_retry_count); + buffer.put(srq); + buffer.put(reserved); + buffer.putInt(qp_num); + } + + public int size() { + return CSIZE; } } diff --git a/src/main/java/com/ibm/disni/verbs/RdmaVerbs.java b/src/main/java/com/ibm/disni/verbs/RdmaVerbs.java index 0a1fd8e4..f5a01907 100755 --- a/src/main/java/com/ibm/disni/verbs/RdmaVerbs.java +++ b/src/main/java/com/ibm/disni/verbs/RdmaVerbs.java @@ -125,6 +125,15 @@ public abstract IbvCQ createCQ(IbvContext context, */ public abstract int queryOdpSupport(IbvContext context) throws IOException; + /** + * Query the maximum number of incoming RDMA read and atomic operations that the local side can accept. + * + * @param context the device context. + * @return all device attributes + * @throws Exception on failure. + */ + public abstract IbvDeviceAttr queryDevice(IbvContext context) throws IOException; + /** * Prefetch part of a memory region. * Can be used only with MRs registered with IBV_EXP_ACCESS_ON_DEMAND diff --git a/src/main/java/com/ibm/disni/verbs/impl/NatIbvSendWR.java b/src/main/java/com/ibm/disni/verbs/impl/NatIbvSendWR.java index 8c16a5aa..1323848f 100644 --- a/src/main/java/com/ibm/disni/verbs/impl/NatIbvSendWR.java +++ b/src/main/java/com/ibm/disni/verbs/impl/NatIbvSendWR.java @@ -68,6 +68,7 @@ public class NatIbvSendWR extends IbvSendWR implements SendWRMod { public static int SENDFLAGS_OFFSET = 32; // public static int IMMDATA_OFFSET = 36; public static int REMOTEADDR_OFFSET = 40; + public static int ATOMIC_OFFSET = 40; public static int RKEY_OFFSET = 48; private NatPostSendCall postSendCall; @@ -75,10 +76,13 @@ public class NatIbvSendWR extends IbvSendWR implements SendWRMod { private long next; private long ptr_sge_list; private NatRdma natRdma; + private NatAtomic natAtomic; - public NatIbvSendWR(NatPostSendCall postSendCall, NatRdma natRdma, IbvSendWR sendWR, LinkedList sg_list) { - super(natRdma, null, null, sg_list); + public NatIbvSendWR(NatPostSendCall postSendCall, NatRdma natRdma, NatAtomic natAtomic, + IbvSendWR sendWR, LinkedList sg_list) { + super(natRdma, natAtomic, null, sg_list); this.natRdma = natRdma; + this.natAtomic = natAtomic; this.next = 0; this.ptr_sge_list = 0; @@ -102,9 +106,16 @@ public void writeBack(ByteBuffer buffer) { buffer.putInt(opcode); buffer.putInt(send_flags); buffer.putInt(imm_data); + + if(opcode >= IBV_WR_ATOMIC_CMP_AND_SWP){ + buffer.position(initialPos + NatIbvSendWR.ATOMIC_OFFSET); + natAtomic.writeBack(buffer); + } else { + buffer.position(initialPos + NatIbvSendWR.REMOTEADDR_OFFSET); + natRdma.writeBack(buffer); + } - buffer.position(initialPos + NatIbvSendWR.REMOTEADDR_OFFSET); - natRdma.writeBack(buffer); + int newPos = initialPos + CSIZE; buffer.position(newPos); } @@ -205,4 +216,63 @@ public int getBufPosition() { return bufPosition; } } + + public static class NatAtomic extends IbvSendWR.Atomic { + private NatPostSendCall postSendCall; + private int bufPosition; + + public NatAtomic(Atomic atomic, NatPostSendCall postSendCall){ + this.remote_addr = atomic.getRemote_addr(); + this.compare_add = atomic.getCompare_add(); + this.swap = atomic.getSwap(); + this.rkey = atomic.getRkey(); + this.reserved = atomic.getReserved(); + + this.postSendCall = postSendCall; + } + + @Override + public void setRemote_addr(long remote_addr) { + super.setRemote_addr(remote_addr); + postSendCall.setRemote_addr(this, 0); + } + + @Override + public void setCompare_add(long compare_add) { + super.setCompare_add(compare_add); + postSendCall.setCompare_add(this, 8); + } + + @Override + public void setSwap(long swap) { + super.setSwap(swap); + postSendCall.setSwap(this, 16); + } + + + @Override + public void setRkey(int rkey) { + super.setRkey(rkey); + postSendCall.setRkey(this, 24); + } + + @Override + public void setReserved(int reserved) { + super.setReserved(reserved); + postSendCall.setReserved(this, 28); + } + + public void writeBack(ByteBuffer buffer) { + this.bufPosition = buffer.position(); + buffer.putLong(getRemote_addr()); + buffer.putLong(getCompare_add()); + buffer.putLong(getSwap()); + buffer.putInt(getRkey()); + buffer.putInt(getReserved()); + } + + public int getBufPosition() { + return bufPosition; + } + } } diff --git a/src/main/java/com/ibm/disni/verbs/impl/NatPostSendCall.java b/src/main/java/com/ibm/disni/verbs/impl/NatPostSendCall.java index c2990df7..e9d21590 100644 --- a/src/main/java/com/ibm/disni/verbs/impl/NatPostSendCall.java +++ b/src/main/java/com/ibm/disni/verbs/impl/NatPostSendCall.java @@ -31,6 +31,7 @@ import com.ibm.disni.verbs.IbvSge; import com.ibm.disni.verbs.SVCPostSend; import com.ibm.disni.verbs.impl.NatIbvSendWR.NatRdma; +import com.ibm.disni.verbs.impl.NatIbvSendWR.NatAtomic; import com.ibm.disni.util.MemBuf; import com.ibm.disni.util.MemoryAllocation; @@ -82,7 +83,8 @@ private void setWrList(List wrList) { } NatRdma natRdma = new NatRdma(sendWR.getRdma(), this); - NatIbvSendWR natSendWR = new NatIbvSendWR(this, natRdma, sendWR, sg_list); + NatAtomic natAtomic = new NatAtomic(sendWR.getAtomic(), this); + NatIbvSendWR natSendWR = new NatIbvSendWR(this, natRdma, natAtomic, sendWR, sg_list); natSendWR.setPtr_sge_list(sgeOffset); natSendWR.setNext(wrOffset); wrNatList.add(natSendWR); @@ -156,6 +158,31 @@ public void setReserved(NatRdma rdma, int offset) { cmd.getBuffer().putInt(position, rdma.getReserved()); } + public void setCompare_add(NatAtomic atomic, int offset) { + int position = atomic.getBufPosition() + offset; + cmd.getBuffer().putLong(position, atomic.getCompare_add()); + } + + public void setSwap(NatAtomic atomic, int offset) { + int position = atomic.getBufPosition() + offset; + cmd.getBuffer().putLong(position, atomic.getSwap()); + } + + public void setRemote_addr(NatAtomic atomic, int offset) { + int position = atomic.getBufPosition() + offset; + cmd.getBuffer().putLong(position, atomic.getRemote_addr()); + } + + public void setRkey(NatAtomic atomic, int offset) { + int position = atomic.getBufPosition() + offset; + cmd.getBuffer().putInt(position, atomic.getRkey()); + } + + public void setReserved(NatAtomic atomic, int offset) { + int position = atomic.getBufPosition() + offset; + cmd.getBuffer().putInt(position, atomic.getReserved()); + } + public void setAddr(NatIbvSge sge, int offset) { int position = sge.getBufPosition() + offset; cmd.getBuffer().putLong(position, sge.getAddr()); diff --git a/src/main/java/com/ibm/disni/verbs/impl/NativeDispatcher.java b/src/main/java/com/ibm/disni/verbs/impl/NativeDispatcher.java index 5d61a5c8..d7e16348 100755 --- a/src/main/java/com/ibm/disni/verbs/impl/NativeDispatcher.java +++ b/src/main/java/com/ibm/disni/verbs/impl/NativeDispatcher.java @@ -99,7 +99,9 @@ public native long _createQP(long id, long pd, long sendcq, long recvcq, int qpt public native int _getCmEvent(long channel, long listenid, long clientid, int timeout); public native void _connect(long id, int retrycount, int rnrretrycount, long privdataaddr, byte privdatalen) throws IOException; + public native void _connectV2(long id, long conn_param) throws IOException; public native void _accept(long id, int retrycount, int rnrretrycount) throws IOException; + public native void _acceptV2(long id, long conn_param) throws IOException; public native int _ackCmEvent(int cmEvent); public native int _disconnect(long id); public native int _destroyEventChannel(long fd); @@ -116,6 +118,7 @@ public native void _connect(long id, int retrycount, int rnrretrycount, long pri public native int _modifyQP(long qp, long attr) throws IOException; public native long _regMr(long pd, long addr, int len, int access, long lkey, long rkey, long handle) throws IOException; public native int _queryOdpSupport(long context); + public native int _queryDevice(long context, long addr); public native int _expPrefetchMr(long handle, long addr, int len) throws IOException; public native void _deregMr(long handle) throws IOException; public native void _postSend(long qp, long wrList) throws IOException; diff --git a/src/main/java/com/ibm/disni/verbs/impl/RdmaCmNat.java b/src/main/java/com/ibm/disni/verbs/impl/RdmaCmNat.java index 67737b1f..21153b88 100644 --- a/src/main/java/com/ibm/disni/verbs/impl/RdmaCmNat.java +++ b/src/main/java/com/ibm/disni/verbs/impl/RdmaCmNat.java @@ -217,10 +217,14 @@ public void connect(RdmaCmId id, RdmaConnParam connParam) if (!idPriv.isOpen()) { throw new IOException("Trying to call connect() with closed ID"); } - nativeDispatcher._connect(idPriv.getObjId(), connParam.getRetry_count(), - connParam.getRnr_retry_count(), connParam.getPrivate_data(), connParam.getPrivate_data_len()); + + MemBuf memBuf = memAlloc.allocate(connParam.CSIZE); + connParam.writeBack(memBuf.getBuffer()); + + nativeDispatcher._connectV2(idPriv.getObjId(), memBuf.address()); logger.info("connect, id " + id.getPs()); - + + memBuf.free(); return; } @@ -231,9 +235,14 @@ public void accept(RdmaCmId id, RdmaConnParam connParam) if (!idPriv.isOpen()) { throw new IOException("Trying to call accept() with closed ID"); } - nativeDispatcher._accept(idPriv.getObjId(), connParam.getRetry_count(), connParam.getRnr_retry_count()); + + MemBuf memBuf = memAlloc.allocate(connParam.CSIZE); + connParam.writeBack(memBuf.getBuffer()); + + nativeDispatcher._acceptV2(idPriv.getObjId(), memBuf.address()); logger.info("accept, id " + id.getPs()); + memBuf.free(); return; } diff --git a/src/main/java/com/ibm/disni/verbs/impl/RdmaVerbsNat.java b/src/main/java/com/ibm/disni/verbs/impl/RdmaVerbsNat.java index 6c205a3f..c75a198d 100755 --- a/src/main/java/com/ibm/disni/verbs/impl/RdmaVerbsNat.java +++ b/src/main/java/com/ibm/disni/verbs/impl/RdmaVerbsNat.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; +import com.ibm.disni.verbs.IbvDeviceAttr; import com.ibm.disni.verbs.IbvCQ; import com.ibm.disni.verbs.IbvCompChannel; import com.ibm.disni.verbs.IbvContext; @@ -46,7 +47,7 @@ import com.ibm.disni.verbs.SVCReqNotify; import com.ibm.disni.util.DiSNILogger; import com.ibm.disni.util.MemoryAllocation; - +import com.ibm.disni.util.MemBuf; public class RdmaVerbsNat extends RdmaVerbs { private static final Logger logger = DiSNILogger.getLogger(); @@ -136,11 +137,24 @@ public SVCRegMr regMr(IbvPd pd, long address, int length, int access) { } - public int queryOdpSupport(IbvContext context){ + public int queryOdpSupport(IbvContext context) { NatIbvContext natContext = (NatIbvContext) context; return nativeDispatcher._queryOdpSupport(natContext.getObjId()); } + public IbvDeviceAttr queryDevice(IbvContext context) throws IOException{ + NatIbvContext natContext = (NatIbvContext) context; + MemBuf dstBuf = memAlloc.allocate(IbvDeviceAttr.CSIZE); + int ret = nativeDispatcher._queryDevice(natContext.getObjId(),dstBuf.address()); + IbvDeviceAttr deviceAttr = new IbvDeviceAttr(); + deviceAttr.update(dstBuf.getBuffer()); + dstBuf.free(); + if(ret == 0) { + return deviceAttr; + } + return null; + } + public int expPrefetchMr(IbvMr ibvMr, long address, int length) throws IOException { return nativeDispatcher._expPrefetchMr(((NatIbvMr)ibvMr).getObjId(), address, length); } diff --git a/src/test/java/com/ibm/disni/benchmarks/RDMAvsTcpBenchmarkClient.java b/src/test/java/com/ibm/disni/benchmarks/RDMAvsTcpBenchmarkClient.java index 4f1df397..5fb85c3c 100644 --- a/src/test/java/com/ibm/disni/benchmarks/RDMAvsTcpBenchmarkClient.java +++ b/src/test/java/com/ibm/disni/benchmarks/RDMAvsTcpBenchmarkClient.java @@ -76,10 +76,21 @@ public void runRDMA() throws Exception { //create a EndpointGroup. The RdmaActiveEndpointGroup contains CQ processing and delivers CQ event to the endpoint.dispatchCqEvent() method. endpointGroup = new RdmaActiveEndpointGroup(1000, false, 128, 4, 128); endpointGroup.init(this); + //we have passed our own endpoint factory to the group, therefore new endpoints will be of type CustomClientEndpoint //let's create a new client endpoint CustomClientEndpoint endpoint = endpointGroup.createEndpoint(); + IbvDeviceAttr deviceAttr = endpoint.getIdPriv().getVerbs().queryDevice(); + + int maxResponderResources = deviceAttr.getMax_qp_rd_atom(); + int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom(); + + RdmaConnParam connParam = new RdmaConnParam(); + connParam.setResponder_resources((byte) maxResponderResources); + connParam.setInitiator_depth((byte) maxInitiatorDepth); + endpointGroup.setConnParam(connParam); + //connect to the server endpoint.connect(rdmaAddress, 1000); System.out.println("RDMAvsTcpBenchmarkClient::client channel set up "); diff --git a/src/test/java/com/ibm/disni/benchmarks/ReadClient.java b/src/test/java/com/ibm/disni/benchmarks/ReadClient.java index 00ef34ba..7dcbefa2 100644 --- a/src/test/java/com/ibm/disni/benchmarks/ReadClient.java +++ b/src/test/java/com/ibm/disni/benchmarks/ReadClient.java @@ -57,10 +57,21 @@ public ReadClient.ReadClientEndpoint createEndpoint(RdmaCmId id, boolean serverS private void run() throws Exception { System.out.println("ReadClient, size " + size + ", loop " + loop); - + ReadClient.ReadClientEndpoint endpoint = group.createEndpoint(); InetAddress ipAddress = InetAddress.getByName(host); - InetSocketAddress address = new InetSocketAddress(ipAddress, port); + InetSocketAddress address = new InetSocketAddress(ipAddress, port); + + IbvDeviceAttr deviceAttr = endpoint.getIdPriv().getVerbs().queryDevice(); + + int maxResponderResources = deviceAttr.getMax_qp_rd_atom(); + int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom(); + + RdmaConnParam connParam = new RdmaConnParam(); + connParam.setResponder_resources((byte) maxResponderResources); + connParam.setInitiator_depth((byte) maxInitiatorDepth); + group.setConnParam(connParam); + endpoint.connect(address, 1000); System.out.println("ReadClient, client connected, address " + host + ", port " + port); diff --git a/src/test/java/com/ibm/disni/benchmarks/ReadServer.java b/src/test/java/com/ibm/disni/benchmarks/ReadServer.java index f6e95750..4e50c1e2 100644 --- a/src/test/java/com/ibm/disni/benchmarks/ReadServer.java +++ b/src/test/java/com/ibm/disni/benchmarks/ReadServer.java @@ -61,6 +61,17 @@ private void run() throws Exception { InetAddress ipAddress = InetAddress.getByName(host); InetSocketAddress address = new InetSocketAddress(ipAddress, port); serverEndpoint.bind(address, 10); + + IbvDeviceAttr deviceAttr = serverEndpoint.getIdPriv().getVerbs().queryDevice(); + + int maxResponderResources = deviceAttr.getMax_qp_rd_atom(); + int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom(); + + RdmaConnParam connParam = new RdmaConnParam(); + connParam.setResponder_resources((byte) maxResponderResources); + connParam.setInitiator_depth((byte) maxInitiatorDepth); + group.setConnParam(connParam); + ReadServer.ReadServerEndpoint endpoint = serverEndpoint.accept(); System.out.println("ReadServer, client connected, address " + address.toString()); diff --git a/src/test/java/com/ibm/disni/examples/AtomicClient.java b/src/test/java/com/ibm/disni/examples/AtomicClient.java new file mode 100755 index 00000000..35c28a2f --- /dev/null +++ b/src/test/java/com/ibm/disni/examples/AtomicClient.java @@ -0,0 +1,272 @@ +/* + * DiSNI: Direct Storage and Networking Interface + * + * Author: Konstantin Taranov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.ibm.disni.examples; + +import com.ibm.disni.CmdLineCommon; +import com.ibm.disni.verbs.*; +import org.apache.commons.cli.ParseException; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.LinkedList; + +public class AtomicClient { + private String ipAddress; + private int port; + + public void run() throws Exception { + System.out.println("AtomicClient::starting..."); + //open the CM and the verbs interfaces + + //create a communication channel for receiving CM events + RdmaEventChannel cmChannel = RdmaEventChannel.createEventChannel(); + if (cmChannel == null){ + System.out.println("AtomicClient::cmChannel null"); + return; + } + + //create a RdmaCmId for this client + RdmaCmId idPriv = cmChannel.createId(RdmaCm.RDMA_PS_TCP); + if (idPriv == null){ + System.out.println("AtomicClient::id null"); + return; + } + + //before connecting, we have to resolve addresses + InetAddress _dst = InetAddress.getByName(ipAddress); + InetSocketAddress dst = new InetSocketAddress(_dst, port); + idPriv.resolveAddr(null, dst, 2000); + + //resolve addr returns an event, we have to catch that event + RdmaCmEvent cmEvent = cmChannel.getCmEvent(-1); + if (cmEvent == null){ + System.out.println("AtomicClient::cmEvent null"); + return; + } else if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_ADDR_RESOLVED + .ordinal()) { + System.out.println("AtomicClient::wrong event received: " + cmEvent.getEvent()); + return; + } + cmEvent.ackEvent(); + + //we also have to resolve the route + idPriv.resolveRoute(2000); + //and catch that event too + cmEvent = cmChannel.getCmEvent(-1); + if (cmEvent == null){ + System.out.println("AtomicClient::cmEvent null"); + return; + } else if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_ROUTE_RESOLVED + .ordinal()) { + System.out.println("AtomicClient::wrong event received: " + cmEvent.getEvent()); + return; + } + cmEvent.ackEvent(); + + //let's create a device context + IbvContext context = idPriv.getVerbs(); + + //and a protection domain, we use that one later for registering memory + IbvPd pd = context.allocPd(); + if (pd == null){ + System.out.println("AtomicClient::pd null"); + return; + } + + //the comp channel is used for getting CQ events + IbvCompChannel compChannel = context.createCompChannel(); + if (compChannel == null){ + System.out.println("AtomicClient::compChannel null"); + return; + } + + //let's create a completion queue + IbvCQ cq = context.createCQ(compChannel, 50, 0); + if (cq == null){ + System.out.println("AtomicClient::cq null"); + return; + } + //and request to be notified for this queue + cq.reqNotification(false).execute().free(); + + //we prepare for the creation of a queue pair (QP) + IbvQPInitAttr attr = new IbvQPInitAttr(); + attr.cap().setMax_recv_sge(1); + attr.cap().setMax_recv_wr(10); + attr.cap().setMax_send_sge(1); + attr.cap().setMax_send_wr(10); + attr.setQp_type(IbvQP.IBV_QPT_RC); + attr.setRecv_cq(cq); + attr.setSend_cq(cq); + //let's create a queue pair + IbvQP qp = idPriv.createQP(pd, attr); + if (qp == null){ + System.out.println("AtomicClient::qp null"); + return; + } + + int buffercount = 3; + int buffersize = 100; + ByteBuffer buffers[] = new ByteBuffer[buffercount]; + IbvMr mrlist[] = new IbvMr[buffercount]; + int access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; + + //before we connect we also want to register some buffers + for (int i = 0; i < buffercount; i++){ + buffers[i] = ByteBuffer.allocateDirect(buffersize); + mrlist[i] = pd.regMr(buffers[i], access).execute().free().getMr(); + } + + ByteBuffer dataBuf = buffers[0]; + IbvMr dataMr = mrlist[0]; + IbvMr sendMr = mrlist[1]; + ByteBuffer recvBuf = buffers[2]; + IbvMr recvMr = mrlist[2]; + + LinkedList wrList_recv = new LinkedList(); + + IbvSge sgeRecv = new IbvSge(); + sgeRecv.setAddr(recvMr.getAddr()); + sgeRecv.setLength(recvMr.getLength()); + sgeRecv.setLkey(recvMr.getLkey()); + LinkedList sgeListRecv = new LinkedList(); + sgeListRecv.add(sgeRecv); + IbvRecvWR recvWR = new IbvRecvWR(); + recvWR.setSg_list(sgeListRecv); + recvWR.setWr_id(1000); + wrList_recv.add(recvWR); + + //it's important to post those receive operations before connecting + //otherwise the server may issue a send operation and which cannot be received + //this class wraps soem of the RDMA data operations + VerbsTools commRdma = new VerbsTools(context, compChannel, qp, cq); + commRdma.initSGRecv(wrList_recv); + + IbvDeviceAttr deviceAttr = context.queryDevice(); + + int maxResponderResources = deviceAttr.getMax_qp_rd_atom(); + int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom(); + + //now let's connect to the server + RdmaConnParam connParam = new RdmaConnParam(); + connParam.setRetry_count((byte) 2); + connParam.setResponder_resources((byte) maxResponderResources); + connParam.setInitiator_depth((byte) maxInitiatorDepth); + idPriv.connect(connParam); + + + //wait until we are really connected + cmEvent = cmChannel.getCmEvent(-1); + if (cmEvent == null){ + System.out.println("AtomicClient::cmEvent null"); + return; + } else if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_ESTABLISHED + .ordinal()) { + System.out.println("AtomicClient::wrong event received: " + cmEvent.getEvent()); + return; + } + cmEvent.ackEvent(); + + //let's wait for the first message to be received from the server + commRdma.completeSGRecv(wrList_recv, false); + + //here we go, it contains the RDMA information of a remote buffer + recvBuf.clear(); + long addr = recvBuf.getLong(); + int length = recvBuf.getInt(); + int lkey = recvBuf.getInt(); + recvBuf.clear(); + System.out.println("AtomicClient::receiving rdma information, addr " + addr + ", length " + length + ", key " + lkey); + System.out.println("AtomicClient::preparing atomic operation..."); + + dataBuf.order(ByteOrder.LITTLE_ENDIAN); + System.out.println("AtomicClient::initial value in the buffer: " + dataBuf.getLong()); + + //let's prepare a one-sided RDMA atomic operation to fetch the content of that remote buffer + LinkedList wrList_send = new LinkedList(); + IbvSge sgeSend = new IbvSge(); + sgeSend.setAddr(dataMr.getAddr()); + sgeSend.setLength(8); + sgeSend.setLkey(dataMr.getLkey()); + LinkedList sgeList = new LinkedList(); + sgeList.add(sgeSend); + IbvSendWR sendWR = new IbvSendWR(); + sendWR.setWr_id(1001); + sendWR.setSg_list(sgeList); + sendWR.setOpcode(IbvSendWR.IBV_WR_ATOMIC_FETCH_AND_ADD); + sendWR.setSend_flags(IbvSendWR.IBV_SEND_SIGNALED); + sendWR.getAtomic().setRemote_addr(addr); + sendWR.getAtomic().setRkey(lkey); + sendWR.getAtomic().setCompare_add(10); + wrList_send.add(sendWR); + + //now we post the operation, the RDMA/atomic operation will take off + //the wrapper class will also wait of the CQ event + //once the CQ event is received we know the RDMA/atomic operation has completed + //we should have the content of the remote buffer stored in our own local buffer + //let's print it + commRdma.send(buffers, wrList_send, true, false); + dataBuf.clear(); + System.out.println("AtomicClient::the remote server values has been incremented by 10"); + System.out.println("AtomicClient::the fetched value from the remote server: " + dataBuf.getLong()); + + //now we send a final message to signal everything went fine + sgeSend = new IbvSge(); + sgeSend.setAddr(sendMr.getAddr()); + sgeSend.setLength(sendMr.getLength()); + sgeSend.setLkey(sendMr.getLkey()); + sgeList.clear(); + sgeList.add(sgeSend); + sendWR = new IbvSendWR(); + sendWR.setWr_id(1002); + sendWR.setSg_list(sgeList); + sendWR.setOpcode(IbvSendWR.IBV_WR_SEND); + sendWR.setSend_flags(IbvSendWR.IBV_SEND_SIGNALED); + wrList_send.clear(); + wrList_send.add(sendWR); + + //let's post the final message + commRdma.send(buffers, wrList_send, true, false); + } + + + public void launch(String[] args) throws Exception { + CmdLineCommon cmdLine = new CmdLineCommon("AtomicClient"); + + try { + cmdLine.parse(args); + } catch (ParseException e) { + cmdLine.printHelp(); + System.exit(-1); + } + ipAddress = cmdLine.getIp(); + port = cmdLine.getPort(); + + this.run(); + } + + public static void main(String[] args) throws Exception { + AtomicClient AtomicClient = new AtomicClient(); + AtomicClient.launch(args); + } +} + diff --git a/src/test/java/com/ibm/disni/examples/AtomicServer.java b/src/test/java/com/ibm/disni/examples/AtomicServer.java new file mode 100755 index 00000000..9f40558f --- /dev/null +++ b/src/test/java/com/ibm/disni/examples/AtomicServer.java @@ -0,0 +1,248 @@ +/* + * DiSNI: Direct Storage and Networking Interface + * + * Author: Konstantin Taranov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.ibm.disni.examples; + +import com.ibm.disni.CmdLineCommon; +import com.ibm.disni.verbs.*; +import org.apache.commons.cli.ParseException; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.LinkedList; + +public class AtomicServer { + private String ipAddress; + private int port; + + public void run() throws Exception { + System.out.println("AtomicServer::starting..."); + + //create a communication channel for receiving CM events + RdmaEventChannel cmChannel = RdmaEventChannel.createEventChannel(); + if (cmChannel == null){ + System.out.println("AtomicServer::CM channel null"); + return; + } + + //create a RdmaCmId for the server + RdmaCmId idPriv = cmChannel.createId(RdmaCm.RDMA_PS_TCP); + if (idPriv == null){ + System.out.println("idPriv null"); + return; + } + + InetAddress _src = InetAddress.getByName(ipAddress); + InetSocketAddress src = new InetSocketAddress(_src, port); + idPriv.bindAddr(src); + + //listen on the id + idPriv.listen(10); + + //wait for new connect requests + RdmaCmEvent cmEvent = cmChannel.getCmEvent(-1); + if (cmEvent == null){ + System.out.println("cmEvent null"); + return; + } + else if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_CONNECT_REQUEST + .ordinal()) { + System.out.println("AtomicServer::wrong event received: " + cmEvent.getEvent()); + return; + } + //always acknowledge CM events + cmEvent.ackEvent(); + + //get the id of the newly connection + RdmaCmId connId = cmEvent.getConnIdPriv(); + if (connId == null){ + System.out.println("AtomicServer::connId null"); + return; + } + + //get the device context of the new connection, typically the same as with the server id + IbvContext context = connId.getVerbs(); + if (context == null){ + System.out.println("AtomicServer::context null"); + return; + } + + //create a new protection domain, we will use the pd later when registering memory + IbvPd pd = context.allocPd(); + if (pd == null){ + System.out.println("AtomicServer::pd null"); + return; + } + + //the comp channel is used to get CQ notifications + IbvCompChannel compChannel = context.createCompChannel(); + if (compChannel == null){ + System.out.println("AtomicServer::compChannel null"); + return; + } + + //create a completion queue + IbvCQ cq = context.createCQ(compChannel, 50, 0); + if (cq == null){ + System.out.println("AtomicServer::cq null"); + return; + } + //request to be notified on that CQ + cq.reqNotification(false).execute().free(); + + //prepare a new queue pair + IbvQPInitAttr attr = new IbvQPInitAttr(); + attr.cap().setMax_recv_sge(1); + attr.cap().setMax_recv_wr(10); + attr.cap().setMax_send_sge(1); + attr.cap().setMax_send_wr(10); + attr.setQp_type(IbvQP.IBV_QPT_RC); + attr.setRecv_cq(cq); + attr.setSend_cq(cq); + //create the queue pair for the client connection + IbvQP qp = connId.createQP(pd, attr); + if (qp == null){ + System.out.println("AtomicServer::qp null"); + return; + } + + int buffercount = 3; + int buffersize = 100; + ByteBuffer buffers[] = new ByteBuffer[buffercount]; + IbvMr mrlist[] = new IbvMr[buffercount]; + int access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ | IbvMr.IBV_ACCESS_REMOTE_ATOMIC; + + IbvDeviceAttr deviceAttr = context.queryDevice(); + + int maxResponderResources = deviceAttr.getMax_qp_rd_atom(); + int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom(); + + RdmaConnParam connParam = new RdmaConnParam(); + connParam.setRetry_count((byte) 2); + connParam.setResponder_resources((byte) maxResponderResources); + connParam.setInitiator_depth((byte) maxInitiatorDepth); + //once the client id is set up, accept the connection + connId.accept(connParam); + //wait until the connection is officially switched into established mode + cmEvent = cmChannel.getCmEvent(-1); + if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_ESTABLISHED + .ordinal()) { + System.out.println("AtomicServer::wrong event received: " + cmEvent.getEvent()); + return; + } + //always ack CM events + cmEvent.ackEvent(); + + //register some buffers to be used later + for (int i = 0; i < buffercount; i++){ + buffers[i] = ByteBuffer.allocateDirect(buffersize); + mrlist[i] = pd.regMr(buffers[i], access).execute().free().getMr(); + } + + ByteBuffer dataBuf = buffers[0]; + IbvMr dataMr = mrlist[0]; + ByteBuffer sendBuf = buffers[1]; + IbvMr sendMr = mrlist[1]; + IbvMr recvMr = mrlist[2]; + + dataBuf.order(ByteOrder.LITTLE_ENDIAN); + dataBuf.putLong(dataMr.getAddr()); + dataBuf.clear(); + + System.out.println("AtomicServer::content of buffer before: " + dataBuf.getLong()); + dataBuf.clear(); + + sendBuf.putLong(dataMr.getAddr()); + sendBuf.putInt(dataMr.getLength()); + sendBuf.putInt(dataMr.getLkey()); + sendBuf.clear(); + + //this class is a thin wrapper over some of the data operations in jverbs + //we use it to issue data transfer operations + VerbsTools commRdma = new VerbsTools(context, compChannel, qp, cq); + LinkedList wrList_send = new LinkedList(); + + //let's prepare some work requests for sending + IbvSge sgeSend = new IbvSge(); + sgeSend.setAddr(sendMr.getAddr()); + sgeSend.setLength(sendMr.getLength()); + sgeSend.setLkey(sendMr.getLkey()); + LinkedList sgeList = new LinkedList(); + sgeList.add(sgeSend); + IbvSendWR sendWR = new IbvSendWR(); + sendWR.setWr_id(2000); + sendWR.setSg_list(sgeList); + sendWR.setOpcode(IbvSendWR.IBV_WR_SEND); + sendWR.setSend_flags(IbvSendWR.IBV_SEND_SIGNALED); + wrList_send.add(sendWR); + + LinkedList wrList_recv = new LinkedList(); + + //let's prepare some work requests for receiving + IbvSge sgeRecv = new IbvSge(); + sgeRecv.setAddr(recvMr.getAddr()); + sgeRecv.setLength(recvMr.getLength()); + int lkey = recvMr.getLkey(); + sgeRecv.setLkey(lkey); + LinkedList sgeListRecv = new LinkedList(); + sgeListRecv.add(sgeRecv); + IbvRecvWR recvWR = new IbvRecvWR(); + recvWR.setSg_list(sgeListRecv); + recvWR.setWr_id(2001); + wrList_recv.add(recvWR); + + //post a receive call + commRdma.initSGRecv(wrList_recv); + System.out.println("AtomicServer::initiated recv, about to send stag info"); + //post a send call, here we send a message which include the RDMA information of a data buffer + commRdma.send(buffers, wrList_send, true, false); + System.out.println("AtomicServer::stag info sent"); + + //wait for the final message from the server + commRdma.completeSGRecv(wrList_recv, false); + + System.out.println("AtomicServer::content of buffer after: " + dataBuf.getLong()); + dataBuf.clear(); + + System.out.println("AtomicServer::done"); + } + + public void launch(String[] args) throws Exception { + CmdLineCommon cmdLine = new CmdLineCommon("AtomicServer"); + + try { + cmdLine.parse(args); + } catch (ParseException e) { + cmdLine.printHelp(); + System.exit(-1); + } + ipAddress = cmdLine.getIp(); + port = cmdLine.getPort(); + + this.run(); + } + + public static void main(String[] args) throws Exception { + AtomicServer AtomicServer = new AtomicServer(); + AtomicServer.launch(args); + } +} + diff --git a/src/test/java/com/ibm/disni/examples/ReadClient.java b/src/test/java/com/ibm/disni/examples/ReadClient.java index 7ca6804d..32847b8b 100644 --- a/src/test/java/com/ibm/disni/examples/ReadClient.java +++ b/src/test/java/com/ibm/disni/examples/ReadClient.java @@ -48,13 +48,25 @@ public void run() throws Exception { //create a EndpointGroup. The RdmaActiveEndpointGroup contains CQ processing and delivers CQ event to the endpoint.dispatchCqEvent() method. endpointGroup = new RdmaActiveEndpointGroup(1000, false, 128, 4, 128); endpointGroup.init(this); + //we have passed our own endpoint factory to the group, therefore new endpoints will be of type CustomClientEndpoint //let's create a new client endpoint ReadClient.CustomClientEndpoint endpoint = endpointGroup.createEndpoint(); //connect to the server InetAddress ipAddress = InetAddress.getByName(host); - InetSocketAddress address = new InetSocketAddress(ipAddress, port); + InetSocketAddress address = new InetSocketAddress(ipAddress, port); + + IbvDeviceAttr deviceAttr = endpoint.getIdPriv().getVerbs().queryDevice(); + + int maxResponderResources = deviceAttr.getMax_qp_rd_atom(); + int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom(); + + RdmaConnParam connParam = new RdmaConnParam(); + connParam.setResponder_resources((byte) maxResponderResources); + connParam.setInitiator_depth((byte) maxInitiatorDepth); + endpointGroup.setConnParam(connParam); + endpoint.connect(address, 1000); InetSocketAddress _addr = (InetSocketAddress) endpoint.getDstAddr(); System.out.println("ReadClient::client connected, address " + _addr.toString()); diff --git a/src/test/java/com/ibm/disni/examples/ReadServer.java b/src/test/java/com/ibm/disni/examples/ReadServer.java index 5da92048..b2fd490a 100644 --- a/src/test/java/com/ibm/disni/examples/ReadServer.java +++ b/src/test/java/com/ibm/disni/examples/ReadServer.java @@ -49,6 +49,7 @@ public void run() throws Exception { //create a EndpointGroup. The RdmaActiveEndpointGroup contains CQ processing and delivers CQ event to the endpoint.dispatchCqEvent() method. endpointGroup = new RdmaActiveEndpointGroup(1000, false, 128, 4, 128); endpointGroup.init(this); + //create a server endpoint RdmaServerEndpoint serverEndpoint = endpointGroup.createServerEndpoint(); @@ -58,6 +59,16 @@ public void run() throws Exception { serverEndpoint.bind(address, 10); System.out.println("ReadServer::server bound to address" + address.toString()); + IbvDeviceAttr deviceAttr = serverEndpoint.getIdPriv().getVerbs().queryDevice(); + + int maxResponderResources = deviceAttr.getMax_qp_rd_atom(); + int maxInitiatorDepth = deviceAttr.getMax_qp_init_rd_atom(); + + RdmaConnParam connParam = new RdmaConnParam(); + connParam.setResponder_resources((byte) maxResponderResources); + connParam.setInitiator_depth((byte) maxInitiatorDepth); + endpointGroup.setConnParam(connParam); + //we can accept new connections ReadServer.CustomServerEndpoint endpoint = serverEndpoint.accept(); System.out.println("ReadServer::connection accepted "); diff --git a/src/test/java/com/ibm/disni/examples/VerbsClient.java b/src/test/java/com/ibm/disni/examples/VerbsClient.java index 9bce8adc..6ee8f41c 100755 --- a/src/test/java/com/ibm/disni/examples/VerbsClient.java +++ b/src/test/java/com/ibm/disni/examples/VerbsClient.java @@ -165,9 +165,11 @@ public void run() throws Exception { //now let's connect to the server RdmaConnParam connParam = new RdmaConnParam(); connParam.setRetry_count((byte) 2); - idPriv.connect(connParam); - + connParam.setResponder_resources((byte) 1); + connParam.setInitiator_depth((byte) 1); + idPriv.connect(connParam); + //wait until we are really connected cmEvent = cmChannel.getCmEvent(-1); if (cmEvent == null){ diff --git a/src/test/java/com/ibm/disni/examples/VerbsServer.java b/src/test/java/com/ibm/disni/examples/VerbsServer.java index 6fe14693..1bfb80c3 100755 --- a/src/test/java/com/ibm/disni/examples/VerbsServer.java +++ b/src/test/java/com/ibm/disni/examples/VerbsServer.java @@ -131,8 +131,11 @@ else if (cmEvent.getEvent() != RdmaCmEvent.EventType.RDMA_CM_EVENT_CONNECT_REQUE IbvMr mrlist[] = new IbvMr[buffercount]; int access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; + RdmaConnParam connParam = new RdmaConnParam(); connParam.setRetry_count((byte) 2); + connParam.setResponder_resources((byte) 1); + connParam.setInitiator_depth((byte) 1); //once the client id is set up, accept the connection connId.accept(connParam); //wait until the connection is officially switched into established mode