「OS」HDU-OS-Lab3-Linux进程管理(三)消息队列
实验三的知识点是进程通信,进程通信的方式多种多样,既包括锁机制、信号量机制在内的低级通信方式,低级在于其交换的信息量少且效率较低,又包括共享服务器、消息传递系统、管道通信以及客户-服务器系统通信在内的高级通信方式,本实验是实验三的第三个部分,介绍了利用消息队列通信机制实现两个线程间通信的方法。
源码地址:
https://github.com/leslievan/Operator_System/tree/master/Operator_System_Lab3/Operator_System_Exp3_3
基本介绍
消息缓冲队列通信机制是消息传递系统通信中直接通信方式的一种具体实现,基本思想为:通常由系统一管理一组用于通信的空闲消息缓冲区;某进程要发送消息时,首先在自己的地址空间中设置一个发送区,并把欲发送的消息填入其中形成消息,再申请一个消息缓冲区,把数据从发送区复制到消息缓冲区中,然后再把该消息缓冲区直接发送到接收进程的消息队列里;接收进程从自己的消息队列上取下消息缓冲区,并将其中的数据复制到自己的消息接收区中,最后释放消息缓冲区。
实验内容
利用Linux的消息队列通信机制实现两个线程间的通信
编写程序创建三个线程:
sender1
线程、sender2
线程和receive
线程,三个线程的功能描述如下:
sender1
线程:允许函数sender1()
,它创建一个消息队列,然后等待用户通过终端输入一串字符,并将这串字符通过消息队列发送给receiver
线程;可循环发送多个消息,直到用户输入“exit”为止,表示它不再发送消息,最后向receiver
线程发送消息“end1”,并且等待 receiver的应答,等到应答消息后,接收到的应答信息显示在终端屏幕上,结束线程的运行。sender2
线程:运行函数sender2()
,共享sender1
创建的消息队列,等待用户通过终端输入一串字符,并将这串字符通过消息队列发送给receiver
线程;可循环发送多个消息,直到用户输入“exit”为止,表示它不再发送消息,最后向receiver
线程发送消息“end2“,并且等待receiver
的应答,等到应答消息后将接收到的应答信息显示在终端屏幕上,结束线程的运行。Receiver
线程:运行函数receive()
,它通过消息队列接收来自sender1
和sender2
两个线程的消息,将消息显示在终端屏幕上,当收到内容为“end1”的消息时,就向sender1
发送一个应答消息“over1”;当收到内容为“end2”的消息时,就向sender2
发送一个应答消息“over2”;消息接收完成后删除消息队列,结束线程的运行。选择合适的信号量机制实现三个线程之间的同步与互斥。
该实验主要难点在于消息队列的相关操作函数和对于三个线程之间的同步互斥关系。
消息队列
消息队列需要自定义一个消息缓冲区,这里设计一个只包含两个成员变量的结构体作为消息缓冲区:
1struct msg_st {
2 long int message_type;
3 char buffer[MAX_SIZE + 1];
4};
其中message_type
为消息种类,buffer
用来储存消息的数据段,最大可存储MAX_SIZE
大小,+1
操作为了给结尾留出\0
。
消息队列的相关操作有:
int msgget(key_t key, int msgflg);
获得消息队列的特征标识符。 函数需要两个参数,key
和msgflg
。key
是该消息队列的全局唯一标识符,通过该标识符可以定位消息队列,对其进行相关操作。msgflg
为权限控制,决定对消息队列进行的操作,一般使用0666 | IPC_CREAT
,熟悉文件系统的可以知道,0666
表示文件所有者、同组用户和其他用户对于该消息队列的权限均为可读可写,后面对常量IPC_CREAT
进行的位运算作用是“若该队列未被创建则创建它”。(对于该函数可以简单理解为创建消息队列)ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
消息队列的接收操作。 函数需要五个参数,msqid
,msgp
,msgsz
,msgtyp
和msgflg
。msqid
是函数msgget
的返回值,用于表示对哪一个消息队列进行操作。msgp
是接收消息的指针,指向消息结构体msg_st
。msgsz
是接收消息的大小,这里可以看作结构体msg_st
中数据段的大小。msgtyp
是接收消息的类别,函数可以接收指定类别的消息,默认为0,忽视类别,接收队首消息,正值和负值有不同含义,详情查看 附录 。msgflg
同函数msgget
中的msgflg
,这里可以直接使用0
。 函数的返回值为实际接收到的消息字节数。int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
消息队列的发送操作。 函数需要四个参数,msqid
,msgp
,msgsz
和msgflg
。 参数含义可参考函数msgrcv
。int msgctl(int msqid, int cmd, struct msqid_ds *buf);
消息队列的控制操作。 函数需要三个参数,msqid
,cmd
和buf
。msqid
同上。cmd
对消息队列的操作进行选择,需要用到的是IPC_RMID
,用于移除msqid
指向的消息队列,详情查看 附录 。buf
为消息队列进行操作的参数,这里不需用到,详情请查看 附录 。
这部分书上应该更详细,可以看书。
1// demo.c
2
3int mq;
4struct msg_st buf;
5ssize_t bytes_read, bytes_write;
6int MAX_SIZE = 128;
7
8/* Create a queue / Get a queue
9 * int msgget(key_t key, int msgflg)
10 */
11mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
12
13/* Get message from queue, place the message to buf
14 * ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg)
15 */
16bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 0, 0);
17/* Get specific type message, such like 1 */
18bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 1, 0);
19
20/* Send message to queue with message in buf
21 * int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)
22 */
23bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 0, 0);
24/* Send specific type message, suck like 1 */
25bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 1, 0);
26
27/* Remove(Destroy) the queue
28 * int msgctl(int msqid, int cmd, struct msqid_ds *buf)
29 */
30msgctl(mq, IPC_RMID, &t);
同步互斥关系
sender
和receiver
之间的进程同步比较简单,临界资源为消息队列。有:
receiver
接收消息,sender
发送消息,receiver
和sender
存在同步关系,使用full=0
和empty=1
进行约束;sender
之间存在互斥关系,两个发送线程不能同时工作 ,使用w_mutex=1
进行约束;receiver
等待发送进程结束后,返回应答,sender
收到应答后进行输出,receiver
和sender
存在同步关系,使用over=0
进行约束;- 这里对于终端输出也进行了约束,使用
rcv_dp
和snd_dp
进行约束,可以忽视这部分的处理,这一部分只是为了美观。
代码实现
发送线程
1sender1void *sender1() {
2 int mq;
3 struct msg_st buf;
4 ssize_t bytes_read;
5
6 /* open the mail queue */
7 mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
8 CHECK((key_t) -1 != mq);
9
10 do {
11 P(w_mutex); /* 进入互斥区 */
12 P(snd_dp); /* 这部分只是为了美观 */
13 printf("sender1> ");
14 V(rcv_dp);
15 fflush(stdout);
16
17 fgets(buf.buffer, BUFSIZ, stdin); /* 接收终端输入,置入buf.buffer */
18 buf.message_type = snd_to_rcv1; /* 设置消息类别,接收线程据此判断消息来源 */
19
20 /* send the message */
21 P(empty); /* 使用empty和full进行同步 */
22 CHECK(0 <= msgsnd(mq, (void*)&buf, MAX_SIZE, 0)); /* Check用于检测参数是否成立,不成立则报错,用于Debug,可忽视其实现,作为宏定义,定义在头文件中 */
23 V(full);
24 V(w_mutex);
25 /* 换行为了使线程随机切换,勿随意删除 */
26 printf("\n");
27 } while (strncmp(buf.buffer, MSG_STOP, strlen(MSG_STOP))); /* Terminate when receive 'exit' */
28
29 /* wait for response */
30 P(over);
31 bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, rcv_to_snd1, 0);
32 CHECK(bytes_read >= 0);
33 printf("%s", buf.buffer);
34 printf("--------------------------------------------\n");
35 V(snd_dp);
36 pthread_exit(NULL);
37}
接收线程
1receivervoid *receiver() {
2 struct msg_st buf, over1, over2;
3 int mq, must_stop = 2;
4 struct msqid_ds t;
5
6 /* 定义两个结束信号 */
7 over1.message_type = 3;
8 strcpy(over1.buffer, "over1\n");
9 over2.message_type = 4;
10 strcpy(over2.buffer, "over2\n");
11
12 /* open the mail queue */
13 mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
14 CHECK((key_t) -1 != mq);
15
16 do {
17 ssize_t bytes_read, bytes_write;
18 /* receive the message */
19 P(full);
20 bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 0, 0);
21 V(empty);
22
23 CHECK(bytes_read >= 0);
24 /* 接收到"exit"信号 */
25 if (!strncmp(buf.buffer, MSG_STOP, strlen(MSG_STOP))) {
26 if (buf.message_type == 1) {
27 /* 写入并发送退出消息 */
28 bytes_write = msgsnd(mq, (void *) &over1, MAX_SIZE, 0);
29 CHECK(bytes_write >= 0);
30 V(over);
31 must_stop--;
32 } else if (buf.message_type == 2) {
33 /* 写入并发送退出消息 */
34 bytes_write = msgsnd(mq, (void *) &over2, MAX_SIZE, 0);
35 CHECK(bytes_write >= 0);
36 V(over);
37 must_stop--;
38 }
39 } else {
40 /* 正常处理消息 */
41 P(rcv_dp);
42 printf("Received%d: %s", buf.message_type, buf.buffer);
43 printf("--------------------------------------------\n");
44 V(snd_dp);
45 }
46 } while (must_stop); /* Terminate when no thread */
47
48
49 /* cleanup */
50 P(snd_dp);
51 CHECK(!msgctl(mq, IPC_RMID, &t));
52 pthread_exit(NULL);
53}
实验结果
实验结果如下:
附录
msgrcv
1MSGRCV(3P) POSIX Programmer's Manual MSGRCV(3P)
2
3PROLOG
4 This manual page is part of the POSIX Programmer's Manual. The Linux imple‐
5 mentation of this interface may differ (consult the corresponding Linux man‐
6 ual page for details of Linux behavior), or the interface may not be imple‐
7 mented on Linux.
8
9NAME
10 msgrcv — XSI message receive operation
11
12SYNOPSIS
13 #include
14
15 ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
16 int msgflg);
17
18DESCRIPTION
19 The msgrcv() function operates on XSI message queues (see the Base Defini‐
20 tions volume of POSIX.1‐2008, Section 3.225, Message Queue). It is unspeci‐
21 fied whether this function interoperates with the realtime interprocess com‐
22 munication facilities defined in Section 2.8, Realtime.
23
24 The msgrcv() function shall read a message from the queue associated with
25 the message queue identifier specified by msqid and place it in the user-de‐
26 fined buffer pointed to by msgp.
27
28 The application shall ensure that the argument msgp points to a user-defined
29 buffer that contains first a field of type long specifying the type of the
30 message, and then a data portion that holds the data bytes of the message.
31 The structure below is an example of what this user-defined buffer might
32 look like:
33
34 struct mymsg {
35 long mtype; /* Message type. */
36 char mtext[1]; /* Message text. */
37 }
38
39 The structure member mtype is the received message's type as specified by
40 the sending process.
41
42 The structure member mtext is the text of the message.
43
44 The argument msgsz specifies the size in bytes of mtext. The received mes‐
45 sage shall be truncated to msgsz bytes if it is larger than msgsz and (ms‐
46 gflg & MSG_NOERROR) is non-zero. The truncated part of the message shall be
47 lost and no indication of the truncation shall be given to the calling
48 process.
49
50 If the value of msgsz is greater than {SSIZE_MAX}, the result is implementa‐
51 tion-defined.
52
53 The argument msgtyp specifies the type of message requested as follows:
54
55 * If msgtyp is 0, the first message on the queue shall be received.
56
57 * If msgtyp is greater than 0, the first message of type msgtyp shall be
58 received.
59
60 * If msgtyp is less than 0, the first message of the lowest type that is
61 less than or equal to the absolute value of msgtyp shall be received.
62
63 The argument msgflg specifies the action to be taken if a message of the de‐
64 sired type is not on the queue. These are as follows:
65
66 * If (msgflg & IPC_NOWAIT) is non-zero, the calling thread shall return
67 immediately with a return value of −1 and errno set to [ENOMSG].
68
69 * If (msgflg & IPC_NOWAIT) is 0, the calling thread shall suspend execu‐
70 tion until one of the following occurs:
71
72 -- A message of the desired type is placed on the queue.
73
74 -- The message queue identifier msqid is removed from the system; when
75 this occurs, errno shall be set to [EIDRM] and −1 shall be returned.
76
77 -- The calling thread receives a signal that is to be caught; in this
78 case a message is not received and the calling thread resumes execu‐
79 tion in the manner prescribed in sigaction().
80
81 Upon successful completion, the following actions are taken with respect to
82 the data structure associated with msqid:
83
84 * msg_qnum shall be decremented by 1.
85
86 * msg_lrpid shall be set to the process ID of the calling process.
87
88 * msg_rtime shall be set to the current time, as described in Section
89 2.7.1, IPC General Description.
90
91RETURN VALUE
92 Upon successful completion, msgrcv() shall return a value equal to the num‐
93 ber of bytes actually placed into the buffer mtext. Otherwise, no message
94 shall be received, msgrcv() shall return −1, and errno shall be set to indi‐
95 cate the error.
msgctl
1MSGCTL(3P) POSIX Programmer's Manual MSGCTL(3P)
2
3PROLOG
4 This manual page is part of the POSIX Programmer's Manual. The Linux imple‐
5 mentation of this interface may differ (consult the corresponding Linux man‐
6 ual page for details of Linux behavior), or the interface may not be imple‐
7 mented on Linux.
8
9NAME
10 msgctl — XSI message control operations
11
12SYNOPSIS
13 #include
14
15 int msgctl(int msqid, int cmd, struct msqid_ds *buf);
16
17DESCRIPTION
18 The msgctl() function operates on XSI message queues (see the Base Defini‐
19 tions volume of POSIX.1‐2008, Section 3.225, Message Queue). It is unspeci‐
20 fied whether this function interoperates with the realtime interprocess com‐
21 munication facilities defined in Section 2.8, Realtime.
22
23 The msgctl() function shall provide message control operations as specified
24 by cmd. The following values for cmd, and the message control operations
25 they specify, are:
26
27 IPC_STAT Place the current value of each member of the msqid_ds data
28 structure associated with msqid into the structure pointed to by
29 buf. The contents of this structure are defined in .
30
31 IPC_SET Set the value of the following members of the msqid_ds data
32 structure associated with msqid to the corresponding value found
33 in the structure pointed to by buf:
34
35 msg_perm.uid
36 msg_perm.gid
37 msg_perm.mode
38 msg_qbytes
39
40 Also, the msg_ctime timestamp shall be set to the current time,
41 as described in Section 2.7.1, IPC General Description.
42
43 IPC_SET can only be executed by a process with appropriate priv‐
44 ileges or that has an effective user ID equal to the value of
45 msg_perm.cuid or msg_perm.uid in the msqid_ds data structure as‐
46 sociated with msqid. Only a process with appropriate privileges
47 can raise the value of msg_qbytes.
48
49 IPC_RMID Remove the message queue identifier specified by msqid from the
50 system and destroy the message queue and msqid_ds data structure
51 associated with it. IPC_RMD can only be executed by a process
52 with appropriate privileges or one that has an effective user ID
53 equal to the value of msg_perm.cuid or msg_perm.uid in the
54 msqid_ds data structure associated with msqid.
55
56RETURN VALUE
57 Upon successful completion, msgctl() shall return 0; otherwise, it shall re‐
58 turn −1 and set errno to indicate the error.