Mercurial > cgi-bin > hgwebdir.cgi > PR > Applications > Vthread > Vthread__KMeans__Bench
diff pthreads_kmeans.c @ 1:8e7bdab2840f
VPThread version workinh
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Tue, 16 Aug 2011 20:32:55 +0200 |
| parents | e69e4c2d612a |
| children |
line diff
1.1 --- a/pthreads_kmeans.c Wed Aug 03 19:30:34 2011 +0200 1.2 +++ b/pthreads_kmeans.c Tue Aug 16 20:32:55 2011 +0200 1.3 @@ -24,19 +24,54 @@ 1.4 #include <math.h> 1.5 #include "kmeans.h" 1.6 1.7 +#include "VPThread_lib/VPThread.h" 1.8 + 1.9 #define PREC 300 1.10 1.11 -char __ProgrammName[] = "kmeans"; 1.12 -char __DataSet[255]; 1.13 +struct barrier_t 1.14 +{ 1.15 + int counter; 1.16 + int nthreads; 1.17 + int32 mutex; 1.18 + int32 cond; 1.19 +}; 1.20 +typedef struct barrier_t barrier; 1.21 1.22 extern int nthreads; /* Thread count */ 1.23 double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ 1.24 volatile int finished; 1.25 1.26 -pthread_barrier_t barr; 1.27 -pthread_mutex_t lock1; 1.28 +barrier barr; 1.29 +int32 lock1; 1.30 pthread_attr_t attr; 1.31 1.32 +void inline barrier_init(barrier *barr, int nthreads, VirtProcr *VProc) 1.33 +{ 1.34 + barr->counter = 0; 1.35 + barr->nthreads = nthreads; 1.36 + barr->mutex = VPThread__make_mutex(VProc); 1.37 + barr->cond = VPThread__make_cond(barr->mutex, VProc); 1.38 +} 1.39 + 1.40 +void inline barrier_wait(barrier *barr, VirtProcr *VProc) 1.41 +{ 1.42 + int i; 1.43 + 1.44 + VPThread__mutex_lock(barr->mutex, VProc); 1.45 + barr->counter++; 1.46 + if(barr->counter == barr->nthreads) 1.47 + { 1.48 + barr->counter = 0; 1.49 + for(i=0; i < barr->nthreads; i++) 1.50 + VPThread__cond_signal(barr->cond, VProc); 1.51 + } 1.52 + else 1.53 + { 1.54 + VPThread__cond_wait(barr->cond, VProc); 1.55 + } 1.56 + VPThread__mutex_unlock(barr->mutex, VProc); 1.57 +} 1.58 + 1.59 /* 1.60 * Struct: input 1.61 * ------------- 1.62 @@ -93,7 +128,7 @@ 1.63 return index; 1.64 } 1.65 1.66 -void work(struct input *x){ 1.67 +void work(struct input *x, VirtProcr *VProc){ 1.68 int tid = x->t; 1.69 double local_delta=0; 1.70 int i; 1.71 @@ -116,30 +151,32 @@ 1.72 x->local_newClusters[tid][index][j] += x->objects[i][j]; 1.73 1.74 } 1.75 - pthread_mutex_lock(&lock1); 1.76 + VPThread__mutex_lock(lock1, VProc); 1.77 delta +=local_delta; 1.78 - pthread_mutex_unlock(&lock1); 1.79 + VPThread__mutex_unlock(lock1, VProc); 1.80 } 1.81 + 1.82 /* 1.83 * Function: thread function work 1.84 * -------------- 1.85 * Worker function for threading. Work distribution is done so that each thread computers 1.86 */ 1.87 -void* tfwork(void *ip) 1.88 +void tfwork(void *ip, VirtProcr *VProc) 1.89 { 1.90 struct input *x; 1.91 x = (struct input *)ip; 1.92 1.93 for(;;){ 1.94 - pthread_barrier_wait(&barr); 1.95 + barrier_wait(&barr, VProc); 1.96 if (finished){ 1.97 break; 1.98 } 1.99 - work(x); 1.100 - pthread_barrier_wait(&barr); 1.101 + work(x, VProc); 1.102 + barrier_wait(&barr, VProc); 1.103 } 1.104 1.105 - pthread_exit(NULL); 1.106 + //pthread_exit(NULL); 1.107 + VPThread__dissipate_thread(VProc); 1.108 } 1.109 1.110 /* 1.111 @@ -147,12 +184,12 @@ 1.112 * -------------------------- 1.113 * Allocates memory for a 2-dim double array as needed for the algorithm. 1.114 */ 1.115 -double** create_array_2d_f(int height, int width) { 1.116 +double** create_array_2d_f(int height, int width, VirtProcr *VProc) { 1.117 double** ptr; 1.118 int i; 1.119 - ptr = calloc(height, sizeof(double*)); 1.120 + ptr = VPThread__malloc(height * sizeof(double*), VProc); 1.121 assert(ptr != NULL); 1.122 - ptr[0] = calloc(width * height, sizeof(double)); 1.123 + ptr[0] = VPThread__malloc(width * height * sizeof(double), VProc); 1.124 assert(ptr[0] != NULL); 1.125 /* Assign pointers correctly */ 1.126 for(i = 1; i < height; i++) 1.127 @@ -165,12 +202,12 @@ 1.128 * -------------------------- 1.129 * Allocates memory for a 2-dim integer array as needed for the algorithm. 1.130 */ 1.131 -int** create_array_2d_i(int height, int width) { 1.132 +int** create_array_2d_i(int height, int width, VirtProcr *VProc) { 1.133 int** ptr; 1.134 int i; 1.135 - ptr = calloc(height, sizeof(int*)); 1.136 + ptr = VPThread__malloc(height * sizeof(int*), VProc); 1.137 assert(ptr != NULL); 1.138 - ptr[0] = calloc(width * height, sizeof(int)); 1.139 + ptr[0] = VPThread__malloc(width * height * sizeof(int), VProc); 1.140 assert(ptr[0] != NULL); 1.141 /* Assign pointers correctly */ 1.142 for(i = 1; i < height; i++) 1.143 @@ -183,30 +220,34 @@ 1.144 * ------------------------- 1.145 * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. 1.146 */ 1.147 -double** pthreads_kmeans(int is_perform_atomic, /* in: */ 1.148 - double **objects, /* in: [numObjs][numCoords] */ 1.149 - int numCoords, /* no. coordinates */ 1.150 - int numObjs, /* no. objects */ 1.151 - int numClusters, /* no. clusters */ 1.152 - double threshold, /* % objects change membership */ 1.153 - int *membership) /* out: [numObjs] */ 1.154 +void pthreads_kmeans(void *data, VirtProcr *VProc) 1.155 { 1.156 + struct call_data *cluster_data = (struct call_data*)data; 1.157 + //int is_perform_atomic = cluster_data->is_perform_atomic; /* in: */ 1.158 + double **objects = cluster_data->objects; /* in: [numObjs][numCoords] */ 1.159 + int numCoords = cluster_data->numCoords; /* no. coordinates */ 1.160 + int numObjs = cluster_data->numObjs; /* no. objects */ 1.161 + int numClusters = cluster_data->numClusters; /* no. clusters */ 1.162 + double threshold = cluster_data->threshold; /* % objects change membership */ 1.163 + int *membership = cluster_data->membership; /* out: [numObjs] */ 1.164 1.165 - int i, j, k, index, loop = 0, rc; 1.166 + int i, j, k, loop = 0; 1.167 int *newClusterSize; /* [numClusters]: no. objects assigned in each 1.168 new cluster */ 1.169 - double **clusters; /* out: [numClusters][numCoords] */ 1.170 + double **clusters = cluster_data->clusters; /* out: [numClusters][numCoords] */ 1.171 double **newClusters; /* [numClusters][numCoords] */ 1.172 - double timing; 1.173 + //double timing; 1.174 int **local_newClusterSize; /* [nthreads][numClusters] */ 1.175 double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ 1.176 1.177 - pthread_t *thread; 1.178 + VirtProcr **thread; 1.179 1.180 /* === MEMORY SETUP === */ 1.181 1.182 /* [numClusters] clusters of [numCoords] double coordinates each */ 1.183 - clusters = create_array_2d_f(numClusters, numCoords); 1.184 + //Set pointers 1.185 + for(i = 1; i < numClusters; i++) 1.186 + clusters[i] = clusters[i-1] + numCoords; 1.187 1.188 /* Pick first numClusters elements of objects[] as initial cluster centers */ 1.189 for (i=0; i < numClusters; i++) 1.190 @@ -218,17 +259,17 @@ 1.191 membership[i] = -1; 1.192 1.193 /* newClusterSize holds information on the count of members in each cluster */ 1.194 - newClusterSize = (int*)calloc(numClusters, sizeof(int)); 1.195 + newClusterSize = (int*)VPThread__malloc(numClusters * sizeof(int), VProc); 1.196 assert(newClusterSize != NULL); 1.197 1.198 /* newClusters holds the coordinates of the freshly created clusters */ 1.199 - newClusters = create_array_2d_f(numClusters, numCoords); 1.200 - local_newClusterSize = create_array_2d_i(nthreads, numClusters); 1.201 + newClusters = create_array_2d_f(numClusters, numCoords, VProc); 1.202 + local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc); 1.203 1.204 /* local_newClusters is a 3D array */ 1.205 - local_newClusters = (double***)malloc(nthreads * sizeof(double**)); 1.206 + local_newClusters = (double***)VPThread__malloc(nthreads * sizeof(double**), VProc); 1.207 assert(local_newClusters != NULL); 1.208 - local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*)); 1.209 + local_newClusters[0] = (double**) VPThread__malloc(nthreads * numClusters * sizeof(double*), VProc); 1.210 assert(local_newClusters[0] != NULL); 1.211 1.212 /* Set up the pointers */ 1.213 @@ -237,21 +278,18 @@ 1.214 1.215 for (i = 0; i < nthreads; i++) { 1.216 for (j = 0; j < numClusters; j++) { 1.217 - local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double)); 1.218 + local_newClusters[i][j] = (double*)VPThread__malloc(numCoords * sizeof(double), VProc); 1.219 assert(local_newClusters[i][j] != NULL); 1.220 } 1.221 } 1.222 /* Perform thread setup */ 1.223 - thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); 1.224 + thread = (VirtProcr**)VPThread__malloc(nthreads * sizeof(VirtProcr*), VProc); 1.225 1.226 - printf("nthreads %d\n", nthreads); 1.227 - pthread_barrier_init(&barr, NULL, nthreads); 1.228 - pthread_attr_init(&attr); 1.229 - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 1.230 - pthread_mutex_init(&lock1, NULL); 1.231 + barrier_init(&barr, nthreads, VProc); 1.232 + lock1 = VPThread__make_mutex(VProc); 1.233 finished=0; 1.234 1.235 - struct input *ip = malloc(nthreads * sizeof(struct input)); 1.236 + struct input *ip = VPThread__malloc(nthreads * sizeof(struct input), VProc); 1.237 /* Provide thread-safe memory locations for each worker */ 1.238 for(i = 0; i < nthreads; i++){ 1.239 ip[i].t = i; 1.240 @@ -265,11 +303,7 @@ 1.241 ip[i].numCoords=numCoords; 1.242 1.243 if (i>0){ 1.244 - rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]); 1.245 - if (rc) { 1.246 - fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc); 1.247 - exit(EXIT_FAILURE); 1.248 - } 1.249 + thread[i] = VPThread__create_thread(tfwork, (void*)&ip[i], VProc); 1.250 } 1.251 } 1.252 1.253 @@ -277,10 +311,10 @@ 1.254 1.255 do { 1.256 delta = 0.0; 1.257 - pthread_barrier_wait(&barr); 1.258 - work(&ip[0]); 1.259 + barrier_wait(&barr, VProc); 1.260 + work(&ip[0], VProc); 1.261 1.262 - pthread_barrier_wait(&barr); 1.263 + barrier_wait(&barr, VProc); 1.264 /* Let the main thread perform the array reduction */ 1.265 for (i = 0; i < numClusters; i++) { 1.266 for (j = 0; j < nthreads; j++) { 1.267 @@ -306,39 +340,32 @@ 1.268 delta /= numObjs; 1.269 } while (loop++ < PREC && delta > threshold); 1.270 1.271 - // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, 1.272 + // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, 1.273 // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed 1.274 // iterations, therefore making benchmarking completely unreliable. 1.275 1.276 - finished=1; 1.277 - pthread_barrier_wait(&barr); 1.278 + finished=1; 1.279 + barrier_wait(&barr, VProc); 1.280 1.281 - for(i = 1; i < nthreads; i++) { 1.282 - rc = pthread_join(thread[i], NULL); 1.283 - if (rc) { 1.284 - fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc); 1.285 - exit(EXIT_FAILURE); 1.286 - } 1.287 - } 1.288 - 1.289 - free(ip); 1.290 - free(thread); 1.291 - pthread_barrier_destroy(&barr); 1.292 - pthread_mutex_destroy(&lock1); 1.293 - pthread_attr_destroy(&attr); 1.294 1.295 - free(local_newClusterSize[0]); 1.296 - free(local_newClusterSize); 1.297 + VPThread__free(ip, VProc); 1.298 + VPThread__free(thread, VProc); 1.299 + 1.300 + VPThread__free(local_newClusterSize[0], VProc); 1.301 + VPThread__free(local_newClusterSize, VProc); 1.302 1.303 for (i = 0; i < nthreads; i++) 1.304 for (j = 0; j < numClusters; j++) 1.305 - free(local_newClusters[i][j]); 1.306 - free(local_newClusters[0]); 1.307 - free(local_newClusters); 1.308 + VPThread__free(local_newClusters[i][j], VProc); 1.309 + VPThread__free(local_newClusters[0], VProc); 1.310 + VPThread__free(local_newClusters, VProc); 1.311 1.312 - free(newClusters[0]); 1.313 - free(newClusters); 1.314 - free(newClusterSize); 1.315 - return clusters; 1.316 + VPThread__free(newClusters[0], VProc); 1.317 + VPThread__free(newClusters, VProc); 1.318 + VPThread__free(newClusterSize, VProc); 1.319 + 1.320 + (cluster_data)->clusters = clusters; 1.321 + 1.322 + VPThread__dissipate_thread(VProc); 1.323 } 1.324
