#include "test.h" using namespace std; #define NTHREADS 4 struct Targ targs[NTHREADS]; size_t qsize = 16; bool stop = false; void sigint_handler(int sig) { cerr << "sigint_handler" << endl; stop = true; } void* run (void *arg) { struct Targ * targ = (struct Targ * )arg; // cerr << "productor key="<key << endl; err_msg(0, "productor key = %d\n", targ->key ); // LockFreeQueue *queue = QueueFactory::createQueue (targ->key, qsize); SHMQueue *queue = new SHMQueue(targ->key, qsize); /* Transfer blocks of data from stdin to shared memory */ int end = targ->end; struct Item item; struct timespec timeout = {10, 0}; int i = targ->start; item.pic = i; item.info = i; while(!stop && (end == -1 || (i < end) ) && (queue->push(item)) ) { item.pic = i; item.info = i; printf("productor(%d) 入队:{%d, %d}\n", targ->key, item.pic, item.info); i++; } delete queue; return (void *)i; } int main(int argc, char *argv[]) { signal(SIGINT, sigint_handler); /* Create set containing two semaphores; initialize so that writer has first access to shared memory. */ int i; pthread_t tids[NTHREADS]; void *res[NTHREADS]; for (i = 0; i< NTHREADS; i++) { targs[i].key = i; targs[i].start = 0; targs[i].end = 100000; pthread_create(&tids[i], NULL, run, (void *)&targs[i]); //sleep(1); } for (i = 0; i< NTHREADS; i++) { if(pthread_join(tids[i], &res[i])!=0) { perror("productor pthread_join"); } else { fprintf(stderr, "productor(%d) 写入 %ld 条数据\n", i, (long)res[i]); } } mm_destroy(); cerr << "productor quit" << endl; exit(EXIT_SUCCESS); }