「OS」HDU-OS-Lab3-Linux 进程管理(二)管道通信

实验三的知识点是进程通信,进程通信的方式多种多样,既包括锁机制、信号量机制在内的低级通信方式,低级在于其交换的信息量少且效率较低,又包括共享服务器、消息传递系统、管道通信以及客户 - 服务器系统通信在内的高级通信方式,本实验是实验三的第二部分,介绍了管道通信方式的基本原理以及具体实现。

源码地址:

https://github.com/leslievan/Operator_System/tree/master/Operator_System_Lab3/Operator_System_Exp3_2

基本介绍

管道是一个文件,用于连接两个进程以实现进程通信。管道是半双工的,即同一时间同一进程只能读取或者写入。管道又分为有名管道和无名管道两种,无名管道存在于高速缓存 cache 中,用于有亲缘关系的父子进程或兄弟进程之间的通信,有名管道存在于磁盘中,是看得见摸得着的真实文件,只要知道路径名就可以调用,所以它可以用于任意进程之间的通信。前面提到管道是一个文件,所以不论是有名管道还是无名管道,它们写入或读取的方式都是一样的 —— 使用 write 进行写入,使用 read 进行读取。

实验内容

实现一个管道通信程序

由父进程创建一个管道,然后再创建三个子进程,并由这三个子进程利用管道与父进程之间进行通信:子进程发送信息,父进程等三个子进程全部发完消息后再接收信息。通信的具体内容可根据自己的需要随意设计,要求能试验阻塞型读写过程中的各种情况,测试管道的默认大小,并且要求利用 Posix 信号量机制实现进程间对管道的互斥访问。运行程序,观察各种情况下,进程实际读写的字节数以及进程阻塞唤醒的情况。

根据实验要求可知,这里直接选用无名管道即可,实验要求有:

  • 试验阻塞型读写过程中的各种情况
  • 测试管道的默认大小
  • 利用 Posix 信号量机制实现进程间对管道的互斥访问

先放上能够实现这些功能的代码。

  1/*
  2 * @file        main.c
  3 * @author      Arcana
  4 * @date        2018.11.12
  5 * @brief       Children process communicate with parent by pipe.
  6 */
  7
  8#include "errno.h"
  9#include "fcntl.h"
 10#include "semaphore.h"
 11#include "stdio.h"
 12#include "stdlib.h"
 13#include "string.h"
 14#include "sys/ipc.h"
 15#include "sys/sem.h"
 16#include "sys/types.h"
 17#include "sys/wait.h"
 18#include "unistd.h"
 19
 20#define BUF_MAX_SIZE 8192
 21// 如果x为假,则报错,打印出错代码所在函数及行数
 22#define CHECK(x)                                            \
 23    do {                                                    \
 24        if (!(x)) {                                         \
 25            fprintf(stderr, "%s:%d: ", __func__, __LINE__); \
 26            perror(#x);                                     \
 27            exit(-1);                                       \
 28        }                                                   \
 29    } while (0)
 30
 31/**
 32 * Create three children processes to test pipe communication.
 33 * @param argc Argument count.
 34 * @param argv Argument vector.
 35 * @return status code.
 36 */
 37int main(int argc, char **argv) {
 38    int pipefd[2], pid, i = 0;
 39    int flag = 0;
 40    ssize_t n;
 41    char buf[BUF_MAX_SIZE];
 42    char str[BUF_MAX_SIZE];
 43
 44    // 创建有名信号量,若不存在则创建,若存在则直接打开,默认值为0
 45    sem_t *write_mutex;
 46    sem_t *read_mutex1;
 47    sem_t *read_mutex2;
 48    write_mutex = sem_open("pipe_test_wm", O_CREAT | O_RDWR, 0666, 0);
 49    read_mutex1 = sem_open("pipe_test_rm_1", O_CREAT | O_RDWR, 0666, 0);
 50    read_mutex2 = sem_open("pipe_test_rm_2", O_CREAT | O_RDWR, 0666, 0);
 51
 52    memset(buf, 0, BUF_MAX_SIZE);
 53    memset(str, 0, BUF_MAX_SIZE);
 54
 55    // 创建管道并检查操作是否成功
 56    CHECK(pipe(pipefd) >= 0);
 57
 58    // 创建第一个子进程并检查操作是否成功
 59    CHECK((pid = fork()) >= 0);
 60
 61    // 第一个子进程,利用非阻塞写测试管道大小
 62    if (pid == 0) {
 63        int count = 0;
 64        close(pipefd[0]);
 65        int flags = fcntl(pipefd[1], F_GETFL);
 66
 67        // 管道默认是阻塞写,通过`fcntl`设置成非阻塞写,在管道满无法继续写入时返回-EAGAIN,作为循环终止条件
 68        fcntl(pipefd[1], F_SETFL, flags | O_NONBLOCK);
 69    
 70        // 写入管道
 71        while (!flag) {
 72            n = write(pipefd[1], buf, BUF_MAX_SIZE);
 73            if (n == -1) {
 74                flag = 1;
 75            } else {
 76                count++;
 77                printf("children 1 write %dB\n", n);
 78            }
 79        }
 80        printf("space = %dKB\n", (count * BUF_MAX_SIZE) / 1024);
 81        exit(0);
 82    }
 83
 84    // 创建第二个子进程并检查操作是否成功
 85    CHECK((pid = fork()) >= 0);
 86    if (pid == 0) {
 87        sem_wait(write_mutex);
 88        close(pipefd[0]);
 89        n = write(pipefd[1], "This is the second children.\n", 29);
 90        printf("children 2 write %dB\n", n);
 91        sem_post(write_mutex);
 92        sem_post(read_mutex1);
 93        exit(0);
 94    }
 95
 96    // 创建第三个子进程并检查操作是否成功
 97    CHECK((pid = fork()) >= 0);
 98    if (pid == 0) {
 99        sem_wait(write_mutex);
100        close(pipefd[0]);
101        n = write(pipefd[1], "This is the third children.\n", 28);
102        printf("children 3 write %dB\n", n);
103        sem_post(write_mutex);
104        sem_post(read_mutex2);
105        exit(0);
106    }
107
108    // 等待第一个子进程运行完成,父进程继续运行
109    wait(0);
110    close(pipefd[1]);
111    int flags = fcntl(pipefd[0], F_GETFL);
112
113    // 设置非阻塞性读,作为循环结束标志
114    fcntl(pipefd[0], F_SETFL, flags | O_NONBLOCK);
115    while (!flag) {
116        n = read(pipefd[0], str, BUF_MAX_SIZE);
117        if (n == -1) {
118            flag = 1;
119        } else {
120            printf("%dB read\n", n);
121        }
122    }
123    sem_post(write_mutex);
124
125    // 等待子进程二、三写入完毕
126    sem_wait(read_mutex1);
127    sem_wait(read_mutex2);
128    n = read(pipefd[0], str, BUF_MAX_SIZE);
129    printf("%dB read\n", n);
130    for (i = 0; i < n; i++) {
131        printf("%c", str[i]);
132    }
133
134    sem_close(write_mutex);
135    sem_close(read_mutex1);
136    sem_close(read_mutex2);
137    sem_unlink("pipe_test_wm");
138    sem_unlink("pipe_test_rm_1");
139    sem_unlink("pipe_test_rm_2");
140    return 0;
141}

这里使用了三个信号量,分别是 write_mutexread_mutex1read_mutex2,简单分析一下子进程和父进程之间的关系可以明白:

  • 子进程一先将 64K 的数据写入管道,父进程才能第一时间将数据全部读取出来(来自一进程的数据)
  • 父进程将子进程一的数据读取之后,子进程二、三才能写入数据
  • 子进程二、三将数据写入后,父进程随后才能读取第二批数据(来自二、三进程的数据)

关系大致如下图所示:

img

子进程写入数据1父进程读取数据1 利用 wait(0) 限制了先后关系,父进程必须接收到子进程结束之后返回的 0,才能继续运行,否则阻塞。

write_mutex 限制了父进程先读取数据,然后子进程二、三写入数据,read_mutex1read_mutex2 分别限制了子进程二、三写入数据 2,3 和父进程读取数据 2,3 先后关系,只有子进程二、三均完成后,父进程才允许读取管道。

子进程一使用了非阻塞性写,子进程二、三均为阻塞性写,父进程为非阻塞性读。

非阻塞写和非阻塞读的目的在于,阻塞写时,管道满了之后进程被阻塞,无法设置终止条件从而结束写,读也是一样,管道空了之后进程被阻塞,无法设置终止条件从而结束读。

进一步解释程序,除去复制了 fork 之前的程序运行空间,子进程一运行了第一个 if 中的代码块:

 1if (pid == 0) {
 2    int count = 0;
 3    close(pipefd[0]);
 4    int flags = fcntl(pipefd[1], F_GETFL);
 5    fcntl(pipefd[1], F_SETFL, flags | O_NONBLOCK);
 6    while (!flag) {
 7        n = write(pipefd[1], buf, BUF_MAX_SIZE);
 8        if (n == -1) {
 9            flag = 1;
10        } else {
11            count++;
12            printf("children 1 write %dB\n", n);
13        }
14    }
15    printf("space = %dKB\n", (count * BUF_MAX_SIZE) / 1024);
16    exit(0);
17    }

每次写入 8K 数据,直到管道满无法继续写入,write 函数返回 -1,循环终止,计数并打印出总数据大小 —— 即管道容量。


第二个子进程则运行了第二个 if 后的代码块:

1if (pid == 0) {
2    sem_wait(write_mutex);
3    close(pipefd[0]);
4    n = write(pipefd[1], "This is the second children.\n", 29);
5    printf("children 2 write %dB\n", n);
6    sem_post(write_mutex);
7    sem_post(read_mutex1);
8    exit(0);
9}

写入 This is the second children.\n


第三个子进程则运行了第三个 if 后的代码块:

1if (pid == 0) {
2    sem_wait(write_mutex);
3    close(pipefd[0]);
4    n = write(pipefd[1], "This is the third children.\n", 28);
5    printf("children 3 write %dB\n", n);
6    sem_post(write_mutex);
7    sem_post(read_mutex2);
8    exit(0);
9}

写入 This is the third children.\n


父进程运行了:

 1{
 2    wait(0);
 3    close(pipefd[1]);
 4    int flags = fcntl(pipefd[0], F_GETFL);
 5    fcntl(pipefd[0], F_SETFL, flags | O_NONBLOCK);
 6    while (!flag) {
 7        n = read(pipefd[0], str, BUF_MAX_SIZE);
 8        if (n == -1) {
 9            flag = 1;
10        } else {
11            printf("%dB read\n", n);
12        }
13    }
14    sem_post(write_mutex);
15
16    sem_wait(read_mutex1);
17    sem_wait(read_mutex2);
18    n = read(pipefd[0], str, BUF_MAX_SIZE);
19    printf("%dB read\n", n);
20    for (i = 0; i < n; i++) {
21        printf("%c", str[i]);
22    }
23}

line 2line 14 用于读取子进程一写入的数据,否则子进程二、三无法继续写入,读空管道后结束循环,释放信号量,子进程二、三继续运行。

line 16line 22 用于读取子进程二、三写入的数据,并打印到终端上,BUF_MAX_SIZE 是想要读取的数据大小,read 返回了实际读取的数据大小。

实验结果

实验结果如下:

 1$ gcc Desktop/Untitled-1.c -o main -pthread
 2$ ./main
 3children 1 write 8192B
 4children 1 write 8192B
 5children 1 write 8192B
 6children 1 write 8192B
 7children 1 write 8192B
 8children 1 write 8192B
 9children 1 write 8192B
10children 1 write 8192B
11space = 64KB
128192B read
138192B read
148192B read
158192B read
168192B read
178192B read
188192B read
198192B read
20children 2 write 29B
21children 3 write 28B
2257B read
23This is the second children.
24This is the third children.

相关阅读