POSIX多线程的使用方式中, 有一种很重要的方式-----流水线(亦称为“管道”)方式,“数据元素”流串行地被一组线程按顺序执行。它的使用架构可参考下图:

                                                      

       以面向对象的思想去理解,整个流水线,可以理解为一个数据传输的管道;该管道中的每一个工作线程,可以理解为一个整个流水线的一个工作阶段stage,

这些工作线程之间的合作是一环扣一环的。靠输入口越近的工作线程,是时序较早的工作阶段stage,它的工作成果会影响下一个工作线程阶段(stage)的工作结果,即下个阶段依赖于上一个阶段的输出,上一个阶段的输出成为本阶段的输入。这也是pipeline的一个共有特点!

     那么,这个管道如何来创建的呢?

     第一步,我们进行管道和工作流程阶段的定义。

 这个管道可以理解为一个单链表,链表的结点为每个工作阶段(工作线程被封装在这个阶段中)。

请看代码:

/*工作阶段的结构定义*/
typedef struct stage_tag {
    pthread_mutex_t     mutex;          /* Protect data */
    pthread_cond_t      avail;          /* Data available */
    pthread_cond_t      ready;          /* Ready for data */
    int                 data_ready;     /* Data present */
    long                data;           /* Data to process */
    pthread_t           thread;         /* Thread for stage */
    struct stage_tag    *next;          /* Next stage */
} stage_t;

/*
 * 管道的结构定义,工作阶段是它的一个链表结点 */
typedef struct pipe_tag {
    pthread_mutex_t     mutex;          /* Mutex to protect pipe */
    stage_t             *head;          /* First stage */
    stage_t             *tail;          /* Final stage */
    int                 stagesCount;         /* Number of stages */
    int                 active;         /* Active data elements */
} pipe_t;


  第二步,我们进行管道的创建。

     前面的分析,我们知道,工作管道其实是所有工作阶段的组合。(这里,组合的意思是管道是一个链表。这个链表由结点组成.结点即我们的每一个工作阶段--stage).

      我们假设工作阶段的总数是事先知道的(例如:20)。那么创建一个管道的过程,其实就是依次创建20个结点,并建立这些结点之间的数据关系---链表。

      这里,我们采用从头至尾的顺序方法建立。它的实质是:采用尾插法建立一个单链表。

示例代码如下:

int pipe_create (pipe_t *pipe, int stageCount)
{
    int pipe_index;
    stage_t **link = &pipe->head, *new_stage, *stage;
    int status;

    status = pthread_mutex_init (&pipe->mutex, NULL);
    pipe->stageCount= stageCount; /*需要创建的结点总数*/
    pipe->active = 0;

    for (pipe_index = 0; pipe_index <= stageCount; pipe_index++) {
        new_stage = (stage_t*)malloc (sizeof (stage_t));/*为结点分配内存空间*/
        status = pthread_mutex_init (&new_stage->mutex, NULL);
        status = pthread_cond_init (&new_stage->ready, NULL);
        new_stage->data_ready = 0;/*初始化工作阶段stage的数据成员,上2行代码也是做这样的工作*/
        *link = new_stage;/*当前创建的结点为尾结点。如果是第二个结点以后的结点,该句代码的作用是把新创建的结点插入到链表的尾部*/
        link = &new_stage->next;/*建立结点之间的联系.link的作用实际是维护一个尾指针*/
    }

    *link = (stage_t*)NULL;     /* Terminate list */
    pipe->tail = new_stage;     /* Record the tail */

    /*
     * Create the threads for the pipe stages only after all
     * the data is initialized (including all links). Note
     * that the last stage doesn't get a thread, it's just
     * a receptacle for the final pipeline value.
     *
     * At this point, proper cleanup on an error would take up
     * more space than worthwhile in a "simple example", so
     * instead of cancelling and detaching all the threads
     * already created, plus the synchronization object and
     * memory cleanup done for earlier errors, it will simply
     * abort.
     */
    for (   stage = pipe->head;
            stage->next != NULL;
            stage = stage->next) {
        status = pthread_create (
            &stage->thread, NULL, pipe_stage, (void*)stage);
        if (status != 0)
            err_abort (status, "Create pipe stage");
    }
    return 0;
}


 

 

Logo

更多推荐