Objet : Developers list for StarPU
Archives de la liste
- From: Ravi Teja Mullapudi <raviteja.mullapudi@gmail.com>
- To: starpu-devel@lists.gforge.inria.fr
- Cc: Roshan D <roshan18@gmail.com>, Uday Reddy <udayreddy@gmail.com>
- Subject: [Starpu-devel] How to use starpu_mpi_task_insert
- Date: Sat, 30 Mar 2013 19:19:29 +0530
- List-archive: <http://lists.gforge.inria.fr/pipermail/starpu-devel>
- List-id: "Developers list. For discussion of new features, code changes, etc." <starpu-devel.lists.gforge.inria.fr>
Hi,
I am trying to use starpu_mpi_task_insert to avoid explicitly doing MPI communication and letting starpu handle it. I am trying to do a floyd warshall implantation. The following is the outline of the code.
Task Creation
create_dist_tasks(int matrix_size, int block_size, starpu_data_handle_t * data_handles)
{
int i,j,k;
int ret;
int num_blocks = matrix_size/block_size;
/* Create non-entry tasks */
for(k = 0; k < matrix_size; k++) {
for(i = 0; i < num_blocks ; i++) {
for(j = 0; j < num_blocks; j++) {
starpu_mpi_insert_task(MPI_COMM_WORLD, &dist_cl,
STARPU_VALUE, &k,
STARPU_VALUE, &i,
STARPU_VALUE, &j,
STARPU_RW, data_handles[i*num_blocks +j],
STARPU_R, data_handles[i*num_blocks +(k/block_size)],
STARPU_R, data_handles[(k/block_size)*num_blocks + j],
STARPU_EXECUTE_ON_DATA, data_handles[i*num_blocks +j],
0);
}
}
}
return 0;
}
Registering Data
for(i = 0; i < num_blocks; i++) {
for (j = 0; j < num_blocks; j++) {
int mpi_rank = data_distribution(i, j, num_nodes);
if (mpi_rank == my_rank)
{
/* Owning data */
starpu_vector_data_register(&data_handles[i*num_blocks + j], 0,
(uintptr_t)(tiles[i*num_blocks + j]), block_size *block_size, sizeof(double));
}
else
{
/* This can probably be further optimized */
starpu_vector_data_register(&data_handles[i*num_blocks + j], -1,
(uintptr_t)(tiles[i*num_blocks + j]), block_size *block_size, sizeof(double));
}
/* Marking where to find what data */
starpu_data_set_rank(data_handles[i*num_blocks + j], mpi_rank);
}
}
Codelet
void dist_task(void *buffers[], void *cl_arg)
{
int k, i, j, block_size;
starpu_codelet_unpack_args(cl_arg, &k, &i, &j);
/* Get block size */
block_size = STARPU_VECTOR_GET_NX(buffers[0]);
double *block_ij = (double *)STARPU_VECTOR_GET_PTR(buffers[0]);
double *block_ik = (double *)STARPU_VECTOR_GET_PTR(buffers[1]);
double *block_kj = (double *)STARPU_VECTOR_GET_PTR(buffers[2]);
for (int p = 0; p < block_size; p++){
for (int q = 0; q < block_size; q++) {
for (int r = 0; r < block_size; r++) {
block_ij[q*block_size +r] = min(block_ik[q*block_size + p] + block_kj[p*block_size + r], block_ij[q*block_size +r]);
}
}
}
}
The problem with the above approach is I hit the following assertion
starpu_mpi_insert_task: Assertion `mpi_tag >= 0 && "StarPU needs to be told the MPI rank of this data, using starpu_data_set_rank"' failed.
I do not clearly understand how the mpi_tag must be specified I was under the assumption that the the above is all that is required for starpu to handle the communication automatically. I understand that I can rewrite using starpu_mpi_send and starpu_mpi_recv but I do not explicitly want to specify the communication. I am basically evaluating StarPU vs Intel CnC in a distributed setting. Attaching the implementation for a complete context.
Any help or inputs are appreciated.
Thanks,
Ravi Teja.
#include <stdint.h> #include <inttypes.h> #include <starpu.h> #include <starpu_mpi.h> #include <unistd.h> #include "../utils/utils.h" #include "floyd_starpu.h" #define FPRINTF(ofile, fmt, args ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ##args); }} while(0) #define TAG(iter, i, j) ((starpu_tag_t) ( ((uint64_t)(iter)<<47) | ((uint64_t)(j)<<11) | (i)) ) struct params { int k; int i; int j; // This is redundant for every task, but I do not like globals and secondly this // is the way to for a distributed memory setting. CnC has context where the // parameters for the task graph can be stored. It can manage this redundancy much // better. double *matrix; int matrix_size; int block_size; }; static struct starpu_perfmodel mult_perf_model = { .type = STARPU_HISTORY_BASED, .symbol = "mult_perf_model" }; static void express_deps(int k, int i, int j, int block_size) { if (k > 0) starpu_tag_declare_deps(TAG(k, i, j), 3, TAG(k-1, i, k/block_size), TAG(k-1, k/block_size, j), TAG(k-1, i, j)); } static void tag_cleanup(int matrix_size, int block_size) { int i,j,k; int num_blocks = matrix_size/block_size; for (k = 0; k < matrix_size; k++) { for (i = 0; i < num_blocks ; i++) { for(j = 0; j < num_blocks; j++) { starpu_tag_remove(TAG(k, i, j)); } } } } /* Every implementation of a codelet must have this prototype, the first * argument (buffers) describes the buffers/streams that are managed by the * DSM; the second arguments references read-only data that is passed as an * argument of the codelet (task->cl_arg). Here, "buffers" is unused as there * are no data input/output managed by the DSM (cl.nbuffers = 0) */ void shared_mem_task(void *buffers[], void *cl_arg) { struct params *params = (struct params *) cl_arg; int i, j, k, size, block_size; k = params->k; double* path_distance_matrix = params->matrix; size = params->matrix_size; block_size = params->block_size; //printf("Task Scheduled(%d %d %d) TAG=%" PRIu64 "\n", params->k, params->i, params->j, TAG(k ,i, j)); for(i = params->i * block_size; i < (params->i + 1) * block_size; i++) { for(j = params->j * block_size; j < (params->j + 1) * block_size; j++) { path_distance_matrix[i*size + j] = min(path_distance_matrix[i*size + k] + path_distance_matrix[k*size + j], path_distance_matrix[i*size + j]); } } } struct starpu_codelet shared_mem_cl = { .where = STARPU_CPU, .cpu_funcs = { shared_mem_task, NULL }, .nbuffers = 0, .model = &mult_perf_model, .name = "Floyd" }; static int create_shared_mem_tasks(int matrix_size, int block_size, struct params* params) { int i,j,k; int ret; int num_blocks = matrix_size/block_size; /* Create non-entry tasks */ for(k = 0; k < matrix_size; k++) { for(i = 0; i < num_blocks ; i++) { for(j = 0; j < num_blocks; j++) { /* Create a new task */ struct starpu_task *task = starpu_task_create(); task->callback_func = NULL; task->cl = &shared_mem_cl; task->cl_arg = ¶ms[k*num_blocks*num_blocks + i*num_blocks + j]; task->cl_arg_size = sizeof(struct params); task->use_tag = 1; task->tag_id = TAG(k, i, j); //printf("Task(%d %d %d) TAG=%" PRIu64 "\n", k, i, j, task->tag_id); express_deps(k, i, j, block_size); ret = starpu_task_submit(task); if (ret == -ENODEV) return 77; STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit"); } } } return 0; } int floyd_shared_mem_starpu(double* path_distance_matrix, int matrix_size, int block_size) { int num_blocks, k, i, j, ret; num_blocks = matrix_size/block_size; // Creating parameters for tasks struct params *params = (struct params*) malloc (sizeof(struct params) * matrix_size * (num_blocks) * (num_blocks)); for( k = 0; k < matrix_size; k++) { for( i = 0; i < num_blocks; i++) { for( j = 0; j < num_blocks; j++) { struct params* p = ¶ms[(k*num_blocks*num_blocks) + (i*num_blocks) + j]; p->k = k; p->i = i; p->j = j; p->matrix = path_distance_matrix; p->matrix_size = matrix_size; p->block_size = block_size; } } } /* initialize StarPU : passing a NULL argument means that we use * default configuration for the scheduling policies and the number of * processors/accelerators */ ret = starpu_init(NULL); if (ret == -ENODEV) return 77; STARPU_CHECK_RETURN_VALUE(ret, "starpu_init"); /* Create tasks*/ ret = create_shared_mem_tasks(matrix_size, block_size, params); if (ret == 0) starpu_task_wait_for_all(); tag_cleanup(matrix_size, block_size); /* terminate StarPU: statistics and other debug outputs are not * guaranteed to be generated unless this function is called. Once it * is called, it is not possible to submit tasks anymore, and the user * is responsible for making sure all tasks have already been executed: * calling starpu_shutdown() before the termination of all the tasks * results in an undefined behaviour */ starpu_shutdown(); free(params); return ret; } void dist_task(void *buffers[], void *cl_arg) { int k, i, j, block_size; starpu_codelet_unpack_args(cl_arg, &k, &i, &j); /* Get block size */ block_size = STARPU_VECTOR_GET_NX(buffers[0]); double *block_ij = (double *)STARPU_VECTOR_GET_PTR(buffers[0]); double *block_ik = (double *)STARPU_VECTOR_GET_PTR(buffers[1]); double *block_kj = (double *)STARPU_VECTOR_GET_PTR(buffers[2]); //printf("Task Scheduled(%d %d %d)\n", k, i, j); for (int p = 0; p < block_size; p++){ for (int q = 0; q < block_size; q++) { for (int r = 0; r < block_size; r++) { block_ij[q*block_size +r] = min(block_ik[q*block_size + p] + block_kj[p*block_size + r], block_ij[q*block_size +r]); } } } } struct starpu_codelet dist_cl = { .where = STARPU_CPU, .cpu_funcs = { dist_task, NULL }, .nbuffers = 3, .model = &mult_perf_model, .name = "Floyd Dist" }; static int create_dist_tasks(int matrix_size, int block_size, starpu_data_handle_t * data_handles) { int i,j,k; int ret; int num_blocks = matrix_size/block_size; /* Create non-entry tasks */ for(k = 0; k < matrix_size; k++) { for(i = 0; i < num_blocks ; i++) { for(j = 0; j < num_blocks; j++) { starpu_mpi_insert_task(MPI_COMM_WORLD, &dist_cl, STARPU_VALUE, &k, STARPU_VALUE, &i, STARPU_VALUE, &j, STARPU_RW, data_handles[i*num_blocks +j], STARPU_R, data_handles[i*num_blocks +(k/block_size)], STARPU_R, data_handles[(k/block_size)*num_blocks + j], 0); } } } return 0; } /* Returns the MPI node number where data is */ int data_distribution(int x, int y, int num_nodes) { /* Block distrib */ //return ((int)(x / sqrt(num_nodes) + (y / sqrt(num_nodes)) * sqrt(num_nodes))) % num_nodes; // /* Other examples useful for other kinds of computations */ // /* / distrib */ // return (x+y) % num_nodes; // /* Block cyclic distrib */ // unsigned side = sqrt(num_nodes); // return x % side + (y % side) * size; return 0; } int floyd_dist_starpu(double* path_distance_matrix, int matrix_size, int block_size) { int num_blocks, k, i, j, ret, my_rank, num_nodes; num_blocks = matrix_size/block_size; /* Initialize StarPU : passing a NULL argument means that we use * default configuration for the scheduling policies and the number of * processors/accelerators */ ret = starpu_init(NULL); if (ret == -ENODEV) return 77; STARPU_CHECK_RETURN_VALUE(ret, "starpu_init"); /* Initializaing StarPU MPI */ ret = starpu_mpi_initialize_extended(&my_rank, &num_nodes); if (ret == -ENODEV) return 77; STARPU_CHECK_RETURN_VALUE(ret, "starpu_init"); starpu_data_handle_t* data_handles = (starpu_data_handle_t*) malloc(sizeof(starpu_data_handle_t) * num_blocks * num_blocks); double** tiles = (double**) malloc(sizeof(double*) * num_blocks * num_blocks); for(i = 0; i < num_blocks; i++) { for(j = 0; j < num_blocks; j++) { /* Allocate memory for the tile*/ tiles[i*num_blocks + j] = (double*) malloc (sizeof(double) * block_size * block_size); double * tile = tiles[i*num_blocks + j]; for(int org_i = i*block_size,t_i = 0; t_i < block_size; org_i++,t_i++) { for(int org_j = j*block_size,t_j = 0; t_j < block_size; org_j++,t_j++) { tile[t_i*block_size + t_j] = path_distance_matrix[org_i*matrix_size + org_j]; } } } } for(i = 0; i < num_blocks; i++) { for (j = 0; j < num_blocks; j++) { int mpi_rank = data_distribution(i, j, num_nodes); if (mpi_rank == my_rank) { /* Owning data */ starpu_vector_data_register(&data_handles[i*num_blocks + j], 0, (uintptr_t)(tiles[i*num_blocks + j]), block_size *block_size, sizeof(double)); } else { /* This can probably be further optimized */ starpu_vector_data_register(&data_handles[i*num_blocks + j], -1, (uintptr_t)(tiles[i*num_blocks + j]), block_size *block_size, sizeof(double)); } /* Marking where to find what data */ starpu_data_set_rank(data_handles[i*num_blocks + j], mpi_rank); } } /* Create tasks*/ ret = create_dist_tasks(matrix_size, block_size, data_handles); if (ret == 0) starpu_task_wait_for_all(); for(i = 0; i < num_blocks; i++) { for (j = 0; j < num_blocks; j++) { starpu_data_unregister(data_handles[i*num_blocks + j]); } } /* Collecting all the data at rank 0*/ /* terminate StarPU: statistics and other debug outputs are not * guaranteed to be generated unless this function is called. Once it * is called, it is not possible to submit tasks anymore, and the user * is responsible for making sure all tasks have already been executed: * calling starpu_shutdown() before the termination of all the tasks * results in an undefined behaviour */ starpu_mpi_shutdown(); starpu_shutdown(); for (i = 0; i < num_blocks; i++) { for (j = 0; j < num_blocks; j++) { free(tiles[i*num_blocks + j]); } } free(tiles); free(data_handles); return ret; }
- [Starpu-devel] How to use starpu_mpi_task_insert, Ravi Teja Mullapudi, 30/03/2013
- Re: [Starpu-devel] How to use starpu_mpi_task_insert, Nathalie Furmento, 30/03/2013
- Re: [Starpu-devel] How to use starpu_mpi_task_insert, Ravi Teja Mullapudi, 30/03/2013
- Re: [Starpu-devel] How to use starpu_mpi_task_insert, Nathalie Furmento, 30/03/2013
Archives gérées par MHonArc 2.6.19+.