diff pthreads_kmeans.c @ 1:8e7bdab2840f

VPThread version workinh
author Merten Sach <msach@mailbox.tu-berlin.de>
date Tue, 16 Aug 2011 20:32:55 +0200
parents e69e4c2d612a
children
line diff
     1.1 --- a/pthreads_kmeans.c	Wed Aug 03 19:30:34 2011 +0200
     1.2 +++ b/pthreads_kmeans.c	Tue Aug 16 20:32:55 2011 +0200
     1.3 @@ -24,19 +24,54 @@
     1.4  #include <math.h>
     1.5  #include "kmeans.h"
     1.6  
     1.7 +#include "VPThread_lib/VPThread.h"
     1.8 +
     1.9  #define PREC 300
    1.10  
    1.11 -char __ProgrammName[] = "kmeans";
    1.12 -char __DataSet[255];
    1.13 +struct barrier_t
    1.14 +{
    1.15 +    int counter;
    1.16 +    int nthreads;
    1.17 +    int32 mutex;
    1.18 +    int32 cond;
    1.19 +};
    1.20 +typedef struct barrier_t barrier;
    1.21  
    1.22  extern int nthreads; /* Thread count */
    1.23  double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
    1.24  volatile int finished;
    1.25  
    1.26 -pthread_barrier_t barr;
    1.27 -pthread_mutex_t lock1;
    1.28 +barrier barr;
    1.29 +int32 lock1;
    1.30  pthread_attr_t attr;
    1.31  
    1.32 +void inline barrier_init(barrier *barr, int nthreads, VirtProcr *VProc)
    1.33 +{
    1.34 +    barr->counter = 0;
    1.35 +    barr->nthreads = nthreads;
    1.36 +    barr->mutex   = VPThread__make_mutex(VProc);
    1.37 +    barr->cond    = VPThread__make_cond(barr->mutex, VProc);
    1.38 +}
    1.39 +
    1.40 +void inline barrier_wait(barrier *barr, VirtProcr *VProc)
    1.41 +{
    1.42 +    int i;
    1.43 +    
    1.44 +    VPThread__mutex_lock(barr->mutex, VProc);
    1.45 +    barr->counter++;
    1.46 +    if(barr->counter == barr->nthreads)
    1.47 +    {
    1.48 +        barr->counter = 0;
    1.49 +        for(i=0; i < barr->nthreads; i++)
    1.50 +            VPThread__cond_signal(barr->cond, VProc);
    1.51 +    }
    1.52 +    else
    1.53 +    {
    1.54 +        VPThread__cond_wait(barr->cond, VProc);
    1.55 +    }
    1.56 +    VPThread__mutex_unlock(barr->mutex, VProc);
    1.57 +}
    1.58 +
    1.59  /*
    1.60  *	Struct: input
    1.61  *	-------------
    1.62 @@ -93,7 +128,7 @@
    1.63      return index;
    1.64  }
    1.65  
    1.66 -void work(struct input *x){
    1.67 +void work(struct input *x, VirtProcr *VProc){
    1.68      int tid = x->t;
    1.69      double local_delta=0;
    1.70      int i;
    1.71 @@ -116,30 +151,32 @@
    1.72              x->local_newClusters[tid][index][j] += x->objects[i][j];
    1.73  
    1.74      }
    1.75 -    pthread_mutex_lock(&lock1);
    1.76 +    VPThread__mutex_lock(lock1, VProc);
    1.77      delta +=local_delta;
    1.78 -    pthread_mutex_unlock(&lock1);
    1.79 +    VPThread__mutex_unlock(lock1, VProc);
    1.80  }
    1.81 +
    1.82  /*
    1.83  *	Function: thread function work
    1.84  *	--------------
    1.85  *	Worker function for threading. Work distribution is done so that each thread computers
    1.86  */
    1.87 -void* tfwork(void *ip)
    1.88 +void tfwork(void *ip, VirtProcr *VProc)
    1.89  {
    1.90      struct input *x;
    1.91      x = (struct input *)ip;    
    1.92      
    1.93      for(;;){
    1.94 -        pthread_barrier_wait(&barr);
    1.95 +        barrier_wait(&barr, VProc);
    1.96          if (finished){            
    1.97              break;
    1.98          }
    1.99 -        work(x);
   1.100 -        pthread_barrier_wait(&barr);
   1.101 +        work(x, VProc);
   1.102 +        barrier_wait(&barr, VProc);
   1.103      }
   1.104      
   1.105 -	pthread_exit(NULL);
   1.106 +	//pthread_exit(NULL);
   1.107 +    VPThread__dissipate_thread(VProc);
   1.108  }
   1.109  
   1.110  /*
   1.111 @@ -147,12 +184,12 @@
   1.112  *	--------------------------
   1.113  *	Allocates memory for a 2-dim double array as needed for the algorithm.
   1.114  */
   1.115 -double** create_array_2d_f(int height, int width) {
   1.116 +double** create_array_2d_f(int height, int width, VirtProcr *VProc) {
   1.117  	double** ptr;
   1.118  	int i;
   1.119 -	ptr = calloc(height, sizeof(double*));
   1.120 +	ptr = VPThread__malloc(height * sizeof(double*), VProc);
   1.121  	assert(ptr != NULL);
   1.122 -	ptr[0] = calloc(width * height, sizeof(double));
   1.123 +	ptr[0] = VPThread__malloc(width * height * sizeof(double), VProc);
   1.124  	assert(ptr[0] != NULL);
   1.125  	/* Assign pointers correctly */
   1.126  	for(i = 1; i < height; i++)
   1.127 @@ -165,12 +202,12 @@
   1.128  *	--------------------------
   1.129  *	Allocates memory for a 2-dim integer array as needed for the algorithm.
   1.130  */
   1.131 -int** create_array_2d_i(int height, int width) {
   1.132 +int** create_array_2d_i(int height, int width, VirtProcr *VProc) {
   1.133  	int** ptr;
   1.134  	int i;
   1.135 -	ptr = calloc(height, sizeof(int*));
   1.136 +	ptr = VPThread__malloc(height * sizeof(int*), VProc);
   1.137  	assert(ptr != NULL);
   1.138 -	ptr[0] = calloc(width * height, sizeof(int));
   1.139 +	ptr[0] = VPThread__malloc(width * height * sizeof(int), VProc);
   1.140  	assert(ptr[0] != NULL);
   1.141  	/* Assign pointers correctly */
   1.142  	for(i = 1; i < height; i++)
   1.143 @@ -183,30 +220,34 @@
   1.144  *	-------------------------
   1.145  *	Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
   1.146  */
   1.147 -double** pthreads_kmeans(int is_perform_atomic, 	/* in: */
   1.148 -                   double **objects,           	/* in: [numObjs][numCoords] */
   1.149 -                   int     numCoords,         	/* no. coordinates */
   1.150 -                   int     numObjs,           	/* no. objects */
   1.151 -                   int     numClusters,       	/* no. clusters */
   1.152 -                   double   threshold,         	/* % objects change membership */
   1.153 -                   int    *membership)        	/* out: [numObjs] */
   1.154 +void pthreads_kmeans(void *data, VirtProcr *VProc)
   1.155  {
   1.156 +    struct call_data *cluster_data = (struct call_data*)data;
   1.157 +    //int is_perform_atomic = cluster_data->is_perform_atomic;	/* in: */
   1.158 +    double **objects      = cluster_data->objects;           	/* in: [numObjs][numCoords] */
   1.159 +    int     numCoords     = cluster_data->numCoords;         	/* no. coordinates */
   1.160 +    int     numObjs       = cluster_data->numObjs;           	/* no. objects */
   1.161 +    int     numClusters   = cluster_data->numClusters;     	/* no. clusters */
   1.162 +    double  threshold     = cluster_data->threshold;         	/* % objects change membership */
   1.163 +    int    *membership    = cluster_data->membership;        	/* out: [numObjs] */ 
   1.164  
   1.165 -    int      i, j, k, index, loop = 0, rc;
   1.166 +    int      i, j, k, loop = 0;
   1.167      int     *newClusterSize; /* [numClusters]: no. objects assigned in each
   1.168                                  new cluster */
   1.169 -    double  **clusters;       /* out: [numClusters][numCoords] */
   1.170 +    double  **clusters    = cluster_data->clusters;       /* out: [numClusters][numCoords] */
   1.171      double  **newClusters;    /* [numClusters][numCoords] */
   1.172 -    double   timing;
   1.173 +    //double   timing;
   1.174      int    **local_newClusterSize; /* [nthreads][numClusters] */
   1.175      double ***local_newClusters;    /* [nthreads][numClusters][numCoords] */
   1.176  
   1.177 -	pthread_t *thread;
   1.178 +	VirtProcr **thread;
   1.179  
   1.180  	/* === MEMORY SETUP === */
   1.181  
   1.182  	/* [numClusters] clusters of [numCoords] double coordinates each */
   1.183 -	clusters = create_array_2d_f(numClusters, numCoords);
   1.184 +        //Set pointers
   1.185 +        for(i = 1; i < numClusters; i++)
   1.186 +		clusters[i] = clusters[i-1] + numCoords; 
   1.187  
   1.188      /* Pick first numClusters elements of objects[] as initial cluster centers */
   1.189      for (i=0; i < numClusters; i++)
   1.190 @@ -218,17 +259,17 @@
   1.191  		membership[i] = -1;
   1.192  
   1.193  	/* newClusterSize holds information on the count of members in each cluster */
   1.194 -    newClusterSize = (int*)calloc(numClusters, sizeof(int));
   1.195 +    newClusterSize = (int*)VPThread__malloc(numClusters * sizeof(int), VProc);
   1.196      assert(newClusterSize != NULL);
   1.197  
   1.198  	/* newClusters holds the coordinates of the freshly created clusters */
   1.199 -	newClusters = create_array_2d_f(numClusters, numCoords);
   1.200 -	local_newClusterSize = create_array_2d_i(nthreads, numClusters);
   1.201 +	newClusters = create_array_2d_f(numClusters, numCoords, VProc);
   1.202 +	local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc);
   1.203  
   1.204      /* local_newClusters is a 3D array */
   1.205 -    local_newClusters    = (double***)malloc(nthreads * sizeof(double**));
   1.206 +    local_newClusters    = (double***)VPThread__malloc(nthreads * sizeof(double**), VProc);
   1.207      assert(local_newClusters != NULL);
   1.208 -    local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*));
   1.209 +    local_newClusters[0] = (double**) VPThread__malloc(nthreads * numClusters * sizeof(double*), VProc);
   1.210      assert(local_newClusters[0] != NULL);
   1.211  
   1.212  	/* Set up the pointers */
   1.213 @@ -237,21 +278,18 @@
   1.214  
   1.215      for (i = 0; i < nthreads; i++) {
   1.216          for (j = 0; j < numClusters; j++) {
   1.217 -            local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double));
   1.218 +            local_newClusters[i][j] = (double*)VPThread__malloc(numCoords * sizeof(double), VProc);
   1.219              assert(local_newClusters[i][j] != NULL);
   1.220          }
   1.221      }
   1.222      /* Perform thread setup */
   1.223 -    thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t));
   1.224 +    thread = (VirtProcr**)VPThread__malloc(nthreads * sizeof(VirtProcr*), VProc);
   1.225  
   1.226 -    printf("nthreads %d\n", nthreads);
   1.227 -    pthread_barrier_init(&barr, NULL, nthreads);
   1.228 -    pthread_attr_init(&attr);
   1.229 -    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
   1.230 -    pthread_mutex_init(&lock1, NULL);
   1.231 +    barrier_init(&barr, nthreads, VProc);
   1.232 +    lock1 = VPThread__make_mutex(VProc);
   1.233      finished=0;
   1.234      
   1.235 -    struct input *ip = malloc(nthreads * sizeof(struct input));
   1.236 +    struct input *ip = VPThread__malloc(nthreads * sizeof(struct input), VProc);
   1.237      /* Provide thread-safe memory locations for each worker */
   1.238      for(i = 0; i < nthreads; i++){
   1.239          ip[i].t = i;
   1.240 @@ -265,11 +303,7 @@
   1.241          ip[i].numCoords=numCoords;
   1.242  
   1.243          if (i>0){
   1.244 -            rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]);
   1.245 -            if (rc) {
   1.246 -                fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc);
   1.247 -                exit(EXIT_FAILURE);
   1.248 -            }
   1.249 +            thread[i] = VPThread__create_thread(tfwork, (void*)&ip[i], VProc);
   1.250          }
   1.251      }
   1.252      
   1.253 @@ -277,10 +311,10 @@
   1.254      
   1.255      do {
   1.256          delta = 0.0;
   1.257 -        pthread_barrier_wait(&barr);
   1.258 -        work(&ip[0]);
   1.259 +        barrier_wait(&barr, VProc);
   1.260 +        work(&ip[0], VProc);
   1.261          
   1.262 -        pthread_barrier_wait(&barr);
   1.263 +        barrier_wait(&barr, VProc);
   1.264  		/* Let the main thread perform the array reduction */
   1.265  		for (i = 0; i < numClusters; i++) {
   1.266  			for (j = 0; j < nthreads; j++) {
   1.267 @@ -306,39 +340,32 @@
   1.268  		delta /= numObjs;
   1.269      } while (loop++ < PREC && delta > threshold);
   1.270      
   1.271 -	// Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
   1.272 +    // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
   1.273      // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
   1.274      // iterations, therefore making benchmarking completely unreliable.
   1.275  
   1.276 -    finished=1;    
   1.277 -    pthread_barrier_wait(&barr);
   1.278 +    finished=1;
   1.279 +    barrier_wait(&barr, VProc);
   1.280      
   1.281 -    for(i = 1; i < nthreads; i++) {
   1.282 -        rc = pthread_join(thread[i], NULL);
   1.283 -        if (rc) {
   1.284 -            fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc);
   1.285 -            exit(EXIT_FAILURE);
   1.286 -        }
   1.287 -    }
   1.288 -    
   1.289 -    free(ip);
   1.290 -    free(thread);
   1.291 -    pthread_barrier_destroy(&barr);
   1.292 -    pthread_mutex_destroy(&lock1);
   1.293 -	pthread_attr_destroy(&attr);
   1.294  
   1.295 -    free(local_newClusterSize[0]);
   1.296 -    free(local_newClusterSize);
   1.297 +    VPThread__free(ip, VProc);
   1.298 +    VPThread__free(thread, VProc);
   1.299 +
   1.300 +    VPThread__free(local_newClusterSize[0], VProc);
   1.301 +    VPThread__free(local_newClusterSize, VProc);
   1.302          
   1.303      for (i = 0; i < nthreads; i++)
   1.304          for (j = 0; j < numClusters; j++)
   1.305 -            free(local_newClusters[i][j]);
   1.306 -    free(local_newClusters[0]);
   1.307 -    free(local_newClusters);
   1.308 +            VPThread__free(local_newClusters[i][j], VProc);
   1.309 +    VPThread__free(local_newClusters[0], VProc);
   1.310 +    VPThread__free(local_newClusters, VProc);
   1.311  
   1.312 -    free(newClusters[0]);
   1.313 -    free(newClusters);
   1.314 -    free(newClusterSize);
   1.315 -    return clusters;
   1.316 +    VPThread__free(newClusters[0], VProc);
   1.317 +    VPThread__free(newClusters, VProc);
   1.318 +    VPThread__free(newClusterSize, VProc);
   1.319 +    
   1.320 +    (cluster_data)->clusters = clusters;
   1.321 +    
   1.322 +    VPThread__dissipate_thread(VProc);
   1.323  }
   1.324