From 1879dc4b595e1209d2c7ea159fb6e37287edd520 Mon Sep 17 00:00:00 2001 From: ridiculousfish Date: Wed, 30 Jan 2013 02:22:38 -0800 Subject: [PATCH] Initial set of changes working to make fish robust against running out of file descriptors --- builtin_set.cpp | 3 +- env_universal.cpp | 3 +- exec.cpp | 168 +++++++++++++++++++++++++++++++--------------- io.cpp | 19 +++++- io.h | 7 +- parser.cpp | 5 +- parser.h | 4 +- proc.cpp | 33 ++++++--- proc.h | 8 +-- reader.cpp | 14 ++-- 10 files changed, 181 insertions(+), 83 deletions(-) diff --git a/builtin_set.cpp b/builtin_set.cpp index c916d6864..454f35e71 100644 --- a/builtin_set.cpp +++ b/builtin_set.cpp @@ -65,7 +65,7 @@ static int my_env_set(const wchar_t *key, const wcstring_list_t &val, int scope) if (is_path_variable(key)) { /* Fix for https://github.com/fish-shell/fish-shell/issues/199 . Return success if any path setting succeeds. */ - bool any_success = false, any_error = false; + bool any_success = false; for (i=0; i< val.size() ; i++) { @@ -93,7 +93,6 @@ static int my_env_set(const wchar_t *key, const wcstring_list_t &val, int scope) } else { - any_error = true; const wchar_t *colon; append_format(stderr_buffer, _(BUILTIN_SET_PATH_ERROR), L"set", dir, key); colon = wcschr(dir, L':'); diff --git a/env_universal.cpp b/env_universal.cpp index 2ef1c7c56..f39a608fa 100644 --- a/env_universal.cpp +++ b/env_universal.cpp @@ -80,7 +80,7 @@ static int is_dead() static int try_get_socket_once(void) { - int s, len; + int s; wchar_t *wdir; wchar_t *wuname; @@ -128,7 +128,6 @@ static int try_get_socket_once(void) struct sockaddr_un local = {}; local.sun_family = AF_UNIX; strncpy(local.sun_path, name.c_str(), (sizeof local.sun_path) - 1); - len = sizeof(local); if (connect(s, (struct sockaddr *)&local, sizeof local) == -1) { diff --git a/exec.cpp b/exec.cpp index 2704db683..e9841ed07 100644 --- a/exec.cpp +++ b/exec.cpp @@ -124,13 +124,14 @@ void exec_close(int fd) int exec_pipe(int fd[2]) { + ASSERT_IS_MAIN_THREAD(); + int res; - while ((res=pipe(fd))) { if (errno != EINTR) { - wperror(L"pipe"); + // caller will call wperror return res; } } @@ -148,6 +149,17 @@ int exec_pipe(int fd[2]) return res; } +void print_open_fds(void) +{ + for (size_t i=0; i < open_fds.size(); ++i) + { + if (open_fds.at(i)) + { + fprintf(stderr, "fd %lu\n", i); + } + } +} + /** Check if the specified fd is used as a part of a pipeline in the specidied set of IO redirections. @@ -526,9 +538,7 @@ static bool can_use_posix_spawn_for_job(const job_t *job, const process_t *proce void exec(parser_t &parser, job_t *j) { - process_t *p; pid_t pid = 0; - int mypipe[2]; sigset_t chldset; shared_ptr io_buffer; @@ -559,10 +569,10 @@ void exec(parser_t &parser, job_t *j) j->io.insert(j->io.begin(), parser.block_io.begin(), parser.block_io.end()); } - const io_buffer_t *input_redirect = 0; + const io_buffer_t *input_redirect = NULL; for (size_t idx = 0; idx < j->io.size(); idx++) { - shared_ptr &io = j->io.at(idx); + const shared_ptr &io = j->io.at(idx); if ((io->io_mode == IO_BUFFER)) { @@ -611,12 +621,13 @@ void exec(parser_t &parser, job_t *j) } } - + + // This is a pipe that the "current" process in our loop below reads from + // Only pipe_read->pipe_fd[0] is used shared_ptr pipe_read(new io_pipe_t(0, true)); - pipe_read->pipe_fd[0] = pipe_read->pipe_fd[1] = -1; - + + // This is the pipe that the "current" process in our loop below writes to shared_ptr pipe_write(new io_pipe_t(1, false)); - pipe_write->pipe_fd[0] = pipe_write->pipe_fd[1] = -1; j->io.push_back(pipe_write); @@ -634,7 +645,7 @@ void exec(parser_t &parser, job_t *j) if (job_get_flag(j, JOB_CONTROL)) { - for (p=j->first_process; p; p = p->next) + for (const process_t *p = j->first_process; p; p = p->next) { if (p->type != EXTERNAL) { @@ -676,7 +687,10 @@ void exec(parser_t &parser, job_t *j) set_child_group(j, &keepalive, 0); } } - + + /* Make a set of file descriptors that we will need to close */ + std::set fds_to_close; + /* This loop loops over every process_t in the job, starting it as appropriate. This turns out to be rather complex, since a @@ -685,10 +699,10 @@ void exec(parser_t &parser, job_t *j) The loop also has to handle pipelining between the jobs. */ - for (p=j->first_process; p; p = p->next) + for (process_t *p=j->first_process; p; p = p->next) { const bool p_wants_pipe = (p->next != NULL); - mypipe[1]=-1; + int mypipe[2] = {-1, -1}; pipe_write->fd = p->pipe_write_fd; pipe_read->fd = p->pipe_read_fd; @@ -714,6 +728,7 @@ void exec(parser_t &parser, job_t *j) if (p == j->first_process->next) { + /* We are the first process that could possibly read from a pipe (aka the second process), so add the pipe read redireciton */ j->io.push_back(pipe_read); } @@ -726,9 +741,14 @@ void exec(parser_t &parser, job_t *j) debug(1, PIPE_ERROR); wperror(L"pipe"); exec_error = true; + job_mark_process_as_failed(j, p); break; } - + + fds_to_close.insert(mypipe[0]); + fds_to_close.insert(mypipe[1]); + + // This tells the redirection about the fds, but the redirection does not close them memcpy(pipe_write->pipe_fd, mypipe, sizeof(int)*2); } else @@ -746,7 +766,6 @@ void exec(parser_t &parser, job_t *j) { case INTERNAL_FUNCTION: { - wchar_t * def=0; int shadows; @@ -757,20 +776,15 @@ void exec(parser_t &parser, job_t *j) */ signal_unblock(); - wcstring orig_def; - function_get_definition(p->argv0(), &orig_def); - - // function_get_named_arguments may trigger autoload, which deallocates the orig_def. - // We should make function_get_definition return a wcstring (but how to handle NULL...) - if (! orig_def.empty()) - def = wcsdup(orig_def.c_str()); + wcstring def; + bool function_exists = function_get_definition(p->argv0(), &def); wcstring_list_t named_arguments = function_get_named_arguments(p->argv0()); shadows = function_get_shadows(p->argv0()); signal_block(); - if (def == NULL) + if (! function_exists) { debug(0, _(L"Unknown function '%ls'"), p->argv0()); break; @@ -778,7 +792,6 @@ void exec(parser_t &parser, job_t *j) function_block_t *newv = new function_block_t(p, p->argv0(), shadows); parser.push_block(newv); - /* set_argv might trigger an event handler, hence we need to unblock @@ -792,15 +805,26 @@ void exec(parser_t &parser, job_t *j) if (p->next) { + // Be careful to handle failure, e.g. too many open fds io_buffer.reset(io_buffer_t::create(0)); - j->io.push_back(io_buffer); + if (io_buffer.get() == NULL) + { + exec_error = true; + job_mark_process_as_failed(j, p); + } + else + { + j->io.push_back(io_buffer); + } + } + + if (! exec_error) + { + internal_exec_helper(parser, def.c_str(), TOP, j->io); } - - internal_exec_helper(parser, def, TOP, j->io); parser.allow_function(); parser.pop_block(); - free(def); break; } @@ -810,10 +834,21 @@ void exec(parser_t &parser, job_t *j) if (p->next) { io_buffer.reset(io_buffer_t::create(0)); - j->io.push_back(io_buffer); + if (io_buffer.get() == NULL) + { + exec_error = true; + job_mark_process_as_failed(j, p); + } + else + { + j->io.push_back(io_buffer); + } + } + + if (! exec_error) + { + internal_exec_helper(parser, p->argv0(), TOP, j->io); } - - internal_exec_helper(parser, p->argv0(), TOP, j->io); break; } @@ -821,7 +856,6 @@ void exec(parser_t &parser, job_t *j) case INTERNAL_BUILTIN: { int builtin_stdin=0; - int fg; int close_stdin=0; /* @@ -933,7 +967,7 @@ void exec(parser_t &parser, job_t *j) builtin_out_redirect = has_fd(j->io, 1); builtin_err_redirect = has_fd(j->io, 2); - fg = job_get_flag(j, JOB_FOREGROUND); + const int fg = job_get_flag(j, JOB_FOREGROUND); job_set_flag(j, JOB_FOREGROUND, 0); signal_unblock(); @@ -1300,6 +1334,11 @@ void exec(parser_t &parser, job_t *j) safe_launch_process _never_ returns... */ } + else if (pid < 0) + { + job_mark_process_as_failed(j, p); + exec_error = true; + } } @@ -1325,24 +1364,29 @@ void exec(parser_t &parser, job_t *j) previous process_t */ if (pipe_read->pipe_fd[0] >= 0) + { exec_close(pipe_read->pipe_fd[0]); + fds_to_close.erase(pipe_read->pipe_fd[0]); + } /* Set up the pipe the next process uses to read from the current process_t */ if (p_wants_pipe) + { + /* The next process will read from this pipe */ + assert(p->next != NULL); pipe_read->pipe_fd[0] = mypipe[0]; - /* - If there is a next process in the pipeline, close the - output end of the current pipe (the surrent child - subprocess already has a copy of the pipe - this makes sure - we don't leak file descriptors either in the shell or in - the children). - */ - if (p->next) - { + /* + If there is a next process in the pipeline, close the + output end of the current pipe (the current child + subprocess already has a copy of the pipe - this makes sure + we don't leak file descriptors either in the shell or in + the children). + */ exec_close(mypipe[1]); + fds_to_close.erase(mypipe[1]); } } @@ -1354,6 +1398,11 @@ void exec(parser_t &parser, job_t *j) { kill(keepalive.pid, SIGKILL); } + + for (std::set::iterator foo = fds_to_close.begin(); foo != fds_to_close.end(); ++foo) + { + fprintf(stderr, "-Malingering %d\n", *foo); + } signal_unblock(); @@ -1368,9 +1417,14 @@ void exec(parser_t &parser, job_t *j) proc_last_bg_pid = j->pgid; } - if (!exec_error) + if (! exec_error) { - job_continue(j, 0); + job_continue(j, false); + } + else + { + /* Mark the errored job as not in the foreground. I can't fully justify whether this is the right place, but it prevents sanity_lose from complaining. */ + job_set_flag(j, JOB_FOREGROUND, 0); } } @@ -1401,27 +1455,35 @@ static int exec_subshell_internal(const wcstring &cmd, wcstring_list_t *lst) is_subshell=1; - const shared_ptr io_buffer(io_buffer_t::create(0)); - prev_status = proc_get_last_status(); - parser_t &parser = parser_t::principal_parser(); - if (parser.eval(cmd, io_chain_t(io_buffer), SUBST)) + const shared_ptr io_buffer(io_buffer_t::create(0)); + + // IO buffer creation may fail (e.g. if we have too many open files to make a pipe), so this may be null + if (io_buffer.get() == NULL) { status = -1; } else { - status = proc_get_last_status(); + parser_t &parser = parser_t::principal_parser(); + if (parser.eval(cmd, io_chain_t(io_buffer), SUBST)) + { + status = -1; + } + else + { + status = proc_get_last_status(); + } + + io_buffer->read(); } - io_buffer->read(); - proc_set_last_status(prev_status); is_subshell = prev_subshell; - if (lst != NULL) + if (lst != NULL && io_buffer.get() != NULL) { const char *begin = io_buffer->out_buffer_ptr(); const char *end = begin + io_buffer->out_buffer_size(); diff --git a/io.cpp b/io.cpp index f6947cb21..f1d664f8f 100644 --- a/io.cpp +++ b/io.cpp @@ -156,6 +156,10 @@ io_buffer_t *io_buffer_t::create(bool is_input) delete buffer_redirect; buffer_redirect = NULL; } + else + { + //fprintf(stderr, "Created pipes {%d, %d} for %p\n", buffer_redirect->pipe_fd[0], buffer_redirect->pipe_fd[1], buffer_redirect); + } return buffer_redirect; } @@ -163,16 +167,20 @@ io_buffer_t *io_buffer_t::create(bool is_input) io_buffer_t::~io_buffer_t() { + //fprintf(stderr, "Deallocating pipes {%d, %d} for %p\n", this->pipe_fd[0], this->pipe_fd[1], this); /** If this is an input buffer, then io_read_buffer will not have been called, and we need to close the output fd as well. */ - if (is_input) + if (is_input && pipe_fd[1] >= 0) { exec_close(pipe_fd[1]); } - exec_close(pipe_fd[0]); + if (pipe_fd[0] >= 0) + { + exec_close(pipe_fd[0]); + } /* Dont free fd for writing. This should already be free'd before @@ -193,6 +201,13 @@ void io_chain_t::remove(const shared_ptr &element) } } +void io_chain_t::push_back(const shared_ptr &element) +{ + // Ensure we never push back NULL + assert(element.get() != NULL); + std::vector >:: push_back(element); +} + void io_remove(io_chain_t &list, const shared_ptr &element) { list.remove(element); diff --git a/io.h b/io.h index 2a1999c9c..b007bbf65 100644 --- a/io.h +++ b/io.h @@ -95,22 +95,22 @@ class io_pipe_t : public io_data_t protected: io_pipe_t(io_mode_t m, int f, bool i): io_data_t(m, f), - pipe_fd(), is_input(i) { + pipe_fd[0] = pipe_fd[1] = -1; } public: int pipe_fd[2]; - bool is_input; + const bool is_input; virtual void print() const; io_pipe_t(int f, bool i): io_data_t(IO_PIPE, f), - pipe_fd(), is_input(i) { + pipe_fd[0] = pipe_fd[1] = -1; } }; @@ -178,6 +178,7 @@ public: io_chain_t(const shared_ptr &); void remove(const shared_ptr &element); + void push_back(const shared_ptr &element); shared_ptr get_io_for_fd(int fd) const; shared_ptr get_io_for_fd(int fd); diff --git a/parser.cpp b/parser.cpp index ca524b8db..c0ff3a7dd 100644 --- a/parser.cpp +++ b/parser.cpp @@ -1202,11 +1202,14 @@ bool parser_t::job_remove(job_t *j) void parser_t::job_promote(job_t *job) { + signal_block(); + job_list_t::iterator loc = std::find(my_job_list.begin(), my_job_list.end(), job); assert(loc != my_job_list.end()); /* Move the job to the beginning */ my_job_list.splice(my_job_list.begin(), my_job_list, loc); + signal_unblock(); } job_t *parser_t::job_get(job_id_t id) @@ -3755,7 +3758,7 @@ event_block_t::event_block_t(const event_t &evt) : { } -function_block_t::function_block_t(process_t *p, const wcstring &n, bool shadows) : +function_block_t::function_block_t(const process_t *p, const wcstring &n, bool shadows) : block_t(shadows ? FUNCTION_CALL : FUNCTION_CALL_NO_SHADOW), process(p), name(n) diff --git a/parser.h b/parser.h index 7677754b3..c2ad6c9b7 100644 --- a/parser.h +++ b/parser.h @@ -164,9 +164,9 @@ struct event_block_t : public block_t struct function_block_t : public block_t { - process_t *process; + const process_t *process; wcstring name; - function_block_t(process_t *p, const wcstring &n, bool shadows); + function_block_t(const process_t *p, const wcstring &n, bool shadows); }; struct source_block_t : public block_t diff --git a/proc.cpp b/proc.cpp index 1ed63b5eb..fe9c3d41f 100644 --- a/proc.cpp +++ b/proc.cpp @@ -117,6 +117,15 @@ job_iterator_t::job_iterator_t() : job_list(&parser_t::principal_parser().job_li this->reset(); } +void print_jobs(void) +{ + job_iterator_t jobs; + job_t *j; + while ((j = jobs.next())) + { + printf("%p -> %ls -> (foreground %d, complete %d, stopped %d, constructed %d)\n", j, j->command_wcstr(), job_get_flag(j, JOB_FOREGROUND), job_is_completed(j), job_is_stopped(j), job_get_flag(j, JOB_CONSTRUCTED)); + } +} int is_interactive_session=0; int is_subshell=0; @@ -305,17 +314,21 @@ int job_is_completed(const job_t *j) } -void job_set_flag(job_t *j, int flag, int set) +void job_set_flag(job_t *j, unsigned int flag, int set) { if (set) + { j->flags |= flag; + } else - j->flags = j->flags & ((unsigned int)(-1) ^ flag); + { + j->flags &= ~flag; + } } -int job_get_flag(const job_t *j, int flag) +int job_get_flag(const job_t *j, unsigned int flag) { - return j->flags&flag?1:0; + return !! (j->flags & flag); } int job_signal(job_t *j, int signal) @@ -396,7 +409,10 @@ static void mark_process_status(const job_t *j, void job_mark_process_as_failed(const job_t *job, process_t *p) { /* The given process failed to even lift off (e.g. posix_spawn failed) and so doesn't have a valid pid. Mark it as dead. */ - p->completed = 1; + for (process_t *cursor = p; p != NULL; p = p->next) + { + cursor->completed = 1; + } } /** @@ -630,7 +646,6 @@ int job_reap(bool interactive) { job_t *j = jnext; jnext = jobs.next(); - process_t *p; /* If we are reaping only jobs who do not need status messages @@ -642,7 +657,7 @@ int job_reap(bool interactive) continue; } - for (p=j->first_process; p; p=p->next) + for (process_t *p = j->first_process; p; p=p->next) { int s; if (!p->completed) @@ -902,7 +917,7 @@ static int select_try(job_t *j) */ static void read_try(job_t *j) { - io_buffer_t *buff=NULL; + io_buffer_t *buff = NULL; /* Find the last buffer, which is the one we want to read from @@ -1030,7 +1045,7 @@ static int terminal_return_from_job(job_t *j) return 1; } -void job_continue(job_t *j, int cont) +void job_continue(job_t *j, bool cont) { /* Put job first in the job list diff --git a/proc.h b/proc.h index a97ccbd92..2d09b7518 100644 --- a/proc.h +++ b/proc.h @@ -141,9 +141,9 @@ private: public: process_t(); - ~process_t(); + /** Type of process. Can be one of \c EXTERNAL, \c INTERNAL_BUILTIN, \c INTERNAL_FUNCTION, \c INTERNAL_BLOCK, @@ -453,12 +453,12 @@ extern int no_exec; /** Add the specified flag to the bitset of flags for the specified job */ -void job_set_flag(job_t *j, int flag, int set); +void job_set_flag(job_t *j, unsigned int flag, int set); /** Returns one if the specified flag is set in the specified job, 0 otherwise. */ -int job_get_flag(const job_t *j, int flag); +int job_get_flag(const job_t *j, unsigned int flag); /** Sets the status of the last process to exit @@ -515,7 +515,7 @@ int job_is_completed(const job_t *j); \param j The job \param cont Whether the function should wait for the job to complete before returning */ -void job_continue(job_t *j, int cont); +void job_continue(job_t *j, bool cont); /** Notify the user about stopped or terminated jobs. Delete terminated diff --git a/reader.cpp b/reader.cpp index ae8c9b224..d0312c2d3 100644 --- a/reader.cpp +++ b/reader.cpp @@ -1048,6 +1048,13 @@ static void run_pager(const wcstring &prefix, int is_quoted, const std::vector in(io_buffer_t::create(true)); + shared_ptr out(io_buffer_t::create(false)); + + // The above may fail e.g. if we have too many open fds + if (in.get() == NULL || out.get() == NULL) + return; wchar_t *escaped_separator; int has_case_sensitive=0; @@ -1066,7 +1073,6 @@ static void run_pager(const wcstring &prefix, int is_quoted, const std::vector in(io_buffer_t::create(true)); in->fd = 3; escaped_separator = escape(COMPLETE_SEP_STR, 1); @@ -1133,11 +1139,9 @@ static void run_pager(const wcstring &prefix, int is_quoted, const std::vectorout_buffer_append(foo, strlen(foo)); free(foo); - term_donate(); - - shared_ptr out(io_buffer_t::create(false)); out->fd = 4; - + + term_donate(); parser_t &parser = parser_t::principal_parser(); io_chain_t io_chain; io_chain.push_back(out);