Objet : Developers list for StarPU
Archives de la liste
- From: Ravi Teja Mullapudi <raviteja.mullapudi@gmail.com>
- To: Nathalie Furmento <nathalie.furmento@labri.fr>
- Cc: starpu-devel@lists.gforge.inria.fr, Roshan D <roshan18@gmail.com>, Uday Reddy <udayreddy@gmail.com>
- Subject: Re: [Starpu-devel] How to use starpu_mpi_task_insert
- Date: Sat, 30 Mar 2013 21:13:39 +0530
- List-archive: <http://lists.gforge.inria.fr/pipermail/starpu-devel>
- List-id: "Developers list. For discussion of new features, code changes, etc." <starpu-devel.lists.gforge.inria.fr>
Thanks for the quick reply. I am using version 1.0.5, the latest release. I did try to set the mpi_tag but I was running into other problems maybe I did not do it right, I will try and let you know. Also the sample in documentation section where starpu_mpi_task_insert usage is shown does not clearly specify that the a tag needs to be set for the data handle. It will be nice if that can be documented.
On Sat, Mar 30, 2013 at 7:40 PM, Nathalie Furmento <nathalie.furmento@labri.fr> wrote:
Hi,
You are missing a call to starpu_data_get_tag to tell StarPU-MPI which tag should be used when sending or receiving a data. The tag must be unique. A possible solution would be to add the following line
starpu_data_set_tag(data_handles[i*num_blocks + j], i*num_blocks + j);
after the call to starpu_data_set_rank.
Could you please tell us which version of StarPU you are using? The error message you are getting needs to be fixed, as it is misleading as it is mentioning a test on the tag value and a call to to the set_rank function.
Thanks,
Nathalie
> #include <stdint.h>
On Mar 30, 19:19, Ravi Teja Mullapudi wrote:
> Hi,
>
> I am trying to use starpu_mpi_task_insert to avoid explicitly doing MPI
> communication and letting starpu handle it. I am trying to do a floyd
> warshall implantation. The following is the outline of the code.
>
> Task Creation
> create_dist_tasks(int matrix_size, int block_size, starpu_data_handle_t *
> data_handles)
> {
> int i,j,k;
> int ret;
> int num_blocks = matrix_size/block_size;
>
> /* Create non-entry tasks */
> for(k = 0; k < matrix_size; k++) {
> for(i = 0; i < num_blocks ; i++) {
> for(j = 0; j < num_blocks; j++) {
> starpu_mpi_insert_task(MPI_COMM_WORLD, &dist_cl,
> STARPU_VALUE, &k,
> STARPU_VALUE, &i,
> STARPU_VALUE, &j,
> STARPU_RW, data_handles[i*num_blocks
> +j],
> STARPU_R, data_handles[i*num_blocks
> +(k/block_size)],
> STARPU_R,
> data_handles[(k/block_size)*num_blocks + j],
> STARPU_EXECUTE_ON_DATA,
> data_handles[i*num_blocks +j],
> 0);
> }
> }
> }
> return 0;
> }
>
> Registering Data
> for(i = 0; i < num_blocks; i++) {
> for (j = 0; j < num_blocks; j++) {
> int mpi_rank = data_distribution(i, j, num_nodes);
> if (mpi_rank == my_rank)
> {
> /* Owning data */
> starpu_vector_data_register(&data_handles[i*num_blocks +
> j], 0,
> (uintptr_t)(tiles[i*num_blocks + j]), block_size
> *block_size, sizeof(double));
> }
> else
> {
> /* This can probably be further optimized */
> starpu_vector_data_register(&data_handles[i*num_blocks +
> j], -1,
> (uintptr_t)(tiles[i*num_blocks + j]), block_size
> *block_size, sizeof(double));
> }
>
> /* Marking where to find what data */
> starpu_data_set_rank(data_handles[i*num_blocks + j], mpi_rank);
> }
> }
>
> Codelet
>
> void dist_task(void *buffers[], void *cl_arg)
> {
> int k, i, j, block_size;
> starpu_codelet_unpack_args(cl_arg, &k, &i, &j);
>
> /* Get block size */
> block_size = STARPU_VECTOR_GET_NX(buffers[0]);
>
> double *block_ij = (double *)STARPU_VECTOR_GET_PTR(buffers[0]);
> double *block_ik = (double *)STARPU_VECTOR_GET_PTR(buffers[1]);
> double *block_kj = (double *)STARPU_VECTOR_GET_PTR(buffers[2]);
>
> for (int p = 0; p < block_size; p++){
> for (int q = 0; q < block_size; q++) {
> for (int r = 0; r < block_size; r++) {
> block_ij[q*block_size +r] = min(block_ik[q*block_size + p]
> + block_kj[p*block_size + r], block_ij[q*block_size +r]);
> }
> }
> }
> }
>
> The problem with the above approach is I hit the following assertion
> starpu_mpi_insert_task: Assertion `mpi_tag >= 0 && "StarPU needs to be told
> the MPI rank of this data, using starpu_data_set_rank"' failed.
>
> I do not clearly understand how the mpi_tag must be specified I was under
> the assumption that the the above is all that is required for starpu to
> handle the communication automatically. I understand that I can rewrite
> using starpu_mpi_send and starpu_mpi_recv but I do not explicitly want to
> specify the communication. I am basically evaluating StarPU vs Intel CnC in
> a distributed setting. Attaching the implementation for a complete context.
>
> Any help or inputs are appreciated.
>
> Thanks,
> Ravi Teja.
> #include <inttypes.h>
> #include <starpu.h>
> #include <starpu_mpi.h>
> #include <unistd.h>
> #include "../utils/utils.h"
> #include "floyd_starpu.h"
>
> #define FPRINTF(ofile, fmt, args ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ##args); }} while(0)
> #define TAG(iter, i, j) ((starpu_tag_t) ( ((uint64_t)(iter)<<47) | ((uint64_t)(j)<<11) | (i)) )
>
> struct params
> {
> int k;
> int i;
> int j;
> // This is redundant for every task, but I do not like globals and secondly this
> // is the way to for a distributed memory setting. CnC has context where the
> // parameters for the task graph can be stored. It can manage this redundancy much
> // better.
> double *matrix;
> int matrix_size;
> int block_size;
> };
>
> static struct starpu_perfmodel mult_perf_model = {
> .type = STARPU_HISTORY_BASED,
> .symbol = "mult_perf_model"
> };
>
> static void express_deps(int k, int i, int j, int block_size)
> {
> if (k > 0)
> starpu_tag_declare_deps(TAG(k, i, j), 3, TAG(k-1, i, k/block_size), TAG(k-1, k/block_size, j), TAG(k-1, i, j));
> }
>
> static void tag_cleanup(int matrix_size, int block_size)
> {
> int i,j,k;
> int num_blocks = matrix_size/block_size;
> for (k = 0; k < matrix_size; k++)
> {
> for (i = 0; i < num_blocks ; i++)
> {> starpu_tag_remove(TAG(k, i, j));
> for(j = 0; j < num_blocks; j++)
> {
> }
> }
> }
> }
>
> /* Every implementation of a codelet must have this prototype, the first
> * argument (buffers) describes the buffers/streams that are managed by the
> * DSM; the second arguments references read-only data that is passed as an
> * argument of the codelet (task->cl_arg). Here, "buffers" is unused as there
> * are no data input/output managed by the DSM (cl.nbuffers = 0) */
>
> void shared_mem_task(void *buffers[], void *cl_arg)
> {
> struct params *params = (struct params *) cl_arg;
> int i, j, k, size, block_size;
> k = params->k;
> double* path_distance_matrix = params->matrix;
> size = params->matrix_size;
> block_size = params->block_size;
> //printf("Task Scheduled(%d %d %d) TAG=%" PRIu64 "\n", params->k, params->i, params->j, TAG(k ,i, j));
> for(i = params->i * block_size; i < (params->i + 1) * block_size; i++)
> {
> for(j = params->j * block_size; j < (params->j + 1) * block_size; j++)
> {
> path_distance_matrix[i*size + j] = min(path_distance_matrix[i*size + k] + path_distance_matrix[k*size + j], path_distance_matrix[i*size + j]);
> }
> }
> }
>
> struct starpu_codelet shared_mem_cl =
> {
> .where = STARPU_CPU,
> .cpu_funcs = { shared_mem_task, NULL },
> .nbuffers = 0,
> .model = &mult_perf_model,
> .name = "Floyd"
> };
>
> static int create_shared_mem_tasks(int matrix_size, int block_size, struct params* params)
> {> /* Create a new task */
> int i,j,k;
> int ret;
> int num_blocks = matrix_size/block_size;
> /* Create non-entry tasks */
> for(k = 0; k < matrix_size; k++)
> {
> for(i = 0; i < num_blocks ; i++)
> {
> for(j = 0; j < num_blocks; j++)
> {
> struct starpu_task *task = starpu_task_create();
> task->callback_func = NULL;
> task->cl = &shared_mem_cl;
> task->cl_arg = ¶ms[k*num_blocks*num_blocks + i*num_blocks + j];
> task->cl_arg_size = sizeof(struct params);
>
> task->use_tag = 1;
> task->tag_id = TAG(k, i, j);
> //printf("Task(%d %d %d) TAG=%" PRIu64 "\n", k, i, j, task->tag_id);
>
> express_deps(k, i, j, block_size);
>
> ret = starpu_task_submit(task);
> if (ret == -ENODEV) return 77;
> STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
> }
> }
> }
> return 0;
> }
>
> int floyd_shared_mem_starpu(double* path_distance_matrix, int matrix_size, int block_size)
> {
> int num_blocks, k, i, j, ret;
> num_blocks = matrix_size/block_size;
> // Creating parameters for tasks
> struct params *params = (struct params*) malloc (sizeof(struct params) * matrix_size * (num_blocks) * (num_blocks));
> for( k = 0; k < matrix_size; k++)
> {
> for( i = 0; i < num_blocks; i++)
> {> struct params* p = ¶ms[(k*num_blocks*num_blocks) + (i*num_blocks) + j];
> for( j = 0; j < num_blocks; j++)
> {
> p->k = k; p->i = i; p->j = j;
> p->matrix = path_distance_matrix;
> p->matrix_size = matrix_size;
> p->block_size = block_size;
> }
> }
> }
>
> /* initialize StarPU : passing a NULL argument means that we use
> * default configuration for the scheduling policies and the number of
> * processors/accelerators */
> ret = starpu_init(NULL);
> if (ret == -ENODEV)
> return 77;
> STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
>
> /* Create tasks*/
> ret = create_shared_mem_tasks(matrix_size, block_size, params);
> if (ret == 0)
> starpu_task_wait_for_all();
>
> tag_cleanup(matrix_size, block_size);
>
> /* terminate StarPU: statistics and other debug outputs are not
> * guaranteed to be generated unless this function is called. Once it
> * is called, it is not possible to submit tasks anymore, and the user
> * is responsible for making sure all tasks have already been executed:
> * calling starpu_shutdown() before the termination of all the tasks
> * results in an undefined behaviour */
> starpu_shutdown();
> free(params);
> return ret;
> }> //printf("Task Scheduled(%d %d %d)\n", k, i, j);
>
> void dist_task(void *buffers[], void *cl_arg)
> {
> int k, i, j, block_size;
> starpu_codelet_unpack_args(cl_arg, &k, &i, &j);
>
> /* Get block size */
> block_size = STARPU_VECTOR_GET_NX(buffers[0]);
>
> double *block_ij = (double *)STARPU_VECTOR_GET_PTR(buffers[0]);
> double *block_ik = (double *)STARPU_VECTOR_GET_PTR(buffers[1]);
> double *block_kj = (double *)STARPU_VECTOR_GET_PTR(buffers[2]);
>
> for (int p = 0; p < block_size; p++){> struct starpu_codelet dist_cl =
> for (int q = 0; q < block_size; q++) {
> for (int r = 0; r < block_size; r++) {
> block_ij[q*block_size +r] = min(block_ik[q*block_size + p] + block_kj[p*block_size + r], block_ij[q*block_size +r]);
> }
> }
> }
> }
>
> {
> .where = STARPU_CPU,
> .cpu_funcs = { dist_task, NULL },
> .nbuffers = 3,
> .model = &mult_perf_model,
> .name = "Floyd Dist"
> };
>
> static int create_dist_tasks(int matrix_size, int block_size, starpu_data_handle_t * data_handles)
> {> 0);
> int i,j,k;
> int ret;
> int num_blocks = matrix_size/block_size;
>
> /* Create non-entry tasks */
> for(k = 0; k < matrix_size; k++) {
> for(i = 0; i < num_blocks ; i++) {
> for(j = 0; j < num_blocks; j++) {
> starpu_mpi_insert_task(MPI_COMM_WORLD, &dist_cl,
> STARPU_VALUE, &k,
> STARPU_VALUE, &i,
> STARPU_VALUE, &j,
> STARPU_RW, data_handles[i*num_blocks +j],
> STARPU_R, data_handles[i*num_blocks +(k/block_size)],
> STARPU_R, data_handles[(k/block_size)*num_blocks + j],
> }
> }
> }
> return 0;
> }
>
> /* Returns the MPI node number where data is */
> int data_distribution(int x, int y, int num_nodes) {
>
> /* Block distrib */
> //return ((int)(x / sqrt(num_nodes) + (y / sqrt(num_nodes)) * sqrt(num_nodes))) % num_nodes;
>
> // /* Other examples useful for other kinds of computations */
> // /* / distrib */
> // return (x+y) % num_nodes;
>
> // /* Block cyclic distrib */
> // unsigned side = sqrt(num_nodes);
> // return x % side + (y % side) * size;
> return 0;
> }
>
> int floyd_dist_starpu(double* path_distance_matrix, int matrix_size, int block_size)
> {
> int num_blocks, k, i, j, ret, my_rank, num_nodes;
> num_blocks = matrix_size/block_size;
>
> /* Initialize StarPU : passing a NULL argument means that we use
> * default configuration for the scheduling policies and the number of
> * processors/accelerators */
> ret = starpu_init(NULL);
> if (ret == -ENODEV)
> return 77;
> STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
>
> /* Initializaing StarPU MPI */
> ret = starpu_mpi_initialize_extended(&my_rank, &num_nodes);
> if (ret == -ENODEV)
> return 77;
> STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
>
>
> starpu_data_handle_t* data_handles = (starpu_data_handle_t*) malloc(sizeof(starpu_data_handle_t) * num_blocks * num_blocks);
> double** tiles = (double**) malloc(sizeof(double*) * num_blocks * num_blocks);
>> /* Allocate memory for the tile*/
> for(i = 0; i < num_blocks; i++) {
> for(j = 0; j < num_blocks; j++) {
> tiles[i*num_blocks + j] = (double*) malloc (sizeof(double) * block_size * block_size);
> double * tile = tiles[i*num_blocks + j];
> for(int org_i = i*block_size,t_i = 0; t_i < block_size; org_i++,t_i++) {
> for(int org_j = j*block_size,t_j = 0; t_j < block_size; org_j++,t_j++) {
> tile[t_i*block_size + t_j] = path_distance_matrix[org_i*matrix_size + org_j];
> }> /* Create tasks*/
> }
> }
> }
>
> for(i = 0; i < num_blocks; i++) {
> for (j = 0; j < num_blocks; j++) {
> int mpi_rank = data_distribution(i, j, num_nodes);
> if (mpi_rank == my_rank)
> {
> /* Owning data */
> starpu_vector_data_register(&data_handles[i*num_blocks + j], 0,
> (uintptr_t)(tiles[i*num_blocks + j]), block_size *block_size, sizeof(double));
> }
> else
> {
> /* This can probably be further optimized */
> starpu_vector_data_register(&data_handles[i*num_blocks + j], -1,
> (uintptr_t)(tiles[i*num_blocks + j]), block_size *block_size, sizeof(double));
> }
>
> /* Marking where to find what data */
> starpu_data_set_rank(data_handles[i*num_blocks + j], mpi_rank);
> }
> }
>
> ret = create_dist_tasks(matrix_size, block_size, data_handles);
> if (ret == 0)
> starpu_task_wait_for_all();
>> starpu_data_unregister(data_handles[i*num_blocks + j]);
> for(i = 0; i < num_blocks; i++) {
> for (j = 0; j < num_blocks; j++) {
> }
> }
>
> /* Collecting all the data at rank 0*/
>
> /* terminate StarPU: statistics and other debug outputs are not
> * guaranteed to be generated unless this function is called. Once it
> * is called, it is not possible to submit tasks anymore, and the user
> * is responsible for making sure all tasks have already been executed:
> * calling starpu_shutdown() before the termination of all the tasks
> * results in an undefined behaviour */
> starpu_mpi_shutdown();
> starpu_shutdown();
> for (i = 0; i < num_blocks; i++) {
> for (j = 0; j < num_blocks; j++) {> free(tiles[i*num_blocks + j]);
> }
> }
> free(tiles);
> free(data_handles);
> return ret;
> }
> _______________________________________________
> Starpu-devel mailing list
> Starpu-devel@lists.gforge.inria.fr
> http://lists.gforge.inria.fr/cgi-bin/mailman/listinfo/starpu-devel
--
----------------------------------------------------------------------------
Dr Nathalie Furmento CNRS R&D Engineer
Centre de Recherche INRIA Bordeaux Sud-Ouest http://www.labri.fr/~furmento/
200 avenue de la vieille Tour Tel: +33 (0)5 24 57 41 20
33405 Talence cedex, FRANCE Fax: +33 (0)5 24 57 40 41
----------------------------------------------------------------------------
- [Starpu-devel] How to use starpu_mpi_task_insert, Ravi Teja Mullapudi, 30/03/2013
- Re: [Starpu-devel] How to use starpu_mpi_task_insert, Nathalie Furmento, 30/03/2013
- Re: [Starpu-devel] How to use starpu_mpi_task_insert, Ravi Teja Mullapudi, 30/03/2013
- Re: [Starpu-devel] How to use starpu_mpi_task_insert, Nathalie Furmento, 30/03/2013
Archives gérées par MHonArc 2.6.19+.