「OS」HDU-OS-Lab3-Linux进程管理(三)消息队列

实验三的知识点是进程通信,进程通信的方式多种多样,既包括锁机制、信号量机制在内的低级通信方式,低级在于其交换的信息量少且效率较低,又包括共享服务器、消息传递系统、管道通信以及客户-服务器系统通信在内的高级通信方式,本实验是实验三的第三个部分,介绍了利用消息队列通信机制实现两个线程间通信的方法。

源码地址:

https://github.com/leslievan/Operator_System/tree/master/Operator_System_Lab3/Operator_System_Exp3_3

基本介绍

消息缓冲队列通信机制是消息传递系统通信中直接通信方式的一种具体实现,基本思想为:通常由系统一管理一组用于通信的空闲消息缓冲区;某进程要发送消息时,首先在自己的地址空间中设置一个发送区,并把欲发送的消息填入其中形成消息,再申请一个消息缓冲区,把数据从发送区复制到消息缓冲区中,然后再把该消息缓冲区直接发送到接收进程的消息队列里;接收进程从自己的消息队列上取下消息缓冲区,并将其中的数据复制到自己的消息接收区中,最后释放消息缓冲区。

实验内容

利用Linux的消息队列通信机制实现两个线程间的通信

编写程序创建三个线程:sender1线程、sender2线程和receive线程,三个线程的功能描述如下:

  1. sender1线程:允许函数sender1(),它创建一个消息队列,然后等待用户通过终端输入一串字符,并将这串字符通过消息队列发送给receiver线程;可循环发送多个消息,直到用户输入“exit”为止,表示它不再发送消息,最后向receiver线程发送消息“end1”,并且等待 receiver的应答,等到应答消息后,接收到的应答信息显示在终端屏幕上,结束线程的运行。
  2. sender2线程:运行函数sender2(),共享sender1创建的消息队列,等待用户通过终端输入一串字符,并将这串字符通过消息队列发送给receiver线程;可循环发送多个消息,直到用户输入“exit”为止,表示它不再发送消息,最后向receiver线程发送消息“end2“,并且等待 receiver的应答,等到应答消息后将接收到的应答信息显示在终端屏幕上,结束线程的运行。
  3. Receiver线程:运行函数receive(),它通过消息队列接收来自sender1sender2两个线程的消息,将消息显示在终端屏幕上,当收到内容为“end1”的消息时,就向sender1发送一个应答消息“over1”;当收到内容为“end2”的消息时,就向sender2发送一个应答消息“over2”;消息接收完成后删除消息队列,结束线程的运行。选择合适的信号量机制实现三个线程之间的同步与互斥。

该实验主要难点在于消息队列的相关操作函数和对于三个线程之间的同步互斥关系。

消息队列

消息队列需要自定义一个消息缓冲区,这里设计一个只包含两个成员变量的结构体作为消息缓冲区:

1struct msg_st {
2    long int message_type;
3    char buffer[MAX_SIZE + 1];
4};

其中message_type为消息种类,buffer用来储存消息的数据段,最大可存储MAX_SIZE大小,+1操作为了给结尾留出\0

消息队列的相关操作有:

  1. int msgget(key_t key, int msgflg);获得消息队列的特征标识符。 函数需要两个参数,keymsgflgkey是该消息队列的全局唯一标识符,通过该标识符可以定位消息队列,对其进行相关操作。 msgflg为权限控制,决定对消息队列进行的操作,一般使用0666 | IPC_CREAT,熟悉文件系统的可以知道,0666表示文件所有者、同组用户和其他用户对于该消息队列的权限均为可读可写,后面对常量IPC_CREAT进行的位运算作用是“若该队列未被创建则创建它”。(对于该函数可以简单理解为创建消息队列)
  2. ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);消息队列的接收操作。 函数需要五个参数,msqidmsgpmsgszmsgtypmsgflgmsqid是函数msgget的返回值,用于表示对哪一个消息队列进行操作。 msgp是接收消息的指针,指向消息结构体msg_stmsgsz是接收消息的大小,这里可以看作结构体msg_st中数据段的大小。 msgtyp是接收消息的类别,函数可以接收指定类别的消息,默认为0,忽视类别,接收队首消息,正值和负值有不同含义,详情查看 附录 msgflg同函数msgget中的msgflg,这里可以直接使用0。 函数的返回值为实际接收到的消息字节数。
  3. int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);消息队列的发送操作。 函数需要四个参数,msqidmsgpmsgszmsgflg。 参数含义可参考函数msgrcv
  4. int msgctl(int msqid, int cmd, struct msqid_ds *buf);消息队列的控制操作。 函数需要三个参数,msqidcmdbufmsqid同上。 cmd对消息队列的操作进行选择,需要用到的是IPC_RMID,用于移除msqid指向的消息队列,详情查看 附录 buf为消息队列进行操作的参数,这里不需用到,详情请查看 附录

这部分书上应该更详细,可以看书。

 1// demo.c
 2
 3int mq;
 4struct msg_st buf;
 5ssize_t bytes_read, bytes_write;
 6int MAX_SIZE = 128;
 7
 8/* Create a queue / Get a queue 
 9 * int msgget(key_t key, int msgflg)
10 */
11mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
12
13/* Get message from queue, place the message to buf
14 * ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg)
15 */
16bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 0, 0);
17/* Get specific type message, such like 1 */
18bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 1, 0);
19
20/* Send message to queue with message in buf
21 * int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)
22 */
23bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 0, 0);
24/* Send specific type message, suck like 1 */
25bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 1, 0);
26
27/* Remove(Destroy) the queue
28 * int msgctl(int msqid, int cmd, struct msqid_ds *buf)
29 */
30msgctl(mq, IPC_RMID, &t);

同步互斥关系

senderreceiver之间的进程同步比较简单,临界资源为消息队列。有:

  1. receiver接收消息,sender发送消息,receiversender存在同步关系,使用full=0empty=1进行约束;
  2. sender之间存在互斥关系,两个发送线程不能同时工作 ,使用w_mutex=1进行约束;
  3. receiver等待发送进程结束后,返回应答,sender收到应答后进行输出,receiversender存在同步关系,使用over=0进行约束;
  4. 这里对于终端输出也进行了约束,使用rcv_dpsnd_dp进行约束,可以忽视这部分的处理,这一部分只是为了美观。

代码实现

发送线程

 1sender1void *sender1() {
 2    int mq;
 3    struct msg_st buf;
 4    ssize_t bytes_read;
 5
 6    /* open the mail queue */
 7    mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
 8    CHECK((key_t) -1 != mq);
 9
10    do {
11        P(w_mutex);		/* 进入互斥区 */
12        P(snd_dp);		/* 这部分只是为了美观 */
13        printf("sender1> ");
14        V(rcv_dp);
15        fflush(stdout);
16        
17        fgets(buf.buffer, BUFSIZ, stdin);	/* 接收终端输入,置入buf.buffer */
18        buf.message_type = snd_to_rcv1;		/* 设置消息类别,接收线程据此判断消息来源 */
19        
20        /* send the message */
21        P(empty);		/* 使用empty和full进行同步 */
22        CHECK(0 <= msgsnd(mq, (void*)&buf, MAX_SIZE, 0));	/* Check用于检测参数是否成立,不成立则报错,用于Debug,可忽视其实现,作为宏定义,定义在头文件中 */
23        V(full);
24        V(w_mutex);
25        /* 换行为了使线程随机切换,勿随意删除 */
26        printf("\n");
27     } while (strncmp(buf.buffer, MSG_STOP, strlen(MSG_STOP)));	/* Terminate when receive 'exit' */
28
29    /* wait for response */
30    P(over);
31    bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, rcv_to_snd1, 0);
32    CHECK(bytes_read >= 0);
33    printf("%s", buf.buffer);
34    printf("--------------------------------------------\n");
35    V(snd_dp);
36    pthread_exit(NULL);
37}

接收线程

 1receivervoid *receiver() {
 2    struct msg_st buf, over1, over2;
 3    int mq, must_stop = 2;
 4    struct msqid_ds t;
 5    
 6    /* 定义两个结束信号 */
 7    over1.message_type = 3;
 8    strcpy(over1.buffer, "over1\n");
 9    over2.message_type = 4;
10    strcpy(over2.buffer, "over2\n");
11
12    /* open the mail queue */
13    mq = msgget((key_t) QUEUE_ID, 0666 | IPC_CREAT);
14    CHECK((key_t) -1 != mq);
15
16    do {
17        ssize_t bytes_read, bytes_write;
18        /* receive the message */
19        P(full);
20        bytes_read = msgrcv(mq, (void *) &buf, MAX_SIZE, 0, 0);
21        V(empty);
22        
23        CHECK(bytes_read >= 0);
24        /* 接收到"exit"信号 */
25        if (!strncmp(buf.buffer, MSG_STOP, strlen(MSG_STOP))) {
26            if (buf.message_type == 1) {
27                /* 写入并发送退出消息 */
28                bytes_write = msgsnd(mq, (void *) &over1, MAX_SIZE, 0);
29                CHECK(bytes_write >= 0);
30                V(over);
31                must_stop--;
32            } else if (buf.message_type == 2) {
33                /* 写入并发送退出消息 */
34                bytes_write = msgsnd(mq, (void *) &over2, MAX_SIZE, 0);
35                CHECK(bytes_write >= 0);
36                V(over);
37                must_stop--;
38            }
39        } else {
40            /* 正常处理消息 */
41            P(rcv_dp);
42            printf("Received%d: %s", buf.message_type, buf.buffer);
43            printf("--------------------------------------------\n");
44            V(snd_dp);
45        }
46    } while (must_stop); /* Terminate when no thread */
47
48
49    /* cleanup */
50    P(snd_dp);
51    CHECK(!msgctl(mq, IPC_RMID, &t));
52    pthread_exit(NULL);
53}

实验结果

实验结果如下:

Result

附录

msgrcv

 1MSGRCV(3P)                   POSIX Programmer's Manual                   MSGRCV(3P)
 2
 3PROLOG
 4       This manual page is part of the POSIX Programmer's Manual.  The Linux imple‐
 5       mentation of this interface may differ (consult the corresponding Linux man‐
 6       ual  page for details of Linux behavior), or the interface may not be imple‐
 7       mented on Linux.
 8
 9NAME
10       msgrcv — XSI message receive operation
11
12SYNOPSIS
13       #include 
14
15       ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
16           int msgflg);
17
18DESCRIPTION
19       The msgrcv() function operates on XSI message queues (see the  Base  Defini‐
20       tions volume of POSIX.1‐2008, Section 3.225, Message Queue).  It is unspeci‐
21       fied whether this function interoperates with the realtime interprocess com‐
22       munication facilities defined in Section 2.8, Realtime.
23
24       The  msgrcv()  function  shall read a message from the queue associated with
25       the message queue identifier specified by msqid and place it in the user-de‐
26       fined buffer pointed to by msgp.
27
28       The application shall ensure that the argument msgp points to a user-defined
29       buffer that contains first a field of type long specifying the type  of  the
30       message,  and  then a data portion that holds the data bytes of the message.
31       The structure below is an example of what  this  user-defined  buffer  might
32       look like:
33
34           struct mymsg {
35               long    mtype;     /* Message type. */
36               char    mtext[1];  /* Message text. */
37           }
38
39       The  structure  member  mtype is the received message's type as specified by
40       the sending process.
41
42       The structure member mtext is the text of the message.
43
44       The argument msgsz specifies the size in bytes of mtext.  The received  mes‐
45       sage  shall  be truncated to msgsz bytes if it is larger than msgsz and (ms‐
46       gflg & MSG_NOERROR) is non-zero.  The truncated part of the message shall be
47       lost  and  no  indication  of  the  truncation shall be given to the calling
48       process.
49
50       If the value of msgsz is greater than {SSIZE_MAX}, the result is implementa‐
51       tion-defined.
52
53       The argument msgtyp specifies the type of message requested as follows:
54
55        *  If msgtyp is 0, the first message on the queue shall be received.
56
57        *  If  msgtyp  is greater than 0, the first message of type msgtyp shall be
58           received.
59
60        *  If msgtyp is less than 0, the first message of the lowest type  that  is
61           less than or equal to the absolute value of msgtyp shall be received.
62
63       The argument msgflg specifies the action to be taken if a message of the de‐
64       sired type is not on the queue. These are as follows:
65
66        *  If (msgflg & IPC_NOWAIT) is non-zero, the calling  thread  shall  return
67           immediately with a return value of −1 and errno set to [ENOMSG].
68
69        *  If  (msgflg  & IPC_NOWAIT) is 0, the calling thread shall suspend execu‐
70           tion until one of the following occurs:
71
72           --  A message of the desired type is placed on the queue.
73
74           --  The message queue identifier msqid is removed from the system;  when
75               this occurs, errno shall be set to [EIDRM] and −1 shall be returned.
76
77           --  The  calling  thread receives a signal that is to be caught; in this
78               case a message is not received and the calling thread resumes execu‐
79               tion in the manner prescribed in sigaction().
80
81       Upon  successful completion, the following actions are taken with respect to
82       the data structure associated with msqid:
83
84        *  msg_qnum shall be decremented by 1.
85
86        *  msg_lrpid shall be set to the process ID of the calling process.
87
88        *  msg_rtime shall be set to the current  time,  as  described  in  Section
89           2.7.1, IPC General Description.
90
91RETURN VALUE
92       Upon  successful completion, msgrcv() shall return a value equal to the num‐
93       ber of bytes actually placed into the buffer mtext.  Otherwise,  no  message
94       shall be received, msgrcv() shall return −1, and errno shall be set to indi‐
95       cate the error.

msgctl

 1MSGCTL(3P)                   POSIX Programmer's Manual                   MSGCTL(3P)
 2
 3PROLOG
 4       This manual page is part of the POSIX Programmer's Manual.  The Linux imple‐
 5       mentation of this interface may differ (consult the corresponding Linux man‐
 6       ual  page for details of Linux behavior), or the interface may not be imple‐
 7       mented on Linux.
 8
 9NAME
10       msgctl — XSI message control operations
11
12SYNOPSIS
13       #include 
14
15       int msgctl(int msqid, int cmd, struct msqid_ds *buf);
16
17DESCRIPTION
18       The msgctl() function operates on XSI message queues (see the  Base  Defini‐
19       tions volume of POSIX.1‐2008, Section 3.225, Message Queue).  It is unspeci‐
20       fied whether this function interoperates with the realtime interprocess com‐
21       munication facilities defined in Section 2.8, Realtime.
22
23       The  msgctl() function shall provide message control operations as specified
24       by cmd.  The following values for cmd, and the  message  control  operations
25       they specify, are:
26
27       IPC_STAT    Place  the  current  value  of  each member of the msqid_ds data
28                   structure associated with msqid into the structure pointed to by
29                   buf.  The contents of this structure are defined in .
30
31       IPC_SET     Set  the  value  of  the  following members of the msqid_ds data
32                   structure associated with msqid to the corresponding value found
33                   in the structure pointed to by buf:
34
35                       msg_perm.uid
36                       msg_perm.gid
37                       msg_perm.mode
38                       msg_qbytes
39
40                   Also,  the msg_ctime timestamp shall be set to the current time,
41                   as described in Section 2.7.1, IPC General Description.
42
43                   IPC_SET can only be executed by a process with appropriate priv‐
44                   ileges  or  that  has an effective user ID equal to the value of
45                   msg_perm.cuid or msg_perm.uid in the msqid_ds data structure as‐
46                   sociated with msqid.  Only a process with appropriate privileges
47                   can raise the value of msg_qbytes.
48
49       IPC_RMID    Remove the message queue identifier specified by msqid from  the
50                   system and destroy the message queue and msqid_ds data structure
51                   associated with it. IPC_RMD can only be executed  by  a  process
52                   with appropriate privileges or one that has an effective user ID
53                   equal to the value  of  msg_perm.cuid  or  msg_perm.uid  in  the
54                   msqid_ds data structure associated with msqid.
55
56RETURN VALUE
57       Upon successful completion, msgctl() shall return 0; otherwise, it shall re‐
58       turn −1 and set errno to indicate the error.