zl程序教程

您现在的位置是:首页 >  后端

当前栏目

C语言线程间通信

C语言线程 间通信
2023-06-13 09:11:55 时间
C11 标准为线程间通信提供了条件变量(condition variable)。线程可以使用条件变量,以等待来自另一个线程的通知,通知告知了指定的条件已被满足。例如,这类通知可能代表某些数据已经准备好进行处理。

条件变量由类型为 cnd_t 的对象表示,并配合互斥一起使用。一般过程如下:线程获得互斥,然后测试条件。如果条件不满足,则线程继续等待条件变量(释放互斥),直到另一个线程再次唤醒它,然后该线程再次获得互斥,并再次测试条件,重复上述过程,直到条件满足。

头文件 threads.h 定义了使用条件变量的函数,它们如下所示:


int cnd_init(cnd_t*cond);

初始化 cond 引用的条件变量。


void cnd_destroy(cnd_t*cond);

释放指定条件变量使用的所有资源。


int cnd_signal(cnd_t*cond);

在等待指定条件变量的任意数量的线程中,唤醒其中一个线程。


int cnd_broadcast(cnd_t*cond);

唤醒所有等待指定条件变量的线程。


int cnd_wait(cnd_t*cond,mtx_t*mtx);

阻塞正在调用的线程,并释放指定的互斥。在调用 cnd_wait()之前,线程必须持有互斥。如果另一线程通过发送一个信号解除当前线程的阻塞(也就是说,通过指定同样的条件变量作为参数调用 cond_signal()或 cnd_broadcast()),那么调用 cnd_wait()的线程在 cnd_wait()返回之前会再次获得互斥。


int cnd_timedwait(cnd_t*restrict cond,mtx_t*restrict mtx,const struct timespec*restrict ts);

与 cnd_wait()类似,cnd_timedwait()阻塞调用它们的线程,但仅维持由参数 ts 指定的时间。可以通过调用函数 timespec_get()获得一个 struct timespec 对象,它表示当前时间。

除 cnd_destroy()以外的所有条件变量函数,如果它们引发错误,则返回值 thrd_error,否则返回值 thrd_success。当时间达到限定值时,函数 cnd_timedwait()也会返回值 thrd_timedout。

例 1 与例 2 中的程序展示了在常见的 生产者-消费者 模型中使用条件变量。程序为每个生产者和消费者开启一个新线程。生产者将一个新产品(在我们的示例中,新产品为一个 int 变量)放入一个环形缓冲区中,假设这个缓冲区没有满,然后通知等待的消费者:产品已经准备好。每个消费者从该缓冲区中取出产品,然后将实际情况通知给正在等待的生产者。

在任一特定时间,只有一个线程可以修改环形缓冲器。因此,在函数 bufPut()和 bufGet()间将存在线程同步问题,函数 bufPut()将一个元素插入到缓冲区,函数 buf-Get()将一个元素从缓冲区移除。

有两个条件变量:生产者等待其中一个条件变量,以判断缓冲器是否满了;消费者等待另一个条件变量,以判断缓冲器是否空了。缓冲区的所有必需元素都包括在结构 Buffer 中。函数 bufInit()初始化具有指定大小的 Buffer 对象,而函数 bufDestroy()销毁 Buffer 对象。

【例1】用于 生产者-消费者 模型的环形缓冲区


/* buffer.h

* 用于线程安全缓冲区的所有声明

#include stdbool.h 

#include threads.h 

typedef struct Buffer

 int *data; // 指向数据数组的指针

 size_t size, count; // 元素数量的最大值和当前值

 size_t tip, tail; // tip = 下一个空点的索引

 mtx_t mtx; // 一个互斥

 cnd_t cndPut, cndGet; // 两个条件变量

} Buffer;

bool bufInit( Buffer *bufPtr, size_t size );

void bufDestroy(Buffer *bufPtr);

bool bufPut(Buffer *bufPtr, int data);

bool bufGet(Buffer *bufPtr, int *dataPtr, int sec);

/* -------------------------------------------------------------

* buffer.c

* 定义用于处理Buffer的函数

#include buffer.h 

#include stdlib.h // 为了使用malloc()和free()

bool bufInit( Buffer *bufPtr, size_t size)

 if ((bufPtr- data = malloc( size * sizeof(int))) == NULL)

 return false;

 bufPtr- size = size;

 bufPtr- count = 0;

 bufPtr- tip = bufPtr- tail = 0;

 return mtx_init( bufPtr- mtx, mtx_plain) == thrd_success

 cnd_init( bufPtr- cndPut) == thrd_success

 cnd_init( bufPtr- cndGet) == thrd_success;

void bufDestroy(Buffer *bufPtr)

 cnd_destroy( bufPtr- cndGet );

 cnd_destroy( bufPtr- cndPut );

 mtx_destroy( bufPtr- mtx );

 free( bufPtr- data );

// 在缓冲区中插入一个新元素

bool bufPut(Buffer *bufPtr, int data)

 mtx_lock( bufPtr- mtx );

 while (bufPtr- count == bufPtr- size)

 if (cnd_wait( bufPtr- cndPut, bufPtr- mtx ) != thrd_success)

 return false;

 bufPtr- data[bufPtr- tip] = data;

 bufPtr- tip = (bufPtr- tip + 1) % bufPtr- size;

 ++bufPtr- count;

 mtx_unlock( bufPtr- mtx );

 cnd_signal( bufPtr- cndGet );

 return true;

// 从缓冲区中移除一个元素

// 如果缓冲区是空的,则等待不超过sec秒

bool bufGet(Buffer *bufPtr, int *dataPtr, int sec)

 struct timespec ts;

 timespec_get( ts, TIME_UTC ); // 当前时间

 ts.tv_sec += sec; // + sec秒延时

 mtx_lock( bufPtr- mtx );

 while ( bufPtr- count == 0 )

 if (cnd_timedwait( bufPtr- cndGet,

 bufPtr- mtx, ts) != thrd_success)

 return false;

 *dataPtr = bufPtr- data[bufPtr- tail];

 bufPtr- tail = (bufPtr- tail + 1) % bufPtr- size;

 --bufPtr- count;

 mtx_unlock( bufPtr- mtx );

 cnd_signal( bufPtr- cndPut );

 return true;

}

例 2 中的 main()函数创建了一个缓冲区,并启动了若干个生产者和消费者线程,给予每个线程一个识别号码和一个指向缓冲区的指针。每个生产者线程创建一定数量的 产品 ,然后用一个 return 语句退出。一个消费者线程如果在给定延时期间无法获得产品以进行消费,则直接返回。

【例2】启动生产者和消费者线程


// producer_consumer.c

#include buffer.h 

#include stdio.h 

#include stdlib.h 

#define NP 2 // 生产者的数量

#define NC 3 // 消费者的数量

int producer(void *); // 线程函数

int consumer(void *);

struct Arg { int id; Buffer *bufPtr; }; // 线程函数的参数

_Noreturn void errorExit(const char* msg)

 fprintf(stderr, %s/n , msg); exit(0xff);

int main(void)

 printf( Producer-Consumer Demo/n/n 

 Buffer buf; // 为5个产品创建一个缓冲区

 bufInit( buf, 5 );

 thrd_t prod[NP], cons[NC]; // 线程

 struct Arg prodArg[NP], consArg[NC]; // 线程的参数

 int i = 0, res = 0;

 for ( i = 0; i ++i ) // 启动生产者

 prodArg[i].id = i+1, prodArg[i].bufPtr = buf;

 if (thrd_create( prod[i], producer, prodArg[i] ) != thrd_success)

 errorExit( Thread error. 

 for ( i = 0; i ++i ) // 启动消费者

 consArg[i].id = i+1, consArg[i].bufPtr = buf;

 if ( thrd_create( cons[i], consumer, consArg[i] ) != thrd_success)

 errorExit( Thread error. 

 for ( i = 0; i ++i ) // 等待线程结束

 thrd_join(prod[i], res),

 printf( /nProducer %d ended with result %d./n , prodArg[i].id, res);

 for ( i = 0; i ++i )

 thrd_join(cons[i], res),

 printf( Consumer %d ended with result %d./n , consArg[i].id, res);

 bufDestroy( buf );

 return 0;

int producer(void *arg) // 生产者线程函数

 struct Arg *argPtr = (struct Arg *)arg;

 int id = argPtr- 

 Buffer *bufPtr = argPtr- bufPtr;

 int count = 0;

 for (int i = 0; i ++i)

 int data = 10*id + i;

 if (bufPut( bufPtr, data ))

 printf( Producer %d produced %d/n , id, data), ++count;

 else

 { fprintf( stderr,

 Producer %d: error storing %d/n , id, data);

 return -id;

 return count;

int consumer(void *arg) // 消费者线程函数

 struct Arg *argPtr = (struct Arg *)arg;

 int id = argPtr- 

 Buffer *bufPtr = argPtr- bufPtr;

 int count = 0;

 int data = 0;

 while (bufGet( bufPtr, data, 2 ))

 ++count;

 printf( Consumer %d consumed %d/n , id, data);

 return count;

}

21685.html