Make history file rewriting be more lock savvy

We now are stingier with taking history file locks - if the lock
is held too long we may just break it. But the current file save
architecture holds the lock for the duration of the save. It also
has some not-quite-right checks that can cause spurious failures in
the history stress test.

Reimplement the history save to retry. Rather than holding the lock,
rewrite the file to a temporary location and then take the lock. If
the history file has changed, start all over.

This is going to be slower under contention, but the advantage is that
the lock is only held for a brief period (stat + rename) rather than
across calls to write().

Some updated logic also fixes spurious failures that were easy to observe
when tsan was enabled. These failures were due to failing to check if the
file at the path was the same file we opened.

The next step is to move the history file saving to a background thread
to reduce the chances of it impacting user's typing.
This commit is contained in:
ridiculousfish 2017-02-06 11:04:07 -08:00
parent f9371899a1
commit 162053ed8d
3 changed files with 252 additions and 171 deletions

View File

@ -25,6 +25,7 @@
#include <wchar.h>
#include <wctype.h>
#include <algorithm>
#include <array>
#include <atomic>
#include <memory>
#include <set>
@ -2705,7 +2706,7 @@ class history_tests_t {
static void test_history_formats(void);
// static void test_history_speed(void);
static void test_history_races(void);
static void test_history_races_pound_on_history();
static void test_history_races_pound_on_history(size_t item_count);
};
static wcstring random_string(void) {
@ -2830,47 +2831,46 @@ static void time_barrier(void) {
} while (time(NULL) == start);
}
static wcstring_list_t generate_history_lines(int pid) {
static wcstring_list_t generate_history_lines(size_t item_count, int pid) {
wcstring_list_t result;
long max = 256;
result.reserve(max);
for (long i = 0; i < max; i++) {
result.push_back(format_string(L"%ld %ld", (long)pid, i));
result.reserve(item_count);
for (unsigned long i = 0; i < item_count; i++) {
result.push_back(format_string(L"%ld %lu", (long)pid, (unsigned long)i));
}
return result;
}
void history_tests_t::test_history_races_pound_on_history() {
void history_tests_t::test_history_races_pound_on_history(size_t item_count) {
// Called in child process to modify history.
std::unique_ptr<history_t> hist = make_unique<history_t>(L"race_test");
hist->chaos_mode = true;
const wcstring_list_t hist_lines = generate_history_lines(getpid());
for (size_t idx = 0; idx < hist_lines.size(); idx++) {
const wcstring &line = hist_lines.at(idx);
hist->add(line);
hist->save();
history_t hist(L"race_test");
hist.chaos_mode = !true;
const wcstring_list_t hist_lines = generate_history_lines(item_count, getpid());
for (const wcstring &line : hist_lines) {
hist.add(line);
hist.save();
}
}
void history_tests_t::test_history_races(void) {
say(L"Testing history race conditions");
// Test concurrent history writing.
// How many concurrent writers we have
constexpr size_t RACE_COUNT = 10;
// How many items each writer makes
constexpr size_t ITEM_COUNT = 256;
// Ensure history is clear.
{
std::unique_ptr<history_t> hist = make_unique<history_t>(L"race_test");
hist->clear();
}
history_t(L"race_test").clear();
// Test concurrent history writing.
#define RACE_COUNT 10
pid_t children[RACE_COUNT];
for (size_t i = 0; i < RACE_COUNT; i++) {
pid_t pid = fork();
if (!pid) {
// Child process.
setup_fork_guards();
test_history_races_pound_on_history();
test_history_races_pound_on_history(ITEM_COUNT);
exit_without_destructors(0);
} else {
// Parent process.
@ -2885,50 +2885,66 @@ void history_tests_t::test_history_races(void) {
}
// Compute the expected lines.
wcstring_list_t expected_lines[RACE_COUNT];
std::array<wcstring_list_t, RACE_COUNT> expected_lines;
for (size_t i = 0; i < RACE_COUNT; i++) {
expected_lines[i] = generate_history_lines(children[i]);
}
// Count total lines.
size_t line_count = 0;
for (size_t i = 0; i < RACE_COUNT; i++) {
line_count += expected_lines[i].size();
expected_lines[i] = generate_history_lines(ITEM_COUNT, children[i]);
}
// Ensure we consider the lines that have been outputted as part of our history.
time_barrier();
// Ensure that we got sane, sorted results.
std::unique_ptr<history_t> hist = make_unique<history_t>(L"race_test");
hist->chaos_mode = true;
history_t hist(L"race_test");
hist.chaos_mode = !true;
// History is enumerated from most recent to least
// Every item should be the last item in some array
size_t hist_idx;
for (hist_idx = 1;; hist_idx++) {
history_item_t item = hist->item_at_index(hist_idx);
for (hist_idx = 1; ; hist_idx++) {
history_item_t item = hist.item_at_index(hist_idx);
if (item.empty()) break;
// The item must be present in one of our 'lines' arrays. If it is present, then every item
// after it is assumed to be missed.
size_t i;
for (i = 0; i < RACE_COUNT; i++) {
wcstring_list_t::iterator where =
std::find(expected_lines[i].begin(), expected_lines[i].end(), item.str());
if (where != expected_lines[i].end()) {
// Delete everything from the found location onwards.
expected_lines[i].resize(where - expected_lines[i].begin());
bool found = false;
for (wcstring_list_t &list : expected_lines) {
auto iter = std::find(list.begin(), list.end(), item.contents);
if (iter != list.end()) {
found = true;
// Break because we found it.
// Remove everything from this item on
auto cursor = list.end();
if (cursor + 1 != list.end()) {
while (--cursor != iter) {
err(L"Item dropped from history: %ls", cursor->c_str());
}
}
list.erase(iter, list.end());
break;
}
}
if (i >= RACE_COUNT) {
err(L"Line '%ls' found in history not found in some array", item.str().c_str());
if (! found) {
err(L"Line '%ls' found in history, but not found in some array", item.str().c_str());
for (wcstring_list_t &list : expected_lines) {
if (! list.empty()) {
fprintf(stderr, "\tRemaining: %ls\n", list.back().c_str());
}
}
}
}
// Every write should add at least one item.
do_test(hist_idx >= RACE_COUNT);
// hist->clear();
// +1 to account for history's 1-based offset
size_t expected_idx = RACE_COUNT * ITEM_COUNT + 1;
if (hist_idx != expected_idx) {
err(L"Expected %lu items, but instead got %lu items", expected_idx, hist_idx);
}
// See if anything is left in the arrays
for (const wcstring_list_t &list : expected_lines) {
for (const wcstring &str : list) {
err(L"Line '%ls' still left in the array", str.c_str());
}
}
hist.clear();
}
void history_tests_t::test_history_merge(void) {

View File

@ -1180,135 +1180,196 @@ void history_t::compact_new_items() {
}
}
// Given the fd of an existing history file, or -1 if none, write
// a new history file to temp_fd. Returns true on success, false
// on error
bool history_t::rewrite_to_temporary_file(int existing_fd, int dst_fd) const {
// This must be called while locked.
ASSERT_IS_LOCKED(lock);
// We are reading FROM existing_fd and writing TO dst_fd
// dst_fd must be valid; existing_fd does not need to be
assert(dst_fd >= 0);
// Make an LRU cache to save only the last N elements.
history_lru_cache_t lru(HISTORY_SAVE_MAX);
// Map in existing items (which may have changed out from underneath us, so don't trust our
// old mmap'd data).
const char *local_mmap_start = NULL;
size_t local_mmap_size = 0;
if (existing_fd >= 0 && map_fd(existing_fd, &local_mmap_start, &local_mmap_size)) {
const history_file_type_t local_mmap_type =
infer_file_type(local_mmap_start, local_mmap_size);
size_t cursor = 0;
for (;;) {
size_t offset = offset_of_next_item(local_mmap_start, local_mmap_size,
local_mmap_type, &cursor, 0);
// If we get back -1, we're done.
if (offset == (size_t)-1) break;
// Try decoding an old item.
const history_item_t old_item = decode_item(local_mmap_start + offset,
local_mmap_size - offset,
local_mmap_type);
if (old_item.empty() || deleted_items.count(old_item.str()) > 0) {
// debug(0, L"Item is deleted : %s\n", old_item.str().c_str());
continue;
}
// Add this old item.
lru.add_item(old_item);
}
munmap((void *)local_mmap_start, local_mmap_size);
}
// Insert any unwritten new items
for (auto iter = new_items.cbegin() + this->first_unwritten_new_item_index;
iter != new_items.cend();
++iter) {
lru.add_item(*iter);
}
// Stable-sort our items by timestamp
// This is because we may have read "old" items with a later timestamp than our "new" items
// This is the essential step that roughly orders items by history
lru.stable_sort([](const history_lru_item_t &item1, const history_lru_item_t &item2){
return item1.timestamp < item2.timestamp;
});
// Write them out.
bool ok = true;
history_output_buffer_t buffer(HISTORY_OUTPUT_BUFFER_SIZE);
for (const auto &key_item : lru) {
const history_lru_item_t &item = key_item.second;
append_yaml_to_buffer(item.text, item.timestamp, item.required_paths, &buffer);
if (buffer.output_size() >= HISTORY_OUTPUT_BUFFER_SIZE) {
ok = buffer.flush_to_fd(dst_fd);
if (! ok) {
debug(2, L"Error %d when writing to temporary history file", errno);
break;
}
}
}
if (ok) {
ok = buffer.flush_to_fd(dst_fd);
if (! ok) {
debug(2, L"Error %d when writing to temporary history file", errno);
}
}
return ok;
}
// Returns the fd of an opened temporary file, or -1 on failure
static int create_temporary_file(const wcstring &name_template, wcstring *out_path) {
int out_fd = -1;
for (size_t attempt = 0; attempt < 10 && out_fd == -1; attempt++) {
char *narrow_str = wcs2str(name_template.c_str());
out_fd = fish_mkstemp_cloexec(narrow_str);
if (out_fd >= 0) {
*out_path = str2wcstring(narrow_str);
}
free(narrow_str);
}
return out_fd;
}
bool history_t::save_internal_via_rewrite() {
// This must be called while locked.
ASSERT_IS_LOCKED(lock);
bool ok = false;
wcstring tmp_name_template = history_filename(name, L".XXXXXX");
if (!tmp_name_template.empty()) {
// Make an LRU cache to save only the last N elements.
history_lru_cache_t lru(HISTORY_SAVE_MAX);
// Insert old items in, from old to new. Merge them with our new items, inserting items with
// earlier timestamps first.
history_item_list_t::const_iterator new_item_iter = new_items.begin();
// Map in existing items (which may have changed out from underneath us, so don't trust our
// old mmap'd data).
const char *local_mmap_start = NULL;
size_t local_mmap_size = 0;
if (map_file(name, &local_mmap_start, &local_mmap_size, NULL)) {
const history_file_type_t local_mmap_type =
infer_file_type(local_mmap_start, local_mmap_size);
size_t cursor = 0;
for (;;) {
size_t offset = offset_of_next_item(local_mmap_start, local_mmap_size,
local_mmap_type, &cursor, 0);
// If we get back -1, we're done.
if (offset == (size_t)-1) break;
// Try decoding an old item.
const history_item_t old_item = decode_item(
local_mmap_start + offset, local_mmap_size - offset, local_mmap_type);
if (old_item.empty() || deleted_items.count(old_item.str()) > 0) {
// debug(0, L"Item is deleted : %s\n",
// old_item.str().c_str());
continue;
}
// The old item may actually be more recent than our new item, if it came from
// another session. Insert all new items at the given index with an earlier
// timestamp.
for (; new_item_iter != new_items.end(); ++new_item_iter) {
if (new_item_iter->timestamp() < old_item.timestamp()) {
// This "new item" is in fact older.
lru.add_item(*new_item_iter);
} else {
// The new item is not older.
break;
}
}
// Now add this old item.
lru.add_item(old_item);
}
munmap((void *)local_mmap_start, local_mmap_size);
}
// Insert any remaining new items.
for (; new_item_iter != new_items.end(); ++new_item_iter) {
lru.add_item(*new_item_iter);
}
signal_block();
// Try to create a CLO_EXEC temporary file, up to 10 times.
// This should almost always succeed on the first try.
int out_fd = -1;
wcstring tmp_name;
for (size_t attempt = 0; attempt < 10 && out_fd == -1; attempt++) {
char *narrow_str = wcs2str(tmp_name_template.c_str());
out_fd = fish_mkstemp_cloexec(narrow_str);
if (out_fd >= 0) {
tmp_name = str2wcstring(narrow_str);
}
free(narrow_str);
}
if (out_fd >= 0) {
// Write them out.
bool errored = false;
history_output_buffer_t buffer;
for (history_lru_cache_t::iterator iter = lru.begin(); iter != lru.end(); ++iter) {
const wcstring &text = (*iter).first;
const history_lru_item_t &item = (*iter).second;
append_yaml_to_buffer(text, item.timestamp, item.required_paths, &buffer);
if (buffer.output_size() >= HISTORY_OUTPUT_BUFFER_SIZE &&
!buffer.flush_to_fd(out_fd)) {
errored = true;
break;
}
}
if (!errored && buffer.flush_to_fd(out_fd)) {
ok = true;
}
if (!ok) {
// This message does not have high enough priority to be shown by default.
debug(2, L"Error when writing history file");
} else {
wcstring new_name = history_filename(name, wcstring());
// Ensure we maintain the ownership and permissions of the original (#2355). If the
// stat fails, we assume (hope) our default permissions are correct. This
// corresponds to e.g. someone running sudo -E as the very first command. If they
// did, it would be tricky to set the permissions correctly. (bash doesn't get this
// case right either).
struct stat sbuf;
if (wstat(new_name, &sbuf) >= 0) { // success
if (fchown(out_fd, sbuf.st_uid, sbuf.st_gid) == -1) {
debug(2, L"Error %d when changing ownership of history file", errno);
}
if (fchmod(out_fd, sbuf.st_mode) == -1) {
debug(2, L"Error %d when changing mode of history file", errno);
}
}
if (wrename(tmp_name, new_name) == -1) {
debug(2, L"Error %d when renaming history file", errno);
}
}
close(out_fd);
}
signal_unblock();
// Make sure we clear all nodes, since this doesn't happen automatically.
lru.evict_all_nodes();
// We want to rewrite the file, while holding the lock for as briefly as possible
// To do this, we speculatively write a file, and then lock and see if our original file changed
// Repeat until we succeed or give up
const wcstring target_name = history_filename(name, wcstring());
const wcstring tmp_name_template = history_filename(name, L".XXXXXX");
if (target_name.empty() || tmp_name_template.empty()) {
return false;
}
if (ok) {
// Make our temporary file
// Remember that we have to close this fd!
wcstring tmp_name;
int tmp_fd = create_temporary_file(tmp_name_template, &tmp_name);
if (tmp_fd < 0) {
return false;
}
bool done = false;
for (int i=0; i < max_save_tries && ! done; i++) {
// Open any target file, but do not lock it right away
int target_fd_before = wopen_cloexec(target_name, O_RDONLY | O_CREAT, history_file_mode);
file_id_t orig_file_id = file_id_for_fd(target_fd_before); // possibly invalid
bool wrote = this->rewrite_to_temporary_file(target_fd_before, tmp_fd);
if (target_fd_before >= 0) {
close(target_fd_before);
}
if (! wrote) {
// Failed to write, no good
break;
}
// The crux! We rewrote the history file; see if the history file changed while we
// were rewriting it. Make an effort to take the lock before checking, to avoid racing.
// If the open fails, then proceed; this may be because there is no current history
file_id_t new_file_id = kInvalidFileID;
int target_fd_after = wopen_cloexec(target_name, O_RDONLY);
if (target_fd_after >= 0) {
// critical to take the lock before checking file IDs,
// and hold it until after we are done replacing
// Also critical to check the file at the path, NOT based on our fd
// It's only OK to replace the file while holding the lock
history_file_lock(target_fd_after, LOCK_EX);
new_file_id = file_id_for_path(target_name);
}
bool can_replace_file = (new_file_id == orig_file_id || new_file_id == kInvalidFileID);
if (! can_replace_file) {
// The file has changed, so we're going to re-read it
// Truncate our tmp_fd so we can reuse it
if (ftruncate(tmp_fd, 0) == -1 || lseek(tmp_fd, 0, SEEK_SET) == -1) {
debug(2, L"Error %d when truncating temporary history file", errno);
}
}
else {
// The file is unchanged, or the new file doesn't exist or we can't read it
// We also attempted to take the lock, so we feel confident in replacing it
// Ensure we maintain the ownership and permissions of the original (#2355). If the
// stat fails, we assume (hope) our default permissions are correct. This
// corresponds to e.g. someone running sudo -E as the very first command. If they
// did, it would be tricky to set the permissions correctly. (bash doesn't get this
// case right either).
struct stat sbuf;
if (target_fd_after >= 0 && fstat(target_fd_after, &sbuf) >= 0) {
if (fchown(tmp_fd, sbuf.st_uid, sbuf.st_gid) == -1) {
debug(2, L"Error %d when changing ownership of history file", errno);
}
if (fchmod(tmp_fd, sbuf.st_mode) == -1) {
debug(2, L"Error %d when changing mode of history file", errno);
}
}
// Slide it into place
if (wrename(tmp_name, target_name) == -1) {
debug(2, L"Error %d when renaming history file", errno);
}
// We did it
done = true;
}
if (target_fd_after >= 0) {
close(target_fd_after);
}
}
// Ensure we never leave the old file around
wunlink(tmp_name);
close(tmp_fd);
if (done) {
// We've saved everything, so we have no more unsaved items.
this->first_unwritten_new_item_index = new_items.size();

View File

@ -178,6 +178,10 @@ class history_t {
// Deletes duplicates in new_items.
void compact_new_items();
// Attempts to rewrite the existing file to a target temporary file
// Returns false on error, true on success
bool rewrite_to_temporary_file(int tmp_fd, int existing_fd) const;
// Saves history by rewriting the file.
bool save_internal_via_rewrite();