I/O Multiplexing#
The Paradigm#
Watch multiple file descriptors
Input
Output
Exceptional conditions
Block until at least one of these conditions is true - e.g. data can be read from a socket without blocking
Perform the desired action (read from socket)
Enter poll()
#
#include <poll.h>
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
timeout
in milliseconds (0 for “don’t block”, -1 for infinite)Original I/O multiplexing system call:
select()
(man -s 2 select)poll()
was added to POSIX later⟶ More resource (and developer) friendly
Possible Events#
From man -s 2 poll (reformatted and condensed):
|
There is data to read. |
|
Writing is now possible, though a write larger than the
available space in a socket or pipe will still block (unless
|
|
Some exceptional condition on the file descriptor, for example out-of-band data on a TCP socket (man -s 7 tcp). |
|
Stream socket peer closed connection, or shut down writing half of connection. |
|
Error condition ( |
|
Hang up ( |
|
Invalid request: fd not open ( |
Solution: Read From Two Input Sources#
Continuing from where we left …
Two input sources
Standard input
UDP socket
Use
poll()
to wait for both (notimeout
⟶ -1)Multiplex on return
#include "database.h"
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <poll.h>
#include <assert.h>
#include <regex>
int main()
{
Database db;
int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sock == -1) {
perror("socket");
return 1;
}
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(1234),
.sin_addr = INADDR_ANY,
};
int error = bind(sock, (struct sockaddr *)&addr, sizeof(addr));
if (error == -1) {
perror("bind");
return 1;
}
bool quit = false;
while (!quit) {
struct pollfd watches[] = {
{
.fd = STDIN_FILENO, // <-- watch STDIN_FILENO
.events = POLLIN, // for input
},
{
.fd = sock, // <-- watch UDP socket
.events = POLLIN, // for input
},
};
int nready = poll(watches, 2, -1); // <-- wait for either to become ready
if (nready == -1) { // (-1 ... no timeout)
perror("poll");
return 1;
}
assert(nready != 0); // <-- no timeout requested
std::vector<std::string> lines; // <-- possibly reading multiple lines in one swoop
if (watches[0].revents & POLLIN) { // <-- only if STDIN_FILENO has something
char line[64];
ssize_t nread = read(STDIN_FILENO, line, sizeof(line)-1);
if (nread == -1) {
perror("read(stdin)");
return 1;
}
else if (nread == 0) // <-- EOF on stdin quits us
quit = true;
else
lines.push_back(std::string(line, nread));
}
if (watches[1].revents & POLLIN) { // <-- only if USP socket has something
char line[64];
ssize_t nread = read(sock, line, sizeof(line)-1);
if (nread == -1) {
perror("read(udp)");
return 1;
}
assert(nread > 0); // <-- no EOF over UDP
lines.push_back(std::string(line, nread));
}
for (const auto& line: lines)
if (Record r = split_line(line))
db.insert(r);
else
std::println(stderr, "invalid line: \"{}\"", line);
}
db.commit();
return 0;
};
Afterword#
Of course this works for more than two sources
Many possible kinds of events
⟶ Code complexity ahead
⟶ best to encapsulate (see here for a naive C++ eventloop implementation)