-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgithub_pr_report.py
More file actions
executable file
·247 lines (198 loc) · 9.48 KB
/
github_pr_report.py
File metadata and controls
executable file
·247 lines (198 loc) · 9.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
#!/usr/bin/env python3
"""
GitHub PR Report Generator
This script queries GitHub repositories for PR information and generates
a report with open non-draft PR counts and detailed PR summaries.
"""
import os
import sys
import json
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Any
import argparse
import time
class GitHubPRReporter:
def __init__(self, token: str):
self.token = token
self.headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
self.base_url = 'https://api.github.com'
self.session = requests.Session()
self.session.headers.update(self.headers)
def get_repo_prs(self, repo: str, state: str = 'open', cutoff_date: datetime = None) -> List[Dict[str, Any]]:
"""Get PRs for a repository with optional date filtering."""
url = f"{self.base_url}/repos/{repo}/pulls"
params = {'state': state, 'per_page': 100, 'sort': 'updated', 'direction': 'desc'}
all_prs = []
page = 1
max_pages = 50 if state == 'closed' else 100 # Limit pages for closed PRs
print(f"Fetching {state} PRs for {repo}...", end="", flush=True)
while page <= max_pages:
params['page'] = page
try:
response = self.session.get(url, params=params, timeout=30)
if response.status_code != 200:
print(f"\nError fetching PRs for {repo}: {response.status_code}")
if response.status_code == 403:
print("Rate limit exceeded. Waiting...")
time.sleep(60)
continue
break
prs = response.json()
if not prs:
break
# If we have a cutoff date, check if we've gone past it
if cutoff_date and state == 'closed':
oldest_pr_date = datetime.strptime(prs[-1]['updated_at'], '%Y-%m-%dT%H:%M:%SZ')
if oldest_pr_date < cutoff_date:
# Add only PRs that are newer than cutoff
filtered_prs = [pr for pr in prs if datetime.strptime(pr['updated_at'], '%Y-%m-%dT%H:%M:%SZ') >= cutoff_date]
all_prs.extend(filtered_prs)
break
all_prs.extend(prs)
page += 1
if page % 5 == 0:
print(".", end="", flush=True)
except requests.exceptions.Timeout:
print(f"\nTimeout fetching page {page} for {repo}")
break
except requests.exceptions.RequestException as e:
print(f"\nRequest error for {repo}: {e}")
break
print(f" Found {len(all_prs)} PRs")
return all_prs
def count_open_non_draft_prs(self, repo: str) -> int:
"""Count open non-draft PRs for a repository."""
prs = self.get_repo_prs(repo, 'open')
return len([pr for pr in prs if not pr.get('draft', False)])
def get_pr_summary(self, repo: str, months: int = 3) -> List[Dict[str, Any]]:
"""Get PR summary for PRs modified in the last N months (open PRs only)."""
cutoff_date = datetime.now() - timedelta(days=months * 30)
# Get only open PRs
open_prs = self.get_repo_prs(repo, 'open')
recent_prs = []
for pr in open_prs:
created_date = datetime.strptime(pr['created_at'], '%Y-%m-%dT%H:%M:%SZ')
updated_date = datetime.strptime(pr['updated_at'], '%Y-%m-%dT%H:%M:%SZ')
# Include PRs that were modified in the last N months (regardless of draft status)
if updated_date >= cutoff_date:
status = "draft" if pr.get('draft', False) else "ready for review"
recent_prs.append({
'repo': repo,
'number': pr['number'],
'title': pr['title'],
'author': pr['user']['login'],
'created_date': created_date.strftime('%Y-%m-%d'),
'updated_date': updated_date.strftime('%Y-%m-%d'),
'status': status,
'url': pr['html_url']
})
return sorted(recent_prs, key=lambda x: x['updated_date'], reverse=True)
def generate_report(self, repos: List[str]) -> str:
"""Generate the full report."""
report = []
# Fetch all PRs once per repo to avoid duplicate API calls
repo_prs = {}
for repo in repos:
prs = self.get_repo_prs(repo, 'open')
repo_prs[repo] = prs
# Summary table
report.append("=== Open Non-Draft PR Count Summary ===")
report.append("Repository\tOpen Non-Draft PRs")
total_prs = 0
for repo in repos:
prs = repo_prs[repo]
count = len([pr for pr in prs if not pr.get('draft', False)])
total_prs += count
report.append(f"{repo}\t{count}")
report.append(f"TOTAL\t{total_prs}")
report.append("")
# Detailed PR information
report.append("=== Open PRs Modified in Last 3 Months ===")
report.append("")
cutoff_date = datetime.now() - timedelta(days=3 * 30)
# Process repos in order, with PRs sorted by last modified date descending within each repo
for repo in repos:
prs = repo_prs[repo]
recent_prs = []
for pr in prs:
created_date = datetime.strptime(pr['created_at'], '%Y-%m-%dT%H:%M:%SZ')
updated_date = datetime.strptime(pr['updated_at'], '%Y-%m-%dT%H:%M:%SZ')
# Include PRs that were modified in the last 3 months (regardless of draft status)
if updated_date >= cutoff_date:
status = "draft" if pr.get('draft', False) else "ready for review"
recent_prs.append({
'repo': repo,
'number': pr['number'],
'title': pr['title'],
'author': pr['user']['login'],
'created_date': created_date.strftime('%Y-%m-%d'),
'updated_date': updated_date.strftime('%Y-%m-%d'),
'status': status,
'url': pr['html_url']
})
if not recent_prs:
continue
# Sort PRs for this repo by last modified date descending
recent_prs.sort(key=lambda x: x['updated_date'], reverse=True)
# Add repo header
report.append(f"Repository: {repo}")
report.append("-" * 120)
# Format as pretty table
headers = ["PR#", "Created", "Modified", "Title", "Author", "Status", "URL"]
rows = []
for pr in recent_prs:
# Truncate title if too long
title = pr['title'][:50] + "..." if len(pr['title']) > 50 else pr['title']
rows.append([
str(pr['number']),
pr['created_date'],
pr['updated_date'],
title,
pr['author'],
pr['status'],
pr['url']
])
# Calculate column widths
col_widths = [len(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
col_widths[i] = max(col_widths[i], len(str(cell)))
# Format header
header_row = " ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers))
report.append(header_row)
report.append(" ".join("-" * col_widths[i] for i in range(len(headers))))
# Format data rows
for row in rows:
formatted_row = " ".join(str(cell).ljust(col_widths[i]) for i, cell in enumerate(row))
report.append(formatted_row)
report.append("")
return "\n".join(report)
def main():
parser = argparse.ArgumentParser(description='Generate GitHub PR report')
parser.add_argument('repos', nargs='+', help='GitHub repositories in format owner/repo')
parser.add_argument('--token', help='GitHub personal access token (or set GITHUB_TOKEN env var)')
parser.add_argument('--output', '-o', help='Output file (default: stdout)')
args = parser.parse_args()
# Get GitHub token
token = args.token or os.getenv('GITHUB_TOKEN')
if not token:
print("Error: GitHub token required. Set GITHUB_TOKEN environment variable or use --token flag.")
sys.exit(1)
reporter = GitHubPRReporter(token)
try:
report = reporter.generate_report(args.repos)
if args.output:
with open(args.output, 'w') as f:
f.write(report)
print(f"Report saved to {args.output}")
else:
print(report)
except Exception as e:
print(f"Error generating report: {e}")
sys.exit(1)
if __name__ == '__main__':
main()