Screenplay: Sysprog: POSIX IPC

Virtual Memory, mmap()

  • First, read /etc/passwd using regular file IO

  • Discuss copy from/to kernel space

  • Basic mmap() demo: read /etc/passwd by creating a mapping and only using memory access.

#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <sys/mman.h>
#include <assert.h>


int main()
{
    int fd = open("/etc/passwd", O_RDONLY);
    if (fd == -1) {
        perror("open");
        return 1;
    }

    struct stat stat;
    int error = fstat(fd, &stat);
    if (error) {
        perror("fstat");
        return 1;
    }

    void* addr = mmap(NULL, stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
    if (addr == MAP_FAILED) {
        perror("mmap");
        return 1;
    }

    close(fd);

    ssize_t nwritten = write(STDOUT_FILENO, addr, stat.st_size);
    if (nwritten == -1) {
        perror("write");
        return 1;
    }
    assert(nwritten != 0);

    return 0;
}
  • Give explanation of mappings

  • Discuss copy -> whiteboard sketch

  • Show /proc/<pid>/maps

  • strace to see how address space is prepared

  • File mappings vs. anonymous

POSIX Shared Memory

Show how POSIX Shared Memory is nothing but mmap() with predefined filenames. Here we show three programs,

  • One creates a shared memory segment

  • One writes into that memory; produces

  • One reads from it; consumes

Note

Producer and consumer do not use any locking. If multiple producers are run, then this will lead to lost increments. (This is what semaphores are there for.)

Create Shared Memory Segment

  • Using shm_open() with O_CREAT

  • Ends up as a file in /dev/shm/ (a tmpfs instance)

  • Zero size after create.

  • Use ftruncate() to resize as appropriate

#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>


int main()
{
    int fd = shm_open("mein-shared-dings", O_CREAT|O_RDWR, 0666);
    if (fd == -1) {
        perror("shm_open");
        return 1;
    }

    int error = ftruncate(fd, 4096);
    if (error) {
        perror("ftruncate");
        return 1;
    }

    // alternatively, instead of using ftruncate(), we could drop one
    // byte at position 4095, creating a 4K weighing one byte, and
    // having a "hole" of 4095 bytes. but no.

    // off_t off = lseek(fd, 4095, SEEK_SET);
    // if (off == -1) {
    //     perror("lseek");
    //     return 1;
    // }
    // char c = 0;
    // ssize_t nwritten = write(fd, &c, 1);
    // if (nwritten == -1) {
    //     perror("write");
    //     return 1;
    // }

    return 0;
}

Produce into Shared Memory

  • Using MAP_SHARED (as opposed to MAP_PRIVATE which makes write operations copy-on-write)

  • Opens O_RDWR: incrementing a counter requires reading it first.

#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <iostream>

using std::cout;
using std::endl;


int main()
{
    int fd = shm_open("mein-shared-dings", O_RDWR, /*mode unused without O_CREAT*/0);
    if (fd == -1) {
        perror("shm_open");
        return 1;
    }

    void* addr = mmap(NULL, 4096, PROT_WRITE, MAP_SHARED, fd, 0);
    if (addr == MAP_FAILED) {
        perror("mmap");
        return 1;
    }

    uint64_t* counter_location = (uint64_t*)addr;

    uint64_t counter = 0;
    while (true) {
        cout << counter << endl;
        *counter_location = counter++;
        sleep(1);
    }

    return 0;
}

Consume from Shared Memory

#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <iostream>

using std::cout;
using std::endl;


int main()
{
    int fd = shm_open("mein-shared-dings", O_RDONLY, /*mode unused without O_CREAT*/0);
    if (fd == -1) {
        perror("shm_open");
        return 1;
    }

    void* addr = mmap(NULL, 4096, PROT_READ, MAP_SHARED, fd, 0);
    if (addr == MAP_FAILED) {
        perror("mmap");
        return 1;
    }

    uint64_t* counter_location = (uint64_t*)addr;

    while (true) {
        cout << *counter_location << endl;
        sleep(1);
    }

    return 0;
}

POSIX Semaphores

Analogous to the shared memory demo above, this is a scenario where two parties use post and wait() on a semaphore.

Create Semaphore

We create a semaphore with initial value 7 (7 wait operations without blocking).

#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <stdio.h>


int main()
{
    sem_t* sem = sem_open("meine-semaphore", O_CREAT|O_RDWR|O_EXCL, 0666, 7);
    if (sem == SEM_FAILED) {
        perror("sem_open");
        return 1;
    }

    return 0;
}

Wait

#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <stdio.h>


int main()
{
    sem_t* sem = sem_open("meine-semaphore", O_RDWR);
    if (sem == SEM_FAILED) {
        perror("sem_open");
        return 1;
    }

    int error = sem_wait(sem);
    if (error) {
        perror("sem_wait");
        return 1;
    }

    return 0;
}

Post

#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <stdio.h>


int main()
{
    sem_t* sem = sem_open("meine-semaphore", O_RDWR);
    if (sem == SEM_FAILED) {
        perror("sem_open");
        return 1;
    }

    int error = sem_post(sem);
    if (error) {
        perror("sem_post");
        return 1;
    }

    return 0;
}

POSIX Message Queues

Another three program scenario

  • Create

  • Produce

  • Consume

Create

  • Gotcha: names have to start with ‘/’

  • Think about messages. Common sense is to communicate flat C structs.

  • Measurement sample: (channel, timestamp, value). Think about sizes and datatypes. Alignment :-)

  • Create the queue

  • Mount the mqueue-fs, and look into it,

    # mkdir mqueue-mount
    # mount -t mqueue blah mqueue-mount
    
#include "msg.h"

#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <string.h>
#include <stdio.h>


int main()
{
    mq_attr attr;
    memset(&attr, 0, sizeof(attr));
    attr.mq_maxmsg = 10;
    attr.mq_msgsize = sizeof(msg);
    mqd_t q = mq_open("/meine-queue", O_CREAT|O_EXCL, 0666, &attr);
    if (q == -1) {
        perror("mq_open");
        return 1;
    }

    return 0;
}

Produce

Produce a number of items, and see what mqueue-fs says.

#include "msg.h"

#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <string.h>
#include <stdio.h>
#include <time.h>


int main()
{
    mqd_t q = mq_open("/meine-queue", O_WRONLY);
    if (q == -1) {
        perror("mq_open");
        return 1;
    }

    msg msg;
    msg.timestamp = time(NULL); // seconds since epoch, roughly
    msg.value = 42.0;
    msg.channel = 7;

    int error = mq_send(q, (const char*)&msg, sizeof(msg), /*prio*/0);
    if (error) {
        perror("mq_send");
        return 1;
    }

    return 0;
}

Consume

Show how a read blocks when queue is empty.

#include "msg.h"

#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#include <string.h>
#include <stdio.h>
#include <time.h>
#include <iostream>

using std::cout;
using std::endl;


int main()
{
    mqd_t q = mq_open("/meine-queue", O_RDONLY);
    if (q == -1) {
        perror("mq_open");
        return 1;
    }

    msg msg;
    unsigned int prio;
    ssize_t nread = mq_receive(q, (char*)&msg, sizeof(msg), &prio);
    if (nread == -1) {
        perror("mq_receive");
        return 1;
    }

    cout << "prio: " << prio << endl;
    cout << "channel: " << msg.channel << endl;
    cout << "timestamp: " << msg.timestamp << endl;
    cout << "value: " << msg.value << endl;

    return 0;
}