RInside Version 0.2.12
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
rinside_mpi_sample4.cpp
Go to the documentation of this file.
1 // -*- mode: C++; c-indent-level: 4; c-basic-offset: 4; tab-width: 8; -*-
2 //
3 // Simple mpi example: Usage of RInside with a Master-Slave Model with worker
4 //
5 // MPI C API version of file contributed by Nicholas Pezolano and Martin Morgan
6 //
7 // Copyright (C) 2010 - 2013 Dirk Eddelbuettel
8 // Copyright (C) 2013 Nicholas Pezolano
9 // Copyright (C) 2013 Martin Morgan
10 //
11 // GPL'ed
12 
13 #include <mpi.h>
14 #include <RInside.h>
15 #include <string>
16 #include <vector>
17 #include <iostream>
18 
19 #define WORKTAG 1
20 #define DIETAG 2
21 
22 /* Local functions */
23 static void master(void);
24 static void slave(RInside &R);
25 static int get_next_work_item(int &work, const int size_work, std::vector<int> &data);
26 static void do_work(int work,int &result,RInside &R);
27 static void initalize(RInside &R);
28 
29 int itr = 0;
30 
31 int main(int argc, char **argv){
32  int myrank;
33  MPI_Init(&argc, &argv);
34  MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
35  RInside R(argc, argv);
36 
37 
38  if (myrank == 0) {
39  master();
40  } else {
41  initalize(R);
42  slave(R);
43  }
44 
45  MPI_Finalize();
46  return 0;
47 }
48 
49 static void initalize(RInside &R){
50  //load the following R library on every R instance
51  std::string R_libs ="suppressMessages(library(random));";
52  R.parseEvalQ(R_libs);
53 }
54 
55 static void master(void){
56  int ntasks, rank;
57  std::vector<int> data;
58  int work;
59  int result;
60  int sum;
61  MPI_Status status;
62 
63  //create some test "data" to pass around
64  for(int i = 0; i< 10; i++){
65  data.push_back(i);
66  }
67 
68  const int size_work = (int)data.size();
69 
70  MPI_Comm_size(MPI_COMM_WORLD, &ntasks);
71 
72  for (rank = 1; rank < ntasks; ++rank) {
73  get_next_work_item(work,size_work,data);
74  MPI_Send(&work,1,MPI_INT,rank, WORKTAG,MPI_COMM_WORLD);
75  }
76 
77  int ret = get_next_work_item(work,size_work,data);
78  while (ret == 0) {
79  MPI_Recv(&result,1,MPI_INT,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
80  sum += result;
81  MPI_Send(&work,1,MPI_INT,status.MPI_SOURCE,WORKTAG,MPI_COMM_WORLD);
82 
83  ret = get_next_work_item(work,size_work,data);
84  }
85 
86  for (rank = 1; rank < ntasks; ++rank) {
87  MPI_Recv(&result, 1, MPI_INT, MPI_ANY_SOURCE,
88  MPI_ANY_TAG, MPI_COMM_WORLD, &status);
89  sum += result;
90  }
91 
92  for (rank = 1; rank < ntasks; ++rank) {
93  MPI_Send(0, 0, MPI_INT, rank, DIETAG, MPI_COMM_WORLD);
94  }
95 
96  std::cout << "sum of all iterations = " << sum << std::endl;
97 }
98 
99 static void slave(RInside &R) {
100  int work;
101  int result;
102  MPI_Status status;
103 
104  while (1) {
105 
106  MPI_Recv(&work, 1, MPI_INT, 0, MPI_ANY_TAG,
107  MPI_COMM_WORLD, &status);
108 
109  if (status.MPI_TAG == DIETAG) {
110  return;
111  }
112 
113  do_work(work,result,R);
114 
115  MPI_Send(&result, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);
116  }
117 }
118 
119 
120 static int get_next_work_item(int &work,const int size_work, std::vector<int> &data) {
121  if (itr >= size_work) {
122  return -1;
123  }
124 
125  work = data[itr];
126 
127  itr++;
128  std::cout << "iteration = " << itr << std::endl;
129 
130  return 0;
131 }
132 
133 static void do_work(int work,int &result,RInside &R){
134 
135  //create a random number on every slave iteration
136  R["work"] = work;
137  std::string Rcmd = "work <- sample(1:10, 1)";
138  Rcpp::NumericVector M = R.parseEval(Rcmd);
139 
140  result = M(0);
141 }
static void master(void)
static void do_work(int work, int &result, RInside &R)
void parseEvalQ(const std::string &line)
Definition: RInside.cpp:366
#define WORKTAG
static int get_next_work_item(int &work, const int size_work, std::vector< int > &data)
int parseEval(const std::string &line, SEXP &ans)
Definition: RInside.cpp:308
int main(int argc, char **argv)
static void initalize(RInside &R)
static void slave(RInside &R)
#define DIETAG