comparison src/Application/consumer.c @ 1:88db7b62b961

Working queue based version
author Sean Halle <seanhalle@yahoo.com>
date Wed, 10 Jul 2013 14:17:04 -0700
parents 9cf9b2091eeb
children
comparison
equal deleted inserted replaced
0:41753310be14 1:52416ae1b81d
9 * Consumer. 9 * Consumer.
10 * 10 *
11 * Birth function for thread that performs the consumer behavior 11 * Birth function for thread that performs the consumer behavior
12 * 12 *
13 *Here's the protocol: 13 *Here's the protocol:
14 *Consumer is born waiting for some producer to send it a production. 14 * Reads a production from the shared Q.
15 *When it wakes, it reads the variables used for communication, and packages 15 * If empty, yields and tries again.
16 * the information into a tuple it is constructing. 16 * When has a production from every producer, broadcasts next iter to producers
17 *Then it wakes the producer, who is waiting to be sure that send was received. 17 * When has all the tuples, end
18 *If the tuple is not yet complete, it loops back for another production.
19 *When tuple complete, it adds that tuple to the output
20 *If that's the last tuple, it ends itself
21 *If not, then wakes all the producers, who go to the next iteration.
22 *Then loops back to wait for some producer to send it a production
23 */ 18 */
24 void* 19 void*
25 consumer_birthFn( void* _params ) 20 consumer_birthFn( void* _params )
26 { 21 {
27 int lastSeenProductionNum, numProducts; 22 int numProducts;
23 void *production; //dummy ptr
28 24
29 ConsumerParams* params = (ConsumerParams *)_params; 25 ConsumerParams* params = (ConsumerParams *)_params;
30 26
31 27
32 /*The consumer does two loops. 28 /*The consumer does two loops.
34 * The inside loop collects the products for one tuple. 30 * The inside loop collects the products for one tuple.
35 * 31 *
36 *Protocol: 32 *Protocol:
37 * increment tupleIter 33 * increment tupleIter
38 * wake producers for next tuple 34 * wake producers for next tuple
39 * wait on production ready 35 * Reads a production from the shared Q.
40 * read comm vars and add msg to current tuple 36 * If empty, yields and tries again.
41 * increment ACK count 37 * if more productions in tuple, repeat
42 * wake producer (who is waiting for ack) 38 * if have more tuples, repeat
43 * if more productions for current tuple, repeat 39 * end thread
44 * have all productions for current tuple, so add tuple to output
45 * if have all tuples are going to produce, end
46 * else more, so repeat
47 */ 40 */
48 while( tupleIter < params->numTuplesToCreate ) 41 while( tupleIter < params->numTuplesToCreate )
49 { 42 {
50 // increment tupleIter (global shared) 43 if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);
51 // wake producers for next iter (don't need cond lock? -- teeter totter) 44
45 // wake producers for next iter
52 DEBUG__printf("consumer broadcast for next iter\n"); 46 DEBUG__printf("consumer broadcast for next iter\n");
53 pthread_mutex_lock(&tupleIterLock); 47 pthread_mutex_lock(&tupleIterLock);
54 tupleIter += 1; 48 tupleIter += 1; // increment tupleIter (global shared)
55 pthread_cond_broadcast( &tupleIterCond ); 49 pthread_cond_broadcast( &tupleIterCond );
56 pthread_mutex_unlock(&tupleIterLock); 50 pthread_mutex_unlock(&tupleIterLock);
57
58 if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);
59 51
60 for( numProducts = 0; numProducts < params->numProducers; numProducts++ ) 52 for( numProducts = 0; numProducts < params->numProducers; numProducts++ )
61 { 53 {
62 //wait on productionReadyCond (suspend until there is a production) 54 // read a production from the shared Q.
63 pthread_mutex_lock( &productionReadyLock ); 55 production = NULL;
64 while( lastSeenProductionNum == currProductionNum ) //new tuple sets to 0 -- no 0 producerID 56 while( production == NULL )
65 { 57 { pthread_mutex_lock( &queueAccessLock );
66 pthread_cond_wait( &productionReadyCond, &productionReadyLock ); 58 production = readPrivQ( commQ );
59 pthread_mutex_unlock( &queueAccessLock );
60 // If empty, yields and tries again.
61 if( production == NULL) sched_yield();
67 } 62 }
68 DEBUG__printf1( "consumer got production %d\n", currProductionNum ); 63 DEBUG__printf1( "consumer got production %d\n", currProductionNum );
69 lastSeenProductionNum = currProductionNum; 64 } //if more productions for current tuple, repeat
70 pthread_mutex_unlock( &productionReadyLock );
71
72 //Read comm vars and add msg to current tuple
73 //add production to tuple -- overhead meas, do nothing
74
75 // increment ACK count
76 pthread_mutex_lock( &consumerReceivedAckLock );
77 currConsumerReceivedACKNum += 1;//make different than last time prod saw
78
79 // wake producer (who is waiting to be sure that send was received)
80 DEBUG__printf1("consumer broadcast ACK %d\n", currConsumerReceivedACKNum);
81 pthread_cond_broadcast( &consumerReceivedAckCond );
82 pthread_mutex_unlock( &consumerReceivedAckLock );
83 } //if more productions for current tuple, repeat
84 65
85 // have all productions for current tuple, so add tuple to output 66 // have all productions for current tuple, so add tuple to output
86 //add tuple to output and malloc new tuple array -- overhead meas, do nothing 67 //add tuple to output and malloc new tuple array -- overhead meas, do nothing
87 68
88 } // if have all tuples are going to produce, end 69 } // if have all tuples are going to produce, end