SERVER-126154 Use s3.put with pre-signed visibility for core dumps (#53434)
GitOrigin-RevId: a9167f37dd5c732170f51d224ad15df13847dee8
This commit is contained in:
parent
a987e53fda
commit
721e9e6c5a
@ -2,7 +2,6 @@ import argparse
|
||||
import concurrent.futures
|
||||
import glob
|
||||
import gzip
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
@ -10,25 +9,8 @@ import sys
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import boto3
|
||||
import requests
|
||||
|
||||
from buildscripts.util.read_config import read_config_file
|
||||
|
||||
|
||||
def process_file(
|
||||
file: str,
|
||||
aws_secret: str,
|
||||
aws_key: str,
|
||||
project: str,
|
||||
variant: str,
|
||||
version_id: str,
|
||||
revision: int,
|
||||
task_name: str,
|
||||
file_number: int,
|
||||
upload_name: str,
|
||||
start_time: int,
|
||||
) -> Optional[dict[str, str]]:
|
||||
def process_file(file: str, start_time: float) -> Optional[str]:
|
||||
print(f"{file} started compressing at {time.time() - start_time}")
|
||||
compressed_file = f"{file}.gz"
|
||||
with open(file, "rb") as f_in:
|
||||
@ -36,64 +18,12 @@ def process_file(
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
|
||||
print(f"{file} finished compressing at {time.time() - start_time}")
|
||||
|
||||
s3_client = boto3.client("s3", aws_access_key_id=aws_key, aws_secret_access_key=aws_secret)
|
||||
basename = os.path.basename(compressed_file)
|
||||
object_path = (
|
||||
f"{project}/{variant}/{version_id}/{task_name}-{revision}-{file_number}/{basename}"
|
||||
)
|
||||
extra_args = {"ContentType": "application/gzip", "ACL": "public-read"}
|
||||
try:
|
||||
s3_client.upload_file(compressed_file, "mciuploads", object_path, ExtraArgs=extra_args)
|
||||
except Exception as ex:
|
||||
print(f"ERROR: failed to upload file to s3 {file}", file=sys.stderr)
|
||||
print(ex, file=sys.stderr)
|
||||
return None
|
||||
|
||||
url = f"https://mciuploads.s3.amazonaws.com/{object_path}"
|
||||
name = f"{upload_name} {file_number} ({basename})"
|
||||
|
||||
# Sanity check to ensure the url exists
|
||||
r = requests.head(url)
|
||||
if r.status_code != 200:
|
||||
print(
|
||||
f"ERROR: Could not verify that {compressed_file} was uploaded to {url}", file=sys.stderr
|
||||
)
|
||||
return None
|
||||
|
||||
print(f"{compressed_file} uploaded at {time.time() - start_time} to {url}")
|
||||
|
||||
# The information needed for attach.artifacts
|
||||
task_artifact = {
|
||||
"name": name,
|
||||
"link": url,
|
||||
"visibility": "public",
|
||||
}
|
||||
|
||||
return task_artifact
|
||||
return compressed_file
|
||||
|
||||
|
||||
def main(output_file: str, patterns: list[str], display_name: str, expansions_file: str) -> int:
|
||||
if not output_file.endswith(".json"):
|
||||
print("WARN: filename input should end with `.json`", file=sys.stderr)
|
||||
|
||||
expansions = read_config_file(expansions_file)
|
||||
|
||||
aws_access_key = expansions.get("aws_key_new", None)
|
||||
aws_secret_key = expansions.get("aws_secret", None)
|
||||
|
||||
if not aws_access_key or not aws_secret_key:
|
||||
print("ERROR: AWS credentials not found in expansions", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
project = expansions.get("project")
|
||||
build_variant = expansions.get("build_variant")
|
||||
version_id = expansions.get("version_id")
|
||||
revision = expansions.get("revision")
|
||||
task_name = expansions.get("task_name")
|
||||
|
||||
def main(patterns: list[str]) -> int:
|
||||
start_time = time.time()
|
||||
files = set()
|
||||
files: set[str] = set()
|
||||
|
||||
for pattern in patterns:
|
||||
glob_results = glob.glob(pattern)
|
||||
@ -123,43 +53,24 @@ def main(output_file: str, patterns: list[str], display_name: str, expansions_fi
|
||||
except Exception as e:
|
||||
print(f"ERROR: Could not resolve symlink {path}: {e}", file=sys.stderr)
|
||||
|
||||
files = list(files)
|
||||
file_list = list(files)
|
||||
|
||||
if not files:
|
||||
if not file_list:
|
||||
print("No files found for the input, exiting early")
|
||||
return 0
|
||||
|
||||
uploads = []
|
||||
cores = os.cpu_count()
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
|
||||
futures = []
|
||||
for i, path in enumerate(files):
|
||||
file_number = i + 1
|
||||
futures.append(
|
||||
executor.submit(
|
||||
process_file,
|
||||
file=path,
|
||||
aws_secret=aws_secret_key,
|
||||
aws_key=aws_access_key,
|
||||
project=project,
|
||||
variant=build_variant,
|
||||
version_id=version_id,
|
||||
revision=revision,
|
||||
task_name=task_name,
|
||||
file_number=file_number,
|
||||
upload_name=display_name,
|
||||
start_time=start_time,
|
||||
)
|
||||
futures = [
|
||||
executor.submit(
|
||||
process_file,
|
||||
file=path,
|
||||
start_time=start_time,
|
||||
)
|
||||
|
||||
for path in file_list
|
||||
]
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
result = future.result()
|
||||
if result:
|
||||
uploads.append(result)
|
||||
|
||||
if uploads:
|
||||
with open(output_file, "w") as file:
|
||||
json.dump(uploads, file, indent=4)
|
||||
future.result()
|
||||
|
||||
return 0
|
||||
|
||||
@ -167,15 +78,10 @@ def main(output_file: str, patterns: list[str], display_name: str, expansions_fi
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="FastArchiver",
|
||||
description="This improves archiving times of a large amount of big files in evergreen "
|
||||
"by compressing and uploading them asynchronously. "
|
||||
"This also uses pigz, which is a multithreaded implementation of gzip, "
|
||||
"to improve gzipping times.",
|
||||
description="Compresses files in parallel using gzip for subsequent upload via Evergreen's "
|
||||
"s3.put command.",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--output-file", "-f", help="Name of output attach.artifacts file.", required=True
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pattern",
|
||||
"-p",
|
||||
@ -185,15 +91,5 @@ if __name__ == "__main__":
|
||||
default=[],
|
||||
required=True,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--display-name", "-n", help="The display name of the file in evergreen", required=True
|
||||
)
|
||||
parser.add_argument(
|
||||
"--expansions-file",
|
||||
"-e",
|
||||
help="Expansions file to read task info and aws credentials from.",
|
||||
default="../expansions.yml",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
exit(main(args.output_file, args.patterns, args.display_name, args.expansions_file))
|
||||
exit(main(args.patterns))
|
||||
|
||||
@ -5,6 +5,7 @@ import glob
|
||||
import gzip
|
||||
import json
|
||||
import os
|
||||
import posixpath
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
@ -23,10 +24,7 @@ from retry import retry
|
||||
|
||||
from buildscripts.create_rbe_sysroot import create_rbe_sysroot
|
||||
from buildscripts.resmokelib.hang_analyzer.dumper import Dumper
|
||||
from buildscripts.resmokelib.setup_multiversion.download import (
|
||||
DownloadError,
|
||||
download_from_s3_with_requests,
|
||||
)
|
||||
from buildscripts.resmokelib.setup_multiversion.download import DownloadError
|
||||
from buildscripts.resmokelib.setup_multiversion.setup_multiversion import (
|
||||
SetupMultiversion,
|
||||
_DownloadOptions,
|
||||
@ -37,6 +35,10 @@ from buildscripts.resmokelib.utils.filesystem import build_hygienic_bin_path
|
||||
from buildscripts.resmokelib.utils.otel_thread_pool_executor import OtelThreadPoolExecutor
|
||||
from buildscripts.resmokelib.utils.otel_utils import get_default_current_span
|
||||
from buildscripts.resmokelib.utils.runtime_recorder import compare_start_time
|
||||
from buildscripts.util.download_utils import (
|
||||
download_from_s3_with_requests,
|
||||
extract_s3_bucket_key,
|
||||
)
|
||||
from evergreen.task import Artifact, Task
|
||||
|
||||
_DEBUG_FILE_BASE_NAMES = ["mongo", "mongod", "mongos"]
|
||||
@ -199,7 +201,8 @@ def download_core_dumps(
|
||||
os.makedirs(core_dumps_dir, exist_ok=True)
|
||||
current_span = get_default_current_span()
|
||||
for artifact in core_dump_artifacts:
|
||||
file_name = artifact.url.split("/")[-1]
|
||||
_, key = extract_s3_bucket_key(artifact.url)
|
||||
file_name = posixpath.basename(key)
|
||||
extracted_name, _ = os.path.splitext(file_name)
|
||||
extract_path = os.path.join(core_dumps_dir, extracted_name)
|
||||
|
||||
@ -240,7 +243,7 @@ def download_core_dumps(
|
||||
root_logger.info(f"Downloading core dump: {file_name}")
|
||||
if os.path.exists(file_name):
|
||||
os.remove(file_name)
|
||||
urllib.request.urlretrieve(artifact.url, file_name)
|
||||
download_from_s3_with_requests(artifact.url, file_name, raise_on_error=True)
|
||||
root_logger.info(f"Extracting core dump: {file_name}")
|
||||
if os.path.exists(extract_path):
|
||||
os.remove(extract_path)
|
||||
|
||||
@ -237,7 +237,7 @@ class ResmokeCoreAnalysisTaskGenerator(CoreAnalysisTaskGenerator):
|
||||
dumpers = dumper.get_dumpers(None, None)
|
||||
|
||||
for artifact in task_info.artifacts:
|
||||
regex = re.search(r"Core Dump [0-9]+ \((.*)\.gz\)", artifact.name)
|
||||
regex = re.search(r"Core Dump (.*?)\.gz", artifact.name)
|
||||
if not regex:
|
||||
continue
|
||||
|
||||
|
||||
@ -367,9 +367,9 @@ class TestResmokeCoreAnalysisTaskGenerator(unittest.TestCase):
|
||||
|
||||
# Mock task artifacts
|
||||
mock_artifact1 = MagicMock()
|
||||
mock_artifact1.name = "Core Dump 1 (dump_mongod.12345.core.gz)"
|
||||
mock_artifact1.name = "Core Dump dump_mongod.12345.core.gz"
|
||||
mock_artifact2 = MagicMock()
|
||||
mock_artifact2.name = "Core Dump 2 (dump_mongos.67890.core.gz)"
|
||||
mock_artifact2.name = "Core Dump dump_mongos.67890.core.gz"
|
||||
|
||||
mock_task = MagicMock()
|
||||
mock_task.artifacts = [mock_artifact1, mock_artifact2]
|
||||
|
||||
Loading…
Reference in New Issue
Block a user