Skip to content

Commit

Permalink
add getdents
Browse files Browse the repository at this point in the history
  • Loading branch information
KoyamaSohei committed Sep 22, 2024
1 parent 487a0e5 commit 6ef7841
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 0 deletions.
102 changes: 102 additions & 0 deletions finchfsd/fs_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <sys/types.h>
#include <time.h>
#include <fcntl.h>
#include <dirent.h>
#include "finchfs.h"
#include "fs_types.h"
#include "fs_rpc.h"
Expand Down Expand Up @@ -1155,6 +1156,94 @@ fs_rpc_find_recv(void *arg, const void *header, size_t header_length,
return (status);
}

struct finchfs_dirent {
unsigned long d_ino;
unsigned long d_off;
unsigned short d_reclen;
char d_name[128];
char d_type;
};

ucs_status_t
fs_rpc_getdents_recv(void *arg, const void *header, size_t header_length,
void *data, size_t length,
const ucp_am_recv_param_t *param)
{
uint64_t eid = *(uint64_t *)data;
getdents_header_t *hdr = (getdents_header_t *)header;

log_debug("fs_rpc_getdents_recv() called");

contig_req_t *user_data = malloc(sizeof(contig_req_t));
user_data->header = malloc(header_length);
memcpy(user_data->header, header, header_length);
getdents_header_t *rhdr = (getdents_header_t *)user_data->header;
user_data->buf = malloc(hdr->count);

ucp_request_param_t rparam = {
.op_attr_mask = UCP_OP_ATTR_FIELD_DATATYPE |
UCP_OP_ATTR_FIELD_CALLBACK |
UCP_OP_ATTR_FIELD_USER_DATA,
.cb =
{
.send = fs_rpc_contig_reply_cb,
},
.datatype = ucp_dt_make_contig(sizeof(char)),
.user_data = user_data,
};

entry_t *dir = (entry_t *)eid;
entry_t *child = (hdr->pos == 0)
? entrytree_RB_MINMAX(&dir->entries, -1)
: ((entry_t *)hdr->pos);
rhdr->ret = FINCH_ENOENT;
rhdr->count = 0;
rhdr->pos = 0;
struct finchfs_dirent *ent = NULL;
for (; (child) != ((void *)0); (child) = entrytree_RB_NEXT(child)) {
if (S_ISDIR(child->mode) && ctx.rank != 0) {
continue;
}
ent = (struct finchfs_dirent *)(user_data->buf + rhdr->count);
if (rhdr->count + sizeof(struct finchfs_dirent) > hdr->count) {
rhdr->ret = FINCH_INPROGRESS;
rhdr->pos = (uint64_t)child;
break;
}
rhdr->ret = FINCH_OK;
rhdr->count += sizeof(struct finchfs_dirent);
ent->d_ino = child->i_ino;
ent->d_off = rhdr->count;
ent->d_reclen = sizeof(struct finchfs_dirent);
strcpy(ent->d_name, child->name);
ent->d_type = S_ISDIR(child->mode) ? DT_DIR : DT_REG;
}
if (ent != NULL) {
ent->d_off = 0;
}
log_debug("fs_rpc_getdents_recv() sending count=%d", rhdr->count);

ucs_status_ptr_t req = ucp_am_send_nbx(
param->reply_ep, RPC_GETDENTS_REP, rhdr, sizeof(*rhdr),
user_data->buf, rhdr->count == 0 ? 1 : rhdr->count, &rparam);

if (req == NULL) {
free(user_data->header);
free(user_data->buf);
free(user_data);
} else if (UCS_PTR_IS_ERR(req)) {
log_error("ucp_am_send_nbx() failed: %s",
ucs_status_string(UCS_PTR_STATUS(req)));
free(user_data->header);
free(user_data->buf);
free(user_data);
ucs_status_t status = UCS_PTR_STATUS(req);
ucp_request_free(req);
return (status);
}
return (UCS_OK);
}

int
fs_server_init(char *db_dir, size_t db_size, int rank, int nprocs, int lrank,
int lnprocs, int *shutdown)
Expand Down Expand Up @@ -1325,6 +1414,19 @@ fs_server_init(char *db_dir, size_t db_size, int rank, int nprocs, int lrank,
ucs_status_string(status));
return (-1);
}
ucp_am_handler_param_t getdents_param = {
.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID |
UCP_AM_HANDLER_PARAM_FIELD_ARG |
UCP_AM_HANDLER_PARAM_FIELD_CB,
.id = RPC_GETDENTS_REQ,
.cb = fs_rpc_getdents_recv,
};
if ((status = ucp_worker_set_am_recv_handler(
ctx.ucp_worker, &getdents_param)) != UCS_OK) {
log_error("ucp_worker_set_am_recv_handler(getdents) failed: %s",
ucs_status_string(status));
return (-1);
}

ctx.fs = fs_inode_init(db_dir, db_size, lrank);
return (0);
Expand Down
45 changes: 45 additions & 0 deletions lib/finchfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ static struct fd_table {
size_t size;
uint64_t i_ino;
uint64_t *eid;
struct {
int rank;
uint64_t pos;
} getdents_state;
} *fd_table;

#define IS_NULL_STRING(str) (str == NULL || str[0] == '\0')
Expand Down Expand Up @@ -153,6 +157,8 @@ finchfs_open(const char *path, int32_t flags)
fd_table[fd].path = NULL;
return (-1);
}
fd_table[fd].getdents_state.rank = 0;
fd_table[fd].getdents_state.pos = 0;
} else {
ret = fs_rpc_inode_stat(NULL, p, &st,
(((flags & O_TRUNC) != 0) << 3) +
Expand Down Expand Up @@ -723,3 +729,42 @@ finchfs_fstatat(int dirfd, const char *pathname, struct stat *st, int flags)
free(p);
return (0);
}

ssize_t
finchfs_getdents(int fd, void *dirp, size_t count)
{
log_debug("finchfs_getdents() called fd=%d", fd);
if (fd < 0 || fd >= fd_table_size || fd_table[fd].path == NULL) {
errno = EBADF;
return (-1);
}
if (!S_ISDIR(fd_table[fd].mode)) {
errno = ENOTDIR;
return (-1);
}
int ret;
size_t c = count;

while (fd_table[fd].getdents_state.rank < nvprocs) {
ret = fs_rpc_getdents(
fd_table[fd].getdents_state.rank, fd_table[fd].eid,
&fd_table[fd].getdents_state.pos, dirp, &c);
switch (ret) {
case FINCH_ENOENT:
fd_table[fd].getdents_state.rank++;
fd_table[fd].getdents_state.pos = 0;
c = count;
continue;
case FINCH_INPROGRESS:
return (c);
case FINCH_OK:
fd_table[fd].getdents_state.rank++;
fd_table[fd].getdents_state.pos = 0;
return (c);
default:
errno = -ret;
return (-1);
}
}
return (0);
}
1 change: 1 addition & 0 deletions lib/finchfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ int finchfs_createat_chunk_size(int dirfd, const char *pathname, int flags,
int finchfs_openat(int dirfd, const char *pathname, int flags);
int finchfs_fstatat(int dirfd, const char *pathname, struct stat *buf,
int flags);
ssize_t finchfs_getdents(int fd, void *dirp, size_t count);
93 changes: 93 additions & 0 deletions lib/fs_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,29 @@ fs_rpc_find_reply(void *arg, const void *header, size_t header_length,
return (UCS_OK);
}

typedef struct {
int ret;
void *buf;
getdents_header_t header;
uint64_t pos;
size_t count;
} getdents_handle_t;

ucs_status_t
fs_rpc_getdents_reply(void *arg, const void *header, size_t header_length,
void *data, size_t length,
const ucp_am_recv_param_t *param)
{
getdents_header_t *hdr = (getdents_header_t *)header;
getdents_handle_t *handle = hdr->handle;
log_debug("fs_rpc_getdents_reply: count=%zu", hdr->count);
memcpy(handle->buf, data, hdr->count);
handle->ret = hdr->ret;
handle->pos = hdr->pos;
handle->count = hdr->count;
return (UCS_OK);
}

static void
ep_err_cb(void *arg, ucp_ep_h ep, ucs_status_t status)
{
Expand Down Expand Up @@ -495,6 +518,21 @@ fs_client_init(char *addrfile, int *nvprocs)
ucs_status_string(status));
return (-1);
}
ucp_am_handler_param_t getdents_reply_param = {
.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID |
UCP_AM_HANDLER_PARAM_FIELD_ARG |
UCP_AM_HANDLER_PARAM_FIELD_CB,
.id = RPC_GETDENTS_REP,
.cb = fs_rpc_getdents_reply,
.arg = NULL,
};
if ((status = ucp_worker_set_am_recv_handler(
env.ucp_worker, &getdents_reply_param)) != UCS_OK) {
log_error("ucp_worker_set_am_recv_handler(getdents) failed: "
"%s",
ucs_status_string(status));
return (-1);
}
return (0);
}

Expand Down Expand Up @@ -1395,6 +1433,7 @@ fs_rpc_inode_open_dir(uint64_t *base, const char *path, uint64_t *eid,
log_debug("fs_rpc_inode_open_dir: ucp_am_send_nbx() succeeded");
}
free(reqs);
free(iov);
int **rets = malloc(sizeof(int *) * nokreq);
for (int i = 0; i < nokreq; i++) {
rets[i] = &handle[okidx[i]].ret;
Expand Down Expand Up @@ -1703,3 +1742,57 @@ fs_rpc_find(const char *path, const char *query,
free(handles);
return (r);
}

int
fs_rpc_getdents(int target, uint64_t *eid, uint64_t *pos, void *buf,
size_t *count)
{
ucp_dt_iov_t iov[1];
iov[0].buffer = &eid[target];
iov[0].length = sizeof(eid[target]);

getdents_handle_t handle = {.ret = FINCH_INPROGRESS,
.buf = buf,
.header = {
.count = *count,
.pos = *pos,
.ret = FINCH_INPROGRESS,
.handle = &handle,
}};
int *ret_addr = &handle.ret;

ucp_request_param_t rparam = {
.op_attr_mask =
UCP_OP_ATTR_FIELD_DATATYPE | UCP_OP_ATTR_FIELD_FLAGS,
.flags = UCP_AM_SEND_FLAG_EAGER | UCP_AM_SEND_FLAG_REPLY,
.datatype = UCP_DATATYPE_IOV,
};

ucs_status_ptr_t req;
req = ucp_am_send_nbx(env.ucp_eps[target], RPC_GETDENTS_REQ,
&handle.header, sizeof(handle.header), iov, 1,
&rparam);

ucs_status_t status;
while (!all_req_finish(&req, 1)) {
ucp_worker_progress(env.ucp_worker);
}
if (req != NULL) {
status = ucp_request_check_status(req);
if (status != UCS_OK) {
log_error(
"fs_rpc_getdents: ucp_am_send_nbx() failed: %s",
ucs_status_string(status));
return (-1);
}
ucp_request_free(req);
}

log_debug("fs_rpc_getdents: ucp_am_send_nbx() succeeded");
while (!all_ret_finish(&ret_addr, 1)) {
ucp_worker_progress(env.ucp_worker);
}
*pos = handle.pos;
*count = handle.count;
return (handle.ret);
}
4 changes: 4 additions & 0 deletions lib/fs_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#define RPC_READDIR_REP 0x10
#define RPC_FIND_REQ 0x11
#define RPC_FIND_REP 0x12
#define RPC_GETDENTS_REQ 0x13
#define RPC_GETDENTS_REP 0x14

int fs_rpc_mkdir(const char *path, mode_t mode);
int fs_rpc_inode_create(uint64_t *base, const char *path, uint8_t flags,
Expand All @@ -42,6 +44,8 @@ int fs_rpc_dir_move(const char *oldpath, const char *newpath);
int fs_rpc_find(const char *path, const char *query,
struct finchfs_find_param *param, void *arg,
void (*filler)(void *, const char *));
int fs_rpc_getdents(int target, uint64_t *eid, uint64_t *pos, void *buf,
size_t *count);

int fs_client_init(char *addrfile, int *nvprocs);
int fs_client_term(void);
Expand Down
7 changes: 7 additions & 0 deletions lib/fs_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ typedef struct {
char path[];
} find_entry_t;

typedef struct {
void *handle;
uint64_t pos;
size_t count;
int ret;
} getdents_header_t;

/* Number of 512B blocks */
#define NUM_BLOCKS(size) ((size + 511) / 512)

Expand Down
22 changes: 22 additions & 0 deletions tests/finchfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ucp/api/ucp.h>
#include <gtest/gtest.h>
#include <fcntl.h>
#include <dirent.h>
extern "C" {
#include <finchfs.h>
}
Expand Down Expand Up @@ -919,6 +920,27 @@ TEST(FinchfsTest, Fstatat)
EXPECT_EQ(finchfs_term(), 0);
}

struct test_dirent {
unsigned long d_ino;
unsigned long d_off;
unsigned short d_reclen;
char d_name[];
};

TEST(FinchfsTest, Getdents)
{
EXPECT_EQ(finchfs_init(NULL), 0);
int dirfd;
dirfd = finchfs_open("/createat", O_RDWR | O_DIRECTORY);
EXPECT_EQ(dirfd, 0);
char buf[2048];
EXPECT_EQ(finchfs_getdents(dirfd, buf, 2048), 152);
struct test_dirent *ent = (struct test_dirent *)buf;
EXPECT_STREQ(ent->d_name, "file1");
EXPECT_EQ(finchfs_close(dirfd), 0);
EXPECT_EQ(finchfs_term(), 0);
}

static int
path_to_target_hash(const char *path, int div)
{
Expand Down

0 comments on commit 6ef7841

Please sign in to comment.