/ home / bin / gh-cleanup
gh-cleanup
  1  #!/usr/bin/env python3
  2  """
  3  GitHub Notification Dismissal Tool
  4  
  5  Analyzes GitHub notifications using discussion-based criteria to safely dismiss
  6  resolved/inactive notifications while preserving active ones.
  7  
  8  Usage:
  9      gh-dismiss-notifications [--batch-size N] [--dry-run] [--help]
 10  
 11  Options:
 12      --batch-size N    Number of notifications to analyze per batch (default: 50)
 13      --dry-run        Show what would be dismissed without actually dismissing
 14      --help           Show this help message
 15  """
 16  
 17  import argparse
 18  import json
 19  import subprocess
 20  import sys
 21  import time
 22  from datetime import UTC, datetime
 23  from typing import Any
 24  
 25  
 26  def fetch_all_notifications() -> list[dict[str, Any]]:
 27      """Fetch all notifications using GitHub CLI with pagination"""
 28      try:
 29          cmd = ["gh", "api", "/notifications", "--paginate"]
 30          result = subprocess.run(cmd, capture_output=True, text=True, check=True)
 31          return json.loads(result.stdout)
 32      except subprocess.CalledProcessError as e:
 33          print(f"Error fetching notifications: {e.stderr}", file=sys.stderr)
 34          return []
 35      except json.JSONDecodeError as e:
 36          print(f"Error parsing JSON: {e}", file=sys.stderr)
 37          return []
 38  
 39  
 40  def get_issue_or_pr_details(url: str) -> dict[str, Any] | None:
 41      """Get issue/PR details including state"""
 42      try:
 43          # Extract repo and number from URL
 44          parts = url.replace("https://api.github.com/repos/", "").split("/")
 45          if len(parts) >= 4:
 46              owner, repo, item_type, number = parts[0], parts[1], parts[2], parts[3]
 47  
 48              # Get issue/PR details
 49              cmd = ["gh", "api", f"/repos/{owner}/{repo}/{item_type}/{number}"]
 50              result = subprocess.run(cmd, capture_output=True, text=True, check=True)
 51              return json.loads(result.stdout)
 52      except (subprocess.CalledProcessError, json.JSONDecodeError, IndexError) as e:
 53          print(f"Warning: Could not fetch details for {url}: {e}", file=sys.stderr)
 54          return None
 55  
 56  
 57  def analyze_dismissal_safety(
 58      notification: dict[str, Any], item_data: dict[str, Any] | None
 59  ) -> tuple[bool, str]:
 60      """
 61      Analyze if notification can be safely dismissed based on discussion state.
 62  
 63      Returns (is_safe, reason) tuple where:
 64      - is_safe: boolean indicating if safe to dismiss
 65      - reason: string explaining the decision
 66      """
 67      if not item_data:
 68          return False, "No item data available"
 69  
 70      state = item_data.get("state", "unknown")
 71      reason = notification.get("reason", "unknown")
 72      now = datetime.now(UTC)
 73  
 74      # Already closed/merged items (give a few days buffer for follow-up)
 75      if state in ["closed", "merged"] and item_data.get("closed_at"):
 76          closed_at = datetime.fromisoformat(
 77              item_data["closed_at"].replace("Z", "+00:00")
 78          )
 79          days_since_closed = (now - closed_at).days
 80          if days_since_closed > 3:
 81              return True, f"Closed/merged {days_since_closed} days ago"
 82  
 83      # Draft PRs with no recent activity
 84      if item_data.get("draft") and item_data.get("updated_at"):
 85          updated_at = datetime.fromisoformat(
 86              item_data["updated_at"].replace("Z", "+00:00")
 87          )
 88          days_since_update = (now - updated_at).days
 89          if days_since_update > 30:
 90              return True, f"Draft PR inactive for {days_since_update} days"
 91  
 92      # Old automation notifications (CI, subscriptions, state changes)
 93      if reason in ["ci_activity", "subscribed", "state_change"] and item_data.get(
 94          "updated_at"
 95      ):
 96          updated_at = datetime.fromisoformat(
 97              item_data["updated_at"].replace("Z", "+00:00")
 98          )
 99          days_since_update = (now - updated_at).days
100          if days_since_update > 14:
101              return (
102                  True,
103                  f"Automation notification, inactive for {days_since_update} days",
104              )
105  
106      # Old mentions/team_mentions with no recent activity
107      if reason in ["mention", "team_mention"] and item_data.get("updated_at"):
108          updated_at = datetime.fromisoformat(
109              item_data["updated_at"].replace("Z", "+00:00")
110          )
111          days_since_update = (now - updated_at).days
112          if days_since_update > 21:
113              return True, f"Old {reason}, inactive for {days_since_update} days"
114  
115      # Old comment notifications
116      if reason == "comment" and item_data.get("updated_at"):
117          updated_at = datetime.fromisoformat(
118              item_data["updated_at"].replace("Z", "+00:00")
119          )
120          days_since_update = (now - updated_at).days
121          if days_since_update > 21:
122              return True, f"Old comment, inactive for {days_since_update} days"
123  
124      # Author notifications for closed items
125      if reason == "author" and state in ["closed", "merged"]:
126          return True, f"Author notification for {state} item"
127  
128      return False, "Needs manual review"
129  
130  
131  def dismiss_notification(thread_id: str, dry_run: bool = False) -> bool:
132      """Dismiss a notification by thread ID"""
133      if dry_run:
134          return True
135  
136      try:
137          cmd = [
138              "gh",
139              "api",
140              "--method",
141              "PATCH",
142              f"/notifications/threads/{thread_id}",
143              "--field",
144              "state=read",
145          ]
146          subprocess.run(cmd, check=True, capture_output=True)
147      except subprocess.CalledProcessError as e:
148          print(f"Error dismissing notification {thread_id}: {e.stderr}", file=sys.stderr)
149          return False
150      else:
151          return True
152  
153  
154  def main() -> int:
155      parser = argparse.ArgumentParser(
156          description="Analyze and dismiss resolved GitHub notifications",
157          formatter_class=argparse.RawDescriptionHelpFormatter,
158          epilog=__doc__,
159      )
160      parser.add_argument(
161          "--batch-size",
162          type=int,
163          default=50,
164          help="Number of notifications to analyze per batch (default: 50)",
165      )
166      parser.add_argument(
167          "--dry-run",
168          action="store_true",
169          help="Show what would be dismissed without actually dismissing",
170      )
171  
172      args = parser.parse_args()
173  
174      print("Fetching GitHub notifications...")
175      all_notifications = fetch_all_notifications()
176  
177      if not all_notifications:
178          print("No notifications found or error occurred")
179          return 1
180  
181      print(f"Found {len(all_notifications)} notifications")
182  
183      # Sort by age (oldest first) for systematic processing
184      now = datetime.now(UTC)
185      all_notifications.sort(
186          key=lambda x: datetime.fromisoformat(x["updated_at"].replace("Z", "+00:00"))
187      )
188  
189      # Process in batches
190      batch_to_analyze = all_notifications[: args.batch_size]
191  
192      if not batch_to_analyze:
193          print("No notifications to analyze")
194          return 0
195  
196      safe_to_dismiss = []
197      needs_review = []
198  
199      print(f"\nAnalyzing {len(batch_to_analyze)} oldest notifications...")
200      print("=" * 100)
201  
202      for i, notification in enumerate(batch_to_analyze, 1):
203          updated_at = datetime.fromisoformat(
204              notification["updated_at"].replace("Z", "+00:00")
205          )
206          age_days = (now - updated_at).days
207  
208          repo = notification.get("repository", {}).get("full_name", "unknown")
209          subject_title = notification.get("subject", {}).get("title", "unknown")
210          subject_type = notification.get("subject", {}).get("type", "unknown")
211          reason = notification.get("reason", "unknown")
212          subject_url = notification.get("subject", {}).get("url", "")
213  
214          print(f"\n{i:2d}. {age_days:3d}d | {reason:15s} | {subject_type:12s}")
215          print(f"    šŸ“ {repo}")
216          print(f"    šŸ“ {subject_title[:70]}...")
217  
218          # Get detailed information for PRs and Issues only
219          item_data = None
220          if subject_type in ["PullRequest", "Issue"]:
221              item_data = get_issue_or_pr_details(subject_url)
222  
223          # Analyze if safe to dismiss
224          is_safe, analysis_reason = analyze_dismissal_safety(notification, item_data)
225  
226          if is_safe:
227              safe_to_dismiss.append(notification)
228              print(f"    āœ… SAFE TO DISMISS: {analysis_reason}")
229          else:
230              needs_review.append(notification)
231              print(f"    āš ļø  NEEDS REVIEW: {analysis_reason}")
232  
233          # Rate limiting to be nice to GitHub API
234          time.sleep(0.1)
235  
236      print("\n" + "=" * 100)
237      print("ANALYSIS SUMMARY")
238      print("=" * 100)
239      print(f"Safe to dismiss: {len(safe_to_dismiss)}")
240      print(f"Needs manual review: {len(needs_review)}")
241      print(f"Total analyzed: {len(batch_to_analyze)}")
242      print(f"Remaining notifications: {len(all_notifications) - len(batch_to_analyze)}")
243  
244      # Dismiss safe notifications
245      if safe_to_dismiss:
246          if args.dry_run:
247              print(f"\nšŸ” DRY RUN: Would dismiss {len(safe_to_dismiss)} notifications")
248          else:
249              print(f"\nDismissing {len(safe_to_dismiss)} notifications...")
250  
251          dismissed_count = 0
252          failed_count = 0
253  
254          for notification in safe_to_dismiss:
255              thread_id = notification["id"]
256              repo = notification.get("repository", {}).get("full_name", "unknown")
257              subject = notification.get("subject", {}).get("title", "unknown")
258              reason = notification.get("reason", "unknown")
259  
260              action = "Would dismiss" if args.dry_run else "Dismissing"
261              print(f"{action} ({reason}): {repo} - {subject[:30]}...")
262  
263              if dismiss_notification(thread_id, args.dry_run):
264                  dismissed_count += 1
265              else:
266                  failed_count += 1
267  
268              time.sleep(0.1)
269  
270          if args.dry_run:
271              print(f"\nšŸ” DRY RUN: Would have dismissed {dismissed_count} notifications")
272          else:
273              print(f"\nāœ… Done! Dismissed {dismissed_count} notifications")
274              if failed_count > 0:
275                  print(f"āš ļø  Failed to dismiss {failed_count} notifications")
276      else:
277          print("\nNo notifications were deemed safe to dismiss.")
278  
279      return 0
280  
281  
282  if __name__ == "__main__":
283      sys.exit(main())