utils.py
1 import requests 2 import jwt 3 from datetime import datetime 4 from cryptography.hazmat.primitives import serialization 5 from cryptography.hazmat.backends import default_backend 6 import json 7 import csv 8 import os 9 import time 10 import gzip 11 import shutil 12 import tempfile 13 import logging 14 15 logger = logging.getLogger("airbyte") 16 17 18 def create_jwt_token(key_id: str, issuer_id: str, private_key: str) -> str: 19 now = int(datetime.now().timestamp()) 20 21 payload = { 22 "iss": issuer_id, 23 "exp": now + 1200, # 20 minutes expiration 24 "aud": "appstoreconnect-v1", 25 "iat": now, 26 "nbf": now 27 } 28 29 try: 30 key = serialization.load_pem_private_key( 31 private_key.encode(), 32 password=None, 33 backend=default_backend() 34 ) 35 except Exception as e: 36 print(f"Error loading private key: {e}") 37 return None 38 39 # Create the token 40 try: 41 return jwt.encode( 42 payload=payload, 43 key=key, 44 algorithm="ES256", 45 headers={"kid": key_id, "alg": "ES256"} 46 ) 47 except Exception as e: 48 print(f"Error creating token: {e}") 49 return None 50 51 def get_report_instances(report_id: str, config): 52 """Get instances for a specific report.""" 53 token = create_jwt_token(config["key_id"], config["issuer_id"], config["private_key"]) 54 if not token: 55 return None 56 57 headers = { 58 "Authorization": f"Bearer {token}", 59 "Content-Type": "application/json" 60 } 61 62 # Make the request 63 response = requests.get( 64 f"https://api.appstoreconnect.apple.com/v1/analyticsReports/{report_id}/instances", 65 headers=headers 66 ) 67 68 if response.status_code == 200: 69 return response.json() 70 else: 71 print(f"Error for report {report_id}: {response.status_code}") 72 print(response.text) 73 return None 74 75 def decompress_gzip_to_csv(gzip_path, csv_path): 76 """Decompress a .gz file to a .csv file.""" 77 try: 78 with gzip.open(gzip_path, 'rb') as f_in: 79 with open(csv_path, 'wb') as f_out: 80 shutil.copyfileobj(f_in, f_out) 81 # Remove the gzip file after successful extraction 82 os.remove(gzip_path) 83 return True 84 except Exception as e: 85 print(f"Error decompressing file: {e}") 86 return False 87 88 def download_report_instance(instance_id: str, report_name: str, processing_date: str, downloaded_files, config): 89 """Download a specific report instance and save as .csv instead of .csv.gz.""" 90 token = create_jwt_token(config["key_id"], config["issuer_id"], config["private_key"]) 91 if not token: 92 return None 93 94 headers = { 95 "Authorization": f"Bearer {token}", 96 "Content-Type": "application/json" 97 } 98 99 response = requests.get( 100 f"https://api.appstoreconnect.apple.com/v1/analyticsReportInstances/{instance_id}/segments", 101 headers=headers 102 ) 103 104 if response.status_code == 200: 105 data = response.json() 106 107 # Get the URL from the response 108 if data.get("data") and len(data["data"]) > 0: 109 file_url = data["data"][0]["attributes"]["url"] 110 111 # Create filename using report name and processing date 112 safe_report_name = "".join(c for c in report_name if c.isalnum() or c in (' ', '-', '_')).strip() 113 csv_file_name = f"{safe_report_name}_{processing_date}.csv" 114 115 # Download the file (which will be gzipped from the API) 116 print(f"Downloading file from: {file_url}") 117 file_response = requests.get(file_url) 118 119 if file_response.status_code == 200: 120 # Create temporary files 121 with tempfile.NamedTemporaryFile(mode='wb', suffix='.gz', delete=False) as gz_temp_file: 122 gz_file_path = gz_temp_file.name 123 gz_temp_file.write(file_response.content) 124 125 with tempfile.NamedTemporaryFile(mode='wb', suffix='.csv', delete=False) as csv_temp_file: 126 csv_file_path = csv_temp_file.name 127 128 # Decompress the gzipped file to CSV 129 if decompress_gzip_to_csv(gz_file_path, csv_file_path): 130 print(f"File downloaded and decompressed successfully to: {csv_file_path}") 131 downloaded_files.add(csv_file_name) 132 # Clean up the gzipped file 133 try: 134 os.remove(gz_file_path) 135 except Exception as e: 136 print(f"Warning: Could not remove temporary gzip file: {e}") 137 return csv_file_path 138 else: 139 print(f"Failed to decompress file to CSV") 140 return None 141 else: 142 print(f"Error downloading file: {file_response.status_code}") 143 print(file_response.text) 144 else: 145 print("No data found in the response") 146 else: 147 print(f"Error: {response.status_code}") 148 print(response.text) 149 return None 150 151 152 def read_and_filter_report(file_path, expected_date): 153 """ 154 Read a report file, filter by date, and return cleaned records. 155 """ 156 records = [] 157 total_records = 0 158 filtered_records = 0 159 160 try: 161 with open(file_path, 'r', encoding='utf-8') as f: 162 # Read the first line to determine the format 163 first_line = f.readline().strip() 164 f.seek(0) # Reset file pointer 165 166 # Determine if it's tab-delimited or comma-delimited 167 if '\t' in first_line: 168 logger.info("Detected tab-delimited format") 169 reader = csv.DictReader(f, delimiter='\t') 170 else: 171 logger.info("Detected comma-delimited format") 172 reader = csv.DictReader(f) 173 174 # Process each record 175 for record in reader: 176 total_records += 1 177 178 # Check if the record's Date matches the expected date 179 record_date = record.get('Date', '').strip() 180 181 # Skip if we couldn't parse the date or if the record date doesn't match 182 if not expected_date or record_date != expected_date: 183 logger.debug(f"Skipping record with date {record_date} (does not match expected date {expected_date})") 184 continue 185 186 filtered_records += 1 187 188 # Clean and normalize the data 189 cleaned_record = {} 190 for key, value in record.items(): 191 # Clean the key 192 clean_key = key.strip() 193 194 # Clean the value 195 if value is not None: 196 clean_value = value.strip() 197 if clean_value == '': 198 clean_value = None 199 else: 200 clean_value = None 201 202 cleaned_record[clean_key] = clean_value 203 204 # Add metadata 205 cleaned_record["_ab_source_file_url"] = file_path 206 cleaned_record["processed_date"] = datetime.now().isoformat() 207 208 records.append(cleaned_record) 209 210 logger.info(f"Processed {total_records} total records, filtered to {filtered_records} records matching expected date {expected_date}") 211 return records 212 213 except Exception as e: 214 logger.error(f"Error processing file {file_path}: {str(e)}", exc_info=True) 215 return []