Skip to content

Commit

Permalink
Notification interface added in mds
Browse files Browse the repository at this point in the history
Signed-off-by: Md Mahamudur Rahaman Sajib <[email protected]>
  • Loading branch information
sajibreadd committed Sep 26, 2024
1 parent 4dc4e36 commit b81b825
Show file tree
Hide file tree
Showing 22 changed files with 1,463 additions and 10 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ endif (WITH_RADOSGW)

#option for CephFS
option(WITH_CEPHFS "CephFS is enabled" ON)
option(WITH_CEPHFS_NOTIFICATION, "CephFS notification is disabled" OFF)

if(NOT WIN32)
# Please specify 3.x if you want to build with a certain version of python3.
Expand Down
33 changes: 33 additions & 0 deletions src/common/options/mds.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -1730,3 +1730,36 @@ options:
- mds
flags:
- runtime
- name: mds_allow_notification_secrets_in_cleartext
type: bool
level: advanced
desc: Allows sending secrets (e.g. passwords) over non encrypted HTTP messages.
long_desc: When notification endpoint require secrets (e.g. passwords),
we allow the topic creation. This parameter can be set to "true" to bypass
this check. Use this only if mds is on a trusted private network, and
the message broker cannot be configured without password authentication.
Otherwise, this will leak the credentials of your message broker and
compromise its security.
default: false
services:
- mds
- name: mds_kafka_sleep_timeout
type: uint
level: advanced
desc: Time in milliseconds to sleep while polling for kafka replies
long_desc: This will be used to prevent busy waiting for the kafka replies
As well as for the cases where the broker is down and we try to reconnect.
The same values times 3 will be used to sleep if there were no messages
sent or received across all kafka connections
default: 10
services:
- mds
- name: mds_kafka_message_timeout
type: uint
level: advanced
desc: This is the maximum time in milliseconds to deliver a message (including retries)
long_desc: Delivery error occurs when the message timeout is exceeded.
Value must be greater than zero, if set to zero, a value of 1 millisecond will be used.
default: 5000
services:
- mds
22 changes: 22 additions & 0 deletions src/include/ceph_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,28 @@ enum {
CEPH_MDS_OP_LOCK_PATH = 0x0150a,
};

#ifdef WITH_CEPHFS_NOTIFICATION
enum {
CEPH_MDS_NOTIFY_ACCESS = 0x0000000000000001,
CEPH_MDS_NOTIFY_ATTRIB = 0x0000000000000002,
CEPH_MDS_NOTIFY_CLOSE_WRITE = 0x0000000000000004,
CEPH_MDS_NOTIFY_CLOSE_NOWRITE = 0x0000000000000008,
CEPH_MDS_NOTIFY_CREATE = 0x0000000000000010,
CEPH_MDS_NOTIFY_DELETE = 0x0000000000000020,
CEPH_MDS_NOTIFY_DELETE_SELF = 0x0000000000000040,
CEPH_MDS_NOTIFY_MODIFY = 0x0000000000000080,
CEPH_MDS_NOTIFY_MOVE_SELF = 0x0000000000000100,
CEPH_MDS_NOTIFY_MOVED_FROM = 0x0000000000000200,
CEPH_MDS_NOTIFY_MOVED_TO = 0x0000000000000400,
CEPH_MDS_NOTIFY_OPEN = 0x0000000000000800,
CEPH_MDS_NOTIFY_CLOSE = 0x0000000000001000,
CEPH_MDS_NOTIFY_MOVE = 0x0000000000002000,
CEPH_MDS_NOTIFY_ONESHOT = 0x0000000000004000,
CEPH_MDS_NOTIFY_IGNORED = 0x0000000000008000,
CEPH_MDS_NOTIFY_ONLYDIR = 0x0000000000010000
};
#endif

#define IS_CEPH_MDS_OP_NEWINODE(op) (op == CEPH_MDS_OP_CREATE || \
op == CEPH_MDS_OP_MKNOD || \
op == CEPH_MDS_OP_MKDIR || \
Expand Down
3 changes: 3 additions & 0 deletions src/include/config-h.in.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@
/* define if cephfs enabled */
#cmakedefine WITH_CEPHFS

/* define if cephfs notification enabled */
#cmakedefine WITH_CEPHFS_NOTIFICATION

/* define if systemed is enabled */
#cmakedefine WITH_SYSTEMD

Expand Down
24 changes: 22 additions & 2 deletions src/mds/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@

if (WITH_CEPHFS_NOTIFICATION)
find_package(RDKafka 0.9.2 REQUIRED)
endif()

set(mds_srcs
BatchOp.cc
Capability.cc
Expand Down Expand Up @@ -49,8 +54,23 @@ set(mds_srcs
${CMAKE_SOURCE_DIR}/src/common/MemoryModel.cc
${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc
${CMAKE_SOURCE_DIR}/src/mgr/MDSPerfMetricTypes.cc)

if (WITH_CEPHFS_NOTIFICATION)
list(APPEND mds_srcs MDSKafka.cc MDSUDPEndpoint.cc MDSNotificationMessage.cc MDSNotificationManager.cc)
endif()

add_library(mds STATIC ${mds_srcs})
target_link_libraries(mds PRIVATE
legacy-option-headers Boost::url
heap_profiler cpu_profiler osdc ${LUA_LIBRARIES})
legacy-option-headers Boost::url RDKafka::RDKafka
heap_profiler cpu_profiler osdc ${LUA_LIBRARIES}
${Boost_LIBRARIES})

if (WITH_CEPHFS_NOTIFICATION)
target_link_libraries(mds PRIVATE RDKafka::RDKafka)
endif()

target_include_directories(mds PRIVATE "${LUA_INCLUDE_DIR}")

if (WITH_CEPHFS_NOTIFICATION)
target_include_directories(mds PRIVATE ${Boost_INCLUDE_DIRS})
endif()
25 changes: 25 additions & 0 deletions src/mds/MDSDaemon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,31 @@ void MDSDaemon::set_up_admin_socket()
asok_hook,
"run cpu profiling on daemon");
ceph_assert(r == 0);

#ifdef WITH_CEPHFS_NOTIFICATION
r = admin_socket->register_command(
"add_topic "
"name=topic_name,type=CephString,req=true "
"name=broker,type=CephString,req=true "
"name=use_ssl,type=CephBool,req=false "
"name=username,type=CephString,req=false "
"name=password,type=CephString,req=false "
"name=ca_location,type=CephString,req=false "
"name=mechanism,type=CephString,req=false",
asok_hook,
"add topic for notification"
);
ceph_assert(r == 0);
r = admin_socket->register_command(
"add_udp_endpoint "
"name=entity,type=CephString,req=true "
"name=ip,type=CephString,req=true "
"name=port,type=CephInt,req=true",
asok_hook,
"add udp endpoint for notification"
);
ceph_assert(r == 0);
#endif
}

void MDSDaemon::clean_up_admin_socket()
Expand Down
Loading

0 comments on commit b81b825

Please sign in to comment.