8000 util/poll: `kqueue` support by iliasotnikov · Pull Request #11050 · ofiwg/libfabric · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

util/poll: kqueue support #11050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dnl Copyright (c) 2019-2020 Amazon.com, Inc. or its affiliates. All rights reser
dnl (C) Copyright 2020,2024 Hewlett Packard Enterprise Development LP
dnl Copyright (c) 2022 DataDirect Networks, Inc. All rights reserved.
dnl Copyright (c) 2023 Tactical Computing Labs, LLC. All rights reserved.
dnl Copyright (c) 2025 VDURA, Inc. All rights reserved.
dnl
dnl Process this file with autoconf to produce a configure script.

Expand Down Expand Up @@ -239,6 +240,20 @@ fi
AC_DEFINE_UNQUOTED([PT_LOCK_SPIN], [$have_spinlock],
[Define to 1 if pthread_spin_init is available.])

AC_ARG_ENABLE([kqueue],
[AS_HELP_STRING([--disable-kqueue],
[Disable kqueue if available@<:@default=no@:>@])],
[],
[enable_kqueue=auto]
)

AS_IF([test x"$enable_kqueue" != x"no"],
[AC_CHECK_FUNCS([kqueue])
if test "$ac_cv_func_e" = yes; then
AC_DEFINE([HAVE_KQUEUE], [1], [Define if you have kqueue support.])
fi]
)

AC_ARG_ENABLE([epoll],
[AS_HELP_STRING([--disable-epoll],
[Disable epoll if available@<:@default=no@:>@])],
Expand Down
274 changes: 215 additions & 59 deletions include/ofi_epoll.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2011-s2018 Intel Corporation. All rights reserved.
* Copyright (c) 2025 VDURA, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -43,87 +44,171 @@

#include <ofi_list.h>
#include <ofi_signal.h>
#include <ofi.h>

#ifdef HAVE_KQUEUE
#include <sys/event.h>
#include <sys/time.h>
#include <time.h>

#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#define ofi_epollfds_event epoll_event
#else
struct ofi_epollfds_event {
uint32_t events;
union {
void *ptr;
} data;
};
#endif
#define ofi_epollfds_event kevent
#define OFI_EPOLL_EVT_HAS_INPUT(evt) evt.filter == EVFILT_READ
#define OFI_EPOLL_EVT_HAS_OUTPUT(evt) evt.filter == EVFILT_WRITE
#define OFI_EPOLL_EVT_HAS_ERR(evt) evt.flags & EV_ERROR
#define OFI_EPOLL_EVT_DATA(evt) evt.udata
#define OFI_EPOLL_EVT_EVENTS(evt) evt.filter
Comment on lines +55 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These macros need to be defined for the else branch as well.

Copy link
Contributor Author
@iliasotnikov iliasotnikov May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks - for both epoll and poll those are defined in L50-56 - didn't want to introduce more duplication than necessary. Please do let me know if you'd like those explicitly defined in corresponding #ifdef branches to ease of understanding.

Copy link
Contributor
@j-xiong j-xiong May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would reorganize these definitions as:

#ifdef HAVE_KQUEUE
L62-66
#else
L51-55
#endif

updated the line number to match the code shown here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hint - I went further and reorganized ofi_epoll.h so there is only one single conditional around each HAVE_EPOLL and HAVE_KQUEUE, which hopefully would improve readability.


enum ofi_pollfds_ctl {
POLLFDS_CTL_ADD,
POLLFDS_CTL_DEL,
POLLFDS_CTL_MOD,
};
#define OFI_EPOLL_IN EVFILT_READ
#define OFI_EPOLL_OUT EVFILT_WRITE

struct ofi_pollfds_work_item {
int fd;
uint32_t events;
void *context;
enum ofi_pollfds_ctl op;
struct slist_entry entry;
};
typedef int ofi_epoll_t;
#define OFI_EPOLL_INVALID -1

struct ofi_pollfds_ctx {
void *context;
int index;
};
static const bool ofi_have_epoll = true;

struct ofi_pollfds {
int size;
int nfds;
struct pollfd *fds;
struct ofi_pollfds_ctx *ctx;
struct fd_signal signal;
struct slist work_item_list;
struct ofi_genlock lock;
static inline int ofi_epoll_create(int *ep)
{
*ep = kqueue();
return *ep < 0 ? -ofi_syserr() : 0;
}

int (*add)(struct ofi_pollfds *pfds, int fd, uint32_t events,
void *context);
int (*del)(struct ofi_pollfds *pfds, int fd);
};
static inline int
ofi_epoll_add(int ep, int fd, uint32_t events, void *context)
{
struct kevent kevents[2];
int kevent_cnt = 0;

/* Translate event bits into corresponding separate filters, since
* '.filter' values are not bitmasks
*/
if (events & POLLIN)
EV_SET(&kevents[kevent_cnt++], fd, EVFILT_READ, EV_ADD,
0, 0, context);

if (events & POLLOUT)
EV_SET(&kevents[kevent_cnt++], fd, EVFILT_WRITE, EV_ADD,
0, 0, context);

return kevent(ep, kevents, kevent_cnt, NULL, 0, NULL) < 0
? -ofi_syserr() : 0;
}

int ofi_pollfds_create_(struct ofi_pollfds **pfds, enum ofi_lock_type lock_type);
int ofi_pollfds_create(struct ofi_pollfds **pfds);
int ofi_pollfds_grow(struct ofi_pollfds *pfds, int max_size);
static inline int
ofi_epoll_mod(int ep, int fd, uint32_t events, void *context)
{
/* Ensure no filters not included into the events are present -
* contrary to 'epoll' the 'kqueue' stores '(ident, filter)' pairs
* separately, while 'epoll' has single entry per FD (since events are
* bitmask). As an illustrative example, 'epoll' semantics switching
* from POLLIN to POLLOUT would work just fine, while 'kqueue' would
* have excessive 'EVFILT_READ' still present.
*/
struct kevent kevents[2];
int kevent_cnt = 0;
int rc;

/* Remove filters not having corresponding events bit set */
if (!(events & POLLIN))
EV_SET(&kevents[kevent_cnt++], fd, EVFILT_READ, EV_DELETE,
0, 0, NULL);

if (!(events & POLLOUT))
EV_SET(&kevents[kevent_cnt++], fd, EVFILT_WRITE, EV_DELETE,
0, 0, NULL);

rc = kevent(ep, kevents, kevent_cnt, NULL, 0, NULL);
/* Ignore error if no filter to delete has been found */
if (rc < 0 && errno != ENOENT)
return -ofi_syserr();

/* Add requested events as filters */
return ofi_epoll_add(ep, fd, events, context);
}

static inline int
ofi_pollfds_add(struct ofi_pollfds *pfds, int fd, uint32_t events, void *context)
ofi_epoll_del(int ep, int fd)
{
return pfds->add(pfds, fd, events, context);
struct kevent event[2];
int rc;

EV_SET(&event[0], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&event[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);

rc = kevent(ep, event, 2, NULL, 0, NULL);

/* Ignore error if no filter to delete has been found */
if (rc < 0 && errno != ENOENT)
return -ofi_syserr();

return 0;
}

int ofi_pollfds_mod(struct ofi_pollfds *pfds, int fd, uint32_t events,
void *context);
static inline int
ofi_epoll_wait(int ep, struct ofi_epollfds_event *events,
int maxevents, int timeout)
{
int rc;
struct timespec ts, *tsp = NULL;

/* Convert milliseconds to 'struct timespec' */
do {
/* Timeout is processed only when input is not negative, the
* latter indicates polling request
*/
if (timeout < 0)
break;

tsp = &ts;
if (timeout == 0) {
ts.tv_sec = ts.tv_nsec = 0;
break;
}

/* Convert seconds from milliseconds */
ts.tv_sec = timeout / 1000;
/* Convert the remainder to nanoseconds */
ts.tv_nsec = (timeout % 1000) * 1000000;
} while (0);

/* Note that 'kevent()' could return multiple entries for different
* conditions over same FD - twice as much compared to 'epoll_wait()',
* since read and write readiness are of interest. Hence, returned
* number of events will be lower than for 'epoll_wait()', though it
* is assumed to be ok since upper layers will capture remaining events
* in the next function call.
*/
rc = kevent(ep, NULL, 0, events, maxevents, tsp);
if (rc < 0)
return -ofi_syserr();

static inline int ofi_pollfds_del(struct ofi_pollfds *pfds, int fd)
return rc;
}

static inline void ofi_epoll_close(int ep)
{
return pfds->del(pfds, fd);
close(ep);
}

int ofi_pollfds_wait(struct ofi_pollfds *pfds,
struct ofi_epollfds_event *events,
int maxevents, int timeout);
void ofi_pollfds_close(struct ofi_pollfds *pfds);
static inline int ofi_epoll_fd(int ep)
{
return ep;
}

/* OS specific */
struct ofi_pollfds_ctx *ofi_pollfds_get_ctx(struct ofi_pollfds *pfds, int fd);
struct ofi_pollfds_ctx *ofi_pollfds_alloc_ctx(struct ofi_pollfds *pfds, int fd);
#else /* HAVE_KQUEUE */

/* Both `epoll` and `poll` share same data structures */
#define OFI_EPOLL_EVT_HAS_INPUT(evt) evt.events & POLLIN
#define OFI_EPOLL_EVT_HAS_OUTPUT(evt) evt.events & POLLOUT
#define OFI_EPOLL_EVT_HAS_ERR(evt) evt.events & POLLERR
#define OFI_EPOLL_EVT_DATA(evt) evt.data.ptr
#define OFI_EPOLL_EVT_EVENTS(evt) evt.events

#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#define ofi_epollfds_event epoll_event

#define OFI_EPOLL_IN EPOLLIN
#define OFI_EPOLL_OUT EPOLLOUT
#define OFI_EPOLL_ERR EPOLLERR

typedef int ofi_epoll_t;
#define OFI_EPOLL_INVALID -1
Expand Down Expand Up @@ -187,11 +272,16 @@ static inline int ofi_epoll_fd(int ep)
return ep;
}

#else
#else /* HAVE_EPOLL */
struct ofi_epollfds_event {
uint32_t events;
union {
void *ptr;
} data;
};

#define OFI_EPOLL_IN POLLIN
#define OFI_EPOLL_OUT POLLOUT
#define OFI_EPOLL_ERR POLLERR

typedef struct ofi_pollfds *ofi_epoll_t;
#define OFI_EPOLL_INVALID NULL
Expand All @@ -214,7 +304,68 @@ static inline int ofi_epoll_fd(ofi_epoll_t ep)
#define EPOLL_CTL_DEL POLLFDS_CTL_DEL
#define EPOLL_CTL_MOD POLLFDS_CTL_MOD

#endif /* HAVE_EPOLL */
#endif /* HAVE_EPOLL */
#endif /* HAVE_KQUEUE */

enum ofi_pollfds_ctl {
POLLFDS_CTL_ADD,
POLLFDS_CTL_DEL,
POLLFDS_CTL_MOD,
};

struct ofi_pollfds_work_item {
int fd;
uint32_t events;
void *context;
enum ofi_pollfds_ctl op;
struct slist_entry entry;
};

struct ofi_pollfds_ctx {
void *context;
int index;
};

struct ofi_pollfds {
int size;
int nfds;
struct pollfd *fds;
struct ofi_pollfds_ctx *ctx;
struct fd_signal signal;
struct slist work_item_list;
struct ofi_genlock lock;

int (*add)(struct ofi_pollfds *pfds, int fd, uint32_t events,
void *context);
int (*del)(struct ofi_pollfds *pfds, int fd);
};

int ofi_pollfds_create_(struct ofi_pollfds **pfds, enum ofi_lock_type lock_type);
int ofi_pollfds_create(struct ofi_pollfds **pfds);
int ofi_pollfds_grow(struct ofi_pollfds *pfds, int max_size);

static inline int
ofi_pollfds_add(struct ofi_pollfds *pfds, int fd, uint32_t events, void *context)
{
return pfds->add(pfds, fd, events, context);
}

int ofi_pollfds_mod(struct ofi_pollfds *pfds, int fd, uint32_t events,
void *context);

static inline int ofi_pollfds_del(struct ofi_pollfds *pfds, int fd)
{
return pfds->del(pfds, fd);
}

int ofi_pollfds_wait(struct ofi_pollfds *pfds,
struct ofi_epollfds_event *events,
int maxevents, int timeout);
void ofi_pollfds_close(struct ofi_pollfds *pfds);

/* OS specific */
struct ofi_pollfds_ctx *ofi_pollfds_get_ctx(struct ofi_pollfds *pfds, int fd);
struct ofi_pollfds_ctx *ofi_pollfds_alloc_ctx(struct ofi_pollfds *pfds, int fd);

/* If we HAVE_EPOLL, the values for EPOLLIN and EPOLLOUT are the same as
* POLLIN and POLLOUT, at least in the gnu headers. If we don't have
Expand All @@ -224,6 +375,11 @@ static inline int ofi_epoll_fd(ofi_epoll_t ep)
* This use of this function helps make it clear that we're passing the
* correct event values to epoll, versus poll, without actually incurring
* the unnecessary overhead of converting the values.
*
* For HAVE_KQUEUE case the filter values in 'struct kevent.filter' are not
* done bitwise, hence 'struct pollfd' events being those cannot be directly
* translated for the 'kevent()', 'ofi_epoll_add()' function handles that
* semantic difference.
*/
static inline uint32_t ofi_poll_to_epoll(uint32_t events)
{
Expand Down
3 changes: 2 additions & 1 deletion prov/sockets/src/sock_conn.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Copyright (c) 2014 Intel Corporation, Inc. All rights reserved.
* Copyright (c) 2017 DataDirect Networks, Inc. All rights reserved.
* Copyright (c) VDURA, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -364,7 +365,7 @@ static void *sock_conn_listener_thread(void *arg)
}

for (i = 0; i < num_fds; i++) {
conn_handle = events[i].data.ptr;
conn_handle = OFI_EPOLL_EVT_DATA(events[i]);

if (conn_handle == NULL) { /* signal event */
fd_signal_reset(&conn_listener->signal);
Expand Down
3 changes: 2 additions & 1 deletion prov/sockets/src/sock_ep_msg.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Copyright (c) 2014 Intel Corporation, Inc. All rights reserved.
* Copyright (c) 2017 DataDirect Networks, Inc. All rights reserved.
* Copyright (c) 2025 VDURA, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -1199,7 +1200,7 @@ static void *sock_ep_cm_thread(void *arg)
goto skip;
}
for (i = 0; i < num_fds; i++) {
handle = events[i].data.ptr;
handle = OFI_EPOLL_EVT_DATA(events[i]);

if (handle == NULL) { /* Signal event */
fd_signal_reset(&cm_head->signal);
Expand Down
Loading
0