utils.py
  1  import requests
  2  import jwt
  3  from datetime import datetime
  4  from cryptography.hazmat.primitives import serialization
  5  from cryptography.hazmat.backends import default_backend
  6  import json
  7  import csv
  8  import os
  9  import time
 10  import gzip
 11  import shutil
 12  import tempfile
 13  import logging
 14  
 15  logger = logging.getLogger("airbyte")
 16  
 17  
 18  def create_jwt_token(key_id: str, issuer_id: str, private_key: str) -> str:
 19      now = int(datetime.now().timestamp())
 20      
 21      payload = {
 22          "iss": issuer_id,
 23          "exp": now + 1200,  # 20 minutes expiration
 24          "aud": "appstoreconnect-v1",
 25          "iat": now,
 26          "nbf": now
 27      }
 28      
 29      try:
 30          key = serialization.load_pem_private_key(
 31              private_key.encode(),
 32              password=None,
 33              backend=default_backend()
 34          )
 35      except Exception as e:
 36          print(f"Error loading private key: {e}")
 37          return None
 38  
 39      # Create the token
 40      try:
 41          return jwt.encode(
 42              payload=payload,
 43              key=key,
 44              algorithm="ES256",
 45              headers={"kid": key_id, "alg": "ES256"}
 46          )
 47      except Exception as e:
 48          print(f"Error creating token: {e}")
 49          return None
 50  
 51  def get_report_instances(report_id: str, config):
 52      """Get instances for a specific report."""
 53      token = create_jwt_token(config["key_id"], config["issuer_id"], config["private_key"])
 54      if not token:
 55          return None
 56  
 57      headers = {
 58          "Authorization": f"Bearer {token}",
 59          "Content-Type": "application/json"
 60      }
 61  
 62      # Make the request
 63      response = requests.get(
 64          f"https://api.appstoreconnect.apple.com/v1/analyticsReports/{report_id}/instances",
 65          headers=headers
 66      )
 67  
 68      if response.status_code == 200:
 69          return response.json()
 70      else:
 71          print(f"Error for report {report_id}: {response.status_code}")
 72          print(response.text)
 73          return None
 74  
 75  def decompress_gzip_to_csv(gzip_path, csv_path):
 76      """Decompress a .gz file to a .csv file."""
 77      try:
 78          with gzip.open(gzip_path, 'rb') as f_in:
 79              with open(csv_path, 'wb') as f_out:
 80                  shutil.copyfileobj(f_in, f_out)
 81          # Remove the gzip file after successful extraction
 82          os.remove(gzip_path)
 83          return True
 84      except Exception as e:
 85          print(f"Error decompressing file: {e}")
 86          return False
 87  
 88  def download_report_instance(instance_id: str, report_name: str, processing_date: str, downloaded_files, config):
 89      """Download a specific report instance and save as .csv instead of .csv.gz."""
 90      token = create_jwt_token(config["key_id"], config["issuer_id"], config["private_key"])
 91      if not token:
 92          return None
 93  
 94      headers = {
 95          "Authorization": f"Bearer {token}",
 96          "Content-Type": "application/json"
 97      }
 98  
 99      response = requests.get(
100          f"https://api.appstoreconnect.apple.com/v1/analyticsReportInstances/{instance_id}/segments",
101          headers=headers
102      )
103  
104      if response.status_code == 200:
105          data = response.json()
106          
107          # Get the URL from the response
108          if data.get("data") and len(data["data"]) > 0:
109              file_url = data["data"][0]["attributes"]["url"]
110              
111              # Create filename using report name and processing date
112              safe_report_name = "".join(c for c in report_name if c.isalnum() or c in (' ', '-', '_')).strip()
113              csv_file_name = f"{safe_report_name}_{processing_date}.csv"
114              
115              # Download the file (which will be gzipped from the API)
116              print(f"Downloading file from: {file_url}")
117              file_response = requests.get(file_url)
118              
119              if file_response.status_code == 200:
120                  # Create temporary files
121                  with tempfile.NamedTemporaryFile(mode='wb', suffix='.gz', delete=False) as gz_temp_file:
122                      gz_file_path = gz_temp_file.name
123                      gz_temp_file.write(file_response.content)
124                  
125                  with tempfile.NamedTemporaryFile(mode='wb', suffix='.csv', delete=False) as csv_temp_file:
126                      csv_file_path = csv_temp_file.name
127                  
128                  # Decompress the gzipped file to CSV
129                  if decompress_gzip_to_csv(gz_file_path, csv_file_path):
130                      print(f"File downloaded and decompressed successfully to: {csv_file_path}")
131                      downloaded_files.add(csv_file_name)
132                      # Clean up the gzipped file
133                      try:
134                          os.remove(gz_file_path)
135                      except Exception as e:
136                          print(f"Warning: Could not remove temporary gzip file: {e}")
137                      return csv_file_path
138                  else:
139                      print(f"Failed to decompress file to CSV")
140                      return None
141              else:
142                  print(f"Error downloading file: {file_response.status_code}")
143                  print(file_response.text)
144          else:
145              print("No data found in the response")
146      else:
147          print(f"Error: {response.status_code}")
148          print(response.text)
149      return None
150  
151        
152  def read_and_filter_report(file_path, expected_date):
153      """
154      Read a report file, filter by date, and return cleaned records.
155      """
156      records = []
157      total_records = 0
158      filtered_records = 0
159      
160      try:
161          with open(file_path, 'r', encoding='utf-8') as f:
162              # Read the first line to determine the format
163              first_line = f.readline().strip()
164              f.seek(0)  # Reset file pointer
165              
166              # Determine if it's tab-delimited or comma-delimited
167              if '\t' in first_line:
168                  logger.info("Detected tab-delimited format")
169                  reader = csv.DictReader(f, delimiter='\t')
170              else:
171                  logger.info("Detected comma-delimited format")
172                  reader = csv.DictReader(f)
173              
174              # Process each record
175              for record in reader:
176                  total_records += 1
177                  
178                  # Check if the record's Date matches the expected date
179                  record_date = record.get('Date', '').strip()
180                  
181                  # Skip if we couldn't parse the date or if the record date doesn't match
182                  if not expected_date or record_date != expected_date:
183                      logger.debug(f"Skipping record with date {record_date} (does not match expected date {expected_date})")
184                      continue
185                      
186                  filtered_records += 1
187                  
188                  # Clean and normalize the data
189                  cleaned_record = {}
190                  for key, value in record.items():
191                      # Clean the key
192                      clean_key = key.strip()
193                      
194                      # Clean the value
195                      if value is not None:
196                          clean_value = value.strip()
197                          if clean_value == '':
198                              clean_value = None
199                      else:
200                          clean_value = None
201                          
202                      cleaned_record[clean_key] = clean_value
203                  
204                  # Add metadata
205                  cleaned_record["_ab_source_file_url"] = file_path
206                  cleaned_record["processed_date"] = datetime.now().isoformat()
207                  
208                  records.append(cleaned_record)
209          
210          logger.info(f"Processed {total_records} total records, filtered to {filtered_records} records matching expected date {expected_date}")
211          return records
212          
213      except Exception as e:
214          logger.error(f"Error processing file {file_path}: {str(e)}", exc_info=True)
215          return []