Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > pthread > pthread__k_tuple__async
view src/Application/consumer.c @ 1:88db7b62b961
Working queue based version
| author | Sean Halle <seanhalle@yahoo.com> |
|---|---|
| date | Wed, 10 Jul 2013 14:17:04 -0700 |
| parents | 9cf9b2091eeb |
| children |
line source
1 /*
2 *
3 */
5 #include "main.h"
6 #include <pthread.h>
8 /*
9 * Consumer.
10 *
11 * Birth function for thread that performs the consumer behavior
12 *
13 *Here's the protocol:
14 * Reads a production from the shared Q.
15 * If empty, yields and tries again.
16 * When has a production from every producer, broadcasts next iter to producers
17 * When has all the tuples, end
18 */
19 void*
20 consumer_birthFn( void* _params )
21 {
22 int numProducts;
23 void *production; //dummy ptr
25 ConsumerParams* params = (ConsumerParams *)_params;
28 /*The consumer does two loops.
29 * The outside loop counts the number of tuples created.
30 * The inside loop collects the products for one tuple.
31 *
32 *Protocol:
33 * increment tupleIter
34 * wake producers for next tuple
35 * Reads a production from the shared Q.
36 * If empty, yields and tries again.
37 * if more productions in tuple, repeat
38 * if have more tuples, repeat
39 * end thread
40 */
41 while( tupleIter < params->numTuplesToCreate )
42 {
43 if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);
45 // wake producers for next iter
46 DEBUG__printf("consumer broadcast for next iter\n");
47 pthread_mutex_lock(&tupleIterLock);
48 tupleIter += 1; // increment tupleIter (global shared)
49 pthread_cond_broadcast( &tupleIterCond );
50 pthread_mutex_unlock(&tupleIterLock);
52 for( numProducts = 0; numProducts < params->numProducers; numProducts++ )
53 {
54 // read a production from the shared Q.
55 production = NULL;
56 while( production == NULL )
57 { pthread_mutex_lock( &queueAccessLock );
58 production = readPrivQ( commQ );
59 pthread_mutex_unlock( &queueAccessLock );
60 // If empty, yields and tries again.
61 if( production == NULL) sched_yield();
62 }
63 DEBUG__printf1( "consumer got production %d\n", currProductionNum );
64 } //if more productions for current tuple, repeat
66 // have all productions for current tuple, so add tuple to output
67 //add tuple to output and malloc new tuple array -- overhead meas, do nothing
69 } // if have all tuples are going to produce, end
71 //Shutdown consumer thread
72 pthread_exit(NULL);
74 }
