POSIX多线程的使用方式中, 有一种很重要的方式-----流水线(亦称为“管道”)方式,“数据元素”流串行地被一组线程按顺序执行。它的使用架构可参考下图:








typedef struct stage_tag {
    pthread_mutex_t     mutex;          /* Protect data */
    pthread_cond_t      avail;          /* Data available */
    pthread_cond_t      ready;          /* Ready for data */
    int                 data_ready;     /* Data present */
    long                data;           /* Data to process */
    pthread_t           thread;         /* Thread for stage */
    struct stage_tag    *next;          /* Next stage */
} stage_t;

 * 管道的结构定义,工作阶段是它的一个链表结点 */
typedef struct pipe_tag {
    pthread_mutex_t     mutex;          /* Mutex to protect pipe */
    stage_t             *head;          /* First stage */
    stage_t             *tail;          /* Final stage */
    int                 stagesCount;         /* Number of stages */
    int                 active;         /* Active data elements */
} pipe_t;






int pipe_create (pipe_t *pipe, int stageCount)
    int pipe_index;
    stage_t **link = &pipe->head, *new_stage, *stage;
    int status;

    status = pthread_mutex_init (&pipe->mutex, NULL);
    pipe->stageCount= stageCount; /*需要创建的结点总数*/
    pipe->active = 0;

    for (pipe_index = 0; pipe_index <= stageCount; pipe_index++) {
        new_stage = (stage_t*)malloc (sizeof (stage_t));/*为结点分配内存空间*/
        status = pthread_mutex_init (&new_stage->mutex, NULL);
        status = pthread_cond_init (&new_stage->ready, NULL);
        new_stage->data_ready = 0;/*初始化工作阶段stage的数据成员,上2行代码也是做这样的工作*/
        *link = new_stage;/*当前创建的结点为尾结点。如果是第二个结点以后的结点,该句代码的作用是把新创建的结点插入到链表的尾部*/
        link = &new_stage->next;/*建立结点之间的联系.link的作用实际是维护一个尾指针*/

    *link = (stage_t*)NULL;     /* Terminate list */
    pipe->tail = new_stage;     /* Record the tail */

     * Create the threads for the pipe stages only after all
     * the data is initialized (including all links). Note
     * that the last stage doesn't get a thread, it's just
     * a receptacle for the final pipeline value.
     * At this point, proper cleanup on an error would take up
     * more space than worthwhile in a "simple example", so
     * instead of cancelling and detaching all the threads
     * already created, plus the synchronization object and
     * memory cleanup done for earlier errors, it will simply
     * abort.
    for (   stage = pipe->head;
            stage->next != NULL;
            stage = stage->next) {
        status = pthread_create (
            &stage->thread, NULL, pipe_stage, (void*)stage);
        if (status != 0)
            err_abort (status, "Create pipe stage");
    return 0;



