Linux系统编程——条件变量&生产者消费者模型

条件变量

  使用互斥锁来实现线程间数据的共享和通信,互斥锁一个明显的缺点是它只有两种状态:锁定和非锁定。而条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足,它常和互斥锁一起使用。使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开相应的互斥锁并等待条件发生变化。一旦其它的某个线程改变了条件变量,它将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程。这些线程将重新锁定互斥锁并重新测试条件是否满足。一般说来,条件变量被用来进行线承间的同步。

  条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待条件变量的条件成立而挂起;另一个线程使条件成立(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。

  简而言之,条件变量本身不是锁,但它也可以造成线程阻塞,通常与互斥锁配合使用,给多线程提供一个会合的场所。

相关操作

条件变量的数据类型为:pthread_cond_t
常用的函数,以及功能如下:

     函数      描述
  pthread_cond_init  初始化一个条件变量  
  pthread_cond_destroy  销毁一个条件变量  
  pthread_cond_wait  阻塞等待一个条件变量  
  pthread_cond_timewait  限时阻塞等待一个条件变量  
  pthread_cond_signal  唤醒至少一个阻塞在条件变量上的线程  
  pthread_cond_broadcast  唤醒全部阻塞在条件变量上的线程  

头文件: #include <pthread.h>

返回值:

  • 成功:0
  • 失败:返回 errno

初始化条件变量

函数原型: int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);

作用: 用来初始化一个条件变量

参数:

  • cond:条件变量的地址
  • attr:条件变量的属性,通常传 NULL ,表示默认属性

可以使用宏 PTHREAD_COND_INITIALIZER 静态初始化条件变量,比如:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
这种方法等价于使用 NULL 指定的 attr 参数调用 pthread_cond_init() 来完成动态初始化,不同之处在于 PTHREAD_COND_INITIALIZER 宏不进行错误检查。

销毁条件变量

函数原型: int pthread_cond_destroy(pthread_cond_t *cond);

作用: 销毁创建的条件变量,以免浪费系统资源

参数:

  • cond:条件变量的地址

阻塞等待一个条件变量

函数原型: int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

作用:

  1. 阻塞等待条件变量满足
  2. 解锁已经加锁成功的信号量 (相当于 pthread_mutex_unlock(&mutex))
  3. 当条件满足,函数返回时,重新加锁信号量 (相当于, pthread_mutex_lock(&mutex);)

其中 1.2. 两步为一个原子操作(不可被打断)。

参数:

  • cond:创建的条件变量的地址
  • mutex:创建的互斥锁(加锁状态)

限时阻塞

函数原型: int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

作用: 限时等待一个条件变量

参数:

  • cond:条件变量的地址
  • mutex:互斥锁
  • abstime:等待的绝对时间
//struct timespec 的结构体如下
struct timespec {
    ​time_t tv_sec; /* seconds */ 秒
    long tv_nsec; /* nanosecondes*/ 纳秒
}

  此处 struct timespec 定义的形参 abstime 是个绝对时间。注意,是绝对时间,不是相对时间。什么是绝对时间?2018年10月1日10:10:00,这就是一个绝对时间。什么是相对时间?给洗衣机定时 30 分钟洗衣服,就是一个相对时间,也就是说从当时时间开始计算 30 分钟,诸如此类。

time(NULL) 返回的就是绝对时间。而 alarm(1) 是相对时间,相对当前时间定时 1 秒钟。
adstime所相对的时间是相对于 1970年1月1日00:00:00,也就是 UNIX 计时元年。

//错误写法
/*
struct timespec t = {1, 0};
pthread_cond_timedwait (&cond, &mutex, &t);
*/

//正确写法
time_t cur = time(NULL); //获取当前时间。
struct timespec t;    //定义timespec 结构体变量t
t.tv_sec = cur+1; //定时1秒
pthread_cond_timedwait (&cond, &mutex, &t); //传参                           

唤醒阻塞线程

函数原型:

  • int pthread_cond_signal(pthread_cond_t *cond);
  • int pthread_cond_broadcast(pthread_cond_t *cond);

作用: 上述两个函数都可以完成唤醒因为条件变量而阻塞的线程,区别为

  • singal:至少唤醒一个阻塞在该条件变量上的线程
  • broadcast:唤醒全部阻塞在该题该条件变量上的线程

参数:

  • cond:条件变量的地址

条件变量的优点

  • 相较于mutex而言,条件变量可以减少竞争。
  • 如直接使用mutex,除了生产者、消费者之间要竞争互斥量以外,消费者之间也需要竞争互斥量,但如
    果汇聚(链表)中没有数据,消费者之间竞争互斥锁是无意义的。有了条件变量机制以后,只有生产者完成生产,才会引起消费者之间的竞争。提高了程序效率。

生产者消费者模型

  生产者消费者的过程如下:一个生产者生产数据放入缓冲池,消费者从缓冲池中取数据消费,在这期间,生产者生产数据和消费者消费数据两者不能同时进行,即放的时候不能拿,拿的时候不能放,必须是其中一个操作结束后,另一个才有机会操作。这样生产者和消费者就形成了一种互斥的关系;

而在缓冲池空的时候,消费者必须先等生产者生产之后,他才可以取数据消费;同样,当缓冲池满时,生产者必须等到消费者把数据拿走后,自己才可以继续生产,这种关系我们称之为同步。所以,生产者和消费者之间就形成了同步与互斥的关系

而两个外两种关系就是生产者与生产者之间以及消费者与消费者之间的互斥关系

通过上述的条件变量以及互斥锁实现简单的生产者消费者模型

#include<stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

struct msg{
    int num;
    struct msg *next;
};
struct msg *head;

pthread_mutex_t mutex; //创建互斥锁
pthread_cond_t cond;   //创建条件变量

void *produce(void *arg){
    while(1){
        struct msg *mp = malloc(sizeof(struct msg)); //生产者生产数据
        mp->num = rand() % 1000 + 1; //随机产生一个 1~1000的数据
    
        pthread_mutex_lock(&mutex); //生产者在向缓冲区写数据时先加锁
        mp->next = head; //头插法,写入数据
        head = mp; 
        printf("------------------Produce:%d\n", head->num);
    
        pthread_mutex_unlock(&mutex); //立即解锁
        pthread_cond_signal(&cond);//唤醒被阻塞的线程
        sleep(rand() % 3); 
    }   
    pthread_exit(NULL);
}

void *consumer(void *arg){
    struct msg *mp;
    while(1){
        pthread_mutex_lock(&mutex); //消费者消费数据时,先加锁
        while(head == NULL){   //循环尝试条件变量是否满足
            pthread_cond_wait(&cond, &mutex);
        }
        mp = head;  //条件满足,取出变量
        head = mp->next;
        printf("---------Consumer:%d\n", mp->num);

        pthread_mutex_unlock(&mutex); //立即解锁
        free(mp);  //释放生产者申请的空间(线程共享)
        sleep(rand()%3);
    }   
    pthread_exit(NULL);
}

int main(){
    srand(time(NULL)); //设置随机化种子

    pthread_t tid, cid;
    pthread_mutex_init(&mutex, NULL); //初始化锁
    pthread_cond_init(&cond, NULL);//初始化条件变量

    pthread_create(&tid, NULL, produce, NULL);
    pthread_create(&cid, NULL, consumer, NULL);

    pthread_join(tid, NULL);
    pthread_join(cid, NULL);

    pthread_mutex_destroy(&mutex); //销毁
    pthread_cond_destroy(&cond);

    return 0;
}