annotate src/Application/consumer.c @ 1:88db7b62b961

Working queue based version
author Sean Halle <seanhalle@yahoo.com>
date Wed, 10 Jul 2013 14:17:04 -0700
parents 9cf9b2091eeb
children
rev   line source
seanhalle@0 1 /*
seanhalle@0 2 *
seanhalle@0 3 */
seanhalle@0 4
seanhalle@0 5 #include "main.h"
seanhalle@0 6 #include <pthread.h>
seanhalle@0 7
seanhalle@0 8 /*
seanhalle@0 9 * Consumer.
seanhalle@0 10 *
seanhalle@0 11 * Birth function for thread that performs the consumer behavior
seanhalle@0 12 *
seanhalle@0 13 *Here's the protocol:
seanhalle@1 14 * Reads a production from the shared Q.
seanhalle@1 15 * If empty, yields and tries again.
seanhalle@1 16 * When has a production from every producer, broadcasts next iter to producers
seanhalle@1 17 * When has all the tuples, end
seanhalle@0 18 */
seanhalle@0 19 void*
seanhalle@0 20 consumer_birthFn( void* _params )
seanhalle@0 21 {
seanhalle@1 22 int numProducts;
seanhalle@1 23 void *production; //dummy ptr
seanhalle@0 24
seanhalle@0 25 ConsumerParams* params = (ConsumerParams *)_params;
seanhalle@0 26
seanhalle@0 27
seanhalle@0 28 /*The consumer does two loops.
seanhalle@0 29 * The outside loop counts the number of tuples created.
seanhalle@0 30 * The inside loop collects the products for one tuple.
seanhalle@0 31 *
seanhalle@0 32 *Protocol:
seanhalle@0 33 * increment tupleIter
seanhalle@0 34 * wake producers for next tuple
seanhalle@1 35 * Reads a production from the shared Q.
seanhalle@1 36 * If empty, yields and tries again.
seanhalle@1 37 * if more productions in tuple, repeat
seanhalle@1 38 * if have more tuples, repeat
seanhalle@1 39 * end thread
seanhalle@0 40 */
seanhalle@0 41 while( tupleIter < params->numTuplesToCreate )
seanhalle@0 42 {
seanhalle@1 43 if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);
seanhalle@1 44
seanhalle@1 45 // wake producers for next iter
seanhalle@0 46 DEBUG__printf("consumer broadcast for next iter\n");
seanhalle@0 47 pthread_mutex_lock(&tupleIterLock);
seanhalle@1 48 tupleIter += 1; // increment tupleIter (global shared)
seanhalle@0 49 pthread_cond_broadcast( &tupleIterCond );
seanhalle@0 50 pthread_mutex_unlock(&tupleIterLock);
seanhalle@0 51
seanhalle@0 52 for( numProducts = 0; numProducts < params->numProducers; numProducts++ )
seanhalle@0 53 {
seanhalle@1 54 // read a production from the shared Q.
seanhalle@1 55 production = NULL;
seanhalle@1 56 while( production == NULL )
seanhalle@1 57 { pthread_mutex_lock( &queueAccessLock );
seanhalle@1 58 production = readPrivQ( commQ );
seanhalle@1 59 pthread_mutex_unlock( &queueAccessLock );
seanhalle@1 60 // If empty, yields and tries again.
seanhalle@1 61 if( production == NULL) sched_yield();
seanhalle@0 62 }
seanhalle@0 63 DEBUG__printf1( "consumer got production %d\n", currProductionNum );
seanhalle@1 64 } //if more productions for current tuple, repeat
seanhalle@0 65
seanhalle@0 66 // have all productions for current tuple, so add tuple to output
seanhalle@0 67 //add tuple to output and malloc new tuple array -- overhead meas, do nothing
seanhalle@0 68
seanhalle@0 69 } // if have all tuples are going to produce, end
seanhalle@0 70
seanhalle@0 71 //Shutdown consumer thread
seanhalle@0 72 pthread_exit(NULL);
seanhalle@0 73
seanhalle@0 74 }
seanhalle@0 75