Import wiredtiger: af950ff8e62d5210b8a5422546202556908f8b14 from branch mongodb-master (#52120)

GitOrigin-RevId: 239f330fac6b530f2404a69a73b296e4f98def1c
This commit is contained in:
Ivan Kochin 2026-04-20 15:54:57 +10:00 committed by MongoDB Bot
parent c021021405
commit 3cd58c7983
18 changed files with 957 additions and 66 deletions

View File

@ -57,6 +57,7 @@ BqRv
Brueckner
Bsearch
Btree
Btrees
Buf
Buildvariants
Bxxx
@ -1316,6 +1317,7 @@ munmap
mutex
mutexes
mux
mvcc
myconn
mytable
nTemp

View File

@ -2,5 +2,5 @@
"vendor": "wiredtiger",
"github": "wiredtiger/wiredtiger",
"branch": "mongodb-master",
"commit": "c54216fdaa882d8789b01937c812ba7ac4f7b3cb"
"commit": "af950ff8e62d5210b8a5422546202556908f8b14"
}

View File

@ -137,7 +137,11 @@ __wt_blkcache_read(WT_SESSION_IMPL *session, WT_ITEM *buf, WT_PAGE_BLOCK_META *b
ip = buf;
expect_conversion = compressor != NULL || encryptor != NULL;
if (expect_conversion) {
WT_RET(__wt_scr_alloc(session, 4 * 1024, &tmp));
/*
* Pre-size the scratch buffer to the leaf page max to avoid realloc when reading typical
* pages.
*/
WT_RET(__wt_scr_alloc(session, btree->maxleafpage, &tmp));
ip = tmp;
}

View File

@ -56,6 +56,7 @@ static int __verify_row_int_key_order(
WT_SESSION_IMPL *, WT_PAGE *, WT_REF *, uint32_t, WT_VSTUFF *);
static int __verify_row_leaf_key_order(WT_SESSION_IMPL *, WT_REF *, WT_VSTUFF *);
static int __verify_tree(WT_SESSION_IMPL *, WT_REF *, WT_CELL_UNPACK_ADDR *, WT_VSTUFF *);
static int __verify_unique_btree_ids(WT_SESSION_IMPL *);
/*
* __verify_config --
@ -214,6 +215,81 @@ __verify_disagg_accumulate_size(
return (0);
}
typedef struct {
uint32_t id;
char *uri;
} WT_ID_URI_PAIR;
/*
* __id_uri_pair_cmp --
* Comparator for sorting btree ID entries by ID.
*/
static int WT_CDECL
__id_uri_pair_cmp(const void *a, const void *b)
{
uint32_t ia, ib;
ia = ((const WT_ID_URI_PAIR *)a)->id;
ib = ((const WT_ID_URI_PAIR *)b)->id;
return (ia < ib ? -1 : (ia == ib ? 0 : 1));
}
/*
* __verify_unique_btree_ids --
* Verify that no two stable constituent files in the local metadata share the same btree ID.
* Only called for .wt_stable files, where the verify session's exclusive lock is on the stable
* file not the metadata file so a shared metadata cursor can be opened directly.
*/
static int
__verify_unique_btree_ids(WT_SESSION_IMPL *session)
{
WT_CONFIG_ITEM id_val;
WT_CURSOR *cursor;
WT_DECL_RET;
WT_ID_URI_PAIR *pairs;
size_t allocated, count, i;
const char *key, *value;
cursor = NULL;
pairs = NULL;
allocated = count = 0;
WT_ERR(__wt_metadata_cursor(session, &cursor));
while ((ret = cursor->next(cursor)) == 0) {
WT_ERR(cursor->get_key(cursor, &key));
if (!WT_PREFIX_MATCH(key, "file:") || !WT_SUFFIX_MATCH(key, ".wt_stable"))
continue;
WT_ERR(cursor->get_value(cursor, &value));
WT_ERR(__wt_config_getones(session, value, "id", &id_val));
WT_ERR(__wt_realloc_def(session, &allocated, count + 1, &pairs));
pairs[count].id = (uint32_t)id_val.val;
WT_ERR(__wt_strdup(session, key, &pairs[count].uri));
++count;
}
WT_ERR_NOTFOUND_OK(ret, false);
if (count > 1) {
__wt_qsort(pairs, count, sizeof(WT_ID_URI_PAIR), __id_uri_pair_cmp);
for (i = 0; i < count - 1; ++i) {
if (pairs[i].id != pairs[i + 1].id)
continue;
__wt_verbose_error(session, WT_VERB_VERIFY,
"metadata corruption: btree ID %" PRIu32 " is shared by %s and %s", pairs[i].id,
pairs[i].uri, pairs[i + 1].uri);
ret = WT_ERROR;
}
}
err:
for (i = 0; i < count; ++i)
__wt_free(session, pairs[i].uri);
__wt_free(session, pairs);
if (cursor != NULL)
WT_TRET(__wt_metadata_cursor_release(session, &cursor));
return (ret);
}
/*
* __wt_verify --
* Verify a file.
@ -263,6 +339,14 @@ __wt_verify(WT_SESSION_IMPL *session, const char *cfg[])
if (quit)
goto done;
/*
* Check that no two stable constituent files share the same btree ID. Only run for stable files
* the verify session's exclusive lock is on the stable file, not the metadata file, so a shared
* metadata cursor can be opened directly on the verify session.
*/
if (WT_SUFFIX_MATCH(name, ".wt_stable"))
WT_ERR(__verify_unique_btree_ids(session));
/*
* Get a list of the checkpoints for this file. Empty objects and ingest tables have no
* checkpoints, in which case there's no work to do.
@ -370,16 +454,8 @@ __wt_verify(WT_SESSION_IMPL *session, const char *cfg[])
if (!skip_hs) {
__wt_verbose(session, WT_VERB_VERIFY, "%s: verify against history store", name);
#ifndef WT_STANDALONE_BUILD
/* FIXME-WT-16557: Re-enable HS validation at all times. */
if (__wt_conn_is_disagg(session))
__wt_verbose(session, WT_VERB_VERIFY,
"%s: skipping verify against history store in disagg", name);
else
WT_TRET(__wt_hs_verify_one(session, btree->id));
#else
WT_TRET(__wt_hs_verify_one(session, btree->id));
#endif
WT_TRET_MSG(session, __wt_hs_verify_one(session, btree->id),
"history store verification failed");
}
/*
* We cannot error out here. If we got an error verifying the history store, we need

View File

@ -1926,8 +1926,8 @@ extern void __wti_read_row_time_window(
WT_SESSION_IMPL *session, WT_PAGE *page, WT_ROW *rip, WT_TIME_WINDOW *tw);
extern void __wti_ref_addr_safe_free(WT_SESSION_IMPL *session, void *p, size_t len);
extern void __wti_rts_pop_work(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT **entryp);
extern void __wti_rts_progress_msg(WT_SESSION_IMPL *session, WT_TIMER *rollback_start,
uint64_t rollback_count, uint64_t max_count, uint64_t *rollback_msg_count, bool walk);
extern void __wti_rts_progress_msg_walk(WT_SESSION_IMPL *session, uint64_t btree_start_clock,
uint64_t *last_report_clock, double npos, uint64_t btree_pages);
extern void __wti_rts_work_free(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT *entry);
extern void __wti_schema_destroy_colgroup(WT_SESSION_IMPL *session, WT_COLGROUP **colgroupp);
extern void __wti_tiered_get_remove_shared(WT_SESSION_IMPL *session, WT_TIERED_WORK_UNIT **entryp);

View File

@ -84,6 +84,14 @@
WT_STAT_CONN_DSRC_INCR(session, stat##_dryrun); \
} while (0)
/* RTS phase identifiers for progress reporting. */
#define WT_RTS_PHASE_INACTIVE 0
#define WT_RTS_PHASE_METADATA_COUNT 1
#define WT_RTS_PHASE_BTREE_APPLY 2
#define WT_RTS_PHASE_QUEUE_DRAIN 3
#define WT_RTS_PHASE_HS_FINAL_PASS 4
#define WT_RTS_PHASE_COMPLETE 5
#define WT_RTS_MAX_WORKERS 10
/*
* WT_RTS_WORK_UNIT --
@ -118,6 +126,17 @@ struct __wt_rollback_to_stable {
/* Configuration. */
bool dryrun;
/* RTS progress tracking. */
struct {
WT_TIMER start_timer; /* Overall RTS start time. */
uint64_t total_btrees; /* From metadata count pass (set once). */
wt_shared uint32_t phase; /* Current RTS phase (WT_RTS_PHASE_*). */
wt_shared uint64_t overall_last_report; /* CAS-guarded clock of last overall report. */
wt_shared uint64_t btrees_processed; /* Btrees fully processed (atomic). */
wt_shared uint64_t btrees_skipped; /* Btrees skipped, no work needed (atomic). */
wt_shared uint64_t pages_walked; /* Pages walked across all btrees (atomic). */
} progress;
};
/*

View File

@ -1304,7 +1304,7 @@ __rec_fill_tw_from_upd_select(WT_SESSION_IMPL *session, WT_PAGE *page, WT_CELL_U
if (!F_ISSET(S2C(session), WT_CONN_PRESERVE_PREPARED))
continue;
if (upd->prepared_id == WT_PREPARED_ID_NONE) {
if (upd->next->prepared_id == WT_PREPARED_ID_NONE) {
write_start_prepare = false;
continue;
}

View File

@ -9,32 +9,161 @@
#include "wt_internal.h"
/*
* __wti_rts_progress_msg --
* Log a verbose message about the progress of the current rollback to stable.
* __rts_phase_string --
* Return a human-readable string for an RTS phase.
*/
static const char *
__rts_phase_string(uint32_t phase)
{
switch (phase) {
case WT_RTS_PHASE_INACTIVE:
return ("INACTIVE");
case WT_RTS_PHASE_METADATA_COUNT:
return ("METADATA_COUNT");
case WT_RTS_PHASE_BTREE_APPLY:
return ("BTREE_APPLY");
case WT_RTS_PHASE_QUEUE_DRAIN:
return ("QUEUE_DRAIN");
case WT_RTS_PHASE_HS_FINAL_PASS:
return ("HS_FINAL_PASS");
case WT_RTS_PHASE_COMPLETE:
return ("COMPLETE");
default:
return ("UNKNOWN");
}
}
/*
* __rts_emit_overall_progress --
* Emit an overall RTS progress message with counters, percentage, and throughput.
*/
static void
__rts_emit_overall_progress(WT_SESSION_IMPL *session)
{
WT_ROLLBACK_TO_STABLE *rts;
uint64_t btrees_completed, btrees_processed, btrees_skipped, elapsed_ms, pages_per_sec,
pages_walked, pct, total_btrees;
uint32_t phase;
rts = S2C(session)->rts;
btrees_processed = __wt_atomic_load_uint64_relaxed(&rts->progress.btrees_processed);
btrees_skipped = __wt_atomic_load_uint64_relaxed(&rts->progress.btrees_skipped);
pages_walked = __wt_atomic_load_uint64_relaxed(&rts->progress.pages_walked);
phase = __wt_atomic_load_uint32_relaxed(&rts->progress.phase);
total_btrees = rts->progress.total_btrees;
btrees_completed = btrees_processed + btrees_skipped;
__wt_timer_evaluate_ms(session, &rts->progress.start_timer, &elapsed_ms);
pct = total_btrees > 0 ? (100 * btrees_completed / total_btrees) : 0;
pages_per_sec = elapsed_ms > 0 ? (pages_walked * WT_THOUSAND / elapsed_ms) : 0;
__wt_verbose(session, WT_VERB_RECOVERY_PROGRESS,
"Rollback to stable [%s] overall: running for %" PRIu64 " seconds, %" PRIu64 " of %" PRIu64
" btrees done (%" PRIu64 "%%), %" PRIu64 " processed, %" PRIu64 " skipped, %" PRIu64
" total pages walked (%" PRIu64 " pages/sec)",
__rts_phase_string(phase), elapsed_ms / WT_THOUSAND, btrees_completed, total_btrees, pct,
btrees_processed, btrees_skipped, pages_walked, pages_per_sec);
/* Notify the application via the progress callback. */
WT_IGNORE_RET(__wt_progress(session, "rollback to stable", btrees_completed));
}
/*
* __rts_progress_msg --
* Log a verbose message about the overall progress of rollback to stable. Called from the
* metadata walk loop in __wti_rts_btree_apply_all.
*/
static void
__rts_progress_msg(WT_SESSION_IMPL *session)
{
WT_ROLLBACK_TO_STABLE *rts;
uint64_t clock_now, overall_last;
rts = S2C(session)->rts;
clock_now = __wt_clock(session);
overall_last = __wt_atomic_load_uint64_relaxed(&rts->progress.overall_last_report);
if (__wt_clock_to_nsec(clock_now, overall_last) >=
(uint64_t)WT_BILLION * WT_PROGRESS_MSG_PERIOD &&
__wt_atomic_cas_uint64(&rts->progress.overall_last_report, overall_last, clock_now))
__rts_emit_overall_progress(session);
}
/*
* __wti_rts_progress_msg_walk --
* Log a verbose message about the progress of a per-btree page walk during rollback to stable.
* Uses the normalized position (npos) of the current page to compute a percentage within the
* tree and estimate time remaining for this btree.
*/
void
__wti_rts_progress_msg(WT_SESSION_IMPL *session, WT_TIMER *rollback_start, uint64_t rollback_count,
uint64_t max_count, uint64_t *rollback_msg_count, bool walk)
__wti_rts_progress_msg_walk(WT_SESSION_IMPL *session, uint64_t btree_start_clock,
uint64_t *last_report_clock, double npos, uint64_t btree_pages)
{
uint64_t time_diff_ms;
WT_ROLLBACK_TO_STABLE *rts;
uint64_t btree_eta_sec, btree_pct, btree_pages_per_sec, clock_now, elapsed_ns, elapsed_sec,
overall_last;
uint32_t phase;
/* Time since the rollback started. */
__wt_timer_evaluate_ms(session, rollback_start, &time_diff_ms);
rts = S2C(session)->rts;
if ((time_diff_ms / (WT_THOUSAND * WT_PROGRESS_MSG_PERIOD)) > *rollback_msg_count) {
if (walk)
__wt_verbose(session, WT_VERB_RECOVERY_PROGRESS,
"Rollback to stable has been performing on %s for %" PRIu64
" milliseconds. For more detailed logging, enable WT_VERB_RTS ",
session->dhandle->name, time_diff_ms);
else
__wt_verbose(session, WT_VERB_RECOVERY_PROGRESS,
"Rollback to stable has been running for %" PRIu64
" milliseconds and has inspected %" PRIu64 " files of %" PRIu64
". For more detailed logging, enable WT_VERB_RTS",
time_diff_ms, rollback_count, max_count);
*rollback_msg_count = time_diff_ms / (WT_THOUSAND * WT_PROGRESS_MSG_PERIOD);
/* Use total btree walk time (not interval) for pages/sec and ETA calculations. */
clock_now = __wt_clock(session);
elapsed_ns = __wt_clock_to_nsec(clock_now, btree_start_clock);
elapsed_sec = elapsed_ns / WT_BILLION;
phase = __wt_atomic_load_uint32_relaxed(&rts->progress.phase);
btree_pct = (uint64_t)(npos * 100);
btree_pages_per_sec = elapsed_sec > 0 ? (btree_pages / elapsed_sec) : 0;
/* Estimate time remaining for this btree based on npos progression. */
btree_eta_sec = 0;
if (npos > 0.05 && npos < 1.0)
btree_eta_sec = (uint64_t)((1.0 - npos) * (double)elapsed_sec / npos);
if (btree_eta_sec > 0)
__wt_verbose(session, WT_VERB_RECOVERY_PROGRESS,
"Rollback to stable [%s] btree walk on %s for %" PRIu64 " seconds, %" PRIu64
"%% through btree, %" PRIu64 " pages walked (%" PRIu64
" pages/sec)"
", btree ETA %" PRIu64 " seconds",
__rts_phase_string(phase), session->dhandle->name, elapsed_sec, btree_pct, btree_pages,
btree_pages_per_sec, btree_eta_sec);
else
__wt_verbose(session, WT_VERB_RECOVERY_PROGRESS,
"Rollback to stable [%s] btree walk on %s for %" PRIu64 " seconds, %" PRIu64
"%% through btree, %" PRIu64 " pages walked (%" PRIu64 " pages/sec)",
__rts_phase_string(phase), session->dhandle->name, elapsed_sec, btree_pct, btree_pages,
btree_pages_per_sec);
/*
* Emit an overall progress line. Use CAS on overall_last_report so that exactly one thread wins
* per reporting period, regardless of which thread it is. Skip if progress was not initialized
* (e.g., single-file RTS via rollback_to_stable_one).
*/
if (rts->progress.total_btrees > 0) {
overall_last = __wt_atomic_load_uint64_relaxed(&rts->progress.overall_last_report);
if (__wt_clock_to_nsec(clock_now, overall_last) >=
(uint64_t)WT_BILLION * WT_PROGRESS_MSG_PERIOD &&
__wt_atomic_cas_uint64(&rts->progress.overall_last_report, overall_last, clock_now))
__rts_emit_overall_progress(session);
}
*last_report_clock = clock_now;
}
/*
* __rts_progress_init --
* Initialize the RTS progress tracking fields.
*/
static void
__rts_progress_init(WT_SESSION_IMPL *session)
{
WT_ROLLBACK_TO_STABLE *rts;
rts = S2C(session)->rts;
WT_CLEAR(rts->progress);
__wt_timer_start(session, &rts->progress.start_timer);
__wt_atomic_store_uint64_relaxed(&rts->progress.overall_last_report, __wt_clock(session));
}
/*
@ -172,15 +301,17 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time
WT_CURSOR *cursor;
WT_DECL_RET;
WT_RTS_WORK_UNIT *entry;
WT_TIMER timer;
uint64_t max_count, rollback_count, rollback_msg_count;
uint64_t max_count;
char ts_string[WT_TS_INT_STRING_SIZE];
const char *config, *uri;
const char *config, *saved_session_name, *uri;
bool have_cursor, rts_threads_started;
__wt_timer_start(session, &timer);
max_count = rollback_count = 0;
rollback_msg_count = 0;
__rts_progress_init(session);
__wt_atomic_store_uint32_relaxed(
&S2C(session)->rts->progress.phase, WT_RTS_PHASE_METADATA_COUNT);
max_count = 0;
saved_session_name = session->name;
rts_threads_started = false;
/*
@ -191,13 +322,19 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time
have_cursor = true;
while ((ret = cursor->next(cursor)) == 0) {
WT_ERR(cursor->get_key(cursor, &uri));
if (WT_BTREE_PREFIX(uri))
if (WT_BTREE_PREFIX(uri) && !WT_IS_URI_HS(uri) && !WT_IS_URI_METADATA(uri))
++max_count;
}
WT_ERR_NOTFOUND_OK(ret, false);
WT_ERR(__wt_metadata_cursor_release(session, &cursor));
have_cursor = false;
S2C(session)->rts->progress.total_btrees = max_count;
__wt_verbose(session, WT_VERB_RECOVERY_PROGRESS,
"Rollback to stable found %" PRIu64 " btrees to process", max_count);
__wt_atomic_store_uint32_relaxed(&S2C(session)->rts->progress.phase, WT_RTS_PHASE_BTREE_APPLY);
WT_ERR(__rts_thread_create(session));
rts_threads_started = true;
@ -207,10 +344,7 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time
/* Log a progress message. */
WT_ERR(cursor->get_key(cursor, &uri));
WT_ERR(cursor->get_value(cursor, &config));
if (WT_BTREE_PREFIX(uri))
++rollback_count;
__wti_rts_progress_msg(
session, &timer, rollback_count, max_count, &rollback_msg_count, false);
__rts_progress_msg(session);
F_SET(session, WT_SESSION_QUIET_CORRUPT_FILE);
ret = __wti_rts_btree_walk_btree_apply(session, uri, config, rollback_timestamp);
@ -226,6 +360,13 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time
* workers alone to complete the task.
*/
if (S2C(session)->rts->threads_num != 0) {
__wt_atomic_store_uint32_relaxed(
&S2C(session)->rts->progress.phase, WT_RTS_PHASE_QUEUE_DRAIN);
__wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, "%s",
"Rollback to stable finished metadata walk, draining worker queue");
/* Rename session while joining workers so log messages identify us as a worker. */
session->name = "rts-main-wk";
while (!TAILQ_EMPTY(&S2C(session)->rts->rtsqh)) {
__wti_rts_pop_work(session, &entry);
if (entry == NULL)
@ -234,6 +375,7 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time
__wti_rts_work_free(session, entry);
WT_ERR(ret);
}
session->name = saved_session_name;
}
WT_ERR(__rts_thread_destroy(session));
@ -249,6 +391,10 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time
* doesn't exist.
*/
if (!F_ISSET(S2C(session), WT_CONN_IN_MEMORY)) {
__wt_atomic_store_uint32_relaxed(
&S2C(session)->rts->progress.phase, WT_RTS_PHASE_HS_FINAL_PASS);
__wt_verbose(session, WT_VERB_RECOVERY_PROGRESS, "%s",
"Rollback to stable beginning history store final pass");
__wt_verbose_level_multi(session, WT_VERB_RECOVERY_RTS(session), WT_VERBOSE_DEBUG_3,
WT_RTS_VERB_TAG_HS_TREE_FINAL_PASS
"performing final pass of the history store to remove unstable entries with "
@ -256,7 +402,10 @@ __wti_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_time
__wt_timestamp_to_string(rollback_timestamp, ts_string));
WT_ERR(__wti_rts_history_final_pass(session, rollback_timestamp));
}
__wt_atomic_store_uint32_relaxed(&S2C(session)->rts->progress.phase, WT_RTS_PHASE_COMPLETE);
err:
session->name = saved_session_name;
if (have_cursor)
WT_TRET(__wt_metadata_cursor_release(session, &cursor));
if (rts_threads_started)

View File

@ -262,6 +262,7 @@ __rollback_to_stable_finalize(WT_ROLLBACK_TO_STABLE *rts)
{
rts->dryrun = false;
rts->threads_num = 0;
WT_CLEAR(rts->progress);
}
/*
@ -315,9 +316,20 @@ __rollback_to_stable(WT_SESSION_IMPL *session, const char *cfg[], bool no_ckpt)
/* Time since the RTS started. */
__wt_timer_evaluate_ms(session, &timer, &time_diff_ms);
/* Log a summary of RTS work before the final end message. */
__wt_verbose(session, WT_VERB_RECOVERY_PROGRESS,
"Rollback to stable summary: total_btrees=%" PRIu64 ", processed=%" PRIu64
", skipped=%" PRIu64 ", pages_walked=%" PRIu64 ", elapsed_ms=%" PRIu64,
S2C(session)->rts->progress.total_btrees,
__wt_atomic_load_uint64_relaxed(&S2C(session)->rts->progress.btrees_processed),
__wt_atomic_load_uint64_relaxed(&S2C(session)->rts->progress.btrees_skipped),
__wt_atomic_load_uint64_relaxed(&S2C(session)->rts->progress.pages_walked), time_diff_ms);
__wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session),
WT_RTS_VERB_TAG_END "finished rollback to stable%s and has ran for %" PRIu64 " milliseconds",
dryrun ? " dryrun" : "", time_diff_ms);
WT_RTS_VERB_TAG_END "finished rollback to stable%s with %" PRIu32
" worker threads, ran for %" PRIu64 " milliseconds",
dryrun ? " dryrun" : "", threads, time_diff_ms);
WT_STAT_CONN_SET(session, txn_rollback_to_stable_running, 0);
/* Reset the RTS configuration to default. */

View File

@ -120,20 +120,41 @@ __rts_btree_walk(WT_SESSION_IMPL *session, wt_timestamp_t rollback_timestamp)
{
WT_DECL_RET;
WT_REF *ref;
WT_TIMER timer;
uint64_t msg_count;
double max_npos, npos;
uint64_t btree_pages, btree_start_clock, clock_now, elapsed_ns, last_report_clock;
uint32_t flags;
__wt_timer_start(session, &timer);
btree_start_clock = __wt_clock(session);
last_report_clock = btree_start_clock;
flags = WT_READ_NO_EVICT | WT_READ_VISIBLE_ALL | WT_READ_WONT_NEED | WT_READ_SEE_DELETED;
msg_count = 0;
btree_pages = 0;
max_npos = 0.0;
/* Walk the tree, marking commits aborted where appropriate. */
ref = NULL;
while ((ret = __wt_tree_walk_custom_skip(
session, &ref, __rts_btree_walk_page_skip, &rollback_timestamp, flags)) == 0 &&
ref != NULL) {
__wti_rts_progress_msg(session, &timer, 0, 0, &msg_count, true);
++btree_pages;
(void)__wt_atomic_add_uint64_relaxed(&S2C(session)->rts->progress.pages_walked, 1);
/*
* Use the cheap rdtsc-based clock to check if a progress message is due. Only compute npos
* (which walks parent indexes) and emit the message when the period has elapsed.
*/
clock_now = __wt_clock(session);
elapsed_ns = __wt_clock_to_nsec(clock_now, last_report_clock);
if (elapsed_ns >= (uint64_t)WT_BILLION * WT_PROGRESS_MSG_PERIOD) {
npos = __wt_page_npos(session, ref, 0.5, NULL, NULL, 0);
/*
* npos can fluctuate due to unbalanced trees, so track the maximum seen so far to get a
* monotonically increasing progress indicator.
*/
if (npos > max_npos)
max_npos = npos;
__wti_rts_progress_msg_walk(
session, btree_start_clock, &last_report_clock, max_npos, btree_pages);
}
if (F_ISSET(ref, WT_REF_FLAG_LEAF))
WT_ERR(__wti_rts_btree_abort_updates(session, ref, rollback_timestamp));
@ -256,8 +277,11 @@ __rts_btree(WT_SESSION_IMPL *session, const char *uri, wt_timestamp_t rollback_t
WT_RTS_VERB_TAG_SKIP_DAMAGE
"%s: skipped performing rollback to stable because the file %s",
uri, ret == ENOENT ? "does not exist" : "is corrupted.");
WT_STAT_CONN_INCR(session, txn_rts_btrees_skipped);
(void)__wt_atomic_add_uint64_relaxed(&S2C(session)->rts->progress.btrees_skipped, 1);
ret = 0;
}
} else if (ret == 0)
(void)__wt_atomic_add_uint64_relaxed(&S2C(session)->rts->progress.btrees_processed, 1);
return (ret);
}
@ -365,6 +389,8 @@ __wti_rts_btree_walk_btree_apply(
WT_RTS_VERB_TAG_FILE_SKIP
"skipping rollback to stable on file=%s because has never been checkpointed",
uri);
WT_STAT_CONN_INCR(session, txn_rts_btrees_skipped);
(void)__wt_atomic_add_uint64_relaxed(&S2C(session)->rts->progress.btrees_skipped, 1);
return (0);
}
@ -411,8 +437,10 @@ __wti_rts_btree_walk_btree_apply(
prepared_updates ? "true" : "false", rollback_txnid, S2C(session)->recovery_ckpt_snap_min,
has_txn_updates_gt_than_ckpt_snap ? "true" : "false");
if (file_skipped)
WT_STAT_CONN_DSRC_INCR(session, txn_rts_btrees_skipped);
if (file_skipped) {
WT_STAT_CONN_INCR(session, txn_rts_btrees_skipped);
(void)__wt_atomic_add_uint64_relaxed(&S2C(session)->rts->progress.btrees_skipped, 1);
}
/*
* Truncate history store entries for the non-timestamped table.

View File

@ -174,16 +174,23 @@ __wti_rts_history_final_pass(WT_SESSION_IMPL *session, wt_timestamp_t rollback_t
*/
if ((S2BT(session)->modified || max_durable_ts > rollback_timestamp) &&
rollback_timestamp != WT_TS_NONE) {
__wt_verbose(session, WT_VERB_RECOVERY_PROGRESS,
"Rollback to stable history store final pass: rolling back with durable_timestamp=%s",
__wt_timestamp_to_string(max_durable_ts, ts_string[0]));
__wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session),
WT_RTS_VERB_TAG_HS_TREE_ROLLBACK "tree rolled back with durable_timestamp=%s",
__wt_timestamp_to_string(max_durable_ts, ts_string[0]));
WT_TRET(__wti_rts_btree_walk_btree(session, rollback_timestamp));
} else
} else {
__wt_verbose(session, WT_VERB_RECOVERY_PROGRESS,
"Rollback to stable history store final pass: skipped with durable_timestamp=%s",
__wt_timestamp_to_string(max_durable_ts, ts_string[0]));
__wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session),
WT_RTS_VERB_TAG_HS_TREE_SKIP
"tree skipped with durable_timestamp=%s and stable_timestamp=%s",
__wt_timestamp_to_string(max_durable_ts, ts_string[0]),
__wt_timestamp_to_string(rollback_timestamp, ts_string[1]));
}
/*
* Truncate history store entries from the partial backup remove list. The list holds all of the

View File

@ -68,7 +68,6 @@ __truncate_layered(WT_SESSION_IMPL *session, const char *uri)
WT_DECL_RET;
start = NULL;
WT_RET(__wt_session_get_dhandle(session, uri, NULL, NULL, WT_DHANDLE_EXCLUSIVE));
WT_STAT_DSRC_INCR(session, cursor_truncate);
@ -79,14 +78,12 @@ __truncate_layered(WT_SESSION_IMPL *session, const char *uri)
ret = 0;
goto done;
}
WT_WITHOUT_DHANDLE(session, ret = __wt_session_range_truncate(session, NULL, start, NULL));
WT_ERR(ret);
WT_ERR(__wt_session_range_truncate(session, NULL, start, NULL));
done:
err:
if (start != NULL)
WT_TRET(start->close(start));
WT_TRET(__wt_session_release_dhandle(session));
return (ret);
}

View File

@ -330,7 +330,8 @@ functions:
${IMPORT_S3_SDK|} \
${SPINLOCK_TYPE|} \
${ENABLE_COLORIZE_OUTPUT|-DENABLE_COLORIZE_OUTPUT=0} \
${CC_OPTIMIZE_LEVEL|}"
${CC_OPTIMIZE_LEVEL|} \
${WT_DISAGG_FAST_TRUNCATE_BUILD|}"
# The RHEL PPC platform does not have ZSTD. Strip it out.
if [ "${build_variant|}" = "rhel8-ppc" ] && [[ "$DEFINED_EVERGREEN_CONFIG_FLAGS" =~ (\-DHAVE_BUILTIN_EXTENSION_ZSTD=1) ]]; then

View File

@ -38,7 +38,7 @@ test_stat_log02.py
test_sweep05
test_timestamp26.py # FIXME: WT-16182
test_truncate01.py # FIXME: WT-15474
test_truncate24.py # FIXME: WT-15473
test_util01.py
test_util02.py
test_util04.py

View File

@ -0,0 +1,223 @@
#!/usr/bin/env python3
#
# Public Domain 2014-present MongoDB, Inc.
# Public Domain 2008-2014 WiredTiger, Inc.
#
# This is free and unencumbered software released into the public domain.
#
# Anyone is free to copy, modify, publish, use, compile, sell, or
# distribute this software, either in source code form or as a compiled
# binary, for any purpose, commercial or non-commercial, and by any
# means.
#
# In jurisdictions that recognize copyright laws, the author or authors
# of this software dedicate any and all copyright interest in the
# software to the public domain. We make this dedication for the benefit
# of the public at large and to the detriment of our heirs and
# successors. We intend this dedication to be an overt act of
# relinquishment in perpetuity of all present and future rights to this
# software under copyright law.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
# test_layered_fast_truncate02.py
# Validates visibility and cursor behavior when a follower picks up a
# checkpoint containing fast-truncated pages.
import wiredtiger, wttest
from helper_disagg import disagg_test_class, gen_disagg_storages
from wtscenario import make_scenarios
@disagg_test_class
class test_layered_fast_truncate02(wttest.WiredTigerTestCase):
uri = 'layered:test_layered_fast_truncate02'
nrows = 5000
value = 'a' * 500
trunc_start = 1001
trunc_stop = 4000
trunc_mid = (trunc_start + trunc_stop) // 2
conn_config = 'cache_size=50MB,statistics=(all),disaggregated=(role="leader")'
disagg_storages = gen_disagg_storages('test_layered_fast_truncate02', disagg_only=True)
scenarios = make_scenarios(disagg_storages)
def setUp(self):
if wiredtiger.disagg_fast_truncate_build() == 0:
self.skipTest("fast truncate support is not enabled")
super().setUp()
def leader_checkpoint(self, ts):
self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(ts) +
',oldest_timestamp=' + self.timestamp_str(1))
self.session.checkpoint()
def setup_leader(self):
self.conn.set_timestamp('oldest_timestamp=' + self.timestamp_str(1))
self.session.create(self.uri, 'key_format=i,value_format=S')
cur = self.session.open_cursor(self.uri)
for i in range(1, self.nrows + 1):
self.session.begin_transaction()
cur[i] = self.value
self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(10))
cur.close()
self.leader_checkpoint(10)
# Evict all pages before truncating so the leader uses page-level fast-delete markers.
evict_cur = self.session.open_cursor(self.uri, None, 'debug=(release_evict)')
self.session.begin_transaction()
for i in range(1, self.nrows + 1):
evict_cur.set_key(i)
evict_cur.search()
evict_cur.reset()
evict_cur.close()
self.session.rollback_transaction()
def truncate_and_checkpoint(self, trunc_start, trunc_stop, ts):
# Fast-truncate rows [trunc_start, trunc_stop] on the leader and checkpoint.
c_start = self.session.open_cursor(self.uri)
c_start.set_key(trunc_start)
c_stop = self.session.open_cursor(self.uri)
c_stop.set_key(trunc_stop)
self.session.begin_transaction()
self.session.truncate(None, c_start, c_stop, None)
self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(ts))
c_start.close()
c_stop.close()
self.leader_checkpoint(ts)
def open_follower(self):
conn = self.wiredtiger_open(
'follower',
self.extensionsConfig() + ',create,cache_size=50MB,statistics=(all),disaggregated=(role="follower")')
sess = conn.open_session('')
sess.create(self.uri, 'key_format=i,value_format=S')
self.disagg_advance_checkpoint(conn, self.conn)
return conn, sess
def search_at(self, sess, key, ts):
cur = sess.open_cursor(self.uri)
txn_cfg = ('read_timestamp=' + self.timestamp_str(ts))
sess.begin_transaction(txn_cfg)
cur.set_key(key)
ret = cur.search()
val = cur.get_value() if ret == 0 else None
sess.rollback_transaction()
cur.close()
return ret, val
def test_visibility(self):
# At ts=20 (equal to truncation at ts=20): truncated keys return WT_NOTFOUND, boundary and
# exterior keys return their values. At ts=15 (before truncation): all keys are visible.
self.setup_leader()
self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20)
conn, sess = self.open_follower()
# Truncation is visible: deleted keys are gone, surrounding keys survive.
for key in [self.trunc_start, self.trunc_mid, self.trunc_stop]:
ret, _ = self.search_at(sess, key, 20)
self.assertEqual(ret, wiredtiger.WT_NOTFOUND)
for key in [1, self.trunc_start - 1, self.trunc_stop + 1, self.nrows]:
ret, val = self.search_at(sess, key, 20)
self.assertEqual(ret, 0)
self.assertEqual(val, self.value)
# Truncation is not visible: all keys exist at a timestamp before the truncation.
for key in [self.trunc_start, self.trunc_mid, self.trunc_stop]:
ret, val = self.search_at(sess, key, 15)
self.assertEqual(ret, 0)
self.assertEqual(val, self.value)
sess.close()
conn.close()
def test_pre_truncation_read_sees_all_rows(self):
# Reading at a timestamp before the truncation must still find all rows, including those
# later deleted. Verifies mvcc correctness across the follower checkpoint boundary.
self.setup_leader()
self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20)
conn, sess = self.open_follower()
for key in [self.trunc_start, self.trunc_mid, self.trunc_stop]:
ret, val = self.search_at(sess, key, 10)
self.assertEqual(ret, 0)
self.assertEqual(val, self.value)
cur = sess.open_cursor(self.uri)
sess.begin_transaction('read_timestamp=' + self.timestamp_str(10))
count = 0
while cur.next() == 0:
count += 1
sess.rollback_transaction()
cur.close()
self.assertEqual(count, self.nrows)
sess.close()
conn.close()
def test_cursor_scanning(self):
# Forward and backward scans must skip the entire truncated range without visiting any
# deleted key. search_near on a deleted key must land outside the range.
self.setup_leader()
self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20)
conn, sess = self.open_follower()
expected = self.nrows - (self.trunc_stop - self.trunc_start + 1)
trunc_range = range(self.trunc_start, self.trunc_stop + 1)
# Forward scan: verify no deleted key is visited and the gap is jumped correctly.
cur = sess.open_cursor(self.uri)
sess.begin_transaction('read_timestamp=' + self.timestamp_str(25))
count, prev_key, first_after_gap = 0, 0, None
while cur.next() == 0:
key = cur.get_key()
self.assertNotIn(key, trunc_range)
if prev_key == self.trunc_start - 1 and first_after_gap is None:
first_after_gap = key
prev_key = key
count += 1
sess.rollback_transaction()
cur.close()
self.assertEqual(count, expected)
self.assertEqual(first_after_gap, self.trunc_stop + 1)
# Backward scan: same row count, gap jumped in reverse.
cur = sess.open_cursor(self.uri)
sess.begin_transaction('read_timestamp=' + self.timestamp_str(25))
count, prev_key, first_before_gap = 0, self.nrows + 1, None
while cur.prev() == 0:
key = cur.get_key()
self.assertNotIn(key, trunc_range)
if prev_key == self.trunc_stop + 1 and first_before_gap is None:
first_before_gap = key
prev_key = key
count += 1
sess.rollback_transaction()
cur.close()
self.assertEqual(count, expected)
self.assertEqual(first_before_gap, self.trunc_start - 1)
# search_near on a deleted key must land on the nearest live boundary key.
cur = sess.open_cursor(self.uri)
sess.begin_transaction('read_timestamp=' + self.timestamp_str(25))
cur.set_key(self.trunc_mid)
cmp = cur.search_near()
self.assertIn(cmp, (-1, 1))
landed = cur.get_key()
self.assertNotIn(landed, trunc_range)
self.assertEqual(landed, self.trunc_start - 1 if cmp == -1 else self.trunc_stop + 1)
sess.rollback_transaction()
cur.close()
sess.close()
conn.close()
if __name__ == '__main__':
wttest.run()

View File

@ -0,0 +1,276 @@
#!/usr/bin/env python3
#
# Public Domain 2014-present MongoDB, Inc.
# Public Domain 2008-2014 WiredTiger, Inc.
#
# This is free and unencumbered software released into the public domain.
#
# Anyone is free to copy, modify, publish, use, compile, sell, or
# distribute this software, either in source code form or as a compiled
# binary, for any purpose, commercial or non-commercial, and by any
# means.
#
# In jurisdictions that recognize copyright laws, the author or authors
# of this software dedicate any and all copyright interest in the
# software to the public domain. We make this dedication for the benefit
# of the public at large and to the detriment of our heirs and
# successors. We intend this dedication to be an overt act of
# relinquishment in perpetuity of all present and future rights to this
# software under copyright law.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
# test_layered_fast_truncate03.py
# Tests that a follower correctly handles pages that were fast-truncated on the
# leader: stable pages must never be dirtied, and deleted state must survive
# eviction and reopen.
import wiredtiger, wttest
from helper_disagg import disagg_test_class, gen_disagg_storages
from wtscenario import make_scenarios
from wiredtiger import stat
@disagg_test_class
class test_layered_fast_truncate03(wttest.WiredTigerTestCase):
uri = 'layered:test_layered_fast_truncate03'
nrows = 5000
value = 'a' * 500
trunc_start = 1001
trunc_stop = 4000
conn_config = 'cache_size=50MB,statistics=(all),disaggregated=(role="leader")'
disagg_storages = gen_disagg_storages('test_layered_fast_truncate03', disagg_only=True)
scenarios = make_scenarios(disagg_storages)
def setUp(self):
if wiredtiger.disagg_fast_truncate_build() == 0:
self.skipTest("fast truncate support is not enabled")
super().setUp()
def get_stat(self, conn, stat_key):
s = conn.open_session('')
val = s.open_cursor('statistics:')[stat_key][2]
s.close()
return val
def leader_checkpoint(self, ts):
self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(ts) +
',oldest_timestamp=' + self.timestamp_str(1))
self.session.checkpoint()
def setup_leader(self, extra_cfg=''):
self.conn.set_timestamp('oldest_timestamp=' + self.timestamp_str(1))
self.session.create(self.uri, 'key_format=i,value_format=S' + extra_cfg)
cur = self.session.open_cursor(self.uri)
for i in range(1, self.nrows + 1):
self.session.begin_transaction()
cur[i] = self.value
self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(10))
cur.close()
self.leader_checkpoint(10)
# Evict all pages before truncating so the leader uses page-level fast-delete markers.
evict_cur = self.session.open_cursor(self.uri, None, 'debug=(release_evict)')
self.session.begin_transaction()
for i in range(1, self.nrows + 1):
evict_cur.set_key(i)
evict_cur.search()
evict_cur.reset()
evict_cur.close()
self.session.rollback_transaction()
def truncate_and_checkpoint(self, trunc_start, trunc_stop, ts):
# Fast-truncate rows [trunc_start, trunc_stop] on the leader and checkpoint.
c_start = self.session.open_cursor(self.uri)
c_start.set_key(trunc_start)
c_stop = self.session.open_cursor(self.uri)
c_stop.set_key(trunc_stop)
self.session.begin_transaction()
self.session.truncate(None, c_start, c_stop, None)
self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(ts))
c_start.close()
c_stop.close()
self.leader_checkpoint(ts)
def open_follower(self):
conn = self.wiredtiger_open(
'follower',
self.extensionsConfig() + ',create,cache_size=50MB,statistics=(all),disaggregated=(role="follower")')
sess = conn.open_session('')
sess.create(self.uri, 'key_format=i,value_format=S')
self.disagg_advance_checkpoint(conn, self.conn)
return conn, sess
def advance_follower(self, conn):
self.leader_checkpoint(20)
self.disagg_advance_checkpoint(conn, self.conn)
def evict_range(self, sess, start, stop, step=1):
evict_cur = sess.open_cursor(self.uri, None, 'debug=(release_evict)')
sess.begin_transaction('read_timestamp=' + self.timestamp_str(10))
for i in range(start, stop + 1, step):
evict_cur.set_key(i)
evict_cur.search()
evict_cur.reset()
evict_cur.close()
sess.rollback_transaction()
def search_at(self, sess, key, ts):
cur = sess.open_cursor(self.uri)
txn_cfg = ('read_timestamp=' + self.timestamp_str(ts))
sess.begin_transaction(txn_cfg)
cur.set_key(key)
ret = cur.search()
val = cur.get_value() if ret == 0 else None
sess.rollback_transaction()
cur.close()
return ret, val
def test_no_dirty_on_read(self):
# Reading fast-truncated pages on the follower must never dirty them. Verifies this holds
# across a full load-evict-reload cycle for both single and bulk page reads.
self.setup_leader()
self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20)
conn, sess = self.open_follower()
sample = list(range(self.trunc_start, self.trunc_stop + 1, 10))
dirty_before = self.get_stat(conn, stat.conn.cache_pages_dirty)
# Initial read: no deleted key found, no pages dirtied.
cur = sess.open_cursor(self.uri)
sess.begin_transaction('read_timestamp=' + self.timestamp_str(25))
for key in sample:
cur.set_key(key)
self.assertEqual(cur.search(), wiredtiger.WT_NOTFOUND)
sess.rollback_transaction()
cur.close()
self.assertEqual(self.get_stat(conn, stat.conn.cache_pages_dirty), dirty_before)
self.evict_range(sess, self.trunc_start, self.trunc_stop)
# After eviction: reloaded pages still show no keys and remain clean.
cur = sess.open_cursor(self.uri)
sess.begin_transaction('read_timestamp=' + self.timestamp_str(25))
for key in sample:
cur.set_key(key)
self.assertEqual(cur.search(), wiredtiger.WT_NOTFOUND)
sess.rollback_transaction()
cur.close()
self.assertEqual(self.get_stat(conn, stat.conn.cache_pages_dirty), dirty_before)
self.evict_range(sess, self.trunc_start, self.trunc_stop)
sess.close()
conn.close()
def test_page_split_with_ingest_writes(self):
# With small pages the truncated range spans many leaf pages. After ingest writes
# restore a subset of truncated keys, those keys must be visible while the rest
# remain deleted.
self.setup_leader(',leaf_page_max=4096')
self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20)
conn, sess = self.open_follower()
sample = list(range(self.trunc_start, self.trunc_stop + 1, 10))
dirty_before = self.get_stat(conn, stat.conn.cache_pages_dirty)
# No pages dirtied by reading the truncated range across many small leaf pages.
cur = sess.open_cursor(self.uri)
sess.begin_transaction('read_timestamp=' + self.timestamp_str(25))
for key in sample:
cur.set_key(key)
self.assertEqual(cur.search(), wiredtiger.WT_NOTFOUND)
sess.rollback_transaction()
cur.close()
self.assertEqual(self.get_stat(conn, stat.conn.cache_pages_dirty), dirty_before)
self.evict_range(sess, self.trunc_start, self.trunc_stop)
self.advance_follower(conn)
# Write a subset of truncated keys to ingest.
ingest_keys = set(sample[::3])
cur = sess.open_cursor(self.uri)
sess.begin_transaction()
for key in ingest_keys:
cur.set_key(key)
cur.set_value(f'ingest_{key}')
self.assertEqual(cur.insert(), 0)
sess.commit_transaction('commit_timestamp=' + self.timestamp_str(30))
cur.close()
# At ts=30: ingest keys are found; unwritten truncated keys remain deleted.
cur = sess.open_cursor(self.uri)
sess.begin_transaction('read_timestamp=' + self.timestamp_str(30))
for key in ingest_keys:
cur.set_key(key)
self.assertEqual(cur.search(), 0)
self.assertEqual(cur.get_value(), f'ingest_{key}')
for key in set(sample) - ingest_keys:
cur.set_key(key)
self.assertEqual(cur.search(), wiredtiger.WT_NOTFOUND)
sess.rollback_transaction()
cur.close()
# At ts=25 (before the ingest write), all truncated keys must still be deleted.
cur = sess.open_cursor(self.uri)
sess.begin_transaction('read_timestamp=' + self.timestamp_str(25))
for key in ingest_keys:
cur.set_key(key)
self.assertEqual(cur.search(), wiredtiger.WT_NOTFOUND)
sess.rollback_transaction()
cur.close()
sess.close()
conn.close()
def test_state_preserved_on_reopen(self):
# Closing and reopening the follower connection must not lose the deleted state.
# The same checkpoint must still show truncated keys as WT_NOTFOUND after a cold start.
self.setup_leader()
self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20)
truncated_keys = [self.trunc_start, self.trunc_start + 100, self.trunc_stop]
non_truncated_keys = [1, self.trunc_start - 1, self.trunc_stop + 1, self.nrows]
def verify(sess):
for key in truncated_keys:
ret, _ = self.search_at(sess, key, 25)
self.assertEqual(ret, wiredtiger.WT_NOTFOUND)
for key in non_truncated_keys:
ret, val = self.search_at(sess, key, 25)
self.assertEqual(ret, 0)
self.assertEqual(val, self.value)
for _ in range(2):
conn, sess = self.open_follower()
verify(sess)
sess.close()
conn.close()
def test_instantiation_not_globally_visible(self):
# Reading a deleted page at a timestamp before the truncation forces it to load from disk.
# The key must be found, cache_read_deleted must increment, and the page must not be dirtied.
self.setup_leader()
self.truncate_and_checkpoint(self.trunc_start, self.trunc_stop, 20)
conn, sess = self.open_follower()
dirty_before = self.get_stat(conn, stat.conn.cache_pages_dirty)
rd_before = self.get_stat(conn, stat.conn.cache_read_deleted)
# Pre-truncation read forces page load: key found, read_deleted increments, page stays clean.
ret, val = self.search_at(sess, self.trunc_start + 100, 10)
self.assertEqual(ret, 0)
self.assertEqual(val, self.value)
self.assertGreater(self.get_stat(conn, stat.conn.cache_read_deleted), rd_before)
self.assertEqual(self.get_stat(conn, stat.conn.cache_pages_dirty), dirty_before)
self.evict_range(sess, self.trunc_start + 100, self.trunc_start + 100)
sess.close()
conn.close()
if __name__ == '__main__':
wttest.run()

View File

@ -80,9 +80,10 @@ class test_truncate24(wttest.WiredTigerTestCase):
self.session.commit_transaction('commit_timestamp=' + self.timestamp_str(20))
# Check stats to make sure we fast-deleted at least one page.
stat_cursor = self.session.open_cursor('statistics:', None, None)
fastdelete_pages = stat_cursor[wiredtiger.stat.conn.rec_page_delete_fast][2]
self.assertGreater(fastdelete_pages, 0)
if (wiredtiger.disagg_fast_truncate_build() == 1):
stat_cursor = self.session.open_cursor('statistics:', None, None)
fastdelete_pages = stat_cursor[wiredtiger.stat.conn.rec_page_delete_fast][2]
self.assertGreater(fastdelete_pages, 0)
# Verify the data.
for i in range(1, 100000):

View File

@ -0,0 +1,96 @@
#!/usr/bin/env python3
#
# Public Domain 2014-present MongoDB, Inc.
# Public Domain 2008-2014 WiredTiger, Inc.
#
# This is free and unencumbered software released into the public domain.
#
# Anyone is free to copy, modify, publish, use, compile, sell, or
# distribute this software, either in source code form or as a compiled
# binary, for any purpose, commercial or non-commercial, and by any
# means.
#
# In jurisdictions that recognize copyright laws, the author or authors
# of this software dedicate any and all copyright interest in the
# software to the public domain. We make this dedication for the benefit
# of the public at large and to the detriment of our heirs and
# successors. We intend this dedication to be an overt act of
# relinquishment in perpetuity of all present and future rights to this
# software under copyright law.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
import re, wiredtiger, wttest
from helper_disagg import disagg_test_class, gen_disagg_storages
from wtscenario import make_scenarios
# test_verify_disagg02.py
# Verify that duplicate btree IDs among stable files are detected.
@disagg_test_class
class test_verify_disagg02(wttest.WiredTigerTestCase):
disagg_storages = gen_disagg_storages('test_verify_disagg02', disagg_only=True)
scenarios = make_scenarios(disagg_storages)
conn_config = 'disaggregated=(role="leader")'
conn_config_follower = 'disaggregated=(role="follower")'
table_cfg = 'key_format=S,value_format=S,block_manager=disagg'
uri = 'layered:test_verify_disagg02'
def test_verify_duplicate_btree_ids(self):
"""
Inject a fake stable file entry with a duplicate btree ID into a follower's local
metadata, then verify the layered table. The unique btree IDs check in verify should
detect the duplicate and return an error.
"""
# Create a layered table on the leader with data, then checkpoint.
self.session.create(self.uri, self.table_cfg)
cursor = self.session.open_cursor(self.uri, None, None)
cursor['key'] = 'value'
cursor.close()
self.session.checkpoint()
# Create a follower and advance it to pick up the checkpoint.
conn_follow = self.wiredtiger_open('follower', self.extensionsConfig() + ',create,' +
self.conn_config_follower)
session_follow = conn_follow.open_session('')
self.disagg_advance_checkpoint(conn_follow)
# Read the stable file's config from the follower's metadata to get its btree ID.
md_cursor = session_follow.open_cursor('metadata:', None, None)
md_cursor.set_key('file:test_verify_disagg02.wt_stable')
self.assertEqual(md_cursor.search(), 0)
victim_config = md_cursor.get_value()
md_cursor.close()
# Confirm we got a valid config with an ID.
self.assertRegex(victim_config, r',id=\d+')
# Inject a fake entry with the same btree ID into the follower's local metadata.
raw_cursor = session_follow.open_cursor('file:WiredTiger.wt', None, None)
raw_cursor.set_key('file:fake_duplicate.wt_stable')
raw_cursor.set_value(victim_config)
raw_cursor.insert()
raw_cursor.close()
# Verify the layered table. Our check detects the duplicate btree ID.
self.assertRaisesException(wiredtiger.WiredTigerError,
lambda: session_follow.verify(self.uri), '/WT_ERROR/')
self.ignoreStderrPatternIfExists('metadata corruption')
self.ignoreStderrPatternIfExists('stable table verification failed')
# Remove the fake entry so teardown verification passes.
raw_cursor = session_follow.open_cursor('file:WiredTiger.wt', None, None)
raw_cursor.set_key('file:fake_duplicate.wt_stable')
raw_cursor.remove()
raw_cursor.close()
session_follow.close()
conn_follow.close()