MIDAPACK - MIcrowave Data Analysis PACKage  1.1b
Parallel software tools for high performance CMB DA analysis
 All Data Structures Files Functions Variables Typedefs Groups Pages
toeplitz.c
Go to the documentation of this file.
1 
59 #include "toeplitz.h"
60 
61 //r1.2 - Frederic Dauvergne (APC)
62 //This file contains the main part of the Toeplitz algebra module. This include
63 //the elementary product routines (using FFT) and initialization routines.
64 //This also contains the mpi version of the Toeplitz matrix product with global
65 //row-wise order distribution of the data.
66 //
67 //todo:
68 //- add in stmm non blocking communication as it is done for the stbmm routine
69 //- scmm_direct dont need nfft parameter
70 
71 
72 //=========================================================================
73 //Global parameters
74 
76 
78 int VERBOSE;
80 
81 //Parameter just to know the rank for printing when VERBOSE mode is on
82 int PRINT_RANK = -1;
83 
84 
85 //=========================================================================
86 
88 
93 int print_error_message(int error_number, char const *file, int line)
94 {
95  char *str_mess;
96  str_mess = (char *) malloc(100 * sizeof(char));
97  if(error_number == 1)
98  sprintf (str_mess, "Error on line %d of %s. Toeplitz band width > vector size\n", line, file);
99  if(error_number == 2)
100  sprintf (str_mess, "Error on line %d of %s. Bad allocation.\n", line, file);
101  if(error_number == 3)
102  sprintf (str_mess, "Error on line %d of %s. Error at fftw multithread initialization.\n", line, file);
103  if(error_number == 7)
104  sprintf (str_mess, "Error on line %d of %s.\n", line, file);
105  fprintf(stderr, "%s", str_mess);
106  printf("%s", str_mess);
107  return error_number;
108 
109 }
110 
111 
112 //=========================================================================
113 
115 
130 int define_blocksize(int n, int lambda, int bs_flag, int fixed_bs)
131 {
132  int bs; //computed optimal block size
133  int min_bs; //minimum block size used for the computation
134  int min_pow2; //minimum power of two index used for the block size computation
135 
136  //cheating
137 // bs_flag = 5;//1;//5;
138 // fixed_bs = pow(2,15); //2^14 winner because smaller block than 2^15 (as same speed)
139 
140  if (bs_flag==1) {
141  bs = fixed_bs;
142 
143  }
144  else if (bs_flag==2) { //this formula need to be check - seems there is a pb
145  min_bs = 2*lambda; //when bs = 2 lambda. Not enough data left in the middle
146  min_pow2 = (int) ceil( log(min_bs)/log(2) );
147  bs = pow(2, min_pow2);
148  if (bs > n) //This is to avoid block size much bigger than the matrix. Append mostly
149  bs = min_bs; //when the matrix is small compared to his bandwith
150 
151  }
152  else if (bs_flag==3) {
153  min_bs = 3*lambda;
154  min_pow2 = (int) ceil( log(min_bs)/log(2) );
155  bs = pow(2, min_pow2);
156  if (bs > n) //This is to avoid block size much bigger than the matrix. Append mostly
157  bs = min_bs; //when the matrix is small compared to his bandwith
158 
159  }
160  else if (bs_flag==4 || bs_flag==0) {
161  min_bs = 4*lambda;
162  min_pow2 = (int) ceil( log(min_bs)/log(2) );
163  bs = pow(2, min_pow2);
164  if (bs > n) //This is to avoid block size much bigger than the matrix. Append mostly
165  bs = min_bs; //when the matrix is small compared to his bandwith
166 
167  }
168  else if (bs_flag==5) {
169  //Different formula to compute the optimal block size
170  bs=1;
171  while(bs < 2*(lambda-1)*log(bs+1) && bs<n) {
172  bs = bs*2; }
173 
174  }
175  else if (bs_flag==6) { //the same as bs_flag==5 but with constrain on the minimal size
176  // and the number of subblocks.
177 
178  min_bs = 4*lambda;
179  min_pow2 = (int) ceil( log(min_bs)/log(2) );
180 
181  min_pow2 = max(min_pow2, pow(2,14)); //add condition to have a minimum size 2^14 for bs
182  //This is based on empirical estimation and can be justified
183  //by the speed benchmark of FFTW3 (see the FFTW official website).
184  bs = pow(2, min_pow2);
185 
186  if (bs > n) //This is to avoid block size much bigger than the matrix. Append mostly
187  bs = min_bs; //when the matrix is small compared to his bandwith
188 
189 //test if enough subblock for sliding windows algorithm:
190 // int nbloc_bs = ceil( (1.0*n)/(bs-2*distcorrmin));
191 // if (nbloc_bs<8) //Empirical condition to avoid small number of subblocks
192 // bs = 0; //Switch to no sliding windows algorithm
193 
194  }
195  else {
196  printf("Error. Wrong value for bs_flag. Set to auto mode.\n");
197  min_bs = 4*lambda;
198  min_pow2 = (int) ceil( log(min_bs)/log(2) );
199  bs = pow(2, min_pow2);
200  if (bs > n) //This is to avoid block size much bigger than the matrix. Append mostly
201  bs = min_bs; //when the matrix is small compared to his bandwith
202  }
203 
204 
205  if(PRINT_RANK==0 && VERBOSE>1)
206  printf("Computed optimal blocksize is %d (with lambda = %d)\n", bs, lambda);
207 
208  return bs;
209 }
210 
211 
212 //=========================================================================
213 
215 
220 int define_nfft(int n_thread, int flag_nfft, int fixed_nfft)
221 {
222  int nfft;
223 
224  if (flag_nfft==0)
225  nfft = NFFT_DEFAULT;
226  else if (flag_nfft==1)
227  nfft = fixed_nfft;
228  else if (flag_nfft==2)
229  nfft = n_thread;
230  else {
231  printf("Error. Wrong value for flag_nfft. Set to auto mode.\n");
232  nfft = NFFT_DEFAULT;
233  }
234 
235  return nfft;
236 }
237 
238 
239 //=========================================================================
240 
242 
257 int tpltz_init(int n, int lambda, int *nfft, int *blocksize, fftw_complex **T_fft, double *T, fftw_complex **V_fft, double **V_rfft, fftw_plan *plan_f, fftw_plan *plan_b, Flag flag_stgy)
258 {
259  int n_thread;
260  double t1, t2;
261 
262  //Set the VERBOSE global variable
263  VERBOSE = flag_stgy.flag_verbose;
264 
265 
266  //initialize block size
267  *blocksize = define_blocksize(n, lambda, flag_stgy.flag_bs, flag_stgy.fixed_bs);
268 
269 
270  //if (bs==0)
271 // flag_stgy.flag_bs = 9999 //swich to noslidingwindowsalgo
272 
273 
274 //#pragma omp parallel
275 //{ n_thread = omp_get_num_threads(); }
276 
277 // if ((NB_OMPTHREADS <= n_thread) && (NB_OMPTHREADS != 0))
278 // omp_set_num_threads(NB_OMPTHREADS);
279 
280  n_thread = omp_get_max_threads();
281 
282 
283  //initialize nfft
284  *nfft = define_nfft(n_thread, flag_stgy.flag_nfft, flag_stgy.fixed_nfft); //*nfft=n_thread;
285 
286 
287  if(PRINT_RANK==0 && VERBOSE>0 && VERBOSE_FIRSTINIT==1) {
288  printf("Using %d threads\n", n_thread);
289  printf("nfft = %d\n", *nfft);
290  }
291 
292  //initialize fftw plan allocation flag
293  int fftw_flag = flag_stgy.flag_fftw; //FFTW_FLAG;
294 
295  //initialize fftw for omp threads
296 #ifdef fftw_MULTITHREADING
297  fftw_init_omp_threads(n_thread);
298 #endif
299 
300  //initialize fftw array and plan for T (and make it circulant first)
301 // t1=MPI_Wtime();
302  circ_init_fftw(T, (*blocksize), lambda, T_fft);
303 // t2= MPI_Wtime();
304 
305 // if (PRINT_RANK==0 && VERBOSE>0)
306 // printf("time circ_init_fftw=%f\n", t2-t1);
307 
308  //initialize fftw array and plan for V
309 // t1=MPI_Wtime();
310  rhs_init_fftw(nfft, (*blocksize), V_fft, V_rfft, plan_f, plan_b, fftw_flag);
311 // t2= MPI_Wtime();
312 
313 // if (PRINT_RANK==0 && VERBOSE>0)
314 // printf("time rhs_init_fftw=%f\n", t2-t1);
315 
316  if(PRINT_RANK==0 && VERBOSE>1)
317  printf("Initialization finished successfully\n");
318 
320 
321  return 0;
322 }
323 
324 
325 //=========================================================================
326 
328 
333 int fftw_init_omp_threads(int fftw_n_thread)
334 {
335  int status;
336 
337  //initialize fftw omp threads
338  status = fftw_init_threads();
339  if (status==0)
340  return print_error_message (3, __FILE__, __LINE__);
341 
342  //set the number of FFTW threads
343  fftw_plan_with_nthreads(fftw_n_thread);
344 
345  if(PRINT_RANK==0 && VERBOSE>1 && VERBOSE_FIRSTINIT==1)
346  printf("Using multithreaded FFTW with %d threads\n", fftw_n_thread);
347 
348  return 0;
349 }
350 
351 
352 //=========================================================================
353 
355 
365 int rhs_init_fftw(int *nfft, int fft_size, fftw_complex **V_fft, double **V_rfft, fftw_plan *plan_f, fftw_plan *plan_b, int fftw_flag)
366 {
367  //allocate fftw arrays and plans for V
368  *V_fft = (fftw_complex*) fftw_malloc((*nfft)*(fft_size/2+1) * sizeof(fftw_complex) );
369  *V_rfft = (double*) fftw_malloc((*nfft)*fft_size * sizeof(double) );
370  if (*V_fft==0 || *V_rfft==0)
371  return print_error_message (2, __FILE__, __LINE__);
372 
373  *plan_f = fftw_plan_many_dft_r2c(1, &fft_size, (*nfft), *V_rfft, &fft_size, 1, fft_size, *V_fft, NULL, 1, fft_size/2+1, fftw_flag );
374  *plan_b = fftw_plan_many_dft_c2r(1, &fft_size, (*nfft), *V_fft, NULL, 1, fft_size/2+1, *V_rfft, &fft_size, 1, fft_size, fftw_flag );
375 
376 
377  return 0;
378 }
379 
380 
381 //=========================================================================
382 
384 
392 int circ_init_fftw(double *T, int fft_size, int lambda, fftw_complex **T_fft)
393 {
394  //routine variable
395  int i;
396  int circ_fftw_flag = FFTW_ESTIMATE;
397  //allocation for T_fft
398  *T_fft = (fftw_complex*) fftw_malloc( (fft_size/2+1) * sizeof(fftw_complex) );
399  if (*T_fft==0)
400  return print_error_message (2, __FILE__, __LINE__);
401  double *T_circ = (double*) (*T_fft);
402 
403  //inplace fft
404  fftw_plan plan_f_T;
405  plan_f_T = fftw_plan_dft_r2c_1d( fft_size, T_circ, *T_fft, circ_fftw_flag );
406 
407  //make T circulant
408 #pragma omp parallel for
409  for(i=0; i<fft_size+2;i++)
410  T_circ[i] = 0.0;
411 
412  T_circ[0] = T[0];
413  for(i=1;i<lambda;i++) {
414  T_circ[i] = T[i];
415  T_circ[fft_size-i] = T[i]; }
416 
417  fftw_execute(plan_f_T);
418  fftw_destroy_plan(plan_f_T);
419 
420  return 0;
421 }
422 
423 
424 //=========================================================================
425 
427 
435 int tpltz_cleanup(fftw_complex **T_fft, fftw_complex **V_fft, double **V_rfft,fftw_plan *plan_f, fftw_plan *plan_b){
436  fftw_destroy_plan(*plan_f);
437  fftw_destroy_plan(*plan_b);
438  fftw_free(*T_fft);
439  fftw_free(*V_fft);
440  fftw_free(*V_rfft);
441 #ifdef fftw_MULTITHREADING
442  fftw_cleanup_threads();
443 #endif
444  fftw_cleanup();
445 }
446 
447 
448 //=========================================================================
449 
451 
459 int copy_block(int ninrow, int nincol, double *Vin, int noutrow, int noutcol, double *Vout, int inrow, int incol, int nblockrow, int nblockcol, int outrow, int outcol, double norm, int set_zero_flag)
460 {
461  int i, j, p, offsetIn, offsetOut;
462 
463  //do some size checks first
464  if( (nblockcol > nincol) || (nblockrow > ninrow) || (nblockcol > noutcol) || (nblockrow > noutrow)) {
465  printf("Error in routine copy_block. Bad size setup.\n");
466  return print_error_message(7, __FILE__, __LINE__);
467  }
468 
469  if(set_zero_flag) {
470 #pragma omp parallel for //private(i) num_threads(NB_OMPTHREADS_CPBLOCK)
471  for(i=0;i<noutrow*noutcol;i++) //could use maybe memset but how about threading
472  Vout[i] = 0.0;
473  }
474 
475  offsetIn = ninrow*incol+inrow;
476  offsetOut = noutrow*outcol+outrow;
477 
478 //#pragma omp parallel for private(i,j,p) num_threads(NB_OMPTHREADS_CPBLOCK)
479  for(i=0;i<nblockcol*nblockrow;i++) { //copy the block
480  j = i/nblockrow;
481  p = i%nblockrow;
482  Vout[offsetOut+j*noutrow+p] = Vin[offsetIn+j*ninrow+p]*norm;
483  }
484 
485  return 0;
486 }
487 
488 
489 //=========================================================================
490 
492 
509 int scmm_direct(int fft_size, int nfft, fftw_complex *C_fft, int ncol, double *V_rfft, double **CV, fftw_complex *V_fft, fftw_plan plan_f_V, fftw_plan plan_b_CV)
510 {
511  //routine variables
512  int sizeT = fft_size/2+1;
513  int i, idx;
514 
515  //perform forward FFT
516  fftw_execute(plan_f_V); //input in V_rfft; output in V_fft
517 
518 // printf("ncol=%d, fft_size=%d, sizeT=%d\n", ncol, fft_size, sizeT);
519 
520 //double t1, t2;
521 // t1=MPI_Wtime();
522 
523 #pragma omp parallel for private(idx) //num_threads(nfft)
524  for(i=0;i<ncol*sizeT;i++) {
525  idx = i%sizeT;
526  V_fft[i][0] = C_fft[idx][0]*V_fft[i][0]-C_fft[idx][1]*V_fft[i][1];
527  V_fft[i][1] = C_fft[idx][0]*V_fft[i][1]+C_fft[idx][1]*V_fft[i][0]; }
528 
529 // t2= MPI_Wtime();
530 // printf("Computation time : %lf s.\n", t2-t1);
531 
532 
533 // This is wrong :
534 /*
535 int icol;
536 double t1, t2;
537  t1=MPI_Wtime();
538 #pragma omp parallel for private(i, idx)
539  for(icol=0;icol<ncol;icol++) {
540  for(idx=0;idx<sizeT;idx++) {
541  i=icol*idx;
542  V_fft[i][0] = C_fft[idx][0]*V_fft[i][0]-C_fft[idx][1]*V_fft[i][1];
543  V_fft[i][1] = C_fft[idx][0]*V_fft[i][1]+C_fft[idx][1]*V_fft[i][0];
544  }}
545  t2= MPI_Wtime();
546 */
547 // printf("Computation time : %lf s.\n", t2-t1);
548 
549 
550 
551  //perform backward FFts
552  fftw_execute(plan_b_CV); //input in V_fft; output in V_rfft
553 
554  return 0;
555 }
556 
557 
558 //=========================================================================
559 
561 
587 int scmm_basic(double **V, int blocksize, int m, fftw_complex *C_fft, double **CV, fftw_complex *V_fft, double *V_rfft, int nfft, fftw_plan plan_f_V, fftw_plan plan_b_CV)
588 {
589  //routine variables
590  int i,k; //loop index
591  int nloop = (int) ceil((1.0*m)/nfft); //number of subblocks
592 
593  // Loop over set of columns
594  int ncol = min(nfft, m); //a number of columns to be copied from the data to working matrix
595  //equal the number of simultaneous FFTs
596 
597 
598 #pragma omp parallel for //num_threads(NB_OMPTHREADS_BASIC)//schedule(dynamic,1)
599  for( i=0;i<blocksize*ncol;i++)
600  V_rfft[i] = 0.0; //could use maybe memset but how about threading
601 
602 
603 //bug fixed conflit between num_threads and nfft
604 //#pragma omp parallel for schedule(dynamic,1) num_threads(8) //num_threads(nfft)
605  for(k=0;k<nloop;k++) { //this is the main loop over the set of columns
606  if (k==nloop-1) //last loop ncol may be smaller than nfft
607  ncol = m-(nloop-1)*nfft;
608 
609  //init fftw matrices.
610  //extracts a block of ncol full-length columns from the data matrix and embeds in a bigger
611  //matrix padding each column with lambda zeros. Note that all columns will be zero padded
612  //thanks to the "memset" call above
613 
614  copy_block(blocksize, m, (*V), blocksize, ncol, V_rfft, 0, k*nfft, blocksize, ncol, 0, 0, 1.0, 0);
615  //note: all nfft vectors are transformed below ALWAYS in a single go (if ncol < nfft) the extra
616  //useless work is done.
617 
618  scmm_direct(blocksize, nfft, C_fft, ncol, V_rfft, CV, V_fft, plan_f_V, plan_b_CV);
619  //note: the parameter CV is not really used
620 
621  //extract the relevant part from the result
622  copy_block(blocksize, ncol, V_rfft, blocksize, m, (*CV), 0, 0, blocksize, ncol, 0, k*nfft, 1.0/((double) blocksize), 0);
623 
624  } //end of loop over the column-sets
625 
626 
627  return 0;
628 }
629 
630 
631 //=========================================================================
632 
634 
659 int stmm_core(double **V, int n, int m, double *T, fftw_complex *T_fft, int blocksize, int lambda, fftw_complex *V_fft, double *V_rfft, int nfft, fftw_plan plan_f, fftw_plan plan_b, int flag_offset, int flag_nofft)
660 {
661 
662  double t1,t2;
663 
664  t1= MPI_Wtime();
665 
666  //cheating:
667 // flag_offset = 1;
668 
669  //routine variable
670  int status;
671  int i,j,k,p; //loop index
672  int currentsize;
673  int distcorrmin= lambda-1;
674 
675  int blocksize_eff = blocksize-2*distcorrmin; //just a good part after removing the overlaps
676  int nbloc; //number of subblock of slide/overlap algorithm
677 
678  if (flag_offset==1)
679  nbloc = ceil((1.0*(n-2*distcorrmin))/blocksize_eff);
680  else
681  nbloc = ceil( (1.0*n)/blocksize_eff); //we need n because of reshaping
682 
683  if(PRINT_RANK==0 && VERBOSE>0)
684  printf("nbloc=%d, n=%d, m=%d, blocksize=%d, blocksize_eff=%d\n", nbloc, n, m, blocksize, blocksize_eff);
685 
686  double *V_bloc, *TV_bloc;
687  V_bloc = (double *) calloc(blocksize*m, sizeof(double));
688  TV_bloc = (double *) calloc(blocksize*m, sizeof(double));
689  if((V_bloc==0)||(TV_bloc==0))
690  return print_error_message(2, __FILE__, __LINE__);
691 
692  int offset=0;
693  if (flag_offset==1)
694  offset=distcorrmin;
695 
696  int iV = 0; //"-distcorrmin+offset"; //first index in V
697  int iTV = offset; //first index in TV
698 
699  //"k=0";
700  //first subblock separately as it requires some padding. prepare the block of the data vector
701  //with the overlaps on both sides
702  currentsize = min( blocksize-distcorrmin+offset, n-iV);
703  //note: if flag_offset=0, pad first distcorrmin elements with zeros (for the first subblock only)
704  // and if flag_offset=1 there is no padding with zeros.
705  copy_block( n, m, *V, blocksize, m, V_bloc, 0, 0, currentsize, m, distcorrmin-offset, 0, 1.0, 0);
706 
707  //do block computation
708  if (flag_nofft==1)
709  status = stmm_simple_basic(&V_bloc, blocksize, m, T, lambda, &TV_bloc);
710  else
711  status = scmm_basic(&V_bloc, blocksize, m, T_fft, &TV_bloc, V_fft, V_rfft, nfft, plan_f, plan_b);
712 
713  if (status!=0) {
714  printf("Error in stmm_core.");
715  return print_error_message(7, __FILE__, __LINE__); }
716 
717 
718  //now copy first the new chunk of the data matrix **before** overwriting the input due to overlaps !
719  iV = blocksize_eff-distcorrmin+offset;
720 
721  if(nbloc > 1) {
722  currentsize = min( blocksize, n-iV); //not to overshoot
723 
724  int flag_reset = (currentsize!=blocksize); //with flag_reset=1, always "memset" the block.
725  copy_block( n, m, *V, blocksize, m, V_bloc, iV, 0, currentsize, m, 0, 0, 1.0, flag_reset);
726  }
727 
728  //and now store the ouput back in V
729  currentsize = min( blocksize_eff, n-iTV); // to trim the extra rows
730  copy_block( blocksize, m, TV_bloc, n, m, *V, distcorrmin, 0, currentsize, m, iTV, 0, 1.0, 0);
731 
732 
733  iTV += blocksize_eff;
734  //now continue with all the other subblocks
735  for(k=1;k<nbloc;k++) {
736 
737  //do bloc computation
738  if (flag_nofft==1)
739  status = stmm_simple_basic(&V_bloc, blocksize, m, T, lambda, &TV_bloc);
740  else
741  status = scmm_basic(&V_bloc, blocksize, m, T_fft, &TV_bloc, V_fft, V_rfft, nfft, plan_f, plan_b);
742 
743  if (status!=0) break;
744 
745 
746  iV += blocksize_eff;
747  //copy first the next subblock to process
748  if(k != nbloc-1) {
749  currentsize = min(blocksize, n-iV); //not to overshoot
750 
751  int flag_resetk = (currentsize!=blocksize); //with flag_reset=1, always "memset" the block.
752  copy_block( n, m, *V, blocksize, m, V_bloc, iV, 0, currentsize, m, 0, 0, 1.0, flag_resetk);
753  }
754 
755  //and then store the output in V
756  currentsize = min( blocksize_eff, n-iTV); //not to overshoot
757  copy_block( blocksize, m, TV_bloc, n, m, *V, distcorrmin, 0, currentsize, m, iTV, 0, 1.0, 0);
758  iTV += blocksize_eff;
759 
760  }//end bloc computation
761 
762 
763  free(V_bloc);
764  free(TV_bloc);
765 
766 
767  t2= MPI_Wtime();
768 
769  if (PRINT_RANK==0 && VERBOSE>0)
770  printf("time stmm_core=%f\n", t2-t1);
771 
772  return status;
773 }
774 
775 
776 //=========================================================================
777 
779 
803 int stmm_main(double **V, int n, int m, int id0, int l, double *T, fftw_complex *T_fft, int lambda, fftw_complex *V_fft, double *V_rfft, fftw_plan plan_f, fftw_plan plan_b, int blocksize, int nfft, Flag flag_stgy)
804 {
805 
806  //routine variable
807  int i,j,k,p; //loop index
808  int distcorrmin= lambda-1;
809  int flag_prod_strategy_nofft=0; //0: ffts 1: no ffts
810  int flag_shortcut_m_eff_eq_1=1;//1;//1;
811  int flag_shortcut_nbcol_eq_1=1;//1;//1;
812  int flag_nfullcol_in_middle=0;//0; //in the case where m=1 can be good to direct stmm_core too
813  int flag_optim_offset_for_nfft=0;
814  int flag_no_rshp=flag_stgy.flag_no_rshp;//0;
815  int flag_nofft=flag_stgy.flag_nofft;//1;
816 
817  int m_eff = (id0+l-1)/n - id0/n + 1 ; //number of columns
818  int nfullcol;
819  int nloop_middle; //change it to number of full column to improve memory
820 
821  FILE *file;
822  file = stdout;
823 
824  if (l<distcorrmin) //test to avoid communications errors
825  return print_error_message (1, __FILE__, __LINE__);
826 
827 
828 //shortcut for m==1 if flag_shortcut_m_eff_eq_1==1 && nfft==1 ??
829  if (m_eff==1 && flag_shortcut_m_eff_eq_1==1 && nfft==1 || flag_no_rshp==1 && id0==0 && l==n*m) {
830 
831  int flag_offset=0;
832 
833 // if (flag_prod_strategy_nofft==1) //need to have T as input to make it work
834  // stmm_simple_core(V, n, m, T, blocksize, lambda, nfft, flag_offset);
835 // else
836  int nr=min(l,n);
837  stmm_core(V, nr, m_eff, T, T_fft, blocksize, lambda, V_fft, V_rfft, nfft, plan_f, plan_b, flag_offset, flag_nofft);
838 
839 
840  return 0;
841  }//End shortcut for m==1
842 
843 
844 //the middle
845  int m_middle;
846 
847 //define splitting for the product computation
848  nfullcol = max(0, (l-(n-id0%n)%n-(id0+l)%n)/n ); //check how many full columns input data we have
849 
850  if (flag_nfullcol_in_middle==1)
851  nloop_middle = ceil(1.0*(nfullcol)/nfft);
852  else
853  nloop_middle = (nfullcol)/nfft;
854 
855  if (flag_nfullcol_in_middle==1)
856  m_middle = nfullcol;
857  else
858  m_middle = nfft*nloop_middle;
859 
860 
861  int vmiddle_size = n*m_middle;
862 
863 
864  if(PRINT_RANK==0 && VERBOSE>2)
865  printf("nloop_middle=%d , m_middle=%d\n", nloop_middle, m_middle);
866 
867 
868 //compute the middle if needed
869  if (nloop_middle>0) {
870  double *Vmiddle;
871  int offset_middle = (n-id0%n)%n;
872  Vmiddle = (*V)+offset_middle;
873 
874  int flag_offset=0;
875  stmm_core(&Vmiddle, n, m_middle, T, T_fft, blocksize, lambda, V_fft, V_rfft, nfft, plan_f, plan_b, flag_offset, flag_nofft);
876 
877  } //(nloop_middle>0)
878 
879 
880 //edge (first+last columns + extra column from the euclidian division)
881  int v1edge_size = min(l,(n-id0%n)%n);
882  int v2edge_size = max( l-(v1edge_size+vmiddle_size) , 0);
883  int vedge_size = v1edge_size+v2edge_size;
884 
885 //compute the edges if needed
886  if (vedge_size>0) {
887 
888  int m_v1edge, m_v2edge;
889  m_v1edge = (v1edge_size>0)*1; //m_v1 = 1 or 0 cannot be more
890  m_v2edge = m-(m_v1edge+m_middle);
891  int nbcol = m_v1edge+m_v2edge;
892  int *nocol;
893  nocol = (int *) calloc(nbcol, sizeof(double));
894 
895  //define the columns for the edge computation
896  if (m_v1edge==1)
897  nocol[0]=0;
898  for(i=(m_v1edge);i<nbcol;i++)
899  nocol[i]=m_middle+i;
900 
901  if(PRINT_RANK==0 && VERBOSE>2)
902  printf("nbcol=%d , m_v1edge=%d , m_v2edge=%d\n", nbcol, m_v1edge, m_v2edge);
903 
904 //shorcut for nbcol==1
905  if (nbcol==1 && nfft==1 && flag_shortcut_nbcol_eq_1==1) {
906  //this is the case where no reshaping is needed. This is equivalent to flag_format_rshp==0
907  double *Vedge;
908  int offset_edge = n*nocol[0];//work because all the previous columns are obligatory full
909  Vedge = (*V)+offset_edge;
910  int flag_offset=0;
911  stmm_core(&Vedge, vedge_size, nbcol, T, T_fft, blocksize, lambda, V_fft, V_rfft, nfft, plan_f, plan_b, flag_offset, flag_nofft);
912 
913  }
914  else { //general case to compute de edges
915 
916  double *Vin;
917  Vin = (*V);
918 
919 //size for the different kinds of reshaping
920  int lconc = vedge_size; //another way to compute : lconc = n*nbcol - (nocol[0]==0)*(id0%n) - (nocol[nbcol-1]==(m-1))*(n-(id0+l)%n);
921  int v1_size=lconc+(distcorrmin)*(nbcol-1);
922  int fft_size = ceil(1.0*v1_size/nfft)+2*distcorrmin;
923 
924  int flag_format_rshp = (nfft>1)*2 + (nfft==1 && nbcol>1)*1 + (nfft==1 && nbcol==1)*0;
925  int nrshp, mrshp, lrshp;
926 
927  define_rshp_size(flag_format_rshp, fft_size, nfft, v1_size, vedge_size, &nrshp, &mrshp, &lrshp);
928 
929 //allocate Vrshp for computation
930  double *Vrshp;
931  Vrshp = (double *) calloc(lrshp, sizeof(double));
932  double *Vout;
933  Vout = (*V);
934 
935  if(PRINT_RANK==0 && VERBOSE>2) {
936  fprintf(file, "nrshp=%d , mrshp=%d , lrshp=%d\n", nrshp, mrshp, lrshp);
937  fprintf(file, "flag_format_rshp=%d\n", flag_format_rshp);
938  }
939 
940  build_reshape(Vin, nocol, nbcol, lconc, n, m, id0, l, lambda, nfft, Vrshp, nrshp, mrshp, lrshp, flag_format_rshp);
941 
942  int flag_offset;
943  if (flag_format_rshp==2 && flag_optim_offset_for_nfft==1)
944  flag_offset=1;
945  else
946  flag_offset=0;
947 
948 //compute Vrshp
949  stmm_core(&Vrshp, nrshp, mrshp, T, T_fft, blocksize, lambda, V_fft, V_rfft, nfft, plan_f, plan_b, flag_offset, flag_nofft);
950 
951  extract_result(Vout, nocol, nbcol, lconc, n, m, id0, l, lambda, nfft, Vrshp, nrshp, mrshp, lrshp, flag_format_rshp);
952 
953 
954  }//End general case to compute de edges
955  }//End (vedge_size>0)
956 
957 
958  return 0;
959 }
960 
961 
962 //=========================================================================
963 #ifdef W_MPI
964 
965 
980 int mpi_stmm(double **V, int n, int m, int id0, int l, double *T, int lambda, Flag flag_stgy, MPI_Comm comm)
981 {
982 
983  //mpi variables
984  int rank; //rank process
985  int size; //number of processes
986  MPI_Status status;
987  MPI_Comm_rank(comm, &rank);
988  MPI_Comm_size(comm, &size);
989 
990 
991  //routine variables
992  int i,j,k; // some index
993  int idf = id0+l; // first index of scattered V for rank "rank + 1";
994  int cfirst = id0/n; // first column index
995  int clast = idf/n; // last column index
996  int clast_r = (idf-1)/n;
997  int m_eff = clast_r - cfirst + 1 ;
998  double *V1, *Lambda;
999 
1000  // Mpi communication conditions
1001  // Mpi comm is needed when columns are truncated
1002  int right = rank + 1;
1003  int left = rank - 1;
1004  int v1_size = l + 2*lambda; // size including comm
1005  if (rank==0 || cfirst*n==id0){ // no left comm
1006  v1_size -= lambda;
1007  left = MPI_PROC_NULL;}
1008  if (rank==(size-1) || clast*n==idf){ // no right comm
1009  v1_size -= lambda;
1010  right = MPI_PROC_NULL;}
1011 
1012  // init data to send
1013  Lambda=(double *) malloc(2*lambda * sizeof(double));
1014  if (Lambda==0)
1015  return print_error_message (2, __FILE__, __LINE__);
1016 
1017  for(i=0;i<lambda;i++) {
1018  Lambda[i]=(*V)[i];
1019  Lambda[i+lambda]=(*V)[i+l-lambda]; }
1020 
1021  if(PRINT_RANK==0 && VERBOSE>2)
1022  printf("[rank %d] Left comm with %d | Right comm with %d\n", rank, left, right);
1023 
1024  //send and receive data
1025  MPI_Sendrecv_replace(Lambda, lambda, MPI_DOUBLE, left, MPI_USER_TAG, right, MPI_USER_TAG, comm, &status); //1st comm
1026  MPI_Sendrecv_replace((Lambda+lambda), lambda, MPI_DOUBLE, right, MPI_USER_TAG, left, MPI_USER_TAG, comm, &status); //2nd comm
1027 
1028 
1029  if (l<lambda) //After sendrecv to avoid problems of communication for others processors
1030  return print_error_message (1, __FILE__, __LINE__);
1031 
1032  //copy received data
1033  if(left==MPI_PROC_NULL && right==MPI_PROC_NULL) // 0--0 : nothing to do
1034  V1 = *V;
1035  else if(left==MPI_PROC_NULL) { // 0--1 : realloc
1036  *V = realloc(*V, v1_size * sizeof(double));
1037  if(*V == NULL)
1038  return print_error_message (2, __FILE__, __LINE__);
1039  V1 = *V; }
1040  else // 1--1 or 1--0 : new allocation
1041  V1 = (double *) malloc(v1_size * sizeof(double));
1042 
1043  if (left!=MPI_PROC_NULL){
1044  for(i=0;i<lambda;i++)
1045  V1[i] = Lambda[i+lambda];
1046  id0 -= lambda;}
1047  if (right!=MPI_PROC_NULL){
1048  for(i=0;i<lambda;i++)
1049  V1[i+v1_size-lambda] = Lambda[i]; }
1050 
1051  // Copy input matrix V
1052  int offset = 0;
1053  if (left!=MPI_PROC_NULL){
1054  offset = lambda;
1055 #pragma omp parallel for
1056  for(i=offset;i<l+offset;i++)
1057  V1[i] = (*V)[i-offset]; }
1058 
1059  fftw_complex *V_fft, *T_fft;
1060  double *V_rfft;
1061  fftw_plan plan_f, plan_b;
1062 
1063 
1064  //Compute matrix product
1065  int nfft, blocksize;
1066 
1067  tpltz_init(v1_size, lambda , &nfft, &blocksize, &T_fft, T, &V_fft, &V_rfft, &plan_f, &plan_b, flag_stgy);
1068 
1069  if(PRINT_RANK==0 && VERBOSE>1)
1070  printf("[rank %d] Before middle-level call : blocksize=%d, nfft=%d\n", rank, blocksize, nfft);
1071 
1072  stmm_main(&V1, n, m, id0, v1_size, T, T_fft, lambda, V_fft, V_rfft, plan_f, plan_b, blocksize, nfft, flag_stgy);
1073 
1074 
1075  tpltz_cleanup(&T_fft, &V_fft, &V_rfft,&plan_f, &plan_b );
1076 
1077  // Copy output matrix TV
1078  offset = 0;
1079  if (left!=MPI_PROC_NULL)
1080  offset = lambda;
1081  if(left==MPI_PROC_NULL && right==MPI_PROC_NULL) // 0--0
1082  *V=V1;
1083  else if(left==MPI_PROC_NULL) { // 0--1
1084  V1 = realloc(V1, l * sizeof(double));
1085  if(V1 == NULL)
1086  return print_error_message (2, __FILE__, __LINE__);
1087  *V = V1; }
1088  else { // 1--0 or 1--1
1089 #pragma omp parallel for
1090  for(i=offset;i<l+offset;i++)
1091  (*V)[i-offset] = V1[i]; }
1092 
1093  if(left!=MPI_PROC_NULL)
1094  free(V1);
1095 
1096  return 0;
1097 }
1098 
1099 #endif
1100 
1101