source.py
1 from abc import ABC 2 from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple 3 import logging 4 import requests 5 import time 6 from datetime import datetime, timedelta 7 8 from airbyte_cdk.sources import AbstractSource 9 from airbyte_cdk.sources.streams import Stream 10 from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream 11 from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator 12 13 logger = logging.getLogger("airbyte") 14 15 LOCATION_KEYS=["id", "rank", "name", "downloads_total", "downloads_percent"] 16 DOWNLOADS_KEY=["interval", "downloads_total", "downloads_percent"] 17 TECH_KEY=["rank", "name", "downloads_total", "downloads_percent"] 18 19 # Basic full refresh stream 20 class SimplecastFectherStream(HttpStream): 21 url_base = "https://api.simplecast.com/" 22 primary_key = None 23 24 25 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 26 pages = response.json().get('pages') 27 if pages and pages.get('next'): 28 time.sleep(2) 29 return { 30 'limit': pages.get('limit'), 31 'offset': pages.get('limit')* pages.get('current') 32 } 33 34 def request_params( 35 self, 36 stream_state: Optional[Mapping[str, Any]], 37 stream_slice: Optional[Mapping[str, Any]] = None, 38 next_page_token: Optional[Mapping[str, Any]] = None, 39 ) -> MutableMapping[str, Any]: 40 if next_page_token: 41 return next_page_token 42 43 44 class Podcast(SimplecastFectherStream): 45 46 primary_key = "id" 47 48 49 @property 50 def use_cache(self) -> bool: 51 return True 52 53 def path( 54 self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None 55 ) -> str: 56 return "podcasts" 57 58 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: 59 data=response.json() 60 logger.debug("Response: %s", data) 61 for elt in data.get('collection'): 62 podcast={ 63 "id": elt.get("id"), 64 "title": elt.get("title"), 65 "status": elt.get("status"), 66 "href": elt.get("href"), 67 "episode_count": elt.get("episodes").get("count"), 68 "account_id": elt.get("account_id"), 69 "account_owner_name": elt.get("account").get("owner").get("name") 70 } 71 yield podcast 72 73 class Episode(HttpSubStream, SimplecastFectherStream): 74 primary_key="id" 75 76 @property 77 def use_cache(self) -> bool: 78 return True 79 80 def path( 81 self, 82 stream_state: Mapping[str, Any] = None, 83 stream_slice: Mapping[str, Any] = None, 84 next_page_token: Mapping[str, Any] = None 85 ) -> str: 86 podcast_id=stream_slice.get("parent").get("id") 87 88 return f"podcasts/{podcast_id}/episodes" 89 90 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: 91 data=response.json() 92 logger.debug("Response: %s", data) 93 for elt in data.get('collection'): 94 episode={ 95 "id": elt.get("id"), 96 "title": elt.get("title"), 97 "status": elt.get("status"), 98 "published_at": elt.get("published_at"), 99 "updated_at": elt.get("updated_at"), 100 "scheduled_for": elt.get("scheduled_for"), 101 "season": elt.get('season'), 102 "number": elt.get("number"), 103 "description": elt.get("description"), 104 "token": elt.get("token"), 105 "type": elt.get("type") 106 } 107 yield episode 108 109 110 class EpisodeDownload(HttpSubStream, SimplecastFectherStream): 111 primary_key=None 112 113 def __init__(self, sync_mode:bool=False, **kwargs): 114 super().__init__(**kwargs) 115 self.sync_mode = sync_mode 116 117 def path( 118 self, 119 stream_state: Mapping[str, Any] = None, 120 stream_slice: Mapping[str, Any] = None, 121 next_page_token: Mapping[str, Any] = None 122 ) -> str: 123 episode_id=stream_slice.get("parent").get("id") 124 start_date="" 125 if not self.sync_mode: 126 start_date=f"&start_date={(datetime.now() - timedelta(days=1)).date().isoformat()}" 127 return f"analytics/downloads?episode={episode_id}{start_date}" 128 129 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: 130 data=response.json() 131 episode=data.get('id') 132 for elt in data.get("by_interval"): 133 analytic={ key: elt.get(key) for key in DOWNLOADS_KEY } 134 analytic['episode_id']=episode 135 yield analytic 136 137 138 139 class AnalyticSubStream(HttpSubStream, SimplecastFectherStream, ABC): 140 primary_key=None 141 142 def __init__(self, endpoint:str, keys_dict:dict, collection_name:str, **kwargs): 143 super().__init__(**kwargs) 144 self.endpoint=endpoint 145 self.keys_dict=keys_dict 146 self.collection_name = collection_name 147 148 def path( 149 self, 150 stream_state: Mapping[str, Any] = None, 151 stream_slice: Mapping[str, Any] = None, 152 next_page_token: Mapping[str, Any] = None 153 ) -> str: 154 podcast_id=stream_slice.get("parent").get("id") 155 156 return f"analytics/{self.endpoint}?podcast={podcast_id}" 157 158 """ 159 Default implementation of the parse_response to get the data from the json_objection collection_name. 160 If the object mapping is not a simple key mapping then this function as to be overwriten. 161 """ 162 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: 163 data=response.json() 164 logger.debug("Response: %s", data) 165 for elt in data.get(self.collection_name): 166 logger.debug("Elt %s", elt) 167 analytic={ key: elt.get(key) for key in self.keys_dict } 168 yield analytic 169 170 class PodcastListeningLocation(AnalyticSubStream): 171 primary_key=None 172 173 def __init__(self, **kwargs): 174 super().__init__(endpoint="location", keys_dict=LOCATION_KEYS, collection_name="countries", **kwargs) 175 176 class PodcastListeningDevice(AnalyticSubStream): 177 178 def __init__(self, **kwargs): 179 super().__init__(endpoint="technology/device_class", keys_dict=TECH_KEY, collection_name="collection", **kwargs) 180 181 class PodcastListeningMethod(AnalyticSubStream): 182 183 def __init__(self, **kwargs): 184 super().__init__(endpoint="technology/listening_methods", keys_dict=TECH_KEY, collection_name="collection", **kwargs) 185 186 # Source 187 class SourceSimplecastFecther(AbstractSource): 188 def check_connection(self, logger, config) -> Tuple[bool, any]: 189 return True, None 190 191 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 192 auth = TokenAuthenticator(token=config["api_key"]) 193 podcasts=Podcast(authenticator=auth) 194 episodes=Episode(authenticator=auth, parent=podcasts) 195 return [ 196 podcasts, 197 PodcastListeningLocation(authenticator=auth, parent=podcasts), 198 PodcastListeningDevice(authenticator=auth, parent=podcasts), 199 PodcastListeningMethod(authenticator=auth, parent=podcasts), 200 episodes, 201 EpisodeDownload(authenticator=auth,parent=episodes, sync_mode=config.get("full_download_sync", False)) 202 ]