| rev |
line source |
|
seanhalle@0
|
1 /*
|
|
seanhalle@0
|
2 *
|
|
seanhalle@0
|
3 */
|
|
seanhalle@0
|
4
|
|
seanhalle@0
|
5 #include "main.h"
|
|
seanhalle@0
|
6 #include <pthread.h>
|
|
seanhalle@0
|
7
|
|
seanhalle@0
|
8 /*
|
|
seanhalle@0
|
9 * Consumer.
|
|
seanhalle@0
|
10 *
|
|
seanhalle@0
|
11 * Birth function for thread that performs the consumer behavior
|
|
seanhalle@0
|
12 *
|
|
seanhalle@0
|
13 *Here's the protocol:
|
|
seanhalle@1
|
14 * Reads a production from the shared Q.
|
|
seanhalle@1
|
15 * If empty, yields and tries again.
|
|
seanhalle@1
|
16 * When has a production from every producer, broadcasts next iter to producers
|
|
seanhalle@1
|
17 * When has all the tuples, end
|
|
seanhalle@0
|
18 */
|
|
seanhalle@0
|
19 void*
|
|
seanhalle@0
|
20 consumer_birthFn( void* _params )
|
|
seanhalle@0
|
21 {
|
|
seanhalle@1
|
22 int numProducts;
|
|
seanhalle@1
|
23 void *production; //dummy ptr
|
|
seanhalle@0
|
24
|
|
seanhalle@0
|
25 ConsumerParams* params = (ConsumerParams *)_params;
|
|
seanhalle@0
|
26
|
|
seanhalle@0
|
27
|
|
seanhalle@0
|
28 /*The consumer does two loops.
|
|
seanhalle@0
|
29 * The outside loop counts the number of tuples created.
|
|
seanhalle@0
|
30 * The inside loop collects the products for one tuple.
|
|
seanhalle@0
|
31 *
|
|
seanhalle@0
|
32 *Protocol:
|
|
seanhalle@0
|
33 * increment tupleIter
|
|
seanhalle@0
|
34 * wake producers for next tuple
|
|
seanhalle@1
|
35 * Reads a production from the shared Q.
|
|
seanhalle@1
|
36 * If empty, yields and tries again.
|
|
seanhalle@1
|
37 * if more productions in tuple, repeat
|
|
seanhalle@1
|
38 * if have more tuples, repeat
|
|
seanhalle@1
|
39 * end thread
|
|
seanhalle@0
|
40 */
|
|
seanhalle@0
|
41 while( tupleIter < params->numTuplesToCreate )
|
|
seanhalle@0
|
42 {
|
|
seanhalle@1
|
43 if( tupleIter % 1000 == 0 ) DEBUG__printf1("tuples produced: %d\n", tupleIter);
|
|
seanhalle@1
|
44
|
|
seanhalle@1
|
45 // wake producers for next iter
|
|
seanhalle@0
|
46 DEBUG__printf("consumer broadcast for next iter\n");
|
|
seanhalle@0
|
47 pthread_mutex_lock(&tupleIterLock);
|
|
seanhalle@1
|
48 tupleIter += 1; // increment tupleIter (global shared)
|
|
seanhalle@0
|
49 pthread_cond_broadcast( &tupleIterCond );
|
|
seanhalle@0
|
50 pthread_mutex_unlock(&tupleIterLock);
|
|
seanhalle@0
|
51
|
|
seanhalle@0
|
52 for( numProducts = 0; numProducts < params->numProducers; numProducts++ )
|
|
seanhalle@0
|
53 {
|
|
seanhalle@1
|
54 // read a production from the shared Q.
|
|
seanhalle@1
|
55 production = NULL;
|
|
seanhalle@1
|
56 while( production == NULL )
|
|
seanhalle@1
|
57 { pthread_mutex_lock( &queueAccessLock );
|
|
seanhalle@1
|
58 production = readPrivQ( commQ );
|
|
seanhalle@1
|
59 pthread_mutex_unlock( &queueAccessLock );
|
|
seanhalle@1
|
60 // If empty, yields and tries again.
|
|
seanhalle@1
|
61 if( production == NULL) sched_yield();
|
|
seanhalle@0
|
62 }
|
|
seanhalle@0
|
63 DEBUG__printf1( "consumer got production %d\n", currProductionNum );
|
|
seanhalle@1
|
64 } //if more productions for current tuple, repeat
|
|
seanhalle@0
|
65
|
|
seanhalle@0
|
66 // have all productions for current tuple, so add tuple to output
|
|
seanhalle@0
|
67 //add tuple to output and malloc new tuple array -- overhead meas, do nothing
|
|
seanhalle@0
|
68
|
|
seanhalle@0
|
69 } // if have all tuples are going to produce, end
|
|
seanhalle@0
|
70
|
|
seanhalle@0
|
71 //Shutdown consumer thread
|
|
seanhalle@0
|
72 pthread_exit(NULL);
|
|
seanhalle@0
|
73
|
|
seanhalle@0
|
74 }
|
|
seanhalle@0
|
75
|