diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9f3597ba3..3913ab668 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -128,7 +128,8 @@ set(FISH_SRCS
     src/proc.cpp src/reader.cpp src/redirection.cpp src/sanity.cpp src/screen.cpp
     src/signal.cpp src/termsize.cpp src/timer.cpp src/tinyexpr.cpp
     src/tokenizer.cpp src/topic_monitor.cpp src/trace.cpp src/utf8.cpp src/util.cpp
-    src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp src/wutil.cpp src/fds.cpp
+    src/wait_handle.cpp src/wcstringutil.cpp src/wgetopt.cpp src/wildcard.cpp 
+    src/wutil.cpp src/fds.cpp
 )
 
 # Header files are just globbed.
diff --git a/src/builtin_wait.cpp b/src/builtin_wait.cpp
index 1cbf72b59..b7ac4fb2b 100644
--- a/src/builtin_wait.cpp
+++ b/src/builtin_wait.cpp
@@ -11,6 +11,7 @@
 #include "parser.h"
 #include "proc.h"
 #include "signal.h"
+#include "wait_handle.h"
 #include "wgetopt.h"
 #include "wutil.h"
 
@@ -20,10 +21,11 @@ static bool can_wait_on_job(const std::shared_ptr<job_t> &j) {
 }
 
 /// \return true if a wait handle matches a pid or a process name. Exactly one should be passed.
+/// For convenience, this returns false if the wait handle is null.
 static bool wait_handle_matches(pid_t pid, const wchar_t *proc_name, const wait_handle_ref_t &wh) {
     assert((pid > 0 || proc_name) && "Must specify either pid or proc_name");
-    return (pid > 0 && contains(wh->pids, pid)) ||
-           (proc_name && contains(wh->proc_base_names, proc_name));
+    if (!wh) return false;
+    return (pid > 0 && pid == wh->pid) || (proc_name && proc_name == wh->base_name);
 }
 
 /// Walk the list of jobs, looking for a process with \p pid (if nonzero) or \p proc_name (if not
@@ -34,8 +36,9 @@ static bool find_wait_handles(pid_t pid, const wchar_t *proc_name, const parser_
     assert((pid > 0 || proc_name) && "Must specify either pid or proc_name");
 
     // Has a job already completed?
+    // TODO: we can avoid traversing this list if searching by pid.
     bool matched = false;
-    for (const auto &wh : parser.get_recorded_wait_handles()) {
+    for (const auto &wh : parser.get_wait_handles().get_list()) {
         if (wait_handle_matches(pid, proc_name, wh)) {
             handles->push_back(wh);
             matched = true;
@@ -44,18 +47,13 @@ static bool find_wait_handles(pid_t pid, const wchar_t *proc_name, const parser_
 
     // Is there a running job match?
     for (const auto &j : parser.jobs()) {
-        if (can_wait_on_job(j) && wait_handle_matches(pid, proc_name, j->get_wait_handle())) {
-            handles->push_back(j->get_wait_handle());
-            matched = true;
-        }
-    }
-
-    if (!matched) {
-        // Maybe we could have matched, but a job was stopped or otherwise unwaitable.
-        for (const auto &j : parser.jobs()) {
-            if (wait_handle_matches(pid, proc_name, j->get_wait_handle())) {
+        // We want to set 'matched' to true if we could have matched, even if the job was stopped.
+        bool provide_handle = can_wait_on_job(j);
+        for (const auto &proc : j->processes) {
+            auto wh = proc->get_wait_handle();
+            if (wait_handle_matches(pid, proc_name, wh)) {
                 matched = true;
-                break;
+                if (provide_handle) handles->push_back(std::move(wh));
             }
         }
     }
@@ -65,11 +63,17 @@ static bool find_wait_handles(pid_t pid, const wchar_t *proc_name, const parser_
 /// \return all wait handles for all jobs, current and already completed (!).
 static std::vector<wait_handle_ref_t> get_all_wait_handles(const parser_t &parser) {
     std::vector<wait_handle_ref_t> result;
-    const auto &whs = parser.get_recorded_wait_handles();
+    // Get wait handles for reaped jobs.
+    const auto &whs = parser.get_wait_handles().get_list();
     result.insert(result.end(), whs.begin(), whs.end());
+
+    // Get wait handles for running jobs.
     for (const auto &j : parser.jobs()) {
-        if (can_wait_on_job(j)) {
-            result.push_back(j->get_wait_handle());
+        if (!can_wait_on_job(j)) continue;
+        for (const auto &proc : j->processes) {
+            if (auto wh = proc->get_wait_handle()) {
+                result.push_back(std::move(wh));
+            }
         }
     }
     return result;
@@ -91,7 +95,7 @@ static int wait_for_completion(parser_t &parser, const std::vector<wait_handle_r
             // Remove completed wait handles (at most 1 if any_flag is set).
             for (const auto &wh : whs) {
                 if (is_completed(wh)) {
-                    parser.wait_handle_remove(wh);
+                    parser.get_wait_handles().remove(wh);
                     if (any_flag) break;
                 }
             }
diff --git a/src/exec.cpp b/src/exec.cpp
index 60357a71d..53e688199 100644
--- a/src/exec.cpp
+++ b/src/exec.cpp
@@ -930,6 +930,9 @@ static launch_result_t exec_process_in_job(parser_t &parser, process_t *p,
                 launch_result_t::failed) {
                 return launch_result_t::failed;
             }
+            // It's possible (though unlikely) that this is a background process which recycled a
+            // pid from another, previous background process. Forget any such old process.
+            parser.get_wait_handles().remove_by_pid(p->pid);
             break;
         }
 
diff --git a/src/fish_tests.cpp b/src/fish_tests.cpp
index c168bc46b..2b46986f2 100644
--- a/src/fish_tests.cpp
+++ b/src/fish_tests.cpp
@@ -3375,6 +3375,42 @@ static void test_1_completion(wcstring line, const wcstring &completion, complet
     do_test(cursor_pos == out_cursor_pos);
 }
 
+static void test_wait_handles() {
+    say(L"Testing wait handles");
+    constexpr size_t limit = 4;
+    wait_handle_store_t whs(limit);
+    do_test(whs.size() == 0);
+
+    // Null handles ignored.
+    whs.add(wait_handle_ref_t{});
+    do_test(whs.size() == 0);
+    do_test(whs.get_by_pid(5) == nullptr);
+
+    // Duplicate pids drop oldest.
+    whs.add(std::make_shared<wait_handle_t>(5, L"first"));
+    whs.add(std::make_shared<wait_handle_t>(5, L"second"));
+    do_test(whs.size() == 1);
+    do_test(whs.get_by_pid(5)->base_name == L"second");
+
+    whs.remove_by_pid(123);
+    do_test(whs.size() == 1);
+    whs.remove_by_pid(5);
+    do_test(whs.size() == 0);
+
+    // Test evicting oldest.
+    whs.add(std::make_shared<wait_handle_t>(1, L"1"));
+    whs.add(std::make_shared<wait_handle_t>(2, L"2"));
+    whs.add(std::make_shared<wait_handle_t>(3, L"3"));
+    whs.add(std::make_shared<wait_handle_t>(4, L"4"));
+    whs.add(std::make_shared<wait_handle_t>(5, L"5"));
+    do_test(whs.size() == 4);
+    auto start = whs.get_list().begin();
+    do_test(std::next(start, 0)->get()->base_name == L"5");
+    do_test(std::next(start, 1)->get()->base_name == L"4");
+    do_test(std::next(start, 2)->get()->base_name == L"3");
+    do_test(std::next(start, 3)->get()->base_name == L"2");
+}
+
 static void test_completion_insertions() {
 #define TEST_1_COMPLETION(a, b, c, d, e) test_1_completion(a, b, c, d, e, __LINE__)
     say(L"Testing completion insertions");
@@ -6612,6 +6648,7 @@ int main(int argc, char **argv) {
     if (should_test_function("universal")) test_universal_formats();
     if (should_test_function("universal")) test_universal_ok_to_save();
     if (should_test_function("notifiers")) test_universal_notifiers();
+    if (should_test_function("wait_handles")) test_wait_handles();
     if (should_test_function("completion_insertions")) test_completion_insertions();
     if (should_test_function("autosuggestion_ignores")) test_autosuggestion_ignores();
     if (should_test_function("autosuggestion_combining")) test_autosuggestion_combining();
diff --git a/src/parser.cpp b/src/parser.cpp
index 4aaf4ecbc..a4ee68509 100644
--- a/src/parser.cpp
+++ b/src/parser.cpp
@@ -553,32 +553,6 @@ void parser_t::job_add(shared_ptr<job_t> job) {
     job_list.push_front(std::move(job));
 }
 
-void parser_t::save_wait_handle_for_completed_job(job_t *job) {
-    assert(job && job->is_completed() && "Job null or not completed");
-    // Are we a background job with an external process?
-    if (!job->is_foreground() && job->has_external_proc()) {
-        rec_wait_handles.push_front(job->get_wait_handle(true /* create */));
-
-        // Limit how many background jobs we will remember.
-        // This is CHILD_MAX (controlled by _SC_CHILD_MAX) but we just hard code it.
-        // 1024 is zsh's fallback.
-        while (rec_wait_handles.size() > 1024) rec_wait_handles.pop_back();
-    }
-
-    // Mark the job as complete in its wait handle (but don't create it just for this).
-    if (auto wh = job->get_wait_handle(false /* create */)) {
-        wh->completed = true;
-    }
-}
-
-void parser_t::wait_handle_remove(const wait_handle_ref_t &handle) {
-    // Note the handle may not be found, if we exceeded our wait handle limit.
-    auto iter = std::find(rec_wait_handles.begin(), rec_wait_handles.end(), handle);
-    if (iter != rec_wait_handles.end()) {
-        rec_wait_handles.erase(iter);
-    }
-}
-
 void parser_t::job_promote(job_t *job) {
     job_list_t::iterator loc;
     for (loc = job_list.begin(); loc != job_list.end(); ++loc) {
diff --git a/src/parser.h b/src/parser.h
index 6a910c210..f7822e751 100644
--- a/src/parser.h
+++ b/src/parser.h
@@ -20,6 +20,7 @@
 #include "parse_tree.h"
 #include "proc.h"
 #include "util.h"
+#include "wait_handle.h"
 
 class io_chain_t;
 
@@ -252,9 +253,9 @@ class parser_t : public std::enable_shared_from_this<parser_t> {
     /// The jobs associated with this parser.
     job_list_t job_list;
 
-    /// The list of recorded wait-handles. These are jobs that finished in the background, and have
+    /// Our store of recorded wait-handles. These are jobs that finished in the background, and have
     /// been reaped, but may still be wait'ed on.
-    std::deque<wait_handle_ref_t> rec_wait_handles;
+    wait_handle_store_t wait_handles;
 
     /// The list of blocks. This is a deque because we give out raw pointers to callers, who hold
     /// them across manipulating this stack.
@@ -367,10 +368,9 @@ class parser_t : public std::enable_shared_from_this<parser_t> {
     library_data_t &libdata() { return library_data; }
     const library_data_t &libdata() const { return library_data; }
 
-    /// Access the list of wait handles for jobs that have finished in the background.
-    const std::deque<wait_handle_ref_t> &get_recorded_wait_handles() const {
-        return rec_wait_handles;
-    }
+    /// Get our wait handle store.
+    wait_handle_store_t &get_wait_handles() { return wait_handles; }
+    const wait_handle_store_t &get_wait_handles() const { return wait_handles; }
 
     /// Get and set the last proc statuses.
     int get_last_status() const { return vars().get_last_status(); }
@@ -408,13 +408,6 @@ class parser_t : public std::enable_shared_from_this<parser_t> {
     /// Returns the job with the given pid.
     job_t *job_get_from_pid(pid_t pid) const;
 
-    /// Given that a job has completed, check if it may be wait'ed on; if so add it to our list of
-    /// wait handles.
-    void save_wait_handle_for_completed_job(job_t *job);
-
-    /// Remove a wait handle, if present in the list.
-    void wait_handle_remove(const wait_handle_ref_t &handle);
-
     /// Returns a new profile item if profiling is active. The caller should fill it in.
     /// The parser_t will deallocate it.
     /// If profiling is not active, this returns nullptr.
diff --git a/src/proc.cpp b/src/proc.cpp
index bace61f47..d67f02ca6 100644
--- a/src/proc.cpp
+++ b/src/proc.cpp
@@ -31,6 +31,7 @@
 #endif
 #include <sys/time.h>  // IWYU pragma: keep
 #include <sys/types.h>
+#include <sys/wait.h>
 
 #include <algorithm>  // IWYU pragma: keep
 #include <memory>
@@ -189,21 +190,6 @@ maybe_t<statuses_t> job_t::get_statuses() const {
     return st;
 }
 
-wait_handle_ref_t job_t::get_wait_handle(bool create) {
-    if (!wait_handle && create) {
-        wait_handle = std::make_shared<wait_handle_t>();
-        for (const auto &proc : processes) {
-            // Only external processes may be wait'ed upon.
-            if (proc->type != process_type_t::external) continue;
-            if (proc->pid > 0) {
-                wait_handle->pids.push_back(proc->pid);
-            }
-            wait_handle->proc_base_names.push_back(wbasename(proc->actual_cmd));
-        }
-    }
-    return wait_handle;
-}
-
 void internal_proc_t::mark_exited(proc_status_t status) {
     assert(!exited() && "Process is already exited");
     status_.store(status, std::memory_order_relaxed);
@@ -309,6 +295,16 @@ bool process_t::is_internal() const {
     return true;
 }
 
+wait_handle_ref_t process_t::get_wait_handle(bool create) {
+    if (type != process_type_t::external || pid <= 0) {
+        return nullptr;
+    }
+    if (!wait_handle_ && create) {
+        wait_handle_ = std::make_shared<wait_handle_t>(this->pid, wbasename(this->actual_cmd));
+    }
+    return wait_handle_;
+}
+
 static uint64_t next_internal_job_id() {
     static std::atomic<uint64_t> s_next{};
     return ++s_next;
@@ -610,6 +606,26 @@ static bool job_wants_message(const shared_ptr<job_t> &j) {
     return true;
 }
 
+/// Given that a job has completed, check if it may be wait'ed on; if so add it to the wait handle
+/// store. Then mark all wait handles as complete.
+static void save_wait_handle_for_completed_job(const shared_ptr<job_t> &job,
+                                               wait_handle_store_t &store) {
+    assert(job && job->is_completed() && "Job null or not completed");
+    // Are we a background job?
+    if (!job->is_foreground()) {
+        for (auto &proc : job->processes) {
+            store.add(proc->get_wait_handle(true));
+        }
+    }
+
+    // Mark all wait handles as complete (but don't create just for this).
+    for (auto &proc : job->processes) {
+        if (wait_handle_ref_t wh = proc->get_wait_handle(false /* create */)) {
+            wh->completed = true;
+        }
+    }
+}
+
 /// Remove completed jobs from the job list, printing status messages as appropriate.
 /// \return whether something was printed.
 static bool process_clean_after_marking(parser_t &parser, bool allow_interactive) {
@@ -688,7 +704,7 @@ static bool process_clean_after_marking(parser_t &parser, bool allow_interactive
         const shared_ptr<job_t> &j = *iter;
         if (should_process_job(j) && j->is_completed()) {
             // If this job finished in the background, we have to remember to wait on it.
-            parser.save_wait_handle_for_completed_job(j.get());
+            save_wait_handle_for_completed_job(j, parser.get_wait_handles());
             iter = jobs.erase(iter);
         } else {
             ++iter;
diff --git a/src/proc.h b/src/proc.h
index 368eab73a..2e7aae43b 100644
--- a/src/proc.h
+++ b/src/proc.h
@@ -8,7 +8,6 @@
 #include <signal.h>
 #include <stddef.h>
 #include <sys/time.h>  // IWYU pragma: keep
-#include <sys/wait.h>
 #include <unistd.h>
 
 #include <deque>
@@ -21,6 +20,7 @@
 #include "io.h"
 #include "parse_tree.h"
 #include "topic_monitor.h"
+#include "wait_handle.h"
 
 /// Types of processes.
 enum class process_type_t {
@@ -248,6 +248,10 @@ class process_t {
     /// \return whether this process type is internal (block, function, or builtin).
     bool is_internal() const;
 
+    /// \return the wait handle for the process, creating it if \p create is set.
+    /// This will return nullptr if the process does not have a pid (i.e. is not external).
+    wait_handle_ref_t get_wait_handle(bool create = true);
+
     /// Actual command to pass to exec in case of process_type_t::external or process_type_t::exec.
     wcstring actual_cmd;
 
@@ -285,6 +289,9 @@ class process_t {
    private:
     wcstring_list_t argv_;
     redirection_spec_list_t proc_redirection_specs_;
+
+    // The wait handle. This is constructed lazily, and cached.
+    wait_handle_ref_t wait_handle_{};
 };
 
 using process_ptr_t = std::unique_ptr<process_t>;
@@ -297,21 +304,6 @@ using job_id_t = int;
 /// Every job has a unique positive value for this.
 using internal_job_id_t = uint64_t;
 
-/// The bits of a job necessary to support 'wait'.
-/// This may outlive the job.
-struct wait_handle_t {
-    /// The list of pids of the processes in this job.
-    std::vector<pid_t> pids{};
-
-    /// The list of "base names" of the processes from the job.
-    /// For example if the job is "/bin/sleep" then this will be 'sleep'.
-    wcstring_list_t proc_base_names{};
-
-    /// Set to true when the job is completed.
-    bool completed{false};
-};
-using wait_handle_ref_t = std::shared_ptr<wait_handle_t>;
-
 /// A struct representing a job. A job is a pipeline of one or more processes.
 class job_t {
    public:
@@ -394,10 +386,6 @@ class job_t {
     // This is never null and not changed after construction.
     job_group_ref_t group{};
 
-    // The wait handle. This is constructed lazily, and cached.
-    // Do not access this directly, use the get_wait_handle() function below.
-    wait_handle_ref_t wait_handle{};
-
     /// \return the pgid for the job, based on the job group.
     /// This may be none if the job consists of just internal fish functions or builtins.
     /// This may also be fish itself.
@@ -491,9 +479,6 @@ class job_t {
 
     /// \returns the statuses for this job.
     maybe_t<statuses_t> get_statuses() const;
-
-    /// \return the wait handle for the job, creating it if \p create is set.
-    wait_handle_ref_t get_wait_handle(bool create = true);
 };
 
 /// Whether this shell is attached to a tty.
diff --git a/src/wait_handle.cpp b/src/wait_handle.cpp
new file mode 100644
index 000000000..2e01ad4d6
--- /dev/null
+++ b/src/wait_handle.cpp
@@ -0,0 +1,47 @@
+#include "config.h"
+
+#include "wait_handle.h"
+
+#include "flog.h"
+
+wait_handle_store_t::wait_handle_store_t(size_t limit) : limit_(limit) {}
+
+void wait_handle_store_t::add(wait_handle_ref_t wh) {
+    if (!wh || wh->pid <= 0) return;
+    pid_t pid = wh->pid;
+
+    remove_by_pid(wh->pid);
+    handles_.push_front(std::move(wh));
+    handle_map_[pid] = std::begin(handles_);
+
+    // Remove oldest until we reach our limit.
+    while (handles_.size() > limit_) {
+        handle_map_.erase(handles_.back()->pid);
+        handles_.pop_back();
+    }
+}
+
+void wait_handle_store_t::remove(const wait_handle_ref_t &wh) {
+    // Note: this differs from remove_by_pid because we verify that the handle is the same.
+    if (!wh) return;
+    auto iter = handle_map_.find(wh->pid);
+    if (iter != handle_map_.end() && *iter->second == wh) {
+        // Note this may deallocate the wait handle, leaving it dangling.
+        handles_.erase(iter->second);
+        handle_map_.erase(iter);
+    }
+}
+
+void wait_handle_store_t::remove_by_pid(pid_t pid) {
+    auto iter = handle_map_.find(pid);
+    if (iter != handle_map_.end()) {
+        handles_.erase(iter->second);
+        handle_map_.erase(iter);
+    }
+}
+
+wait_handle_ref_t wait_handle_store_t::get_by_pid(pid_t pid) const {
+    auto iter = handle_map_.find(pid);
+    if (iter == handle_map_.end()) return nullptr;
+    return *iter->second;
+}
diff --git a/src/wait_handle.h b/src/wait_handle.h
new file mode 100644
index 000000000..e1afe613d
--- /dev/null
+++ b/src/wait_handle.h
@@ -0,0 +1,85 @@
+// Support for handling pids that are no longer fish jobs.
+// This includes pids that have been disowned ("forgotten") and background jobs which have finished,
+// but may be wait'ed.
+#ifndef FISH_WAIT_HANDLE_H
+#define FISH_WAIT_HANDLE_H
+
+#include "config.h"  // IWYU pragma: keep
+
+#include <unistd.h>
+
+#include <list>
+#include <unordered_map>
+#include <vector>
+
+#include "common.h"
+
+/// The bits of a job necessary to support 'wait' and '--on-process-exit'.
+/// This may outlive the job.
+struct wait_handle_t {
+    /// Construct from a pid and base name.
+    wait_handle_t(pid_t pid, wcstring name) : pid(pid), base_name(std::move(name)) {}
+
+    /// The pid of this process.
+    pid_t pid{};
+
+    /// The "base name" of this process.
+    /// For example if the process is "/bin/sleep" then this will be 'sleep'.
+    wcstring base_name{};
+
+    /// Set to true when the process is completed.
+    bool completed{false};
+};
+using wait_handle_ref_t = std::shared_ptr<wait_handle_t>;
+
+/// Support for storing a list of wait handles, with a max limit set at initialization.
+/// Note this class is not safe for concurrent access.
+class wait_handle_store_t {
+   public:
+    // Our wait handles are arranged in a linked list for its iterator invalidation semantics: we
+    // may remove one without needing to update the map from pid -> handle.
+    using wait_handle_list_t = std::list<wait_handle_ref_t>;
+
+    /// Construct with a max limit on the number of handles we will remember.
+    /// The default is 1024, which is zsh's default.
+    explicit wait_handle_store_t(size_t limit = 1024);
+
+    /// Add a wait handle to the store. This may remove the oldest handle, if our limit is exceeded.
+    /// It may also remove any existing handle with that pid.
+    /// For convenience, this does nothing if wh is null.
+    void add(wait_handle_ref_t wh);
+
+    /// \return the wait handle for a pid, or nullptr if there is none.
+    /// This is a fast lookup.
+    wait_handle_ref_t get_by_pid(pid_t pid) const;
+
+    /// Remove a given wait handle, if present in this store.
+    void remove(const wait_handle_ref_t &wh);
+
+    /// Remove the wait handle for a pid, if present in this store.
+    void remove_by_pid(pid_t pid);
+
+    /// Get the list of all wait handles.
+    const wait_handle_list_t &get_list() const { return handles_; }
+
+    /// Convenience to return the size, for testing.
+    size_t size() const { return handles_.size(); }
+
+    /// No copying allowed.
+    wait_handle_store_t(const wait_handle_store_t &) = delete;
+    void operator=(const wait_handle_store_t &) = delete;
+
+   private:
+    using list_node_t = typename wait_handle_list_t::iterator;
+
+    /// The list of all wait handles. New ones come on the front, the last one is oldest.
+    wait_handle_list_t handles_{};
+
+    /// Map from pid to the wait handle's position in the list.
+    std::unordered_map<pid_t, list_node_t> handle_map_{};
+
+    /// Max supported wait handles.
+    const size_t limit_;
+};
+
+#endif