seanhalle@0: /* seanhalle@0: * seanhalle@0: */ seanhalle@0: seanhalle@0: #include "main.h" seanhalle@0: #include seanhalle@0: seanhalle@0: /* seanhalle@0: * Consumer. seanhalle@0: * seanhalle@0: * Birth function for thread that performs the consumer behavior seanhalle@0: * seanhalle@0: *Here's the protocol: seanhalle@1: * Reads a production from the shared Q. seanhalle@1: * If empty, yields and tries again. seanhalle@1: * When has a production from every producer, broadcasts next iter to producers seanhalle@1: * When has all the tuples, end seanhalle@0: */ seanhalle@0: void* seanhalle@0: consumer_birthFn( void* _params ) seanhalle@0: { seanhalle@1: int numProducts; seanhalle@1: void *production; //dummy ptr seanhalle@0: seanhalle@0: ConsumerParams* params = (ConsumerParams *)_params; seanhalle@0: seanhalle@0: seanhalle@0: /*The consumer does two loops. seanhalle@0: * The outside loop counts the number of tuples created. seanhalle@0: * The inside loop collects the products for one tuple. seanhalle@0: * seanhalle@0: *Protocol: seanhalle@0: * increment tupleIter seanhalle@0: * wake producers for next tuple seanhalle@1: * Reads a production from the shared Q. seanhalle@1: * If empty, yields and tries again. seanhalle@1: * if more productions in tuple, repeat seanhalle@1: * if have more tuples, repeat seanhalle@1: * end thread seanhalle@0: */ seanhalle@0: while( tupleIter < params->numTuplesToCreate ) seanhalle@0: { seanhalle@1: if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); seanhalle@1: seanhalle@1: // wake producers for next iter seanhalle@0: DEBUG__printf("consumer broadcast for next iter\n"); seanhalle@0: pthread_mutex_lock(&tupleIterLock); seanhalle@1: tupleIter += 1; // increment tupleIter (global shared) seanhalle@0: pthread_cond_broadcast( &tupleIterCond ); seanhalle@0: pthread_mutex_unlock(&tupleIterLock); seanhalle@0: seanhalle@0: for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) seanhalle@0: { seanhalle@1: // read a production from the shared Q. seanhalle@1: production = NULL; seanhalle@1: while( production == NULL ) seanhalle@1: { pthread_mutex_lock( &queueAccessLock ); seanhalle@1: production = readPrivQ( commQ ); seanhalle@1: pthread_mutex_unlock( &queueAccessLock ); seanhalle@1: // If empty, yields and tries again. seanhalle@1: if( production == NULL) sched_yield(); seanhalle@0: } seanhalle@0: DEBUG__printf1( "consumer got production %d\n", currProductionNum ); seanhalle@1: } //if more productions for current tuple, repeat seanhalle@0: seanhalle@0: // have all productions for current tuple, so add tuple to output seanhalle@0: //add tuple to output and malloc new tuple array -- overhead meas, do nothing seanhalle@0: seanhalle@0: } // if have all tuples are going to produce, end seanhalle@0: seanhalle@0: //Shutdown consumer thread seanhalle@0: pthread_exit(NULL); seanhalle@0: seanhalle@0: } seanhalle@0: