www.pudn.com > pthread_examples.rar > workcrew.c
#include#include #include "control.h" #include "queue.h" #include "dbug.h" /* the work_queue holds tasks for the various threads to complete. */ /*这个工作队列保存数据给不同的线程使用*/ struct work_queue { data_control control; queue work; } wq; /* I added a job number to the work node. Normally, the work node would contain additional data that needed to be processed. */ typedef struct work_node { struct node *next; int jobnum; } wnode; /* the cleanup queue holds stopped threads. Before a thread terminates, it adds itself to this list. Since the main thread is waiting for changes in this list, it will then wake up and clean up the newly terminated thread. */ /*该清除队列用于主函数清理结束线程*/ struct cleanup_queue { data_control control; queue cleanup; } cq; /* I added a thread number (for debugging/instructional purposes) and a thread id to the cleanup node. The cleanup node gets passed to the new thread on startup, and just before the thread stops, it attaches the cleanup node to the cleanup queue. The main thread monitors the cleanup queue and is the one that performs the necessary cleanup. */ typedef struct cleanup_node { struct node *next; int threadnum; pthread_t tid; } cnode; /*线程处理函数*/ /*如果一个线程创建时不是以detach方式,则线程退出的时候不会自动释放所占用的资源,需要主线程来释放资源*/ void *threadfunc(void *myarg) { wnode *mywork; /*工作节点*/ cnode *mynode; /*清除节点*/ mynode = (cnode *) myarg; pthread_mutex_lock(&wq.control.mutex); /*锁住工作节点的互斥锁*/ /*如果control.active不等于1,则不能使用队列,直接退出线程*/ while (wq.control.active) /*检查工作队列是否有效,是否可以使用*/ { while ((wq.work.head == NULL) && wq.control.active) { pthread_cond_wait(&wq.control.cond, &wq.control.mutex); } if (!wq.control.active) { break; } //we got something! mywork = (wnode *) queue_get(&wq.work); pthread_mutex_unlock(&wq.control.mutex); //perform processing... printf("Thread number %d processing job %d\n",mynode->threadnum,mywork->jobnum); free(mywork); pthread_mutex_lock(&wq.control.mutex); /*该部分用来继续得到队列节点,使用条件变量和互斥变量*/ } pthread_mutex_unlock(&wq.control.mutex); /*退出线程,将该节点加入清除队列中,以供主线程清除退出线程资源*/ pthread_mutex_lock(&cq.control.mutex); queue_put(&cq.cleanup,(node *) mynode); pthread_mutex_unlock(&cq.control.mutex); pthread_cond_signal(&cq.control.cond); printf("thread %d shutting down...\n",mynode->threadnum); return NULL; } #define NUM_WORKERS 4 int numthreads; /*加入线程,以供主线程调用*/ void join_threads(void) { cnode *curnode; printf("joining threads...\n"); /*如果还有工作线程在执行,则会一直执行下去*/ while (numthreads) { pthread_mutex_lock(&cq.control.mutex); /*锁住清除队列的互斥锁*/ /* below, we sleep until there really is a new cleanup node. This takes care of any false wakeups... even if we break out of pthread_cond_wait(), we don't make any assumptions that the condition we were waiting for is true. */ while (cq.cleanup.head == NULL) { /*如果清除队列为空,则主清理线程阻塞在这里,等到条件变量满足条件*/ pthread_cond_wait(&cq.control.cond, &cq.control.mutex); } /* at this point, we hold the mutex and there is an item in the list that we need to process. First, we remove the node from the queue. Then, we call pthread_join() on the tid stored in the node. When pthread_join() returns, we have cleaned up after a thread. Only then do we free() the node, decrement the number of additional threads we need to wait for and repeat the entire process, if necessary */ /*将工作节点取出,然后使用pthread_join加入等待*/ curnode = (cnode *) queue_get(&cq.cleanup); pthread_mutex_unlock(&cq.control.mutex); pthread_join(curnode->tid, NULL); printf("joined with thread %d\n",curnode->threadnum); free(curnode); numthreads--; } } /*创建了NUM_WORKERS个线程*/ int create_threads(void) { int x; cnode *curnode; /*分配清除节点,以及初始化相应的结构*/ for (x=0; x threadnum = x; /*设置线程数*/ if (pthread_create(&curnode->tid, NULL, threadfunc, (void *) curnode)) { return 1; } printf("created thread %d\n",x); numthreads++; } return 0; } void initialize_structs(void) { numthreads = 0; /*初始化控制数据结构*/ if (control_init(&wq.control)) { dabort(); } queue_init(&wq.work); /*初始化清除数据结构*/ if (control_init(&cq.control)) { control_destroy(&wq.control); dabort(); } queue_init(&wq.work); /*queue_init(&cq.work); */ control_activate(&wq.control); } void cleanup_structs(void) { control_destroy(&cq.control); control_destroy(&wq.control); } int main(void) { int x; wnode *mywork; initialize_structs(); /*在该函数中已经将工作队列激活*/ /* CREATION */ if (create_threads()) { printf("Error starting threads... cleaning up.\n"); join_threads(); dabort(); } pthread_mutex_lock(&wq.control.mutex); /*主函数往工作队列中加入16000个工作节点*/ for (x=0; x<16000; x++) { mywork = (wnode *)malloc(sizeof(wnode)); if (!mywork) { printf("ouch! can't malloc!\n"); break; } mywork->jobnum = x; queue_put(&wq.work,(node *) mywork); } pthread_mutex_unlock(&wq.control.mutex); /*广播通知等待工作节点的线程,表示条件变量已经发生改变*/ pthread_cond_broadcast(&wq.control.cond); printf("sleeping...\n"); sleep(2); /*主线程睡眠两秒钟*/ printf("deactivating work queue...\n"); /*之后将工作队列不激活,迫使其他线程退出*/ control_deactivate(&wq.control); /* CLEANUP*/ /*退出之后,主线程做清理工作*/ join_threads(); cleanup_structs(); }