I/O Multiplexing#

The Paradigm#

  • Watch multiple file descriptors

    • Input

    • Output

    • Exceptional conditions

  • Block until at least one of these conditions is true - e.g. data can be read from a socket without blocking

  • Perform the desired action (read from socket)

../../../../../../../_images/paradigm.svg

Enter poll()#

#include <poll.h>
struct pollfd {
    int   fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • timeout in milliseconds (0 for “don’t block”, -1 for infinite)

  • Original I/O multiplexing system call: select() (man -s 2 select)

  • poll() was added to POSIX later

  • ⟶ More resource (and developer) friendly

Possible Events#

From man -s 2 poll (reformatted and condensed):

POLLIN

There is data to read.

POLLOUT

Writing is now possible, though a write larger than the available space in a socket or pipe will still block (unless O_NONBLOCK is set).

POLLPRI

Some exceptional condition on the file descriptor, for example out-of-band data on a TCP socket (man -s 7 tcp).

POLLRDHUP

Stream socket peer closed connection, or shut down writing half of connection.

POLLERR

Error condition (revents only). For example watching the write-end of a pipe for input.

POLLHUP

Hang up (revents only). Note that when reading from a channel such as a pipe or a stream socket, this event merely indicates that the peer closed its end of the channel. Subsequent reads from the channel will return 0 (end of file) only after all outstanding data in the channel has been consumed.

POLLNVAL

Invalid request: fd not open (revents only).

Solution: Read From Two Input Sources#

Continuing from where we left

  • Two input sources

    • Standard input

    • UDP socket

  • Use poll() to wait for both (no timeout ⟶ -1)

  • Multiplex on return

#include "database.h"

#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <poll.h>
#include <assert.h>
#include <regex>

int main()
{
    Database db;

    int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
    if (sock == -1) {
        perror("socket");
        return 1;
    }
    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(1234),
        .sin_addr = INADDR_ANY,
    };
    int error = bind(sock, (struct sockaddr *)&addr, sizeof(addr));
    if (error == -1) {
        perror("bind");
        return 1;
    }

    bool quit = false;
    while (!quit) {
        struct pollfd watches[] = {
            {
                .fd = STDIN_FILENO,                    // <-- watch STDIN_FILENO
                .events = POLLIN,                      //     for input
            },
            {
                .fd = sock,                            // <-- watch UDP socket
                .events = POLLIN,                      //     for input
            },
        };

        int nready = poll(watches, 2, -1);             // <-- wait for either to become ready
        if (nready == -1) {                            //     (-1 ... no timeout)
            perror("poll");
            return 1;
        }
        assert(nready != 0);                           // <-- no timeout requested
        
        std::vector<std::string> lines;                // <-- possibly reading multiple lines in one swoop

        if (watches[0].revents & POLLIN) {             // <-- only if STDIN_FILENO has something
            char line[64];
            ssize_t nread = read(STDIN_FILENO, line, sizeof(line)-1);
            if (nread == -1) {
                perror("read(stdin)");
                return 1;
            }
            else if (nread == 0)                       // <-- EOF on stdin quits us
                quit = true;
            else 
                lines.push_back(std::string(line, nread));
        }
        if (watches[1].revents & POLLIN) {             // <-- only if USP socket has something
            char line[64];
            ssize_t nread = read(sock, line, sizeof(line)-1);
            if (nread == -1) {
                perror("read(udp)");
                return 1;
            }
            assert(nread > 0);                         // <-- no EOF over UDP
            lines.push_back(std::string(line, nread));
        }

        for (const auto& line: lines)
            if (Record r = split_line(line))
                db.insert(r);
            else
                std::println(stderr, "invalid line: \"{}\"", line);
    }

    db.commit();
    return 0;
};

Afterword#

  • Of course this works for more than two sources

  • Many possible kinds of events

  • ⟶ Code complexity ahead

  • ⟶ best to encapsulate (see here for a naive C++ eventloop implementation)