From 3cd58c798399edcc7876df4358ab8e75bb7740ab Mon Sep 17 00:00:00 2001 From: Ivan Kochin Date: Mon, 20 Apr 2026 15:54:57 +1000 Subject: [PATCH] Import wiredtiger: af950ff8e62d5210b8a5422546202556908f8b14 from branch mongodb-master (#52120) GitOrigin-RevId: 239f330fac6b530f2404a69a73b296e4f98def1c --- src/third_party/wiredtiger/dist/s_string.ok | 2 + src/third_party/wiredtiger/import.data | 2 +- .../wiredtiger/src/block_cache/block_io.c | 6 +- .../wiredtiger/src/btree/bt_vrfy.c | 96 +++++- .../wiredtiger/src/include/extern.h | 4 +- .../src/include/rollback_to_stable.h | 19 ++ .../wiredtiger/src/reconcile/rec_visibility.c | 2 +- .../wiredtiger/src/rollback_to_stable/rts.c | 211 +++++++++++-- .../src/rollback_to_stable/rts_api.c | 16 +- .../src/rollback_to_stable/rts_btree_walk.c | 44 ++- .../src/rollback_to_stable/rts_history.c | 9 +- .../wiredtiger/src/schema/schema_truncate.c | 5 +- src/third_party/wiredtiger/test/evergreen.yml | 3 +- .../wiredtiger/test/suite/hook_disagg.fail | 2 +- .../suite/test_layered_fast_truncate02.py | 223 ++++++++++++++ .../suite/test_layered_fast_truncate03.py | 276 ++++++++++++++++++ .../wiredtiger/test/suite/test_truncate24.py | 7 +- .../test/suite/test_verify_disagg02.py | 96 ++++++ 18 files changed, 957 insertions(+), 66 deletions(-) create mode 100644 src/third_party/wiredtiger/test/suite/test_layered_fast_truncate02.py create mode 100644 src/third_party/wiredtiger/test/suite/test_layered_fast_truncate03.py create mode 100644 src/third_party/wiredtiger/test/suite/test_verify_disagg02.py diff --git a/src/third_party/wiredtiger/dist/s_string.ok b/src/third_party/wiredtiger/dist/s_string.ok index 466562d2129..51915b245ba 100644 --- a/src/third_party/wiredtiger/dist/s_string.ok +++ b/src/third_party/wiredtiger/dist/s_string.ok @@ -57,6 +57,7 @@ BqRv Brueckner Bsearch Btree +Btrees Buf Buildvariants Bxxx @@ -1316,6 +1317,7 @@ munmap mutex mutexes mux +mvcc myconn mytable nTemp diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 2dae0e32d6b..01d75ec706d 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger", "branch": "mongodb-master", - "commit": "c54216fdaa882d8789b01937c812ba7ac4f7b3cb" + "commit": "af950ff8e62d5210b8a5422546202556908f8b14" } diff --git a/src/third_party/wiredtiger/src/block_cache/block_io.c b/src/third_party/wiredtiger/src/block_cache/block_io.c index 1844a983332..24a5ec5c31b 100644 --- a/src/third_party/wiredtiger/src/block_cache/block_io.c +++ b/src/third_party/wiredtiger/src/block_cache/block_io.c @@ -137,7 +137,11 @@ __wt_blkcache_read(WT_SESSION_IMPL *session, WT_ITEM *buf, WT_PAGE_BLOCK_META *b ip = buf; expect_conversion = compressor != NULL || encryptor != NULL; if (expect_conversion) { - WT_RET(__wt_scr_alloc(session, 4 * 1024, &tmp)); + /* + * Pre-size the scratch buffer to the leaf page max to avoid realloc when reading typical + * pages. + */ + WT_RET(__wt_scr_alloc(session, btree->maxleafpage, &tmp)); ip = tmp; } diff --git a/src/third_party/wiredtiger/src/btree/bt_vrfy.c b/src/third_party/wiredtiger/src/btree/bt_vrfy.c index 49e1a9e6644..646bfecc527 100644 --- a/src/third_party/wiredtiger/src/btree/bt_vrfy.c +++ b/src/third_party/wiredtiger/src/btree/bt_vrfy.c @@ -56,6 +56,7 @@ static int __verify_row_int_key_order( WT_SESSION_IMPL *, WT_PAGE *, WT_REF *, uint32_t, WT_VSTUFF *); static int __verify_row_leaf_key_order(WT_SESSION_IMPL *, WT_REF *, WT_VSTUFF *); static int __verify_tree(WT_SESSION_IMPL *, WT_REF *, WT_CELL_UNPACK_ADDR *, WT_VSTUFF *); +static int __verify_unique_btree_ids(WT_SESSION_IMPL *); /* * __verify_config -- @@ -214,6 +215,81 @@ __verify_disagg_accumulate_size( return (0); } +typedef struct { + uint32_t id; + char *uri; +} WT_ID_URI_PAIR; + +/* + * __id_uri_pair_cmp -- + * Comparator for sorting btree ID entries by ID. + */ +static int WT_CDECL +__id_uri_pair_cmp(const void *a, const void *b) +{ + uint32_t ia, ib; + + ia = ((const WT_ID_URI_PAIR *)a)->id; + ib = ((const WT_ID_URI_PAIR *)b)->id; + return (ia < ib ? -1 : (ia == ib ? 0 : 1)); +} + +/* + * __verify_unique_btree_ids -- + * Verify that no two stable constituent files in the local metadata share the same btree ID. + * Only called for .wt_stable files, where the verify session's exclusive lock is on the stable + * file not the metadata file so a shared metadata cursor can be opened directly. + */ +static int +__verify_unique_btree_ids(WT_SESSION_IMPL *session) +{ + WT_CONFIG_ITEM id_val; + WT_CURSOR *cursor; + WT_DECL_RET; + WT_ID_URI_PAIR *pairs; + size_t allocated, count, i; + const char *key, *value; + + cursor = NULL; + pairs = NULL; + allocated = count = 0; + + WT_ERR(__wt_metadata_cursor(session, &cursor)); + + while ((ret = cursor->next(cursor)) == 0) { + WT_ERR(cursor->get_key(cursor, &key)); + if (!WT_PREFIX_MATCH(key, "file:") || !WT_SUFFIX_MATCH(key, ".wt_stable")) + continue; + WT_ERR(cursor->get_value(cursor, &value)); + WT_ERR(__wt_config_getones(session, value, "id", &id_val)); + WT_ERR(__wt_realloc_def(session, &allocated, count + 1, &pairs)); + pairs[count].id = (uint32_t)id_val.val; + WT_ERR(__wt_strdup(session, key, &pairs[count].uri)); + ++count; + } + WT_ERR_NOTFOUND_OK(ret, false); + + if (count > 1) { + __wt_qsort(pairs, count, sizeof(WT_ID_URI_PAIR), __id_uri_pair_cmp); + for (i = 0; i < count - 1; ++i) { + if (pairs[i].id != pairs[i + 1].id) + continue; + __wt_verbose_error(session, WT_VERB_VERIFY, + "metadata corruption: btree ID %" PRIu32 " is shared by %s and %s", pairs[i].id, + pairs[i].uri, pairs[i + 1].uri); + ret = WT_ERROR; + } + } + +err: + for (i = 0; i < count; ++i) + __wt_free(session, pairs[i].uri); + __wt_free(session, pairs); + if (cursor != NULL) + WT_TRET(__wt_metadata_cursor_release(session, &cursor)); + return (ret); +} + /* * __wt_verify -- * Verify a file. @@ -263,6 +339,14 @@ __wt_verify(WT_SESSION_IMPL *session, const char *cfg[]) if (quit) goto done; + /* + * Check that no two stable constituent files share the same btree ID. Only run for stable files + * the verify session's exclusive lock is on the stable file, not the metadata file, so a shared + * metadata cursor can be opened directly on the verify session. + */ + if (WT_SUFFIX_MATCH(name, ".wt_stable")) + WT_ERR(__verify_unique_btree_ids(session)); + /* * Get a list of the checkpoints for this file. Empty objects and ingest tables have no * checkpoints, in which case there's no work to do. @@ -370,16 +454,8 @@ __wt_verify(WT_SESSION_IMPL *session, const char *cfg[]) if (!skip_hs) { __wt_verbose(session, WT_VERB_VERIFY, "%s: verify against history store", name); -#ifndef WT_STANDALONE_BUILD - /* FIXME-WT-16557: Re-enable HS validation at all times. */ - if (__wt_conn_is_disagg(session)) - __wt_verbose(session, WT_VERB_VERIFY, - "%s: skipping verify against history store in disagg", name); - else - WT_TRET(__wt_hs_verify_one(session, btree->id)); -#else - WT_TRET(__wt_hs_verify_one(session, btree->id)); -#endif + WT_TRET_MSG(session, __wt_hs_verify_one(session, btree->id), + "history store verification failed"); } /* * We cannot error out here. If we got an error verifying the history store, we need diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h index 9eecfaa3b04..49310e7d397 100644 --- a/src/third_party/wiredtiger/src/include/extern.h +++ b/src/third_party/wiredtiger/src/include/extern.h @@ -1926,8 +1926,8 @@ extern void __wti_read_row_time_window( WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW *rip, WT_TIME_WINDOW *tw); extern void __wti_ref_addr_safe_free(WT_SESSION_IMPL *session, void *p, size_t len); extern void __wti_rts_pop_work(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT **entryp); -extern void __wti_rts_progress_msg(WT_SESSION_IMPL *session, WT_TIMER *rollback_start, - uint64_t rollback_count, uint64_t max_count, uint64_t *rollback_msg_count, bool walk); +extern void __wti_rts_progress_msg_walk(WT_SESSION_IMPL *session, uint64_t btree_start_clock, + uint64_t *last_report_clock, double npos, uint64_t btree_pages); extern void __wti_rts_work_free(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT *entry); extern void __wti_schema_destroy_colgroup(WT_SESSION_IMPL *session, WT_COLGROUP **colgroupp); extern void __wti_tiered_get_remove_shared(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp); diff --git a/src/third_party/wiredtiger/src/include/rollback_to_stable.h b/src/third_party/wiredtiger/src/include/rollback_to_stable.h index 96ba14e882a..c0f0287bbd9 100644 --- a/src/third_party/wiredtiger/src/include/rollback_to_stable.h +++ b/src/third_party/wiredtiger/src/include/rollback_to_stable.h @@ -84,6 +84,14 @@ WT_STAT_CONN_DSRC_INCR(session, stat##_dryrun); \ } while (0) +/* RTS phase identifiers for progress reporting. */ +#define WT_RTS_PHASE_INACTIVE 0 +#define WT_RTS_PHASE_METADATA_COUNT 1 +#define WT_RTS_PHASE_BTREE_APPLY 2 +#define WT_RTS_PHASE_QUEUE_DRAIN 3 +#define WT_RTS_PHASE_HS_FINAL_PASS 4 +#define WT_RTS_PHASE_COMPLETE 5 + #define WT_RTS_MAX_WORKERS 10 /* * WT_RTS_WORK_UNIT -- @@ -118,6 +126,17 @@ struct __wt_rollback_to_stable { /* Configuration. */ bool dryrun; + + /* RTS progress tracking. */ + struct { + WT_TIMER start_timer; /* Overall RTS start time. */ + uint64_t total_btrees; /* From metadata count pass (set once). */ + wt_shared uint32_t phase; /* Current RTS phase (WT_RTS_PHASE_*). */ + wt_shared uint64_t overall_last_report; /* CAS-guarded clock of last overall report. */ + wt_shared uint64_t btrees_processed; /* Btrees fully processed (atomic). */ + wt_shared uint64_t btrees_skipped; /* Btrees skipped, no work needed (atomic). */ + wt_shared uint64_t pages_walked; /* Pages walked across all btrees (atomic). */ + } progress; }; /* diff --git a/src/third_party/wiredtiger/src/reconcile/rec_visibility.c b/src/third_party/wiredtiger/src/reconcile/rec_visibility.c index 8425c7724ac..b5af3a25c49 100644 --- a/src/third_party/wiredtiger/src/reconcile/rec_visibility.c +++ b/src/third_party/wiredtiger/src/reconcile/rec_visibility.c @@ -1304,7 +1304,7 @@ __rec_fill_tw_from_upd_select(WT_SESSION_IMPL *session, WT_PAGE *page, WT_CELL_U if (!F_ISSET(S2C(session), WT_CONN_PRESERVE_PREPARED)) continue; - if (upd->prepared_id == WT_PREPARED_ID_NONE) { + if (upd->next->prepared_id == WT_PREPARED_ID_NONE) { write_start_prepare = false; continue; } diff --git a/src/third_party/wiredtiger/src/rollback_to_stable/rts.c b/src/third_party/wiredtiger/src/rollback_to_stable/rts.c index 75c726d1482..6665394c0a0 100644 --- a/src/third_party/wiredtiger/src/rollback_to_stable/rts.c +++ b/src/third_party/wiredtiger/src/rollback_to_stable/rts.c @@ -9,32 +9,161 @@ #include "wt_internal.h" /* - * __wti_rts_progress_msg -- - * Log a verbose message about the progress of the current rollback to stable. + * __rts_phase_string -- + * Return a human-readable string for an RTS phase. + */ +static const char * +__rts_phase_string(uint32_t phase) +{ + switch (phase) { + case WT_RTS_PHASE_INACTIVE: + return ("INACTIVE"); + case WT_RTS_PHASE_METADATA_COUNT: + return ("METADATA_COUNT"); + case WT_RTS_PHASE_BTREE_APPLY: + return ("BTREE_APPLY"); + case WT_RTS_PHASE_QUEUE_DRAIN: + return ("QUEUE_DRAIN"); + case WT_RTS_PHASE_HS_FINAL_PASS: + return ("HS_FINAL_PASS"); + case WT_RTS_PHASE_COMPLETE: + return ("COMPLETE"); + default: + return ("UNKNOWN"); + } +} + +/* + * __rts_emit_overall_progress -- + * Emit an overall RTS progress message with counters, percentage, and throughput. + */ +static void +__rts_emit_overall_progress(WT_SESSION_IMPL *session) +{ + WT_ROLLBACK_TO_STABLE *rts; + uint64_t btrees_completed, btrees_processed, btrees_skipped, elapsed_ms, pages_per_sec, + pages_walked, pct, total_btrees; + uint32_t phase; + + rts = S2C(session)->rts; + + btrees_processed = __wt_atomic_load_uint64_relaxed(&rts->progress.btrees_processed); + btrees_skipped = __wt_atomic_load_uint64_relaxed(&rts->progress.btrees_skipped); + pages_walked = __wt_atomic_load_uint64_relaxed(&rts->progress.pages_walked); + phase = __wt_atomic_load_uint32_relaxed(&rts->progress.phase); + total_btrees = rts->progress.total_btrees; + + btrees_completed = btrees_processed + btrees_skipped; + __wt_timer_evaluate_ms(session, &rts->progress.start_timer, &elapsed_ms); + pct = total_btrees > 0 ? (100 * btrees_completed / total_btrees) : 0; + pages_per_sec = elapsed_ms > 0 ? (pages_walked * WT_THOUSAND / elapsed_ms) : 0; + + __wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, + "Rollback to stable [%s] overall: running for %" PRIu64 " seconds, %" PRIu64 " of %" PRIu64 + " btrees done (%" PRIu64 "%%), %" PRIu64 " processed, %" PRIu64 " skipped, %" PRIu64 + " total pages walked (%" PRIu64 " pages/sec)", + __rts_phase_string(phase), elapsed_ms / WT_THOUSAND, btrees_completed, total_btrees, pct, + btrees_processed, btrees_skipped, pages_walked, pages_per_sec); + + /* Notify the application via the progress callback. */ + WT_IGNORE_RET(__wt_progress(session, "rollback to stable", btrees_completed)); +} + +/* + * __rts_progress_msg -- + * Log a verbose message about the overall progress of rollback to stable. Called from the + * metadata walk loop in __wti_rts_btree_apply_all. + */ +static void +__rts_progress_msg(WT_SESSION_IMPL *session) +{ + WT_ROLLBACK_TO_STABLE *rts; + uint64_t clock_now, overall_last; + + rts = S2C(session)->rts; + clock_now = __wt_clock(session); + overall_last = __wt_atomic_load_uint64_relaxed(&rts->progress.overall_last_report); + if (__wt_clock_to_nsec(clock_now, overall_last) >= + (uint64_t)WT_BILLION * WT_PROGRESS_MSG_PERIOD && + __wt_atomic_cas_uint64(&rts->progress.overall_last_report, overall_last, clock_now)) + __rts_emit_overall_progress(session); +} + +/* + * __wti_rts_progress_msg_walk -- + * Log a verbose message about the progress of a per-btree page walk during rollback to stable. + * Uses the normalized position (npos) of the current page to compute a percentage within the + * tree and estimate time remaining for this btree. */ void -__wti_rts_progress_msg(WT_SESSION_IMPL *session, WT_TIMER *rollback_start, uint64_t rollback_count, - uint64_t max_count, uint64_t *rollback_msg_count, bool walk) +__wti_rts_progress_msg_walk(WT_SESSION_IMPL *session, uint64_t btree_start_clock, + uint64_t *last_report_clock, double npos, uint64_t btree_pages) { - uint64_t time_diff_ms; + WT_ROLLBACK_TO_STABLE *rts; + uint64_t btree_eta_sec, btree_pct, btree_pages_per_sec, clock_now, elapsed_ns, elapsed_sec, + overall_last; + uint32_t phase; - /* Time since the rollback started. */ - __wt_timer_evaluate_ms(session, rollback_start, &time_diff_ms); + rts = S2C(session)->rts; - if ((time_diff_ms / (WT_THOUSAND * WT_PROGRESS_MSG_PERIOD)) > *rollback_msg_count) { - if (walk) - __wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, - "Rollback to stable has been performing on %s for %" PRIu64 - " milliseconds. For more detailed logging, enable WT_VERB_RTS ", - session->dhandle->name, time_diff_ms); - else - __wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, - "Rollback to stable has been running for %" PRIu64 - " milliseconds and has inspected %" PRIu64 " files of %" PRIu64 - ". For more detailed logging, enable WT_VERB_RTS", - time_diff_ms, rollback_count, max_count); - *rollback_msg_count = time_diff_ms / (WT_THOUSAND * WT_PROGRESS_MSG_PERIOD); + /* Use total btree walk time (not interval) for pages/sec and ETA calculations. */ + clock_now = __wt_clock(session); + elapsed_ns = __wt_clock_to_nsec(clock_now, btree_start_clock); + elapsed_sec = elapsed_ns / WT_BILLION; + + phase = __wt_atomic_load_uint32_relaxed(&rts->progress.phase); + btree_pct = (uint64_t)(npos * 100); + btree_pages_per_sec = elapsed_sec > 0 ? (btree_pages / elapsed_sec) : 0; + + /* Estimate time remaining for this btree based on npos progression. */ + btree_eta_sec = 0; + if (npos > 0.05 && npos < 1.0) + btree_eta_sec = (uint64_t)((1.0 - npos) * (double)elapsed_sec / npos); + + if (btree_eta_sec > 0) + __wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, + "Rollback to stable [%s] btree walk on %s for %" PRIu64 " seconds, %" PRIu64 + "%% through btree, %" PRIu64 " pages walked (%" PRIu64 + " pages/sec)" + ", btree ETA %" PRIu64 " seconds", + __rts_phase_string(phase), session->dhandle->name, elapsed_sec, btree_pct, btree_pages, + btree_pages_per_sec, btree_eta_sec); + else + __wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, + "Rollback to stable [%s] btree walk on %s for %" PRIu64 " seconds, %" PRIu64 + "%% through btree, %" PRIu64 " pages walked (%" PRIu64 " pages/sec)", + __rts_phase_string(phase), session->dhandle->name, elapsed_sec, btree_pct, btree_pages, + btree_pages_per_sec); + + /* + * Emit an overall progress line. Use CAS on overall_last_report so that exactly one thread wins + * per reporting period, regardless of which thread it is. Skip if progress was not initialized + * (e.g., single-file RTS via rollback_to_stable_one). + */ + if (rts->progress.total_btrees > 0) { + overall_last = __wt_atomic_load_uint64_relaxed(&rts->progress.overall_last_report); + if (__wt_clock_to_nsec(clock_now, overall_last) >= + (uint64_t)WT_BILLION * WT_PROGRESS_MSG_PERIOD && + __wt_atomic_cas_uint64(&rts->progress.overall_last_report, overall_last, clock_now)) + __rts_emit_overall_progress(session); } + + *last_report_clock = clock_now; +} + +/* + * __rts_progress_init -- + * Initialize the RTS progress tracking fields. + */ +static void +__rts_progress_init(WT_SESSION_IMPL *session) +{ + WT_ROLLBACK_TO_STABLE *rts; + + rts = S2C(session)->rts; + WT_CLEAR(rts->progress); + __wt_timer_start(session, &rts->progress.start_timer); + __wt_atomic_store_uint64_relaxed(&rts->progress.overall_last_report, __wt_clock(session)); } /* @@ -172,15 +301,17 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time WT_CURSOR *cursor; WT_DECL_RET; WT_RTS_WORK_UNIT *entry; - WT_TIMER timer; - uint64_t max_count, rollback_count, rollback_msg_count; + uint64_t max_count; char ts_string[WT_TS_INT_STRING_SIZE]; - const char *config, *uri; + const char *config, *saved_session_name, *uri; bool have_cursor, rts_threads_started; - __wt_timer_start(session, &timer); - max_count = rollback_count = 0; - rollback_msg_count = 0; + __rts_progress_init(session); + __wt_atomic_store_uint32_relaxed( + &S2C(session)->rts->progress.phase, WT_RTS_PHASE_METADATA_COUNT); + + max_count = 0; + saved_session_name = session->name; rts_threads_started = false; /* @@ -191,13 +322,19 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time have_cursor = true; while ((ret = cursor->next(cursor)) == 0) { WT_ERR(cursor->get_key(cursor, &uri)); - if (WT_BTREE_PREFIX(uri)) + if (WT_BTREE_PREFIX(uri) && !WT_IS_URI_HS(uri) && !WT_IS_URI_METADATA(uri)) ++max_count; } WT_ERR_NOTFOUND_OK(ret, false); WT_ERR(__wt_metadata_cursor_release(session, &cursor)); have_cursor = false; + S2C(session)->rts->progress.total_btrees = max_count; + __wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, + "Rollback to stable found %" PRIu64 " btrees to process", max_count); + + __wt_atomic_store_uint32_relaxed(&S2C(session)->rts->progress.phase, WT_RTS_PHASE_BTREE_APPLY); + WT_ERR(__rts_thread_create(session)); rts_threads_started = true; @@ -207,10 +344,7 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time /* Log a progress message. */ WT_ERR(cursor->get_key(cursor, &uri)); WT_ERR(cursor->get_value(cursor, &config)); - if (WT_BTREE_PREFIX(uri)) - ++rollback_count; - __wti_rts_progress_msg( - session, &timer, rollback_count, max_count, &rollback_msg_count, false); + __rts_progress_msg(session); F_SET(session, WT_SESSION_QUIET_CORRUPT_FILE); ret = __wti_rts_btree_walk_btree_apply(session, uri, config, rollback_timestamp); @@ -226,6 +360,13 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time * workers alone to complete the task. */ if (S2C(session)->rts->threads_num != 0) { + __wt_atomic_store_uint32_relaxed( + &S2C(session)->rts->progress.phase, WT_RTS_PHASE_QUEUE_DRAIN); + __wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, "%s", + "Rollback to stable finished metadata walk, draining worker queue"); + + /* Rename session while joining workers so log messages identify us as a worker. */ + session->name = "rts-main-wk"; while (!TAILQ_EMPTY(&S2C(session)->rts->rtsqh)) { __wti_rts_pop_work(session, &entry); if (entry == NULL) @@ -234,6 +375,7 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time __wti_rts_work_free(session, entry); WT_ERR(ret); } + session->name = saved_session_name; } WT_ERR(__rts_thread_destroy(session)); @@ -249,6 +391,10 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time * doesn't exist. */ if (!F_ISSET(S2C(session), WT_CONN_IN_MEMORY)) { + __wt_atomic_store_uint32_relaxed( + &S2C(session)->rts->progress.phase, WT_RTS_PHASE_HS_FINAL_PASS); + __wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, "%s", + "Rollback to stable beginning history store final pass"); __wt_verbose_level_multi(session, WT_VERB_RECOVERY_RTS(session), WT_VERBOSE_DEBUG_3, WT_RTS_VERB_TAG_HS_TREE_FINAL_PASS "performing final pass of the history store to remove unstable entries with " @@ -256,7 +402,10 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time __wt_timestamp_to_string(rollback_timestamp, ts_string)); WT_ERR(__wti_rts_history_final_pass(session, rollback_timestamp)); } + + __wt_atomic_store_uint32_relaxed(&S2C(session)->rts->progress.phase, WT_RTS_PHASE_COMPLETE); err: + session->name = saved_session_name; if (have_cursor) WT_TRET(__wt_metadata_cursor_release(session, &cursor)); if (rts_threads_started) diff --git a/src/third_party/wiredtiger/src/rollback_to_stable/rts_api.c b/src/third_party/wiredtiger/src/rollback_to_stable/rts_api.c index 68a5dfedfd6..32fb94cfefc 100644 --- a/src/third_party/wiredtiger/src/rollback_to_stable/rts_api.c +++ b/src/third_party/wiredtiger/src/rollback_to_stable/rts_api.c @@ -262,6 +262,7 @@ __rollback_to_stable_finalize(WT_ROLLBACK_TO_STABLE *rts) { rts->dryrun = false; rts->threads_num = 0; + WT_CLEAR(rts->progress); } /* @@ -315,9 +316,20 @@ __rollback_to_stable(WT_SESSION_IMPL *session, const char *cfg[], bool no_ckpt) /* Time since the RTS started. */ __wt_timer_evaluate_ms(session, &timer, &time_diff_ms); + + /* Log a summary of RTS work before the final end message. */ + __wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, + "Rollback to stable summary: total_btrees=%" PRIu64 ", processed=%" PRIu64 + ", skipped=%" PRIu64 ", pages_walked=%" PRIu64 ", elapsed_ms=%" PRIu64, + S2C(session)->rts->progress.total_btrees, + __wt_atomic_load_uint64_relaxed(&S2C(session)->rts->progress.btrees_processed), + __wt_atomic_load_uint64_relaxed(&S2C(session)->rts->progress.btrees_skipped), + __wt_atomic_load_uint64_relaxed(&S2C(session)->rts->progress.pages_walked), time_diff_ms); + __wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session), - WT_RTS_VERB_TAG_END "finished rollback to stable%s and has ran for %" PRIu64 " milliseconds", - dryrun ? " dryrun" : "", time_diff_ms); + WT_RTS_VERB_TAG_END "finished rollback to stable%s with %" PRIu32 + " worker threads, ran for %" PRIu64 " milliseconds", + dryrun ? " dryrun" : "", threads, time_diff_ms); WT_STAT_CONN_SET(session, txn_rollback_to_stable_running, 0); /* Reset the RTS configuration to default. */ diff --git a/src/third_party/wiredtiger/src/rollback_to_stable/rts_btree_walk.c b/src/third_party/wiredtiger/src/rollback_to_stable/rts_btree_walk.c index 04b61302043..ac7ca6a3351 100644 --- a/src/third_party/wiredtiger/src/rollback_to_stable/rts_btree_walk.c +++ b/src/third_party/wiredtiger/src/rollback_to_stable/rts_btree_walk.c @@ -120,20 +120,41 @@ __rts_btree_walk(WT_SESSION_IMPL *session, wt_timestamp_t rollback_timestamp) { WT_DECL_RET; WT_REF *ref; - WT_TIMER timer; - uint64_t msg_count; + double max_npos, npos; + uint64_t btree_pages, btree_start_clock, clock_now, elapsed_ns, last_report_clock; uint32_t flags; - __wt_timer_start(session, &timer); + btree_start_clock = __wt_clock(session); + last_report_clock = btree_start_clock; flags = WT_READ_NO_EVICT | WT_READ_VISIBLE_ALL | WT_READ_WONT_NEED | WT_READ_SEE_DELETED; - msg_count = 0; + btree_pages = 0; + max_npos = 0.0; /* Walk the tree, marking commits aborted where appropriate. */ ref = NULL; while ((ret = __wt_tree_walk_custom_skip( session, &ref, __rts_btree_walk_page_skip, &rollback_timestamp, flags)) == 0 && ref != NULL) { - __wti_rts_progress_msg(session, &timer, 0, 0, &msg_count, true); + ++btree_pages; + (void)__wt_atomic_add_uint64_relaxed(&S2C(session)->rts->progress.pages_walked, 1); + + /* + * Use the cheap rdtsc-based clock to check if a progress message is due. Only compute npos + * (which walks parent indexes) and emit the message when the period has elapsed. + */ + clock_now = __wt_clock(session); + elapsed_ns = __wt_clock_to_nsec(clock_now, last_report_clock); + if (elapsed_ns >= (uint64_t)WT_BILLION * WT_PROGRESS_MSG_PERIOD) { + npos = __wt_page_npos(session, ref, 0.5, NULL, NULL, 0); + /* + * npos can fluctuate due to unbalanced trees, so track the maximum seen so far to get a + * monotonically increasing progress indicator. + */ + if (npos > max_npos) + max_npos = npos; + __wti_rts_progress_msg_walk( + session, btree_start_clock, &last_report_clock, max_npos, btree_pages); + } if (F_ISSET(ref, WT_REF_FLAG_LEAF)) WT_ERR(__wti_rts_btree_abort_updates(session, ref, rollback_timestamp)); @@ -256,8 +277,11 @@ __rts_btree(WT_SESSION_IMPL *session, const char *uri, wt_timestamp_t rollback_t WT_RTS_VERB_TAG_SKIP_DAMAGE "%s: skipped performing rollback to stable because the file %s", uri, ret == ENOENT ? "does not exist" : "is corrupted."); + WT_STAT_CONN_INCR(session, txn_rts_btrees_skipped); + (void)__wt_atomic_add_uint64_relaxed(&S2C(session)->rts->progress.btrees_skipped, 1); ret = 0; - } + } else if (ret == 0) + (void)__wt_atomic_add_uint64_relaxed(&S2C(session)->rts->progress.btrees_processed, 1); return (ret); } @@ -365,6 +389,8 @@ __wti_rts_btree_walk_btree_apply( WT_RTS_VERB_TAG_FILE_SKIP "skipping rollback to stable on file=%s because has never been checkpointed", uri); + WT_STAT_CONN_INCR(session, txn_rts_btrees_skipped); + (void)__wt_atomic_add_uint64_relaxed(&S2C(session)->rts->progress.btrees_skipped, 1); return (0); } @@ -411,8 +437,10 @@ __wti_rts_btree_walk_btree_apply( prepared_updates ? "true" : "false", rollback_txnid, S2C(session)->recovery_ckpt_snap_min, has_txn_updates_gt_than_ckpt_snap ? "true" : "false"); - if (file_skipped) - WT_STAT_CONN_DSRC_INCR(session, txn_rts_btrees_skipped); + if (file_skipped) { + WT_STAT_CONN_INCR(session, txn_rts_btrees_skipped); + (void)__wt_atomic_add_uint64_relaxed(&S2C(session)->rts->progress.btrees_skipped, 1); + } /* * Truncate history store entries for the non-timestamped table. diff --git a/src/third_party/wiredtiger/src/rollback_to_stable/rts_history.c b/src/third_party/wiredtiger/src/rollback_to_stable/rts_history.c index b66920c32ad..1e554ea2943 100644 --- a/src/third_party/wiredtiger/src/rollback_to_stable/rts_history.c +++ b/src/third_party/wiredtiger/src/rollback_to_stable/rts_history.c @@ -174,16 +174,23 @@ __wti_rts_history_final_pass(WT_SESSION_IMPL *session, wt_timestamp_t rollback_t */ if ((S2BT(session)->modified || max_durable_ts > rollback_timestamp) && rollback_timestamp != WT_TS_NONE) { + __wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, + "Rollback to stable history store final pass: rolling back with durable_timestamp=%s", + __wt_timestamp_to_string(max_durable_ts, ts_string[0])); __wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session), WT_RTS_VERB_TAG_HS_TREE_ROLLBACK "tree rolled back with durable_timestamp=%s", __wt_timestamp_to_string(max_durable_ts, ts_string[0])); WT_TRET(__wti_rts_btree_walk_btree(session, rollback_timestamp)); - } else + } else { + __wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, + "Rollback to stable history store final pass: skipped with durable_timestamp=%s", + __wt_timestamp_to_string(max_durable_ts, ts_string[0])); __wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session), WT_RTS_VERB_TAG_HS_TREE_SKIP "tree skipped with durable_timestamp=%s and stable_timestamp=%s", __wt_timestamp_to_string(max_durable_ts, ts_string[0]), __wt_timestamp_to_string(rollback_timestamp, ts_string[1])); + } /* * Truncate history store entries from the partial backup remove list. The list holds all of the diff --git a/src/third_party/wiredtiger/src/schema/schema_truncate.c b/src/third_party/wiredtiger/src/schema/schema_truncate.c index d054014bf0a..58707ee6436 100644 --- a/src/third_party/wiredtiger/src/schema/schema_truncate.c +++ b/src/third_party/wiredtiger/src/schema/schema_truncate.c @@ -68,7 +68,6 @@ __truncate_layered(WT_SESSION_IMPL *session, const char *uri) WT_DECL_RET; start = NULL; - WT_RET(__wt_session_get_dhandle(session, uri, NULL, NULL, WT_DHANDLE_EXCLUSIVE)); WT_STAT_DSRC_INCR(session, cursor_truncate); @@ -79,14 +78,12 @@ __truncate_layered(WT_SESSION_IMPL *session, const char *uri) ret = 0; goto done; } - WT_WITHOUT_DHANDLE(session, ret = __wt_session_range_truncate(session, NULL, start, NULL)); - WT_ERR(ret); + WT_ERR(__wt_session_range_truncate(session, NULL, start, NULL)); done: err: if (start != NULL) WT_TRET(start->close(start)); - WT_TRET(__wt_session_release_dhandle(session)); return (ret); } diff --git a/src/third_party/wiredtiger/test/evergreen.yml b/src/third_party/wiredtiger/test/evergreen.yml index da501a046c6..4e448f38fd5 100644 --- a/src/third_party/wiredtiger/test/evergreen.yml +++ b/src/third_party/wiredtiger/test/evergreen.yml @@ -330,7 +330,8 @@ functions: ${IMPORT_S3_SDK|} \ ${SPINLOCK_TYPE|} \ ${ENABLE_COLORIZE_OUTPUT|-DENABLE_COLORIZE_OUTPUT=0} \ - ${CC_OPTIMIZE_LEVEL|}" + ${CC_OPTIMIZE_LEVEL|} \ + ${WT_DISAGG_FAST_TRUNCATE_BUILD|}" # The RHEL PPC platform does not have ZSTD. Strip it out. if [ "${build_variant|}" = "rhel8-ppc" ] && [[ "$DEFINED_EVERGREEN_CONFIG_FLAGS" =~ (\-DHAVE_BUILTIN_EXTENSION_ZSTD=1) ]]; then diff --git a/src/third_party/wiredtiger/test/suite/hook_disagg.fail b/src/third_party/wiredtiger/test/suite/hook_disagg.fail index c900bc07cf0..905caa3b2e3 100644 --- a/src/third_party/wiredtiger/test/suite/hook_disagg.fail +++ b/src/third_party/wiredtiger/test/suite/hook_disagg.fail @@ -38,7 +38,7 @@ test_stat_log02.py test_sweep05 test_timestamp26.py # FIXME: WT-16182 test_truncate01.py # FIXME: WT-15474 -test_truncate24.py # FIXME: WT-15473 + test_util01.py test_util02.py test_util04.py diff --git a/src/third_party/wiredtiger/test/suite/test_layered_fast_truncate02.py b/src/third_party/wiredtiger/test/suite/test_layered_fast_truncate02.py new file mode 100644 index 00000000000..d31fe1f118c --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_layered_fast_truncate02.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +# test_layered_fast_truncate02.py +# Validates visibility and cursor behavior when a follower picks up a +# checkpoint containing fast-truncated pages. + +import wiredtiger, wttest +from helper_disagg import disagg_test_class, gen_disagg_storages +from wtscenario import make_scenarios + +@disagg_test_class +class test_layered_fast_truncate02(wttest.WiredTigerTestCase): + + uri = 'layered:test_layered_fast_truncate02' + nrows = 5000 + value = 'a' * 500 + trunc_start = 1001 + trunc_stop = 4000 + trunc_mid = (trunc_start + trunc_stop) // 2 + + conn_config = 'cache_size=50MB,statistics=(all),disaggregated=(role="leader")' + disagg_storages = gen_disagg_storages('test_layered_fast_truncate02', disagg_only=True) + scenarios = make_scenarios(disagg_storages) + + def setUp(self): + if wiredtiger.disagg_fast_truncate_build() == 0: + self.skipTest("fast truncate support is not enabled") + super().setUp() + + def leader_checkpoint(self, ts): + self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(ts) + + ',oldest_timestamp=' + self.timestamp_str(1)) + self.session.checkpoint() + + def setup_leader(self): + self.conn.set_timestamp('oldest_timestamp=' + self.timestamp_str(1)) + self.session.create(self.uri, 'key_format=i,value_format=S') + cur = self.session.open_cursor(self.uri) + for i in range(1, self.nrows + 1): + self.session.begin_transaction() + cur[i] = self.value + self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(10)) + cur.close() + self.leader_checkpoint(10) + + # Evict all pages before truncating so the leader uses page-level fast-delete markers. + evict_cur = self.session.open_cursor(self.uri, None, 'debug=(release_evict)') + self.session.begin_transaction() + for i in range(1, self.nrows + 1): + evict_cur.set_key(i) + evict_cur.search() + evict_cur.reset() + evict_cur.close() + self.session.rollback_transaction() + + def truncate_and_checkpoint(self, trunc_start, trunc_stop, ts): + # Fast-truncate rows [trunc_start, trunc_stop] on the leader and checkpoint. + c_start = self.session.open_cursor(self.uri) + c_start.set_key(trunc_start) + c_stop = self.session.open_cursor(self.uri) + c_stop.set_key(trunc_stop) + self.session.begin_transaction() + self.session.truncate(None, c_start, c_stop, None) + self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(ts)) + c_start.close() + c_stop.close() + self.leader_checkpoint(ts) + + def open_follower(self): + conn = self.wiredtiger_open( + 'follower', + self.extensionsConfig() + ',create,cache_size=50MB,statistics=(all),disaggregated=(role="follower")') + sess = conn.open_session('') + sess.create(self.uri, 'key_format=i,value_format=S') + self.disagg_advance_checkpoint(conn, self.conn) + return conn, sess + + def search_at(self, sess, key, ts): + cur = sess.open_cursor(self.uri) + txn_cfg = ('read_timestamp=' + self.timestamp_str(ts)) + sess.begin_transaction(txn_cfg) + cur.set_key(key) + ret = cur.search() + val = cur.get_value() if ret == 0 else None + sess.rollback_transaction() + cur.close() + return ret, val + + def test_visibility(self): + # At ts=20 (equal to truncation at ts=20): truncated keys return WT_NOTFOUND, boundary and + # exterior keys return their values. At ts=15 (before truncation): all keys are visible. + self.setup_leader() + self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20) + conn, sess = self.open_follower() + + # Truncation is visible: deleted keys are gone, surrounding keys survive. + for key in [self.trunc_start, self.trunc_mid, self.trunc_stop]: + ret, _ = self.search_at(sess, key, 20) + self.assertEqual(ret, wiredtiger.WT_NOTFOUND) + + for key in [1, self.trunc_start - 1, self.trunc_stop + 1, self.nrows]: + ret, val = self.search_at(sess, key, 20) + self.assertEqual(ret, 0) + self.assertEqual(val, self.value) + + # Truncation is not visible: all keys exist at a timestamp before the truncation. + for key in [self.trunc_start, self.trunc_mid, self.trunc_stop]: + ret, val = self.search_at(sess, key, 15) + self.assertEqual(ret, 0) + self.assertEqual(val, self.value) + + sess.close() + conn.close() + + def test_pre_truncation_read_sees_all_rows(self): + # Reading at a timestamp before the truncation must still find all rows, including those + # later deleted. Verifies mvcc correctness across the follower checkpoint boundary. + self.setup_leader() + self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20) + conn, sess = self.open_follower() + + for key in [self.trunc_start, self.trunc_mid, self.trunc_stop]: + ret, val = self.search_at(sess, key, 10) + self.assertEqual(ret, 0) + self.assertEqual(val, self.value) + + cur = sess.open_cursor(self.uri) + sess.begin_transaction('read_timestamp=' + self.timestamp_str(10)) + count = 0 + while cur.next() == 0: + count += 1 + sess.rollback_transaction() + cur.close() + self.assertEqual(count, self.nrows) + + sess.close() + conn.close() + + def test_cursor_scanning(self): + # Forward and backward scans must skip the entire truncated range without visiting any + # deleted key. search_near on a deleted key must land outside the range. + self.setup_leader() + self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20) + conn, sess = self.open_follower() + + expected = self.nrows - (self.trunc_stop - self.trunc_start + 1) + trunc_range = range(self.trunc_start, self.trunc_stop + 1) + + # Forward scan: verify no deleted key is visited and the gap is jumped correctly. + cur = sess.open_cursor(self.uri) + sess.begin_transaction('read_timestamp=' + self.timestamp_str(25)) + count, prev_key, first_after_gap = 0, 0, None + while cur.next() == 0: + key = cur.get_key() + self.assertNotIn(key, trunc_range) + if prev_key == self.trunc_start - 1 and first_after_gap is None: + first_after_gap = key + prev_key = key + count += 1 + sess.rollback_transaction() + cur.close() + self.assertEqual(count, expected) + self.assertEqual(first_after_gap, self.trunc_stop + 1) + + # Backward scan: same row count, gap jumped in reverse. + cur = sess.open_cursor(self.uri) + sess.begin_transaction('read_timestamp=' + self.timestamp_str(25)) + count, prev_key, first_before_gap = 0, self.nrows + 1, None + while cur.prev() == 0: + key = cur.get_key() + self.assertNotIn(key, trunc_range) + if prev_key == self.trunc_stop + 1 and first_before_gap is None: + first_before_gap = key + prev_key = key + count += 1 + sess.rollback_transaction() + cur.close() + self.assertEqual(count, expected) + self.assertEqual(first_before_gap, self.trunc_start - 1) + + # search_near on a deleted key must land on the nearest live boundary key. + cur = sess.open_cursor(self.uri) + sess.begin_transaction('read_timestamp=' + self.timestamp_str(25)) + cur.set_key(self.trunc_mid) + cmp = cur.search_near() + self.assertIn(cmp, (-1, 1)) + landed = cur.get_key() + self.assertNotIn(landed, trunc_range) + self.assertEqual(landed, self.trunc_start - 1 if cmp == -1 else self.trunc_stop + 1) + sess.rollback_transaction() + cur.close() + + sess.close() + conn.close() + +if __name__ == '__main__': + wttest.run() diff --git a/src/third_party/wiredtiger/test/suite/test_layered_fast_truncate03.py b/src/third_party/wiredtiger/test/suite/test_layered_fast_truncate03.py new file mode 100644 index 00000000000..11d828f7ef1 --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_layered_fast_truncate03.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +# test_layered_fast_truncate03.py +# Tests that a follower correctly handles pages that were fast-truncated on the +# leader: stable pages must never be dirtied, and deleted state must survive +# eviction and reopen. + +import wiredtiger, wttest +from helper_disagg import disagg_test_class, gen_disagg_storages +from wtscenario import make_scenarios +from wiredtiger import stat + +@disagg_test_class +class test_layered_fast_truncate03(wttest.WiredTigerTestCase): + + uri = 'layered:test_layered_fast_truncate03' + nrows = 5000 + value = 'a' * 500 + trunc_start = 1001 + trunc_stop = 4000 + + conn_config = 'cache_size=50MB,statistics=(all),disaggregated=(role="leader")' + disagg_storages = gen_disagg_storages('test_layered_fast_truncate03', disagg_only=True) + scenarios = make_scenarios(disagg_storages) + + def setUp(self): + if wiredtiger.disagg_fast_truncate_build() == 0: + self.skipTest("fast truncate support is not enabled") + super().setUp() + + def get_stat(self, conn, stat_key): + s = conn.open_session('') + val = s.open_cursor('statistics:')[stat_key][2] + s.close() + return val + + def leader_checkpoint(self, ts): + self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(ts) + + ',oldest_timestamp=' + self.timestamp_str(1)) + self.session.checkpoint() + + def setup_leader(self, extra_cfg=''): + self.conn.set_timestamp('oldest_timestamp=' + self.timestamp_str(1)) + self.session.create(self.uri, 'key_format=i,value_format=S' + extra_cfg) + cur = self.session.open_cursor(self.uri) + for i in range(1, self.nrows + 1): + self.session.begin_transaction() + cur[i] = self.value + self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(10)) + cur.close() + self.leader_checkpoint(10) + + # Evict all pages before truncating so the leader uses page-level fast-delete markers. + evict_cur = self.session.open_cursor(self.uri, None, 'debug=(release_evict)') + self.session.begin_transaction() + for i in range(1, self.nrows + 1): + evict_cur.set_key(i) + evict_cur.search() + evict_cur.reset() + evict_cur.close() + self.session.rollback_transaction() + + def truncate_and_checkpoint(self, trunc_start, trunc_stop, ts): + # Fast-truncate rows [trunc_start, trunc_stop] on the leader and checkpoint. + c_start = self.session.open_cursor(self.uri) + c_start.set_key(trunc_start) + c_stop = self.session.open_cursor(self.uri) + c_stop.set_key(trunc_stop) + self.session.begin_transaction() + self.session.truncate(None, c_start, c_stop, None) + self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(ts)) + c_start.close() + c_stop.close() + self.leader_checkpoint(ts) + + def open_follower(self): + conn = self.wiredtiger_open( + 'follower', + self.extensionsConfig() + ',create,cache_size=50MB,statistics=(all),disaggregated=(role="follower")') + sess = conn.open_session('') + sess.create(self.uri, 'key_format=i,value_format=S') + self.disagg_advance_checkpoint(conn, self.conn) + return conn, sess + + def advance_follower(self, conn): + self.leader_checkpoint(20) + self.disagg_advance_checkpoint(conn, self.conn) + + def evict_range(self, sess, start, stop, step=1): + evict_cur = sess.open_cursor(self.uri, None, 'debug=(release_evict)') + sess.begin_transaction('read_timestamp=' + self.timestamp_str(10)) + for i in range(start, stop + 1, step): + evict_cur.set_key(i) + evict_cur.search() + evict_cur.reset() + evict_cur.close() + sess.rollback_transaction() + + def search_at(self, sess, key, ts): + cur = sess.open_cursor(self.uri) + txn_cfg = ('read_timestamp=' + self.timestamp_str(ts)) + sess.begin_transaction(txn_cfg) + cur.set_key(key) + ret = cur.search() + val = cur.get_value() if ret == 0 else None + sess.rollback_transaction() + cur.close() + return ret, val + + def test_no_dirty_on_read(self): + # Reading fast-truncated pages on the follower must never dirty them. Verifies this holds + # across a full load-evict-reload cycle for both single and bulk page reads. + self.setup_leader() + self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20) + conn, sess = self.open_follower() + sample = list(range(self.trunc_start, self.trunc_stop + 1, 10)) + dirty_before = self.get_stat(conn, stat.conn.cache_pages_dirty) + + # Initial read: no deleted key found, no pages dirtied. + cur = sess.open_cursor(self.uri) + sess.begin_transaction('read_timestamp=' + self.timestamp_str(25)) + for key in sample: + cur.set_key(key) + self.assertEqual(cur.search(), wiredtiger.WT_NOTFOUND) + sess.rollback_transaction() + cur.close() + self.assertEqual(self.get_stat(conn, stat.conn.cache_pages_dirty), dirty_before) + + self.evict_range(sess, self.trunc_start, self.trunc_stop) + + # After eviction: reloaded pages still show no keys and remain clean. + cur = sess.open_cursor(self.uri) + sess.begin_transaction('read_timestamp=' + self.timestamp_str(25)) + for key in sample: + cur.set_key(key) + self.assertEqual(cur.search(), wiredtiger.WT_NOTFOUND) + sess.rollback_transaction() + cur.close() + self.assertEqual(self.get_stat(conn, stat.conn.cache_pages_dirty), dirty_before) + + self.evict_range(sess, self.trunc_start, self.trunc_stop) + sess.close() + conn.close() + + def test_page_split_with_ingest_writes(self): + # With small pages the truncated range spans many leaf pages. After ingest writes + # restore a subset of truncated keys, those keys must be visible while the rest + # remain deleted. + self.setup_leader(',leaf_page_max=4096') + self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20) + conn, sess = self.open_follower() + sample = list(range(self.trunc_start, self.trunc_stop + 1, 10)) + dirty_before = self.get_stat(conn, stat.conn.cache_pages_dirty) + + # No pages dirtied by reading the truncated range across many small leaf pages. + cur = sess.open_cursor(self.uri) + sess.begin_transaction('read_timestamp=' + self.timestamp_str(25)) + for key in sample: + cur.set_key(key) + self.assertEqual(cur.search(), wiredtiger.WT_NOTFOUND) + sess.rollback_transaction() + cur.close() + self.assertEqual(self.get_stat(conn, stat.conn.cache_pages_dirty), dirty_before) + + self.evict_range(sess, self.trunc_start, self.trunc_stop) + self.advance_follower(conn) + + # Write a subset of truncated keys to ingest. + ingest_keys = set(sample[::3]) + cur = sess.open_cursor(self.uri) + sess.begin_transaction() + for key in ingest_keys: + cur.set_key(key) + cur.set_value(f'ingest_{key}') + self.assertEqual(cur.insert(), 0) + sess.commit_transaction('commit_timestamp=' + self.timestamp_str(30)) + cur.close() + + # At ts=30: ingest keys are found; unwritten truncated keys remain deleted. + cur = sess.open_cursor(self.uri) + sess.begin_transaction('read_timestamp=' + self.timestamp_str(30)) + for key in ingest_keys: + cur.set_key(key) + self.assertEqual(cur.search(), 0) + self.assertEqual(cur.get_value(), f'ingest_{key}') + for key in set(sample) - ingest_keys: + cur.set_key(key) + self.assertEqual(cur.search(), wiredtiger.WT_NOTFOUND) + sess.rollback_transaction() + cur.close() + + # At ts=25 (before the ingest write), all truncated keys must still be deleted. + cur = sess.open_cursor(self.uri) + sess.begin_transaction('read_timestamp=' + self.timestamp_str(25)) + for key in ingest_keys: + cur.set_key(key) + self.assertEqual(cur.search(), wiredtiger.WT_NOTFOUND) + sess.rollback_transaction() + cur.close() + + sess.close() + conn.close() + + def test_state_preserved_on_reopen(self): + # Closing and reopening the follower connection must not lose the deleted state. + # The same checkpoint must still show truncated keys as WT_NOTFOUND after a cold start. + self.setup_leader() + self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20) + + truncated_keys = [self.trunc_start, self.trunc_start + 100, self.trunc_stop] + non_truncated_keys = [1, self.trunc_start - 1, self.trunc_stop + 1, self.nrows] + + def verify(sess): + for key in truncated_keys: + ret, _ = self.search_at(sess, key, 25) + self.assertEqual(ret, wiredtiger.WT_NOTFOUND) + for key in non_truncated_keys: + ret, val = self.search_at(sess, key, 25) + self.assertEqual(ret, 0) + self.assertEqual(val, self.value) + + for _ in range(2): + conn, sess = self.open_follower() + verify(sess) + sess.close() + conn.close() + + def test_instantiation_not_globally_visible(self): + # Reading a deleted page at a timestamp before the truncation forces it to load from disk. + # The key must be found, cache_read_deleted must increment, and the page must not be dirtied. + self.setup_leader() + self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20) + conn, sess = self.open_follower() + + dirty_before = self.get_stat(conn, stat.conn.cache_pages_dirty) + rd_before = self.get_stat(conn, stat.conn.cache_read_deleted) + + # Pre-truncation read forces page load: key found, read_deleted increments, page stays clean. + ret, val = self.search_at(sess, self.trunc_start + 100, 10) + self.assertEqual(ret, 0) + self.assertEqual(val, self.value) + self.assertGreater(self.get_stat(conn, stat.conn.cache_read_deleted), rd_before) + self.assertEqual(self.get_stat(conn, stat.conn.cache_pages_dirty), dirty_before) + self.evict_range(sess, self.trunc_start + 100, self.trunc_start + 100) + + sess.close() + conn.close() + +if __name__ == '__main__': + wttest.run() diff --git a/src/third_party/wiredtiger/test/suite/test_truncate24.py b/src/third_party/wiredtiger/test/suite/test_truncate24.py index d0390c24f3c..64a45db994c 100644 --- a/src/third_party/wiredtiger/test/suite/test_truncate24.py +++ b/src/third_party/wiredtiger/test/suite/test_truncate24.py @@ -80,9 +80,10 @@ class test_truncate24(wttest.WiredTigerTestCase): self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(20)) # Check stats to make sure we fast-deleted at least one page. - stat_cursor = self.session.open_cursor('statistics:', None, None) - fastdelete_pages = stat_cursor[wiredtiger.stat.conn.rec_page_delete_fast][2] - self.assertGreater(fastdelete_pages, 0) + if (wiredtiger.disagg_fast_truncate_build() == 1): + stat_cursor = self.session.open_cursor('statistics:', None, None) + fastdelete_pages = stat_cursor[wiredtiger.stat.conn.rec_page_delete_fast][2] + self.assertGreater(fastdelete_pages, 0) # Verify the data. for i in range(1, 100000): diff --git a/src/third_party/wiredtiger/test/suite/test_verify_disagg02.py b/src/third_party/wiredtiger/test/suite/test_verify_disagg02.py new file mode 100644 index 00000000000..49478651ecb --- /dev/null +++ b/src/third_party/wiredtiger/test/suite/test_verify_disagg02.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import re, wiredtiger, wttest +from helper_disagg import disagg_test_class, gen_disagg_storages +from wtscenario import make_scenarios + +# test_verify_disagg02.py +# Verify that duplicate btree IDs among stable files are detected. + +@disagg_test_class +class test_verify_disagg02(wttest.WiredTigerTestCase): + disagg_storages = gen_disagg_storages('test_verify_disagg02', disagg_only=True) + scenarios = make_scenarios(disagg_storages) + + conn_config = 'disaggregated=(role="leader")' + conn_config_follower = 'disaggregated=(role="follower")' + + table_cfg = 'key_format=S,value_format=S,block_manager=disagg' + uri = 'layered:test_verify_disagg02' + + def test_verify_duplicate_btree_ids(self): + """ + Inject a fake stable file entry with a duplicate btree ID into a follower's local + metadata, then verify the layered table. The unique btree IDs check in verify should + detect the duplicate and return an error. + """ + # Create a layered table on the leader with data, then checkpoint. + self.session.create(self.uri, self.table_cfg) + cursor = self.session.open_cursor(self.uri, None, None) + cursor['key'] = 'value' + cursor.close() + self.session.checkpoint() + + # Create a follower and advance it to pick up the checkpoint. + conn_follow = self.wiredtiger_open('follower', self.extensionsConfig() + ',create,' + + self.conn_config_follower) + session_follow = conn_follow.open_session('') + self.disagg_advance_checkpoint(conn_follow) + + # Read the stable file's config from the follower's metadata to get its btree ID. + md_cursor = session_follow.open_cursor('metadata:', None, None) + md_cursor.set_key('file:test_verify_disagg02.wt_stable') + self.assertEqual(md_cursor.search(), 0) + victim_config = md_cursor.get_value() + md_cursor.close() + + # Confirm we got a valid config with an ID. + self.assertRegex(victim_config, r',id=\d+') + + # Inject a fake entry with the same btree ID into the follower's local metadata. + raw_cursor = session_follow.open_cursor('file:WiredTiger.wt', None, None) + raw_cursor.set_key('file:fake_duplicate.wt_stable') + raw_cursor.set_value(victim_config) + raw_cursor.insert() + raw_cursor.close() + + # Verify the layered table. Our check detects the duplicate btree ID. + self.assertRaisesException(wiredtiger.WiredTigerError, + lambda: session_follow.verify(self.uri), '/WT_ERROR/') + self.ignoreStderrPatternIfExists('metadata corruption') + self.ignoreStderrPatternIfExists('stable table verification failed') + + # Remove the fake entry so teardown verification passes. + raw_cursor = session_follow.open_cursor('file:WiredTiger.wt', None, None) + raw_cursor.set_key('file:fake_duplicate.wt_stable') + raw_cursor.remove() + raw_cursor.close() + + session_follow.close() + conn_follow.close()