Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
comparison src/Application/consumer.c @ 1:88db7b62b961
Working queue based version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:17:04 -0700 |
| parents | 9cf9b2091eeb |
| children |
comparison
equal
deleted
inserted
replaced
| 0:41753310be14 | 1:52416ae1b81d |
|---|---|
| 9 * Consumer. | 9 * Consumer. |
| 10 * | 10 * |
| 11 * Birth function for thread that performs the consumer behavior | 11 * Birth function for thread that performs the consumer behavior |
| 12 * | 12 * |
| 13 *Here's the protocol: | 13 *Here's the protocol: |
| 14 *Consumer is born waiting for some producer to send it a production. | 14 * Reads a production from the shared Q. |
| 15 *When it wakes, it reads the variables used for communication, and packages | 15 * If empty, yields and tries again. |
| 16 * the information into a tuple it is constructing. | 16 * When has a production from every producer, broadcasts next iter to producers |
| 17 *Then it wakes the producer, who is waiting to be sure that send was received. | 17 * When has all the tuples, end |
| 18 *If the tuple is not yet complete, it loops back for another production. | |
| 19 *When tuple complete, it adds that tuple to the output | |
| 20 *If that's the last tuple, it ends itself | |
| 21 *If not, then wakes all the producers, who go to the next iteration. | |
| 22 *Then loops back to wait for some producer to send it a production | |
| 23 */ | 18 */ |
| 24 void* | 19 void* |
| 25 consumer_birthFn( void* _params ) | 20 consumer_birthFn( void* _params ) |
| 26 { | 21 { |
| 27 int lastSeenProductionNum, numProducts; | 22 int numProducts; |
| 23 void *production; //dummy ptr | |
| 28 | 24 |
| 29 ConsumerParams* params = (ConsumerParams *)_params; | 25 ConsumerParams* params = (ConsumerParams *)_params; |
| 30 | 26 |
| 31 | 27 |
| 32 /*The consumer does two loops. | 28 /*The consumer does two loops. |
| 34 * The inside loop collects the products for one tuple. | 30 * The inside loop collects the products for one tuple. |
| 35 * | 31 * |
| 36 *Protocol: | 32 *Protocol: |
| 37 * increment tupleIter | 33 * increment tupleIter |
| 38 * wake producers for next tuple | 34 * wake producers for next tuple |
| 39 * wait on production ready | 35 * Reads a production from the shared Q. |
| 40 * read comm vars and add msg to current tuple | 36 * If empty, yields and tries again. |
| 41 * increment ACK count | 37 * if more productions in tuple, repeat |
| 42 * wake producer (who is waiting for ack) | 38 * if have more tuples, repeat |
| 43 * if more productions for current tuple, repeat | 39 * end thread |
| 44 * have all productions for current tuple, so add tuple to output | |
| 45 * if have all tuples are going to produce, end | |
| 46 * else more, so repeat | |
| 47 */ | 40 */ |
| 48 while( tupleIter < params->numTuplesToCreate ) | 41 while( tupleIter < params->numTuplesToCreate ) |
| 49 { | 42 { |
| 50 // increment tupleIter (global shared) | 43 if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); |
| 51 // wake producers for next iter (don't need cond lock? -- teeter totter) | 44 |
| 45 // wake producers for next iter | |
| 52 DEBUG__printf("consumer broadcast for next iter\n"); | 46 DEBUG__printf("consumer broadcast for next iter\n"); |
| 53 pthread_mutex_lock(&tupleIterLock); | 47 pthread_mutex_lock(&tupleIterLock); |
| 54 tupleIter += 1; | 48 tupleIter += 1; // increment tupleIter (global shared) |
| 55 pthread_cond_broadcast( &tupleIterCond ); | 49 pthread_cond_broadcast( &tupleIterCond ); |
| 56 pthread_mutex_unlock(&tupleIterLock); | 50 pthread_mutex_unlock(&tupleIterLock); |
| 57 | |
| 58 if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter); | |
| 59 | 51 |
| 60 for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) | 52 for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) |
| 61 { | 53 { |
| 62 //wait on productionReadyCond (suspend until there is a production) | 54 // read a production from the shared Q. |
| 63 pthread_mutex_lock( &productionReadyLock ); | 55 production = NULL; |
| 64 while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID | 56 while( production == NULL ) |
| 65 { | 57 { pthread_mutex_lock( &queueAccessLock ); |
| 66 pthread_cond_wait( &productionReadyCond, &productionReadyLock ); | 58 production = readPrivQ( commQ ); |
| 59 pthread_mutex_unlock( &queueAccessLock ); | |
| 60 // If empty, yields and tries again. | |
| 61 if( production == NULL) sched_yield(); | |
| 67 } | 62 } |
| 68 DEBUG__printf1( "consumer got production %d\n", currProductionNum ); | 63 DEBUG__printf1( "consumer got production %d\n", currProductionNum ); |
| 69 lastSeenProductionNum = currProductionNum; | 64 } //if more productions for current tuple, repeat |
| 70 pthread_mutex_unlock( &productionReadyLock ); | |
| 71 | |
| 72 //Read comm vars and add msg to current tuple | |
| 73 //add production to tuple -- overhead meas, do nothing | |
| 74 | |
| 75 // increment ACK count | |
| 76 pthread_mutex_lock( &consumerReceivedAckLock ); | |
| 77 currConsumerReceivedACKNum += 1;//make different than last time prod saw | |
| 78 | |
| 79 // wake producer (who is waiting to be sure that send was received) | |
| 80 DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum); | |
| 81 pthread_cond_broadcast( &consumerReceivedAckCond ); | |
| 82 pthread_mutex_unlock( &consumerReceivedAckLock ); | |
| 83 } //if more productions for current tuple, repeat | |
| 84 | 65 |
| 85 // have all productions for current tuple, so add tuple to output | 66 // have all productions for current tuple, so add tuple to output |
| 86 //add tuple to output and malloc new tuple array -- overhead meas, do nothing | 67 //add tuple to output and malloc new tuple array -- overhead meas, do nothing |
| 87 | 68 |
| 88 } // if have all tuples are going to produce, end | 69 } // if have all tuples are going to produce, end |
