24 #include <sys/types.h> 25 #include <sys/socket.h> 42 #define LOG(...) {fprintf(stderr, "%d: ", getpid());fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");} 54 #include <RcppCommon.h> 61 #if !defined(RINSIDE_CALLBACKS) 62 #error "RInside was not compiled with RINSIDE_CALLBACKS" 64 #if !defined(RCPP_USING_CXX11) 65 #error "Rcpp didn't detect C++11 support (RCPP_USING_CXX11 is not defined)" 78 int cmpTimespec(
const struct timespec &t1,
const struct timespec &t2) {
79 if (t1.tv_sec < t2.tv_sec)
81 if (t1.tv_sec > t2.tv_sec)
83 if (t1.tv_nsec < t2.tv_nsec)
85 if (t1.tv_nsec > t2.tv_nsec)
92 LOG(
"Caught signal %d, exiting", signum);
101 RInsideServer::registerType<Foo>();
102 RInsideServer::registerType<Bar>();
106 int signals[] = {SIGHUP, SIGINT, 0};
107 for (
int i=0;signals[i];i++) {
109 perror(
"Cannot install signal handler");
113 printf(
"Signal handler for %d installed\n", signals[i]);
115 signal(SIGPIPE, SIG_IGN);
125 printf(
"...loading R\n");
127 R.set_callbacks( Rcallbacks );
129 printf(
"...loading packages\n");
140 catch (
const std::exception &e) {
141 printf(
"error loading packages: %s\nR's output:\n%s", e.what(), Rcallbacks->
getConsoleOutput().c_str());
146 printf(
"R is ready\n");
152 int listen_fd = socket(AF_UNIX, SOCK_STREAM, 0);
154 perror(
"socket() failed");
159 struct sockaddr_un server_addr;
160 memset((
void *) &server_addr, 0,
sizeof(server_addr));
161 server_addr.sun_family = AF_UNIX;
163 if (bind(listen_fd, (sockaddr *) &server_addr,
sizeof(server_addr)) < 0) {
164 perror(
"bind() failed");
176 std::map<pid_t, timespec> running_clients;
178 printf(
"Socket started, listening..\n");
180 listen(listen_fd, 5);
189 while ((exited_pid = waitpid(-1, &status, WNOHANG)) > 0) {
190 LOG(
"Client %d no longer exists", (
int) exited_pid);
191 running_clients.erase(exited_pid);
196 struct timespec current_t;
197 clock_gettime(CLOCK_MONOTONIC, ¤t_t);
199 for (
auto it = running_clients.begin(); it != running_clients.end(); ) {
200 auto timeout_t = it->second;
202 auto timeouted_pid = it->first;
203 LOG(
"Client %d gets killed due to timeout", (
int) timeouted_pid);
211 if (kill(timeouted_pid, SIGHUP) < 0) {
212 perror(
"kill() failed");
217 running_clients.erase(it++);
231 if (running_clients.size() > 10) {
232 std::this_thread::sleep_for(std::chrono::milliseconds(5000));
236 struct pollfd pollfds[1];
237 pollfds[0].fd = listen_fd;
238 pollfds[0].events = POLLIN;
240 int poll_res = poll(pollfds, 1, 5000);
242 perror(
"poll() failed");
251 if ((pollfds[0].revents & POLLIN) == 0)
254 struct sockaddr_un client_addr;
255 socklen_t client_addr_len =
sizeof(client_addr);
256 int client_fd = accept(listen_fd, (
struct sockaddr *) &client_addr, &client_addr_len);
258 if (errno == EAGAIN || errno == EWOULDBLOCK)
260 perror(
"accept() failed");
266 perror(
"fork() failed");
282 LOG(
"Client starting");
283 auto start_c = clock();
284 struct timespec start_t;
285 clock_gettime(CLOCK_MONOTONIC, &start_t);
291 catch (
const std::exception &e) {
292 LOG(
"Exception: %s", e.what());
294 auto end_c = clock();
295 struct timespec end_t;
296 clock_gettime(CLOCK_MONOTONIC, &end_t);
298 double c = (double) (end_c - start_c) / CLOCKS_PER_SEC;
299 double t = (double) (end_t.tv_sec - start_t.tv_sec) + (double) (end_t.tv_nsec - start_t.tv_nsec) / 1000000000;
301 LOG(
"Client finished, %.3fs real, %.3fs CPU", t, c);
309 struct timespec timeout_t;
310 clock_gettime(CLOCK_MONOTONIC, &timeout_t);
312 running_clients[pid] = timeout_t;
const int TIMEOUT_SECONDS
std::string getConsoleOutput()
#define ris_socket_address
static void registerDefaultTypes()
void signal_handler(int signum)
void resetConsoleOutput()
int cmpTimespec(const struct timespec &t1, const struct timespec &t2)