RInside Version 0.2.12
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
rinsideserver.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Christian Authmann
3  */
4 
5 #define LOG(...) {fprintf(stderr, "%d: ", getpid());fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");}
6 
7 #include <RInside.h>
8 
9 #include "rinsideserver.h"
10 #include "internalfunction_clone.h"
11 
12 #include <stdexcept>
13 #include <fstream>
14 
15 
16 // Two helper functions.
17 static void replace_all(std::string &str, const std::string &search, const std::string &replace) {
18  size_t start_pos = 0;
19  while ((start_pos = str.find(search, start_pos)) != std::string::npos) {
20  str.replace(start_pos, search.length(), replace);
21  start_pos += replace.length();
22  }
23 };
24 
25 static std::string read_file_as_string(const std::string &filename) {
26  std::ifstream in(filename, std::ios::in | std::ios::binary);
27  if (in) {
28  std::string contents;
29  in.seekg(0, std::ios::end);
30  contents.resize(in.tellg());
31  in.seekg(0, std::ios::beg);
32  in.read(&contents[0], contents.size());
33  in.close();
34  return contents;
35  }
36  throw std::runtime_error("Could not read file");
37 }
38 
39 
40 std::map<int32_t, std::function<SEXP(BinaryStream &)> > RInsideServer::registry_sexp_from_stream;
41 std::map<int32_t, std::function<void(RInsideServer &, SEXP, bool)> > RInsideServer::registry_sexp_to_stream;
42 
43 
44 
46  : stream(std::move(stream)), R(R), Rcallbacks(Rcallbacks), can_send_reply(false) {
47 
48 }
50 
51 }
52 
53 /*
54  * Just a shorthand for the repetitive error handling that follows.
55  * The CMD_TRY must only start after all relevant input has been read, i.e. when the server would be allowed to send a reply.
56  * Connection errors are always considered fatal and will cause the server to stop.
57  */
58 #define CMD_TRY try {
59 #define CMD_CATCH } catch (const BinaryStream::stream_exception) { throw; } catch (const std::exception &e) { std::string s = e.what(); LOG("Command failed: %s", s.c_str()); sendReply(RIS_REPLY_ERROR); stream.write(s); }
60 
61 
62 
64  auto magic = stream.read<int>();
65  if (magic != RIS_MAGIC_NUMBER)
66  throw std::runtime_error("Client sent the wrong magic number");
67 
68  while (true) {
69  auto cmd = stream.read<char>();
71  //LOG("Requested command: %d", cmd);
72 
73  if (cmd == RIS_CMD_EXIT) {
74  LOG("Exiting because the client requested it");
75  return;
76  }
77  else if (cmd == RIS_CMD_SETVALUE) {
78  auto name = stream.read<std::string>();
79  LOG("Setting value for %s", name.c_str());
80  auto sexp = sexp_from_stream();
81  CMD_TRY
82  R[name] = sexp;
84  CMD_CATCH
85  }
86  else if (cmd == RIS_CMD_GETVALUE) {
87  auto name = stream.read<std::string>();
88  LOG("Returning value for %s", name.c_str());
89  auto type = stream.read<int32_t>();
90 
91  CMD_TRY
92  auto sexp = R[name];
93  sexp_to_stream(sexp, type, true);
94  CMD_CATCH
95  }
96  else if (cmd == RIS_CMD_SETCALLBACK) {
97  auto name = stream.read<std::string>();
98  LOG("Setting callback for %s", name.c_str());
99  auto callback_id = stream.read<uint32_t>();
100  auto result_type = stream.read<uint32_t>();
101  auto paramcount = stream.read<size_t>();
102 
103  std::vector<int32_t> param_types;
104  param_types.reserve(paramcount+1);
105  param_types.push_back(result_type);
106  for (size_t i=0;i<paramcount;i++) {
107  auto type = stream.read<int32_t>();
108  param_types.push_back(type);
109  }
110 
111  CMD_TRY
112  R[name] = Rcpp::InternalFunctionForRInsideServer(*this, callback_id, param_types);
114  CMD_CATCH
115  LOG("Callback %s initialized", name.c_str());
116  }
117  else if (cmd == RIS_CMD_RUN) {
118  auto source = stream.read<std::string>();
119  // R on linux doesn't deal well with windows \r\n line endings, so we replace those
120  replace_all(source, "\r\n", "\n");
121 
122  auto type = stream.read<int32_t>();
123 
124  CMD_TRY
125  std::string delimiter = "\n\n";
126  size_t start = 0;
127  size_t end = 0;
128  while (true) {
129  end = source.find(delimiter, start);
130  if (end == std::string::npos)
131  break;
132  std::string line = source.substr(start, end-start);
133  start = end+delimiter.length();
134  LOG("src: %s", line.c_str());
135  R.parseEvalQ(line);
136  }
137  std::string lastline = source.substr(start);
138  LOG("src: %s", lastline.c_str());
139  auto result = R.parseEval(lastline);
140  LOG("Trying to return the result of the R code as a value with typeid %d", type);
141  if (type == 0)
143  else {
144  sexp_to_stream(result, type, true);
145  }
146  CMD_CATCH
147  }
148  else if (cmd == RIS_CMD_GETCONSOLE) {
149  LOG("Returning console output");
150  std::string output = Rcallbacks.getConsoleOutput();
153  stream.write(output);
154  }
155  else if (cmd == RIS_CMD_INITPLOT) {
156  LOG("Initializing plot");
157  auto width = stream.read<uint32_t>();
158  auto height = stream.read<uint32_t>();
159 
160  CMD_TRY
161  R.parseEval("rserver_plot_tempfile = tempfile(\"rs_plot\", fileext=\".png\")");
162  R.parseEval("png(rserver_plot_tempfile, width=" + std::to_string(width) + ", height=" + std::to_string(height)+", bg=\"transparent\")");
164  CMD_CATCH
165  }
166  else if (cmd == RIS_CMD_GETPLOT) {
167  LOG("Returning plot");
168  CMD_TRY
169  R.parseEval("dev.off()");
170  std::string filename = Rcpp::as<std::string>(R["rserver_plot_tempfile"]);
171  std::string output = read_file_as_string(filename);
172  std::remove(filename.c_str());
174  stream.write(output);
175  CMD_CATCH
176  }
177  else
178  throw std::runtime_error("Client sent unknown command");
179  }
180 }
181 
183  auto type = stream.read<int32_t>();
184 
185  if (registry_sexp_from_stream.count(type) < 1) {
186  LOG("unknown type in sexp_from_stream: %d", type);
187  throw std::runtime_error("Unknown datatype in sexp_from_stream");
188  }
189 
190  return registry_sexp_from_stream[type](stream);
191 }
192 
193 void RInsideServer::sexp_to_stream(SEXP sexp, int32_t type, bool include_reply) {
194  if (registry_sexp_to_stream.count(type) < 1) {
195  LOG("unknown type in sexp_to_stream: %d", type);
196  throw std::runtime_error("Unknown datatype in sexp_to_stream");
197  }
198 
199  registry_sexp_to_stream[type](*this, sexp, include_reply);
200 }
201 
203  // TODO: Rcpp does not natively wrap chars.
204  //registerType<int8_t>();
205  //registerType<uint8_t>();
206  registerType<int16_t>();
207  registerType<uint16_t>();
208  registerType<int32_t>();
209  registerType<uint32_t>();
210  registerType<int64_t>();
211  registerType<uint64_t>();
212  registerType<float>();
213  registerType<double>();
214  registerType<std::string>();
215 
216  //registerType<std::vector<int8_t>>();
217  //registerType<std::vector<uint8_t>>();
218  registerType<std::vector<int16_t>>();
219  registerType<std::vector<uint16_t>>();
220  registerType<std::vector<int32_t>>();
221  registerType<std::vector<uint32_t>>();
222  registerType<std::vector<int64_t>>();
223  registerType<std::vector<uint64_t>>();
224  registerType<std::vector<float>>();
225  registerType<std::vector<double>>();
226  registerType<std::vector<std::string>>();
227 
228 }
const char RIS_CMD_GETCONSOLE
Definition: constants.h:15
InternalFunctionForRInsideServer_Impl< PreserveStorage > InternalFunctionForRInsideServer
const char RIS_REPLY_OK
Definition: constants.h:21
const uint32_t RIS_MAGIC_NUMBER
Definition: constants.h:10
BinaryStream stream
Definition: rinsideserver.h:30
const char RIS_CMD_RUN
Definition: constants.h:14
const char RIS_CMD_INITPLOT
Definition: constants.h:16
RInsideCallbacks & Rcallbacks
Definition: rinsideserver.h:32
#define CMD_TRY
RInsideServer(BinaryStream &stream, RInside &R, RInsideCallbacks &Rcallbacks)
#define CMD_CATCH
SEXP sexp_from_stream()
void parseEvalQ(const std::string &line)
Definition: RInside.cpp:366
const char RIS_CMD_GETPLOT
Definition: constants.h:17
std::string getConsoleOutput()
void allowSendReply()
Definition: rinsideserver.h:36
void write(const char *buffer, size_t len)
const char RIS_CMD_SETVALUE
Definition: constants.h:11
static std::string read_file_as_string(const std::string &filename)
int parseEval(const std::string &line, SEXP &ans)
Definition: RInside.cpp:308
static void registerDefaultTypes()
void sexp_to_stream(SEXP, int32_t type, bool include_reply=false)
static std::map< int32_t, std::function< SEXP(BinaryStream &)> > registry_sexp_from_stream
Definition: rinsideserver.h:38
static void replace_all(std::string &str, const std::string &search, const std::string &replace)
const char RIS_CMD_EXIT
Definition: constants.h:18
const char RIS_REPLY_VALUE
Definition: constants.h:23
const char RIS_CMD_GETVALUE
Definition: constants.h:12
#define LOG(...)
static std::map< int32_t, std::function< void(RInsideServer &, SEXP, bool)> > registry_sexp_to_stream
Definition: rinsideserver.h:39
void sendReply(char reply)
Definition: rinsideserver.h:35
size_t read(char *buffer, size_t len)
RInside & R
Definition: rinsideserver.h:31
const char RIS_CMD_SETCALLBACK
Definition: constants.h:13