zl程序教程

您现在的位置是:首页 >  其它

当前栏目

生产者消费者问题--进阶

-- 进阶 消费者 生产者 问题
2023-09-14 09:08:33 时间

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据

一种实现如下:

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>

#define MAX 5 //缓冲区的的大小
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

typedef struct{
    char buffer[MAX];
    int count;
}Buffer;

Buffer share = {"", 0};
char ch = 'A';

void *producer(void *arg)
{
    printf("Producer : starting \n");
    while(ch != 'K')
    {
        pthread_mutex_lock(&mutex);
        if(share.count != MAX)
        {
             share.buffer[share.count++] = ch++;
             printf("Producer: put char[%c]\n", ch-1);
             if(share.count == MAX)
             {
                 printf("Producer: signaling full\n");
                 pthread_cond_signal(&cond);//若果缓存满了发送信号
             }
        }
        pthread_mutex_unlock(&mutex);
    }
    sleep(1);
    printf("Produce: Exiting \n");
    pthread_exit(NULL);
}

void *consumer(void *junk)
{
    int i;
    printf("Consumer : starting\n");
    while (ch != 'K')
    {
        pthread_mutex_lock(&mutex);
        printf("\t Consumer : Waiting\n");
        while(share.count != MAX){
            pthread_cond_wait(&cond, &mutex);  //条件不成立释放锁.
            printf("Consumer wating for FULL signal\n");
        }
        printf("Consumer : getting buffer :: ");
        for(i = 0; share.buffer[i] && share.count;++i, share.count--)
            putchar(share.buffer[i]);
        putchar('\n');
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    pthread_t read, write;
    pthread_create(&read, NULL, (void *) consumer, NULL);
    pthread_create(&write, NULL, (void *)producer, NULL);

    pthread_join(read, NULL);
    pthread_join(write, NULL);
    return 0;
}
View Code

修改consumer的代码:

void *consumer(void *junk)
{
    int i;
    printf("Consumer : starting\n");
    while (ch != 'K')
    {
        pthread_mutex_lock(&mutex);
        printf("\t Consumer : Waiting\n");
        //while(share.count != MAX){
            pthread_cond_wait(&cond, &mutex);  //条件不成立释放锁.
            printf("Consumer wating for FULL signal\n");
        //}
        printf("Consumer : getting buffer :: ");
        for(i = 0; share.buffer[i] && share.count;++i, share.count--)
            putchar(share.buffer[i]);
        putchar('\n');
        pthread_mutex_unlock(&mutex);
    }
}

编译运行会有两种结果:

一种是:

Consumer : starting
         Consumer : Waiting
Producer : starting 
Producer: put char[A]
Producer: put char[B]
Producer: put char[C]
Producer: put char[D]
Producer: put char[E]
Producer: signaling full
Consumer wating for FULL signal
Consumer : getting buffer :: ABCDE
         Consumer : Waiting
Producer: put char[F]
Producer: put char[G]
Producer: put char[H]
Producer: put char[I]
Producer: put char[J]
Producer: signaling full
Consumer wating for FULL signal
Consumer : getting buffer :: FGHIJ
Produce: Exiting 

另一种是:

Producer : starting 
Producer: put char[A]
Producer: put char[B]
Producer: put char[C]
Producer: put char[D]
Producer: put char[E]
Producer: signaling full
Consumer : starting
     Consumer : Waiting

可以看出来第二种是先执行了生产者,生产者填充满buffer之后,发送条件消息,但是此时consumer还没有执行,也并没有等待条件

然后生产者释放锁,接着消费者获取锁,然后等待条件,由于缓冲已经满,

if(share.count == MAX)

生产者不会进入发送消息的代码,所以消费者一直等待条件.

而上面一种的结果是因为,消费这先执行了,进入等待条件,所以没有这个问题,但是问题在于我们不能保证pthread_cond_wait()一定是先与pthread_cond_signal()执行的.

但是通过加while()代码,就不会出现等待条件变量的问题,可以直接执行下面的代码.

 

这个生产者消费者的解决方案是,同时只有一个生产者和消费者,是1:1的关系,生产者需填满缓冲区,才让消费者来取数据.下面一种实现是只要缓冲区不为空,消费者就来取数据,为空,就等待,缓冲区只要不满,生产者就生产数据,否则等待,支持n:n的关系.

修改上面的代码:

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>

#define P_COUNT 5   //producer NO.
#define C_COUNT 5   //NO.

#define MAX 5 //缓冲区的的大小

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //锁住缓冲区
/*队列满的时候,阻塞生产这线程,队列空时阻塞消费者线程*/
pthread_cond_t notFull = PTHREAD_COND_INITIALIZER; //
pthread_cond_t notEmpty = PTHREAD_COND_INITIALIZER;

typedef struct{
    char buffer[MAX];
    int count;
}Buffer;

Buffer share = {"", 0};
char ch = 'A';

void *producer(void *arg)
{
    int id = *(int *)arg;
    printf("[%d] Producer : starting \n", id);
    //while(ch != 'K')
    while(1)
    {
        pthread_mutex_lock(&mutex);
        while(share.count == MAX){
            pthread_cond_wait(&notFull, &mutex);
            printf("[%d] producer wating for not full signal\n", id);
        }
        share.buffer[share.count++] = ch; //字符不能无限加超出范围了
        printf("[%d] Producer: put [%d] char[%c]\n", id, share.count-1, ch );
        pthread_cond_signal(&notEmpty);
        pthread_mutex_unlock(&mutex);
        sleep(1);
    }
    sleep(1);
    printf("Produce: Exiting \n");
}

void *consumer(void *junk)
{
    int id = *(int *)junk;
    printf("\t[%d] Consumer : starting\n", id);
    //while (ch != 'K')
    while (1)
    {
        pthread_mutex_lock(&mutex);
        printf("\t [%d] Consumer : Waiting\n", id);
        while(share.count == 0){
            pthread_cond_wait(&notEmpty, &mutex);  //条件不成立释放锁.
            printf("\t[%d] Consumer wating for not empty signal\n", id);
        }
         //消费者取数据应该怎么取呢,这里是从数组的最右边开始取,也就是先取最新的数据
        printf("\t[%d] Consumer : getting buffer[%d] :: [%c] \n", id, share.count, share.buffer[share.count-1]);
        share.count--;
        pthread_cond_signal(&notFull);
        pthread_mutex_unlock(&mutex);
        sleep(1);
    }
}

int main()
{
    int i;
    pthread_t t_read[C_COUNT], t_write[P_COUNT];
    int *pId=(int *)malloc(sizeof(int)*P_COUNT);
    int *cId=(int *)malloc(sizeof(int)*C_COUNT);
    for(i = 0; i < P_COUNT; ++i){
        pId[i] = i;
        pthread_create(&t_write[i], NULL, (void *)producer, (void *)&pId[i]);
    }
    for(i = 0; i < C_COUNT; ++i){
        cId[i] = i;
        pthread_create(&t_read[i], NULL, (void *) consumer, (void *)&cId[i]);
    }

    for(i = 0; i < P_COUNT; ++i){
        pthread_join(t_read[i], NULL);
    }
    for(i = 0; i < C_COUNT; ++i){
        pthread_join(t_write[i], NULL);
    }

    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&notFull);
    pthread_cond_destroy(&notEmpty);
    return 0;
}
View Code

这里用到等待队列来实现同步,下面这种是采用信号量的方式来实现同步

#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <semaphore.h>

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)

#define CONSUMERS_COUNT 1
#define PRODUCERS_COUNT 1
#define BUFFSIZE 10

int g_buffer[BUFFSIZE];

unsigned short in = 0;
unsigned short out = 0;
unsigned short produce_id = 0; //产品数不断累加
unsigned short consume_id = 0;

sem_t g_sem_full;
sem_t g_sem_empty;
pthread_mutex_t g_mutex;

pthread_t g_thread[CONSUMERS_COUNT + PRODUCERS_COUNT];

void *consume(void *arg)
{
    int i;
    int num = *(int*)arg;
    while (1)
    {
        printf("%d wait buffer not empty\n", num);
        sem_wait(&g_sem_empty);
        pthread_mutex_lock(&g_mutex);

        for (i = 0; i < BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

            if (i == out)
                printf("\t<--consume");

            printf("\n");
        }
        consume_id = g_buffer[out];
        printf("%d begin consume product %d\n", num, consume_id);
        g_buffer[out] = -1;
        out = (out + 1) % BUFFSIZE;
        printf("out == %d \n", out);
        printf("%d end consume product %d\n", num, consume_id);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_full);
        sleep(1);
    }
    return NULL;
}

void *produce(void *arg)
{
    int num = *(int*)arg;
    int i;
    while (1)
    {
        printf("%d wait buffer not full\n", num);
        sem_wait(&g_sem_full);
        pthread_mutex_lock(&g_mutex);
        for (i = 0; i < BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

            if (i == in)
                printf("\t<--produce");

            printf("\n");
        }

        printf("%d begin produce product %d\n", num, produce_id);
        g_buffer[in] = produce_id;
        in = (in + 1) % BUFFSIZE;
        printf("in == %d \n", in);
        printf("%d end produce product %d\n", num, produce_id++);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_empty);
        sleep(5);
    }
    return NULL;
}

int main(void)
{
    int i;
    for (i = 0; i < BUFFSIZE; i++)
        g_buffer[i] = -1;

    sem_init(&g_sem_full, 0, BUFFSIZE);
    sem_init(&g_sem_empty, 0, 0);

    pthread_mutex_init(&g_mutex, NULL);


    for (i = 0; i < CONSUMERS_COUNT; i++)
        pthread_create(&g_thread[i], NULL, consume, (void *)&i);

    for (i = 0; i < PRODUCERS_COUNT; i++)
        pthread_create(&g_thread[CONSUMERS_COUNT + i], NULL, produce, (void *)&i);

    for (i = 0; i < CONSUMERS_COUNT + PRODUCERS_COUNT; i++)
        pthread_join(g_thread[i], NULL);

    sem_destroy(&g_sem_full);
    sem_destroy(&g_sem_empty);
    pthread_mutex_destroy(&g_mutex);

    return 0;
}
View Code

consumer每次消费完会sleep(1), producer每次生产完会sleep(5),消费者等待的时间更长,故消费者会经常阻塞在sem_wait(&g_sem_empty) 上面,因为缓冲区经常为空,可以将PRODUCTORS_COUNT 改成5,即有5个生产者线程和1个消费者线程,而且生产者睡眠时间还是消费者的5倍,从动态输出可以看出,基本上就动态平衡了,即5个生产者一下子生产了5份东西,消费者1s消费1份,刚好在生产者继续生产前消费完.

修改代码:

#define CONSUMERS_COUNT 1
#define PRODUCERS_COUNT 5

使用Posix信号量可模拟互斥量和条件变量,而且通常更有优势。

当函数sem_wait()和sem_post()用于线程内时,两个调用间的区域就是所要保护的临界区代码;当用于线程间时,则与条件变量等效。

此外,信号量还可用作资源计数器,即初始化信号量的值作为某个资源当前可用的数量,使用时递减释放时递增。这样,原先一些保存队列状态的变量都不再需要。

最后,内核会记录信号的存在,不会将信号丢失;而唤醒条件变量时若没有线程在等待该条件变量,信号将被丢失。

 

有时候缓冲区也可以是没有限制大小的,修改第一份程序为缓冲区不限制大小的代码:

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>

#define P_COUNT 1   //producer NO.
#define C_COUNT 2   //NO.

#define MAX 5 //缓冲区的的大小

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

typedef struct{
    char buffer[MAX];
    int count;
}Buffer;

Buffer share = {"", 0};
char ch = 'A';

void *producer(void *arg)
{
    printf("Producer : starting \n");
    while(1)
    {
        pthread_mutex_lock(&mutex);
        share.count++;
        printf("Producer: put charc count[%d]\n", share.count);
        pthread_cond_signal(&cond);//若果缓存满了发送信号
        pthread_mutex_unlock(&mutex);
        sleep(1);
    }
    sleep(1);
    printf("Produce: Exiting \n");
    pthread_exit(NULL);
}

void *consumer(void *junk)
{
    printf("Consumer : starting\n");
    while (1)
    {
        pthread_mutex_lock(&mutex);
        printf("\t Consumer : Waiting\n");
        while(share.count == 0){
            pthread_cond_wait(&cond, &mutex);
        }
        share.count--;
        printf("Consumer : getting buffer count[%d] \n", share.count);
        pthread_mutex_unlock(&mutex);
        sleep(1);
    }
}

int main()
{
    int i;
    pthread_t t_read[C_COUNT], t_write[P_COUNT];
    for(i = 0; i < P_COUNT; ++i){
        pthread_create(&t_write[i], NULL, (void *)producer, NULL);
    }
    for(i = 0; i < C_COUNT; ++i){
        pthread_create(&t_read[i], NULL, (void *) consumer, NULL);
    }

    for(i = 0; i < P_COUNT; ++i){
        pthread_join(t_read[i], NULL);
    }
    for(i = 0; i < C_COUNT; ++i){
        pthread_join(t_write[i], NULL);
    }

    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    return 0;
}
View Code