#include "test.h" using namespace std; #define NTHREADS 4 struct Targ targs[NTHREADS]; size_t qsize = 16; bool stop = false; void sigint_handler(int sig) { stop = true; } void* run (void *arg) { struct Targ * targ = (struct Targ * )arg; // SArrayLockFreeQueue *queue = QFactory::createArrayLockFreeQueue (targ->key, 10); // LockFreeQueue *queue = QueueFactory::createQueue (targ->key, qsize); SHMQueue *queue = new SHMQueue(targ->key, qsize); struct Item item; struct timespec timeout = {5, 0}; int i = 0; while(!stop && (queue->pop_timeout(item, &timeout)) ) { printf("consumer(%d) 出队: {%d, %d}\n", targ->key, item.pic, item.info); // cout << item.pic << endl; i++; } delete queue; return (void *)i; } int main(int argc, char *argv[]) { signal(SIGINT, sigint_handler); /* Create set containing two semaphores; initialize so that writer has first access to shared memory. */ int i; pthread_t tids[NTHREADS]; void *res[NTHREADS]; struct Targ targs[NTHREADS]; for (i = 0; i< NTHREADS; i++) { targs[i].key = i; pthread_create(&tids[i], NULL, run, (void *)&targs[i]); } for (i = 0; i< NTHREADS; i++) { if(pthread_join(tids[i], &res[i])!=0) { perror("productor pthread_join"); } else { fprintf(stderr, "cosumer(%d) 读取了 %ld 条数据\n", i, (long)res[i]); } } mm_destroy(); cerr << "cosumer quit" << endl; exit(EXIT_SUCCESS); }