RInside Version 0.2.12
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
example_server.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Christian Authmann
3  */
4 
5 #include "common/binarystream.h"
6 #include "common/constants.h"
7 #include "datatypes/foo.h"
8 #include "datatypes/bar.h"
9 
10 #include <cstdlib>
11 #include <cstdio>
12 #include <string.h> // memset()
13 #include <map>
14 #include <atomic>
15 #include <iostream>
16 #include <fstream>
17 #include <stdexcept>
18 
19 #include <chrono> // for sleeping
20 #include <thread>
21 
22 #include <time.h>
23 
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <sys/un.h>
27 #include <sys/wait.h>
28 #include <sys/stat.h>
29 #include <poll.h>
30 #include <signal.h>
31 
32 /*
33  * This is an example server. It sets up an R environment and a listening socket, then waits
34  * for clients and fork()s.
35  * The actual communication with clients is handled by server/rinsideserver.cpp.
36  * This file only does initialization and process tracking.
37  */
38 
39 /*
40  * Since the server fork()s a lot, we would like to prepend the pid to each logged line
41  */
42 #define LOG(...) {fprintf(stderr, "%d: ", getpid());fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");}
43 
44 /*
45  * If an R script gets stuck in an infinite loop, we need to stop it eventually. We thus define a global
46  * timeout after which it gets killed.
47  *
48  * It would be desirable to allow the client to specify the timeout. Unfortunately, only the fork()ed
49  * child process can communicate with the client, but the parent process needs to know about the timeout,
50  * making this more complicated than one might expect.
51  */
52 const int TIMEOUT_SECONDS = 600;
53 
54 #include <RcppCommon.h>
55 
58 
59 #include <Rcpp.h>
60 #include <RInside.h>
61 #if !defined(RINSIDE_CALLBACKS)
62 #error "RInside was not compiled with RINSIDE_CALLBACKS"
63 #endif
64 #if !defined(RCPP_USING_CXX11)
65 #error "Rcpp didn't detect C++11 support (RCPP_USING_CXX11 is not defined)"
66 #endif
67 
70 
71 
72 /*
73  * The RInsideServer must be included AFTER RInside and all wrappers are included
74  */
75 #include "server/rinsideserver.h"
76 
77 
78 int cmpTimespec(const struct timespec &t1, const struct timespec &t2) {
79  if (t1.tv_sec < t2.tv_sec)
80  return -1;
81  if (t1.tv_sec > t2.tv_sec)
82  return 1;
83  if (t1.tv_nsec < t2.tv_nsec)
84  return -1;
85  if (t1.tv_nsec > t2.tv_nsec)
86  return 1;
87  return 0;
88 }
89 
90 
91 void signal_handler(int signum) {
92  LOG("Caught signal %d, exiting", signum);
93  exit(signum);
94 }
95 
96 
97 int main()
98 {
99  // register our custom types with the server
101  RInsideServer::registerType<Foo>();
102  RInsideServer::registerType<Bar>();
103 
104 
105  // Install signal handlers
106  int signals[] = {SIGHUP, SIGINT, 0};
107  for (int i=0;signals[i];i++) {
108  if (signal(signals[i], signal_handler) == SIG_ERR) {
109  perror("Cannot install signal handler");
110  exit(1);
111  }
112  else
113  printf("Signal handler for %d installed\n", signals[i]);
114  }
115  signal(SIGPIPE, SIG_IGN);
116 
117  /*
118  * If R prints anything to the console, we must catch it.
119  * Instead of redirecting stdout (which we might want to use for diagnostics or logging), we
120  * use RInside's callbacks. They're marked experimental and aren't enabled by default, but in our
121  * tests, they worked just fine.
122  */
123  RInsideCallbacks *Rcallbacks = new RInsideCallbacks();
124  // Initialize R environment
125  printf("...loading R\n");
126  RInside R;
127  R.set_callbacks( Rcallbacks );
128 
129  printf("...loading packages\n");
130  try {
131  /*
132  * Loading packages is slow. We want to load all common packages once on
133  * server startup, before the fork()
134  *
135  * For example, sandboxR might be useful to restrict the damage an R script can do.
136  * See https://github.com/rapporter/sandboxR
137  */
138  //R.parseEval("library(\"sandboxR\")");
139  }
140  catch (const std::exception &e) {
141  printf("error loading packages: %s\nR's output:\n%s", e.what(), Rcallbacks->getConsoleOutput().c_str());
142  exit(5);
143  }
144  Rcallbacks->resetConsoleOutput();
145 
146  printf("R is ready\n");
147 
148  // get rid of leftover sockets
149  unlink(ris_socket_address);
150 
151  // create a fresh socket
152  int listen_fd = socket(AF_UNIX, SOCK_STREAM, 0);
153  if (listen_fd < 0) {
154  perror("socket() failed");
155  exit(1);
156  }
157 
158  // bind socket
159  struct sockaddr_un server_addr;
160  memset((void *) &server_addr, 0, sizeof(server_addr));
161  server_addr.sun_family = AF_UNIX;
162  strcpy(server_addr.sun_path, ris_socket_address);
163  if (bind(listen_fd, (sockaddr *) &server_addr, sizeof(server_addr)) < 0) {
164  perror("bind() failed");
165  exit(1);
166  }
167 
168  // adjust this for your own needs..
169  chmod(ris_socket_address, 0777);
170 
171 
172  /*
173  * We need to keep track of all the children to enforce timeouts. This map
174  * contains pids of all child processes and their end times.
175  */
176  std::map<pid_t, timespec> running_clients;
177 
178  printf("Socket started, listening..\n");
179  // Start listening and fork()
180  listen(listen_fd, 5);
181  while (true) {
182  /*
183  * Try to reap all child processes that exited on their own. Not only
184  * will this keep our running_clients map small, it will also allow the
185  * OS to remove any "zombie" processes.
186  */
187  int status;
188  pid_t exited_pid;
189  while ((exited_pid = waitpid(-1, &status, WNOHANG)) > 0) {
190  LOG("Client %d no longer exists", (int) exited_pid);
191  running_clients.erase(exited_pid);
192  }
193  /*
194  * Now check if any children exceeded their timeout. Kill them.
195  */
196  struct timespec current_t;
197  clock_gettime(CLOCK_MONOTONIC, &current_t);
198 
199  for (auto it = running_clients.begin(); it != running_clients.end(); ) {
200  auto timeout_t = it->second;
201  if (cmpTimespec(timeout_t, current_t) < 0) {
202  auto timeouted_pid = it->first;
203  LOG("Client %d gets killed due to timeout", (int) timeouted_pid);
204 
205  /*
206  * We kill the client using SIGHUP. Since we installed a signal handler, and signal handlers
207  * are kept during fork(), this should be enough to end it.
208  * That is, unless an R package removes the signal handler. In that case, we'd need to keep
209  * tracking the process and force a SIGKILL if it refuses to exit.
210  */
211  if (kill(timeouted_pid, SIGHUP) < 0) {
212  perror("kill() failed");
213  ++it;
214  }
215  else {
216  // the postincrement of the iterator is important to avoid using an invalid iterator
217  running_clients.erase(it++);
218  }
219  }
220  else {
221  ++it;
222  }
223 
224  }
225 
226  /*
227  * Wait for new connections.
228  *
229  * We may want to limit the amount of clients running at the same time.
230  */
231  if (running_clients.size() > 10) {
232  std::this_thread::sleep_for(std::chrono::milliseconds(5000));
233  continue;
234  }
235 
236  struct pollfd pollfds[1];
237  pollfds[0].fd = listen_fd;
238  pollfds[0].events = POLLIN;
239 
240  int poll_res = poll(pollfds, /* count = */ 1, /* timeout in ms = */ 5000);
241  if (poll_res < 0) {
242  perror("poll() failed");
243  exit(1);
244  }
245  /*
246  * If no new connection is made within 5 seconds, we repeat the loop and check
247  * for finished or timeouted children again.
248  */
249  if (poll_res == 0)
250  continue;
251  if ((pollfds[0].revents & POLLIN) == 0)
252  continue;
253 
254  struct sockaddr_un client_addr;
255  socklen_t client_addr_len = sizeof(client_addr);
256  int client_fd = accept(listen_fd, (struct sockaddr *) &client_addr, &client_addr_len);
257  if (client_fd < 0) {
258  if (errno == EAGAIN || errno == EWOULDBLOCK)
259  continue;
260  perror("accept() failed");
261  exit(1);
262  }
263  // fork
264  pid_t pid = fork();
265  if (pid < 0) {
266  perror("fork() failed");
267  exit(1);
268  }
269 
270  if (pid == 0) {
271  /*
272  * This is the child process.
273  *
274  * If the child process needs to drop any privileges the server may have had,
275  * this is an excellent time to do so.
276  * Whether it's a chroot, seccomp-bpf or a MAC framework like SELinux or AppArmor.
277  *
278  * Note that neither is an excuse to run the parent process unrestricted; creating
279  * a new restricted user for the server seems wise.
280  */
281  close(listen_fd);
282  LOG("Client starting");
283  auto start_c = clock();
284  struct timespec start_t;
285  clock_gettime(CLOCK_MONOTONIC, &start_t);
286  try {
287  BinaryStream stream(client_fd, client_fd);
288  RInsideServer ris(stream, R, *Rcallbacks);
289  ris.run();
290  }
291  catch (const std::exception &e) {
292  LOG("Exception: %s", e.what());
293  }
294  auto end_c = clock();
295  struct timespec end_t;
296  clock_gettime(CLOCK_MONOTONIC, &end_t);
297 
298  double c = (double) (end_c - start_c) / CLOCKS_PER_SEC;
299  double t = (double) (end_t.tv_sec - start_t.tv_sec) + (double) (end_t.tv_nsec - start_t.tv_nsec) / 1000000000;
300 
301  LOG("Client finished, %.3fs real, %.3fs CPU", t, c);
302 
303  exit(0);
304  }
305  else {
306  // This is the parent process
307  close(client_fd);
308 
309  struct timespec timeout_t;
310  clock_gettime(CLOCK_MONOTONIC, &timeout_t);
311  timeout_t.tv_sec += TIMEOUT_SECONDS;
312  running_clients[pid] = timeout_t;
313  }
314  }
315 }
const int TIMEOUT_SECONDS
std::string getConsoleOutput()
int main()
#define ris_socket_address
Definition: constants.h:8
static void registerDefaultTypes()
void signal_handler(int signum)
int cmpTimespec(const struct timespec &t1, const struct timespec &t2)
#define LOG(...)