source.py
  1  from abc import ABC
  2  from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
  3  import logging
  4  import requests
  5  import time
  6  from datetime import datetime, timedelta
  7  
  8  from airbyte_cdk.sources import AbstractSource
  9  from airbyte_cdk.sources.streams import Stream
 10  from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
 11  from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
 12  
 13  logger = logging.getLogger("airbyte")
 14  
 15  LOCATION_KEYS=["id", "rank", "name", "downloads_total", "downloads_percent"]
 16  DOWNLOADS_KEY=["interval", "downloads_total", "downloads_percent"]
 17  TECH_KEY=["rank", "name", "downloads_total", "downloads_percent"]
 18  
 19  # Basic full refresh stream
 20  class SimplecastFectherStream(HttpStream):
 21      url_base = "https://api.simplecast.com/"
 22      primary_key = None
 23  
 24  
 25      def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
 26          pages = response.json().get('pages')
 27          if pages and pages.get('next'):
 28              time.sleep(2)
 29              return {
 30                  'limit': pages.get('limit'),
 31                  'offset': pages.get('limit')* pages.get('current')
 32              }
 33  
 34      def request_params(
 35          self,
 36          stream_state: Optional[Mapping[str, Any]],
 37          stream_slice: Optional[Mapping[str, Any]] = None,
 38          next_page_token: Optional[Mapping[str, Any]] = None,
 39      ) -> MutableMapping[str, Any]:
 40          if next_page_token:
 41              return next_page_token
 42  
 43  
 44  class Podcast(SimplecastFectherStream):
 45  
 46      primary_key = "id"
 47  
 48  
 49      @property
 50      def use_cache(self) -> bool:
 51          return True
 52  
 53      def path(
 54          self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
 55      ) -> str:
 56          return "podcasts"
 57  
 58      def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
 59          data=response.json()
 60          logger.debug("Response: %s", data)
 61          for elt in data.get('collection'):
 62              podcast={
 63                  "id": elt.get("id"),
 64                  "title": elt.get("title"),
 65                  "status": elt.get("status"),
 66                  "href": elt.get("href"),
 67                  "episode_count": elt.get("episodes").get("count"),
 68                  "account_id": elt.get("account_id"),
 69                  "account_owner_name": elt.get("account").get("owner").get("name")
 70              }
 71              yield podcast
 72  
 73  class Episode(HttpSubStream, SimplecastFectherStream):
 74      primary_key="id"
 75  
 76      @property
 77      def use_cache(self) -> bool:
 78          return True
 79  
 80      def path(
 81          self,
 82          stream_state: Mapping[str, Any] = None,
 83          stream_slice: Mapping[str, Any] = None,
 84          next_page_token: Mapping[str, Any] = None
 85      ) -> str:
 86          podcast_id=stream_slice.get("parent").get("id")
 87  
 88          return f"podcasts/{podcast_id}/episodes"
 89  
 90      def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
 91          data=response.json()
 92          logger.debug("Response: %s", data)
 93          for elt in data.get('collection'):
 94              episode={
 95                  "id": elt.get("id"),
 96                  "title": elt.get("title"),
 97                  "status": elt.get("status"),
 98                  "published_at": elt.get("published_at"),
 99                  "updated_at": elt.get("updated_at"),
100                  "scheduled_for": elt.get("scheduled_for"),
101                  "season": elt.get('season'),
102                  "number": elt.get("number"),
103                  "description": elt.get("description"),
104                  "token": elt.get("token"),
105                  "type": elt.get("type")
106              }
107              yield episode
108  
109  
110  class EpisodeDownload(HttpSubStream, SimplecastFectherStream):
111      primary_key=None
112  
113      def __init__(self, sync_mode:bool=False, **kwargs):
114          super().__init__(**kwargs)
115          self.sync_mode = sync_mode
116  
117      def path(
118          self,
119          stream_state: Mapping[str, Any] = None,
120          stream_slice: Mapping[str, Any] = None,
121          next_page_token: Mapping[str, Any] = None
122      ) -> str:
123          episode_id=stream_slice.get("parent").get("id")
124          start_date=""
125          if not self.sync_mode:
126              start_date=f"&start_date={(datetime.now() - timedelta(days=1)).date().isoformat()}"
127          return f"analytics/downloads?episode={episode_id}{start_date}"
128  
129      def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
130          data=response.json()
131          episode=data.get('id')
132          for elt in data.get("by_interval"):
133              analytic={ key: elt.get(key) for key in DOWNLOADS_KEY }
134              analytic['episode_id']=episode
135              yield analytic
136  
137  
138  
139  class AnalyticSubStream(HttpSubStream, SimplecastFectherStream, ABC):
140      primary_key=None
141  
142      def __init__(self, endpoint:str, keys_dict:dict, collection_name:str, **kwargs):
143          super().__init__(**kwargs)
144          self.endpoint=endpoint
145          self.keys_dict=keys_dict
146          self.collection_name = collection_name
147  
148      def path(
149          self,
150          stream_state: Mapping[str, Any] = None,
151          stream_slice: Mapping[str, Any] = None,
152          next_page_token: Mapping[str, Any] = None
153      ) -> str:
154          podcast_id=stream_slice.get("parent").get("id")
155  
156          return f"analytics/{self.endpoint}?podcast={podcast_id}"
157  
158      """
159      Default implementation of the parse_response to get the data from the json_objection collection_name.
160      If the object mapping is not a simple key mapping then this function as to be overwriten.
161      """
162      def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
163          data=response.json()
164          logger.debug("Response: %s", data)
165          for elt in data.get(self.collection_name):
166              logger.debug("Elt %s", elt)
167              analytic={ key: elt.get(key) for key in self.keys_dict }
168              yield analytic
169  
170  class PodcastListeningLocation(AnalyticSubStream):
171      primary_key=None
172  
173      def __init__(self, **kwargs):
174          super().__init__(endpoint="location", keys_dict=LOCATION_KEYS, collection_name="countries", **kwargs)
175  
176  class PodcastListeningDevice(AnalyticSubStream):
177  
178      def __init__(self, **kwargs):
179          super().__init__(endpoint="technology/device_class", keys_dict=TECH_KEY, collection_name="collection", **kwargs)
180  
181  class PodcastListeningMethod(AnalyticSubStream):
182  
183      def __init__(self, **kwargs):
184          super().__init__(endpoint="technology/listening_methods", keys_dict=TECH_KEY, collection_name="collection", **kwargs)
185  
186  # Source
187  class SourceSimplecastFecther(AbstractSource):
188      def check_connection(self, logger, config) -> Tuple[bool, any]:
189          return True, None
190  
191      def streams(self, config: Mapping[str, Any]) -> List[Stream]:
192          auth = TokenAuthenticator(token=config["api_key"])
193          podcasts=Podcast(authenticator=auth)
194          episodes=Episode(authenticator=auth, parent=podcasts)
195          return [
196              podcasts,
197              PodcastListeningLocation(authenticator=auth, parent=podcasts),
198              PodcastListeningDevice(authenticator=auth, parent=podcasts),
199              PodcastListeningMethod(authenticator=auth, parent=podcasts),
200              episodes,
201              EpisodeDownload(authenticator=auth,parent=episodes, sync_mode=config.get("full_download_sync", False))
202          ]