侧边栏壁纸
  • 累计撰写 32 篇文章
  • 累计创建 55 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

Linux IPC:Message Queue(消息队列)

Testerfans
2022-05-26 / 0 评论 / 25 点赞 / 2,906 阅读 / 9,322 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-06-29,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

本文演示 CentOS 7.6:Linux testerfans 3.10.0-1160.45.1.el7.x86_64

前言

前面我们介绍了(无名)管道命名管道共享内存来实现相关和无关进程进程之间的通信。本章开始我们将继续介绍消息队列。

使用消息队列可以使用以下两种方式进行:

  • 一个进程写入,一个或者多个进程进行读取 。
  • 一个进程写入多种不同的数据包,多个进程按照消息类型进行读取。

消息队列的特点

  • 消息队列是面向记录的,其中的消息具有特定的格式以及特定的优先级。
  • 消息队列独立于发送与接收进程。进程终止时,消息队列及其内容并不会被删除。
  • 消息队列可以实现消息的随机查询,消息不一定要以先进先出的次序读取,也可以按消息的类型读取。

系统调用

消息队列可以实现两个或者多个进程之间的通信,并且读进程可以按照消息类型进行读取。在看实例之前让我先了解一下使用消息队列的系统调用和流程是怎样的。

  • 第 1 步- 创建消息队列或连接到已经存在的消息队列 (msgget())。
  • 第 2 步- 向消息队列 写入(msgsnd())。
  • 第 3 步- 从消息队列中读取 (msgrcv())。
  • 第 4 步- 对消息队列执行控制操作 (msgctl())。

msgget()

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

msgget() 系统调用返回与参数key值关联的 System V 消息队列标识符。

传参

  • key:消息队列标识,key可以是任意值,也可以是从库函数 ftok() 派生的值。如果 key 的值为 IPC_PRIVATE 或 key 不是 IPC_PRIVATE,则创建一个新的消息队列,不存在参数key指定的消息队列,则在msgflg 中指定为IPC_CREAT。
  • msgflg:是权限标志,它的作用与open函数的mode参数一样。

如果 msgflg 同时指定了 IPC_CREAT 和 IPC_EXCL 并且指定key的消息队列已经存在,则 msgget() 将失败,并将 errno 设置为 EEXIST。 (这类似于组合 O_CREAT | O_EXCL 对 open(2) 的影响。)。

返回

  • 调用成功时返回一个消息队列标识。
  • -1:失败。

错误返回

错误标识 描述
EACCES 调用进程没有消息队列的写权限,也没有CAP_IPC_OWNER功能。
EEXIST key 指定的消息队列已存在,msgflg 同时指定了 IPC_CREAT 和 IPC_EXCL。
ENOENT key指定的消息队列不存在,并且 msgflg 未指定 IPC_CREAT。
ENOMEM 创建消息队列,但系统没有足够的内存来存储新的消息队列数据结构。
ENOSPC 创建消息队列,但超出消息队列 (MSGMNI) 的最大数量的系统限制。

msgsnd()

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)

msgsnd() 和 msgrcv() 系统调用分别用于向 System V 消息队列发送消息和从 System V 消息队列接收消息。调用进程必须对消息队列具有写权限才能发送消息,并具有读权限才能接收消息。

传参

  • msgid:消息队列ID,用来识别消息队列。在 msgget() 成功时收到标识符值。
  • msgp:指向调用者定义的结构的指针,结构体为msgbuf

msgp 参数(msgrcv()同)是指向调用者定义的结构的指针,在Linux 的系统库linux/msg.h 中,它是这样定义的:

struct msgbuf
{
    long msgtype;   	  /* mtype 字段必须具有严格的正整数值。接收进程可以使用该值进行消息选择(参见下面的 msgrcv() */
    char msgtext[1024];   /* mtext 字段是一个数组(或其他结构),其大小由非负整数值 msgsz 指定。允许长度为零的消息*/
}
  • msgsz:发送消息的大小(消息应以空字符结尾)。
  • msgflg:表示某些标志,例如 IPC_NOWAIT(当队列中没有消息时立即返回)或 MSG_NOERROR(截断消息文本,如果超过 msgsz 字节)。

返回

  • 0:成功。
  • -1:失败。

错误标识

错误标识 描述
EACCES 调用进程没有消息队列的写权限,也没有CAP_IPC_OWNER能力。
EAGAIN 由于队列的 msg_qbytes 限制,消息无法发送,并且在 msgflg 中指定了 IPC_NOWAIT。
EFAULT msgp 指向的地址不可访问。
EIDRM 消息队列已经被删除。
EINTR 访问进程捕获一个消息队列已满的情况下休眠的信号。
EINVAL 无效的 msqid 值,或非正的 mtype 值,或无效的 msgsz 值(小于 0 或大于系统值 MSGMAX)。
ENOMEM 系统没有足够的内存来复制 msgp 指向的消息。

msgrcv()

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgrcv(int msgid, const void *msgp, size_t msgsz, long msgtyp, int msgflg)

上述系统调用从System V消息队列内检索消息。

传参

  • msgid:消息队列,用来识别消息队列。在 msgget() 成功时收到标识符值。
  • msgp:指向调用者定义的结构的指针,结构体为msgbuf
  • msgsz:接收消息的大小(消息应以空字符结尾)。
  • msgtyp:消息类型。
    • msgtyp = 0,读取队列中收到的第一条消息。
    • msgtyp > 0,则读取队列中类型为 msgtyp 的第一条消息,除非在 msgflg 中指 定了 MSG_EXCEPT,在这种情况下,将读取队列中类型不等于 msgtyp 的第一条消息(例如msgtype 为 10,则仅读取类型 10 的第一条消息,即使队列开头可能有其他类型)。
    • msgtype < 0,则将读取队列中最低类型小于或等于 msgtyp 绝对值的第一条消息(例如 msgtyp 为 -5,则它读取类型小于等于 5 的第一条消息,即消息类型从 1 到5)。
  • msgflg:表示某些标志,例如 IPC_NOWAIT(当队列中没有消息时立即返回)或 MSG_NOERROR(截断消息文本,如果超过 msgsz 字节)。
    返回
  • msgrcv() 返回实际复制到 mtext 数组中的字节数。
  • -1:失败。

错误返回

错误标识 描述
E2BIG 消息文本长度大于 msgsz 并且 msgflg 中未指定 MSG_NOERROR。
EACCES 调用进程没有消息队列的读权限,也没有CAP_IPC_OWNER能力。
EAGAIN 队列中没有可用的消息,并且在 msgflg 中指定了 IPC_NOWAIT。
EFAULT msgp 指向的地址不可访问。
EIDRM 当进程休眠以接收消息时,消息队列被删除。
EINVAL msgqid 无效,或 msgsz 小于 0。
ENOMSG 在 msgflg 中指定了 IPC_NOWAIT,并且消息队列中不存在所请求类型的消息。

msgctl()

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msgid, int cmd, struct msqid_ds *buf)

上述系统调用对 System V 消息队列执行控制操作。

传参

  • msgid:消息队列标识符,用来识别消息队列。在 msgget() 成功时收到标识符值。
  • cmd:对消息队列执行所需控制操作的命令。
CMD 描述
IPC_STAT 将与 msqid 关联的内核数据结构中的信息复制到 buf 指向的 msqid_ds 结构中。调用者必须对消息队列具有读取权限。
IPC_SET 设置结构 buf 指向的用户 ID、所有者的组 ID、权限等。
IPC_RMID 立即移除消息队列,唤醒所有等待的读写器进程(返回错误,errno 设置为 EIDRM)。调用进程必须具有适当的权限,或者它的有效用户 ID 必须是消息队列的创建者或所有者的用户 ID。
IPC_INFO 返回有关 buf 指向的结构中的消息队列限制和参数的信息,该结构的类型为 struct msginfo。
MSG_INFO 返回一个包含与 IPC_INFO 相同信息的 msginfo 结构,除了以下字段返回有关消息队列消耗的系统资源的信息: msgpool 字段返回系统上当前存在的消息队列数;
msgmap 字段返回系统上所有队列中的消息总数;
msgtql 字段返回系统上所有队列中所有消息的总字节数。
MSG_STAT 返回与 IPC_STAT 相同的 msqid_ds 结构。但是,msqid 参数不是队列标识符,而是内核内部数组的索引,该数组维护有关系统上所有消息队列的信息。
  • buf:一个指向名为 struct msqid_ds 的消息队列结构的指针。此结构的值将根据 cmd 用于 set 或 get。

msqid_ds 结构它在 linux/msg.h 中定义如下:

/* one msqid structure for each queue on the system */
struct msqid_ds {
         struct ipc_perm msg_perm;     /* 所有者关系和权限,参考ipc_perm 结构 */
         time_t          msg_stime;    /* 最后一次 msgsnd(2) 系统调用的时间 */
         time_t          msg_rtime;    /* 最后一次 msgrcv(2) 系统调用的时间 */
         time_t          msg_ctime;    /* 队列创建时间或上次 msgctl() 进行IPC_SET 操作时间 */
         unsigned long   __msg_cbytes; /* 当前在消息队列上的所有消息中的字节数。这是一个在 POSIX 中未指定的非标准Linux 扩展 */
         msgqnum_t       msg_qnum;     /* 当前在消息队列中的消息数 */
         msglen_t        msg_qbytes;   /* 消息队列允许的消息文本的最大字节数 */
         pid_t           msg_lspid;    /* 执行最后一次 msgsnd(2) 系统调用的进程 PID */
         pid_t           msg_lrpid;    /* 执行最后一个 msgrcv(2) 系统调用的进程 PID */
};

内核将 IPC 对象的权限信息存储在ipc_perm类型的结构中。例如,在上述消息队列的内部结构中,msg_perm 成员就是这种类型。在linux/ipc.h中为我们声明如下:

struct ipc_perm {
	key_t          __key;    /* 对象的键值,调用shmget返回的键值*/
	uid_t          uid;      /* 所有者的有效用户ID */
	gid_t          gid;      /* 所有者的有效组ID */
	uid_t          cuid;     /* 创建者的有效用户ID*/
	gid_t          cgid;     /* 创建者的有效组ID*/
	unsigned short mode;     /* Permissions + SHM_DEST和SHM_LOCKED标志*/
	unsigned short __seq;    /* 对象的序列号 */
	};

通过调用msgctl方法获取有关系统范围的消息队列限制和参数的信息。如果定义了 _GNU_SOURCE 功能测试宏,则此结构是 msginfo 类型(因此,需要强制转换),在 <sys/msg.h> 中为我们声明如下:

struct msginfo {
			int msgpool; /* 用于保存消息数据的缓冲池大小(以千字节为单位);在内核中未使用 */
    	int msgmap;  /* 消息映射中的最大条目数;在内核中未使用 */
    	int msgmax;  /* 单个消息中可写入的最大字节数 */
    	int msgmnb;  /* 可写入队列的最大字节数;用于在队列创建期间初始化 msg_qbytes (msgget(2)) */
    	int msgmni;  /* 最大消息队列数 */
    	int msgssz;  /* 消息段大小;内核中未使用 */
    	int msgtql;  /* 系统中所有队列的最大消息数;在内核中未使用 */
    	unsigned short int msgseg;/* 最大段数;内核中未使用 */
};

返回

  • 此调用根据传递的命令返回值。IPC_INFO 和 SHM_INFO 或 SHM_STAT 成功后返回共享内存段的索引或标识符,或 0 用于其他操作。
  • -1:失败。

错误返回

错误标识 描述
EACCES 参数cmd等于IPC_STAT或MSG_STAT,但是调用进程没有消息队列msqid的读权限,也没有CAP_IPC_OWNER能力。
EFAULT 参数 cmd 的值为 IPC_SET 或 IPC_STAT,但 buf 指向的地址不可访问。
EIDRM 消息队列被删除。
EINVAL cmd 或 msqid 的值无效。或者:对于 MSG_STAT 操作,在 msqid 中指定的索引值引用了当前未使用的数组槽。
EPERM 参数 cmd 的值为 IPC_SET 或 IPC_RMID,但调用进程的有效用户 ID 不是消息队列的创建者(在 msg_perm.cuid 中找到)或所有者(在 msg_perm.uid 中找到),并且进程没有特权(Linux:它没有 CAP_SYS_ADMIN 功能)。

示例程序 - 进程间通过消息队列进行通信

我们设计两个程序,一个是写入进程,一个是读取进程。写入进程向消息队列中写入,读取进程从消息队列中读取。

示意图

步骤

  • 第1步:创建两个进程,msgq_send.c用来发消息,msgq_recv.c用来收消息。
  • 第2步:调用ftok() 函数使用msgq.txt文件创建key值。
  • 第3步:发送进程执行如下过程:
    • 读取用户输入。
    • 如果存在空行进行删除。
    • 重复该过程直到使用CTRL + D结束。
    • 收到结束消息发送“end”消息,结束进程。
  • 第4步:接收进程执行如下过程:
    • 从队列中读取消息。
    • 显示输出。
    • 如果接收到的消息是“end”,则结束进程并退出。

为简化起见,我们没有使用此示例的消息类型。

源代码
msgq_send.c

/* Filename: msgq_send.c */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int len;
   key_t key;
   system("touch msgq.txt");
   // 使用上述创建的msgq.txt文件,调用ftok()方法获取key值.
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   // 创建消息队列
   if ((msqid = msgget(key, PERMS | IPC_CREAT)) == -1) {
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to send messages.\n");
   printf("Enter lines of text, ^D to quit:\n");
   buf.mtype = 1; /* we don't really care in this case */
   // 获取用户输入消息
   while(fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL) {
      len = strlen(buf.mtext);
      /* 如果存在新行,进行移除 */
      if (buf.mtext[len-1] == '\n') buf.mtext[len-1] = '\0';
      if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
      perror("msgsnd");
   }
   // 发送"end"消息.
   strcpy(buf.mtext, "end");
   len = strlen(buf.mtext);
   if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
   perror("msgsnd");
   // 移除消息队列
   if (msgctl(msqid, IPC_RMID, NULL) == -1) {
      perror("msgctl");
      exit(1);
   }
   printf("message queue: done sending messages.\n");
   return 0;
}

msgq_recv.c

/* Filename: msgq_recv.c */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int toend;
   key_t key;
   // 获取key值
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   // 连接到已经存在的消息队列.
   if ((msqid = msgget(key, PERMS)) == -1) { /* connect to the queue */
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to receive messages.\n");

   // 使用msgrcv()方法获取send服务写入的数据并进行打印.
   for(;;) {
      if (msgrcv(msqid, &buf, sizeof(buf.mtext), 0, 0) == -1) {
         perror("msgrcv");
         exit(1);
      }
      printf("recvd: \"%s\"\n", buf.mtext);
      // 确认消息是否为"end",如果是end则跳出循环.
      toend = strcmp(buf.mtext,"end");
      if (toend == 0)
      break;
   }
   printf("message queue: done receiving messages.\n");
   // 删除文件.
   system("rm msgq.txt");
   return 0;
}

编译

[root@testerfans msg]# gcc msgq_send.c -o msgq_send
[root@testerfans msg]# gcc msgq_recv.c -o msgq_recv

执行/输出
我们打开三个shell,一个作为写,一个作为读,一个使用ipcs -q查看消息队列的创建。

[root@testerfans msg]# ./msgq_send 
message queue: ready to send messages.
Enter lines of text, ^D to quit:
The first message
message queue: done sending messages.
[root@testerfans msg]# ./msgq_recv 
message queue: ready to receive messages.
recvd: "The first message"
recvd: "end"
message queue: done receiving messages.
[root@testerfans ~]# ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    
0x42012b10 0          root       644        19           1           

[root@testerfans ~]# ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages    
0x42012b10 0          root       644        0            0           

[root@testerfans ~]# ipcs -q

------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages 

image-1653986743942

总结

消息队列跟命名管道有不少的相同之处,通过与命名管道一样,消息队列进行通信的进程可以是不相关的进程,同时它们都是通过发送和接收的方式来传递数据的。在命名管道中,发送数据用write,接收数据用read,则在消息队列中,发送数据用msgsnd,接收数据用msgrcv。而且它们对每个数据都有一个最大长度的限制。

与命名管道相比,消息队列的优势在于:

  1. 消息队列也可以独立于发送和接收进程而存在,消除了在同步命名管道的打开和关闭时可能产生的困难。
  2. 同时通过发送消息还可以避免命名管道的同步和阻塞问题,不需要由进程自己来提供同步方法。
  3. 接收程序可以通过消息类型有选择地接收数据,而不是像命名管道中那样,只能默认地接收。

本章我们了解了消息队列的基本使用,下一章我们继续探索IPC的信号量。


本文参考:
Inter Process Communication - Message Queues
man手册

25

评论区