SERVER-120931: Add GraphQL-based Batching and S3 Cache for Deduplication to Merge Queue Metrics Script (#48960)

GitOrigin-RevId: 336f435b787ce52554caf4f20057e4297243a80d
This commit is contained in:
Zack Winter 2026-03-04 13:27:32 -08:00 committed by MongoDB Bot
parent af77af452f
commit daa98e188f
3 changed files with 484 additions and 41 deletions

View File

@ -547,5 +547,9 @@ py_binary(
"opentelemetry-exporter-otlp-proto-http",
group = "testing",
),
dependency(
"boto3",
group = "aws",
),
],
)

View File

@ -1,15 +1,23 @@
import argparse
import json
import os
import subprocess
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from statistics import quantiles
from zoneinfo import ZoneInfo
import requests
# Optional boto3 import for S3 cache storage
try:
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
BOTO3_AVAILABLE = True
except ImportError:
BOTO3_AVAILABLE = False
# Optional OTEL imports for Honeycomb integration
try:
from opentelemetry import trace
@ -23,12 +31,226 @@ except ImportError:
OTEL_AVAILABLE = False
EST = ZoneInfo("America/New_York")
CACHE_FILE = Path.home() / ".github_merge_queue_metrics.json"
# Honeycomb OTEL endpoint
HONEYCOMB_OTEL_ENDPOINT = "https://api.honeycomb.io/v1/traces"
def is_sso_profile_configured(profile):
"""Check if the AWS SSO profile is configured.
Returns True if the profile has SSO configuration, False otherwise.
"""
try:
# Check if the profile has sso_start_url configured
result = subprocess.run(
["aws", "configure", "get", "sso_start_url", "--profile", profile],
capture_output=True,
text=True,
timeout=10,
)
return result.returncode == 0 and result.stdout.strip() != ""
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def configure_sso_profile(profile):
"""Run aws configure sso to set up the SSO profile interactively.
Returns True if configuration was successful, False otherwise.
"""
print(f"AWS SSO profile '{profile}' is not configured.")
print("Running 'aws configure sso' to set up the profile...")
print("You will need to provide:")
print(" - SSO start URL (e.g., https://your-org.awsapps.com/start)")
print(" - SSO region")
print(" - Account and role to use")
print()
try:
result = subprocess.run(
["aws", "configure", "sso", "--profile", profile],
timeout=600, # 10 minutes for interactive configuration
)
return result.returncode == 0
except subprocess.TimeoutExpired:
print("AWS SSO configuration timed out.")
return False
except FileNotFoundError:
print("AWS CLI not found. Please install the AWS CLI.")
return False
def are_aws_credentials_valid(profile):
"""Check if AWS credentials are valid for the given profile.
Returns True if credentials are valid, False otherwise.
"""
try:
result = subprocess.run(
["aws", "sts", "get-caller-identity", "--profile", profile],
capture_output=True,
text=True,
timeout=30,
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def ensure_aws_auth(profile=None):
"""Ensure AWS CLI is authenticated, prompting for interactive login if needed.
If the SSO profile is not configured, runs 'aws configure sso' first.
If credentials are already valid, skips the login step.
Args:
profile: AWS profile name to use for SSO login.
Returns True if authentication is successful, False otherwise.
"""
if not profile:
print("No AWS profile specified. Use --aws-profile to specify an SSO profile.")
return False
try:
# First check if credentials are already valid
if are_aws_credentials_valid(profile):
print(f"AWS credentials for profile '{profile}' are valid.")
return True
# Check if the SSO profile is configured
if not is_sso_profile_configured(profile):
if not configure_sso_profile(profile):
print("AWS SSO profile configuration failed.")
return False
# Now try to login with the configured profile
print(f"AWS credentials expired or missing. Logging in with profile '{profile}'...")
result = subprocess.run(
["aws", "sso", "login", "--profile", profile],
timeout=300, # 5 minutes for interactive login
)
return result.returncode == 0
except subprocess.TimeoutExpired:
print("AWS authentication timed out.")
return False
except FileNotFoundError:
print("AWS CLI not found. Please install the AWS CLI.")
return False
def get_s3_client(region="us-east-1", profile=None, aws_key=None, aws_secret=None):
"""Create an S3 client using either explicit credentials or profile.
Args:
region: AWS region.
profile: AWS profile name to use (ignored if aws_key and aws_secret are provided).
aws_key: AWS access key ID (optional, overrides profile).
aws_secret: AWS secret access key (optional, overrides profile).
Returns:
boto3 S3 client or None if boto3 is not available.
"""
if not BOTO3_AVAILABLE:
return None
if aws_key and aws_secret:
# Use explicit credentials
session = boto3.session.Session(
aws_access_key_id=aws_key,
aws_secret_access_key=aws_secret,
)
else:
# Use profile-based authentication
session = boto3.session.Session(profile_name=profile)
return session.client("s3", region_name=region)
def download_cache_from_s3(
s3_bucket, s3_key, region="us-east-1", profile=None, aws_key=None, aws_secret=None
):
"""Download cache file from S3.
Args:
s3_bucket: S3 bucket name.
s3_key: S3 key for the cache file.
region: AWS region.
profile: AWS profile name to use (ignored if aws_key and aws_secret are provided).
aws_key: AWS access key ID (optional, overrides profile).
aws_secret: AWS secret access key (optional, overrides profile).
Returns the cache dict.
Raises:
RuntimeError: If the cache cannot be downloaded.
"""
if not BOTO3_AVAILABLE:
raise RuntimeError("boto3 is not available. Install it to use S3 cache storage.")
s3_client = get_s3_client(region, profile, aws_key, aws_secret)
if s3_client is None:
raise RuntimeError("Failed to create S3 client.")
try:
response = s3_client.get_object(Bucket=s3_bucket, Key=s3_key)
content = response["Body"].read().decode("utf-8")
cache = json.loads(content)
print(f"Downloaded cache from s3://{s3_bucket}/{s3_key}")
return cache
except NoCredentialsError as e:
raise RuntimeError("No AWS credentials available for S3 download.") from e
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
if error_code == "NoSuchKey":
raise RuntimeError(f"Cache file not found in S3: s3://{s3_bucket}/{s3_key}") from e
elif error_code == "AccessDenied":
raise RuntimeError(f"Access denied to S3 cache: s3://{s3_bucket}/{s3_key}") from e
else:
raise RuntimeError(f"Error downloading cache from S3: {e}") from e
def upload_cache_to_s3(
cache, s3_bucket, s3_key, region="us-east-1", profile=None, aws_key=None, aws_secret=None
):
"""Upload cache file to S3.
Args:
cache: Cache dict to upload.
s3_bucket: S3 bucket name.
s3_key: S3 key for the cache file.
region: AWS region.
profile: AWS profile name to use (ignored if aws_key and aws_secret are provided).
aws_key: AWS access key ID (optional, overrides profile).
aws_secret: AWS secret access key (optional, overrides profile).
Returns True if successful, False otherwise.
"""
if not BOTO3_AVAILABLE:
print("boto3 is not available. Install it to use S3 cache storage.")
return False
try:
s3_client = get_s3_client(region, profile, aws_key, aws_secret)
if s3_client is None:
return False
cache_json = json.dumps(cache, indent=2)
s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=cache_json.encode("utf-8"),
ContentType="application/json",
)
print(f"Uploaded cache to s3://{s3_bucket}/{s3_key}")
return True
except NoCredentialsError:
print("No AWS credentials available for S3 upload.")
return False
except ClientError as e:
print(f"Error uploading cache to S3: {e}")
return False
def setup_otel_tracer(honeycomb_api_key, honeycomb_dataset):
"""Set up OpenTelemetry tracer with Honeycomb HTTP exporter."""
if not OTEL_AVAILABLE:
@ -91,6 +313,7 @@ def export_pr_metrics_to_honeycomb(tracer, results, repo_owner, repo_name):
span.set_attribute("pr.merged_day_of_week", merged_at_est.strftime("%A"))
span.set_attribute("pr.merged_hour", merged_at_est.hour)
span.set_attribute("pr.is_weekend", merged_at_est.weekday() >= 5)
span.set_attribute("pr.metric_version", 2)
# End the span at the actual merge time
span.end(end_time=int(merged_at.timestamp() * 1e9))
@ -106,23 +329,6 @@ def shutdown_otel():
provider.shutdown()
def load_cache():
"""Load the cache from disk."""
if CACHE_FILE.exists():
try:
with open(CACHE_FILE, "r") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
return {}
def save_cache(cache):
"""Save the cache to disk."""
with open(CACHE_FILE, "w") as f:
json.dump(cache, f, indent=2)
def get_cache_key(repo_owner, repo_name, pull_number):
"""Generate a cache key for a PR."""
return f"{repo_owner}/{repo_name}/{pull_number}"
@ -206,6 +412,170 @@ def fetch_pull_request_metrics(pull_number, repo_owner, repo_name, headers):
return ("error", pull_number, str(e))
# GraphQL query template for batch fetching PRs with timeline events
GRAPHQL_BATCH_QUERY = """
query($owner: String!, $repo: String!) {
repository(owner: $owner, name: $repo) {
%s
}
}
"""
GRAPHQL_PR_FRAGMENT = """
pr%d: pullRequest(number: %d) {
number
title
baseRefName
mergedAt
timelineItems(first: 50, itemTypes: [ADDED_TO_MERGE_QUEUE_EVENT, MERGED_EVENT, REMOVED_FROM_MERGE_QUEUE_EVENT]) {
nodes {
__typename
... on AddedToMergeQueueEvent { createdAt }
... on MergedEvent { createdAt }
... on RemovedFromMergeQueueEvent { createdAt }
}
}
}
"""
GRAPHQL_ENDPOINT = "https://api.github.com/graphql"
def fetch_prs_batch_graphql(pr_numbers, repo_owner, repo_name, headers, batch_size=50):
"""Fetch multiple PRs in batches using GraphQL.
Args:
pr_numbers: List of PR numbers to fetch
repo_owner: Repository owner
repo_name: Repository name
headers: HTTP headers including authorization
batch_size: Number of PRs per GraphQL request (max ~100 due to query complexity)
Returns:
List of result tuples, same format as fetch_pull_request_metrics
"""
results = []
total_batches = (len(pr_numbers) + batch_size - 1) // batch_size
for batch_idx in range(total_batches):
start_idx = batch_idx * batch_size
end_idx = min(start_idx + batch_size, len(pr_numbers))
batch = pr_numbers[start_idx:end_idx]
# Build the GraphQL query with all PRs in this batch
pr_fragments = "\n".join(GRAPHQL_PR_FRAGMENT % (pr_num, pr_num) for pr_num in batch)
query = GRAPHQL_BATCH_QUERY % pr_fragments
try:
response = requests.post(
GRAPHQL_ENDPOINT,
headers=headers,
json={"query": query, "variables": {"owner": repo_owner, "repo": repo_name}},
timeout=60,
)
response.raise_for_status()
data = response.json()
if "errors" in data:
print(f"GraphQL errors in batch {batch_idx + 1}: {data['errors']}")
# Fall back to individual fetches for this batch
for pr_num in batch:
results.append(("error", pr_num, f"GraphQL error: {data['errors']}"))
continue
repo_data = data.get("data", {}).get("repository", {})
batch_results = _parse_graphql_batch_response(repo_data, batch)
results.extend(batch_results)
print(
f"Fetched batch {batch_idx + 1}/{total_batches} " f"({len(batch)} PRs in 1 request)"
)
except Exception as e:
print(f"Error fetching batch {batch_idx + 1}: {e}")
for pr_num in batch:
results.append(("error", pr_num, str(e)))
return results
def _parse_graphql_batch_response(repo_data, pr_numbers):
"""Parse GraphQL response for a batch of PRs.
Args:
repo_data: The 'repository' object from the GraphQL response
pr_numbers: List of PR numbers in this batch
Returns:
List of result tuples
"""
results = []
for pr_num in pr_numbers:
pr_key = f"pr{pr_num}"
pr_data = repo_data.get(pr_key)
if pr_data is None:
# PR doesn't exist or was deleted
results.append(None)
continue
try:
result = _parse_single_pr_graphql(pr_data)
results.append(result)
except Exception as e:
results.append(("error", pr_num, str(e)))
return results
def _parse_single_pr_graphql(pr_data):
"""Parse a single PR from GraphQL response.
Args:
pr_data: The PR object from GraphQL response
Returns:
Result tuple matching fetch_pull_request_metrics format
"""
pr_number = pr_data["number"]
pr_title = pr_data.get("title", "")
base_ref = pr_data.get("baseRefName", "")
# Check if PR targets master
if base_ref != "master":
return ("skip", pr_number, "does not target master branch")
timeline_items = pr_data.get("timelineItems", {}).get("nodes", [])
# Extract events by type
added_events = [
item for item in timeline_items if item.get("__typename") == "AddedToMergeQueueEvent"
]
merged_events = [item for item in timeline_items if item.get("__typename") == "MergedEvent"]
removed_events = [
item for item in timeline_items if item.get("__typename") == "RemovedFromMergeQueueEvent"
]
start_event = added_events[-1] if added_events else None
end_event = merged_events[-1] if merged_events else None
if end_event is not None and start_event is not None:
started_at = parse_iso_timestamp(start_event["createdAt"])
merged_at = parse_iso_timestamp(end_event["createdAt"])
time_difference = merged_at - started_at
return ("result", pr_number, started_at, merged_at, time_difference, pr_title)
# Check if PR was added to merge queue but removed (not merged)
if start_event is not None and end_event is None and removed_events:
removed_event = removed_events[-1]
added_at = parse_iso_timestamp(start_event["createdAt"])
removed_at = parse_iso_timestamp(removed_event["createdAt"])
return ("removed", pr_number, added_at, removed_at, pr_title)
return None
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Get pull request merge time")
@ -237,6 +607,32 @@ def main():
default=os.environ.get("HONEYCOMB_DATASET", "merge-queue-metrics"),
help="Honeycomb dataset name (default: HONEYCOMB_DATASET env var or 'merge-queue-metrics')",
)
parser.add_argument(
"--s3-cache-bucket",
default="mdb-build-private",
help="S3 bucket for shared cache storage (default: S3_CACHE_BUCKET env var)",
)
parser.add_argument(
"--s3-cache-key",
default=os.environ.get("S3_CACHE_KEY", "data_store/merge_queue/cache.json"),
help="S3 key for cache file (default: data_store/merge_queue/cache.json)",
)
parser.add_argument(
"--aws-region",
default=os.environ.get("AWS_REGION", "us-east-1"),
help="AWS region for S3 (default: us-east-1)",
)
parser.add_argument(
"--aws-profile",
default=os.environ.get("AWS_PROFILE", "mongodb-dev"),
help="AWS profile name for SSO authentication (default: mongodb-dev)",
)
parser.add_argument(
"--graphql-batch-size",
type=int,
default=50,
help="Number of PRs to fetch per GraphQL request (default: 50, max ~100)",
)
args = parser.parse_args()
if not args.token:
@ -260,12 +656,37 @@ def main():
latest_pr = get_latest_pull_request_number(repo_owner, repo_name, headers)
print(f"Latest PR: {latest_pr}")
# Load cache
cache = load_cache()
# Load cache from S3
# Check for explicit AWS credentials from environment variables
aws_key_build = os.environ.get("aws_key_build")
aws_secret_build = os.environ.get("aws_secret_build")
use_explicit_aws_creds = bool(aws_key_build and aws_secret_build)
if use_explicit_aws_creds:
# Use explicit AWS credentials from environment variables
print("Using AWS credentials from aws_key_build/aws_secret_build environment variables")
cache = download_cache_from_s3(
args.s3_cache_bucket,
args.s3_cache_key,
args.aws_region,
aws_key=aws_key_build,
aws_secret=aws_secret_build,
)
elif ensure_aws_auth(profile=args.aws_profile):
# Fall back to profile-based authentication
cache = download_cache_from_s3(
args.s3_cache_bucket, args.s3_cache_key, args.aws_region, profile=args.aws_profile
)
else:
raise RuntimeError(
"AWS authentication failed. Either set aws_key_build and aws_secret_build "
"environment variables, or ensure AWS CLI is configured with --aws-profile."
)
cache_hits = 0
cache_misses = 0
results = []
results = [] # All results (cached + new) for statistics
new_results = [] # Only newly fetched results for Honeycomb export
removed_results = []
pull_numbers = range(latest_pr - args.count, latest_pr + 1)
@ -293,16 +714,14 @@ def main():
f"PRs to fetch: {len(prs_to_fetch)}"
)
with ThreadPoolExecutor(max_workers=args.max_workers) as executor:
futures = {
executor.submit(
fetch_pull_request_metrics, pull_number, repo_owner, repo_name, headers
): pull_number
for pull_number in prs_to_fetch
}
# Use GraphQL batch fetching for efficiency (~99% fewer API requests)
if prs_to_fetch:
print(f"Using GraphQL batch fetching (batch size: {args.graphql_batch_size})")
batch_results = fetch_prs_batch_graphql(
prs_to_fetch, repo_owner, repo_name, headers, batch_size=args.graphql_batch_size
)
for future in as_completed(futures):
result = future.result()
for result in batch_results:
if result is None:
continue
if result[0] == "skip":
@ -314,7 +733,9 @@ def main():
print(f"Error fetching PR {result[1]}: {result[2]}")
elif result[0] == "result":
_, pull_number, started_at, merged_at, time_difference, pr_title = result
results.append((pull_number, started_at, merged_at, time_difference))
pr_result = (pull_number, started_at, merged_at, time_difference)
results.append(pr_result)
new_results.append(pr_result) # Track for Honeycomb export (dedup)
print(f"{pull_number}, {started_at}, {time_difference}")
# Cache the result
cache_key = get_cache_key(repo_owner, repo_name, pull_number)
@ -327,8 +748,24 @@ def main():
_, pull_number, added_at, removed_at, pr_title = result
removed_results.append((pull_number, added_at, removed_at, pr_title))
# Save cache
save_cache(cache)
# Save cache to S3
if use_explicit_aws_creds:
upload_cache_to_s3(
cache,
args.s3_cache_bucket,
args.s3_cache_key,
args.aws_region,
aws_key=aws_key_build,
aws_secret=aws_secret_build,
)
else:
upload_cache_to_s3(
cache,
args.s3_cache_bucket,
args.s3_cache_key,
args.aws_region,
profile=args.aws_profile,
)
print(f"Cache updated: {cache_misses} new entries added")
# Sort by merge date
@ -418,10 +855,12 @@ def main():
else:
print("No PRs were removed from the merge queue in this range.")
# Export to Honeycomb if tracer is configured
if tracer and results:
export_pr_metrics_to_honeycomb(tracer, results, repo_owner, repo_name)
# Export only NEW results to Honeycomb (skip cached PRs to avoid duplicates)
if tracer and new_results:
export_pr_metrics_to_honeycomb(tracer, new_results, repo_owner, repo_name)
shutdown_otel()
elif tracer and not new_results:
print("\nNo new PRs to export to Honeycomb (all were cached).")
if __name__ == "__main__":

View File

@ -2402,8 +2402,8 @@ tasks:
- func: "get engflow creds"
- func: "bazel run"
vars:
target: //buildscripts:github_merge_queue_metrics -- --owner=10gen --repo=mms
env: MERGE_QUEUE_ANALYTICS_GITHUB_TOKEN=${MERGE_QUEUE_ANALYTICS_GITHUB_TOKEN} HONEYCOMB_API_KEY=${MERGE_QUEUE_ANALYTICS_HONEYCOMB_API_KEY} HONEYCOMB_DATASET=Zack-Merge-Queue
target: //buildscripts:github_merge_queue_metrics -- --owner=10gen --repo=mongo
env: MERGE_QUEUE_ANALYTICS_GITHUB_TOKEN=${MERGE_QUEUE_ANALYTICS_GITHUB_TOKEN} HONEYCOMB_API_KEY=${MERGE_QUEUE_ANALYTICS_HONEYCOMB_API_KEY} HONEYCOMB_DATASET=Zack-Merge-Queue aws_key_build=${aws_key_build} aws_secret_build=${aws_secret_build}
- name: merge_queue_metrics_mms
tags: ["assigned_to_jira_team_devprod_correctness", "auxiliary"]
@ -2421,4 +2421,4 @@ tasks:
- func: "bazel run"
vars:
target: //buildscripts:github_merge_queue_metrics -- --owner=10gen --repo=mms
env: MERGE_QUEUE_ANALYTICS_GITHUB_TOKEN=${MERGE_QUEUE_ANALYTICS_GITHUB_TOKEN} HONEYCOMB_API_KEY=${MERGE_QUEUE_ANALYTICS_HONEYCOMB_API_KEY} HONEYCOMB_DATASET=Zack-Merge-Queue
env: MERGE_QUEUE_ANALYTICS_GITHUB_TOKEN=${MERGE_QUEUE_ANALYTICS_GITHUB_TOKEN} HONEYCOMB_API_KEY=${MERGE_QUEUE_ANALYTICS_HONEYCOMB_API_KEY} HONEYCOMB_DATASET=Zack-Merge-Queue aws_key_build=${aws_key_build} aws_secret_build=${aws_secret_build}