source.py
  1  #
  2  # Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3  #
  4  
  5  from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
  6  import logging
  7  import time
  8  import requests
  9  from airbyte_cdk.sources import AbstractSource
 10  from airbyte_cdk.sources.streams import Stream
 11  from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
 12  from airbyte_cdk.models import SyncMode
 13  
 14  logger = logging.getLogger("airbyte")
 15  
 16  USER_KEYS = [
 17      "id","name","username","active","created_at","trust_level","title","time_read", "staged","days_visited","posts_read_count","topics_entered","post_count", "email"
 18      ]
 19  
 20  USER_ACTION_KEYS = [
 21      "id", "action_type", "created_at", "acting_user_id", "acting_username",
 22      "target_user_id", "target_post_id", "target_topic_id", "target_username",
 23      "post_number", "topic_title", "slug", "category_id"
 24      ]
 25  
 26  # Action types to filter: LIKE (1), NEW_TOPIC (4), REPLY (5)
 27  ALLOWED_ACTION_TYPES = [1, 4, 5]
 28  
 29  POST_KEYS = [
 30      "id","name","username", "raw", "created_at", "post_number", "post_type", "post_count", "post_url", "updated_at", "reply_count", "reply_to_post_number","quote_count","incoming_link_count","reads","score","topic_id", "topic_slug","topic_title","topic_html_title","category_id"
 31      ]
 32  
 33  class DiscourseStream(HttpStream):
 34  
 35      url_base = ""
 36      primary_key = None
 37  
 38      def __init__(self, api_key: str, api_username: str, url: str, **kwargs):
 39          super().__init__(**kwargs)
 40          self.api_key = api_key
 41          self.api_username = api_username
 42          self.url= url[:-1] if url.endswith("/") else url
 43  
 44      def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
 45          return None
 46  
 47      def request_headers(
 48          self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
 49      ) -> MutableMapping[str, Any]:
 50          return { "Api-Key" : f"{self.api_key}", "Api-Username": f"{self.api_username}"}
 51  
 52  class User(DiscourseStream):
 53      primary_key="id"
 54  
 55      def __init__(self, **kwargs):
 56          super().__init__(**kwargs)
 57          self._next_page = 0  # Instance attribute for pagination
 58  
 59      @property
 60      def use_cache(self) -> bool:
 61          return True
 62  
 63      def path(
 64         self,
 65         stream_state: Mapping[str, Any] = None,
 66         stream_slice: Mapping[str, Any] = None,
 67         next_page_token: Mapping[str, Any] = None
 68      ) -> str:
 69          return f"{self.url}/admin/users/list/active.json"
 70  
 71  
 72      def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
 73          next_page = {"page": self._next_page} if len(response.json()) > 0 else None
 74          return next_page
 75  
 76  
 77      def request_params(self, stream_state, stream_slice = None, next_page_token = None):
 78          params = {
 79              "order": "created",
 80              "asc": "true"
 81          }
 82          if next_page_token:
 83              params.update(next_page_token)
 84          return params
 85  
 86      def parse_response(
 87         self,
 88         response: requests.Response,
 89         **kwargs
 90      ) -> Iterable[Mapping]:
 91          data = response.json()
 92          for elt in data:
 93              logger.debug("Response %s", elt)
 94              user = { key : elt.get(key) for key in USER_KEYS }
 95              yield user
 96          self._next_page = self._next_page + 1
 97  
 98  
 99  class UserAction(HttpSubStream):
100      """
101      Fetches user actions (likes, topic creations, replies) for all users.
102      """
103      primary_key = "id"
104      url_base = ""
105  
106      def __init__(self, api_key: str, api_username: str, url: str, parent: User, **kwargs):
107          super().__init__(parent=parent, **kwargs)
108          self.api_key = api_key
109          self.api_username = api_username
110          self.url = url[:-1] if url.endswith("/") else url
111          self._current_slice_key = None
112          self._offset = 0
113  
114      def request_headers(
115          self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
116      ) -> MutableMapping[str, Any]:
117          return {"Api-Key": f"{self.api_key}", "Api-Username": f"{self.api_username}"}
118  
119      def stream_slices(
120          self,
121          stream_state: Mapping[str, Any] = None,
122          **kwargs
123      ) -> Iterable[Optional[Mapping[str, Any]]]:
124          """
125          Override stream_slices to iterate over all users.
126          Fetch all actions per user (no filter), then filter locally.
127          This reduces API calls by 3x compared to separate calls per action type.
128          """
129          # Reset parent's pagination state so we get ALL users
130          self.parent._next_page = 0
131  
132          user_count = 0
133          for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh):
134              user_count += 1
135              username = parent_slice.get('parent', {}).get('username')
136              if user_count <= 10 or user_count % 100 == 0:
137                  logger.info(f"Generating slice for user {user_count}: {username}")
138  
139              yield {"parent": parent_slice.get('parent')}
140  
141          logger.info(f"Finished generating slices: {user_count} users")
142  
143      def path(
144          self,
145          stream_state: Mapping[str, Any] = None,
146          stream_slice: Mapping[str, Any] = None,
147          next_page_token: Mapping[str, Any] = None
148      ) -> str:
149          return f"{self.url}/user_actions.json"
150  
151      def request_params(self, stream_state, stream_slice=None, next_page_token: Mapping[str, Any] = None):
152          username = stream_slice.get('parent', {}).get('username')
153  
154          # Reset offset when switching to a new user
155          if username != self._current_slice_key:
156              self._current_slice_key = username
157              self._offset = 0
158              logger.info(f"Requesting user_actions for {username}")
159  
160          # Fetch ALL actions (no filter), we'll filter locally for speed
161          params = {"username": username}
162          if next_page_token:
163              params.update(next_page_token)
164          return params
165  
166      def backoff_time(self, response: requests.Response) -> Optional[float]:
167          """Use the wait_seconds from Discourse rate limit response for optimal backoff."""
168          if response.status_code == 429:
169              try:
170                  data = response.json()
171                  wait_seconds = data.get("extras", {}).get("wait_seconds")
172                  if wait_seconds:
173                      logger.info(f"Rate limited, waiting {wait_seconds} seconds as requested by API")
174                      return float(wait_seconds) + 1
175              except Exception:
176                  pass
177          return None  # Fall back to default exponential backoff
178  
179      def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
180          data = response.json()
181          user_actions = data.get("user_actions", [])
182          if len(user_actions) > 0:
183              self._offset += len(user_actions)
184              return {"offset": self._offset}
185          return None
186  
187      def parse_response(
188          self,
189          response: requests.Response,
190          stream_slice: Mapping[str, Any] = None,
191          **kwargs
192      ) -> Iterable[Mapping]:
193          data = response.json()
194          username = stream_slice.get('parent', {}).get('username')
195          logger.debug("Response user_actions for %s: %s", username, data)
196  
197          # Filter locally for allowed action types (LIKE=1, NEW_TOPIC=4, REPLY=5)
198          for elt in data.get("user_actions", []):
199              if elt.get("action_type") in ALLOWED_ACTION_TYPES:
200                  user_action = {key: elt.get(key) for key in USER_ACTION_KEYS}
201                  yield user_action
202  
203          # Reduced rate limiting - rely on backoff_time for 429 handling
204          time.sleep(0.5)
205  
206  
207  class Post(DiscourseStream):
208      primary_key="id"
209      cursor = "created_at"
210      def path(
211         self,
212         stream_state: Mapping[str, Any] = None,
213         stream_slice: Mapping[str, Any] = None,
214         next_page_token: Mapping[str, Any] = None
215      ) -> str:
216          return f"{self.url}/posts.json"
217  
218      def parse_response(
219         self,
220         response: requests.Response,
221         next_page_token: Mapping[str, Any] = None,
222         **kwargs
223      ) -> Iterable[Mapping]:
224          data: dict = response.json()
225          for elt in data.get("latest_posts"):
226              post = { key : elt.get(key) for key in POST_KEYS }
227              post["base_url"] = self.url
228              post["post_url"] = self.url + post["post_url"]
229              yield post
230  
231      def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
232          posts: list[dict] = response.json().get("latest_posts", [])
233          next_page = {"before": posts[-1]["id"]} if posts else None
234          return next_page
235  
236  
237      def request_params(self, stream_state, stream_slice = None, next_page_token: Mapping[str, Any] = None):
238          params = {}
239          if next_page_token:
240              params.update(next_page_token)
241          return params
242  
243  class Topic(DiscourseStream):
244      primary_key="id"
245      next_page = 0
246      # https://docs.discourse.org/#tag/Topics/operation/listLatestTopics
247      def path(
248         self,
249         stream_state: Mapping[str, Any] = None,
250         stream_slice: Mapping[str, Any] = None,
251         next_page_token: Mapping[str, Any] = None
252      ) -> str:
253          return f"{self.url}/latest.json"
254  
255      def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
256          topics_list=response.json().get("topic_list").get("topics")
257          next_page = {"page": self.next_page } if len(topics_list) > 0 else None
258          return next_page
259  
260  
261      def request_params(self, stream_state, stream_slice = None, next_page_token: Mapping[str, Any] = None):
262          params = {
263              "order": "created",
264              "ascending": "true",
265              "per_page": "100"
266          }
267          if next_page_token:
268              params.update(next_page_token)
269          return params
270  
271      def parse_response(
272         self,
273         response: requests.Response,
274          next_page_token: Mapping[str, Any] = None,
275         **kwargs
276      ) -> Iterable[Mapping]:
277          data = response.json()
278          logger.debug("Response latest topics %s", data)
279          for elt in data.get("topic_list").get("topics"):
280              yield elt
281          self.next_page = self.next_page + 1
282  
283  
284  class Group(DiscourseStream):
285      primary_key="id"
286      use_cache=True
287  
288      # https://docs.discourse.org/#tag/Groups/operation/listGroups
289      def path(
290         self,
291         stream_state: Mapping[str, Any] = None,
292         stream_slice: Mapping[str, Any] = None,
293         next_page_token: Mapping[str, Any] = None
294      ) -> str:
295          return f"{self.url}/groups.json"
296  
297      def parse_response(
298         self,
299         response: requests.Response,
300         **kwargs
301      ) -> Iterable[Mapping]:
302          data = response.json()
303          logger.debug("Response groups %s", data)
304          for elt in data.get("groups"):
305              yield elt
306  
307  class GroupMember(HttpSubStream, Group):
308      primary_key="id"
309      # https://docs.discourse.org/#tag/Groups/operation/listGroupMembers
310      def path(
311         self,
312         stream_state: Mapping[str, Any] = None,
313         stream_slice: Mapping[str, Any] = None,
314         next_page_token: Mapping[str, Any] = None
315      ) -> str:
316          group_id = stream_slice.get('parent').get('name')
317          return f"{self.url}/groups/{group_id}/members.json"
318  
319      def parse_response(
320         self,
321         response: requests.Response,
322         **kwargs
323      ) -> Iterable[Mapping]:
324          data = response.json()
325          for elt in data.get("members"):
326              yield elt
327  
328  
329  class Tag(DiscourseStream):
330      primary_key="id"
331      # https://docs.discourse.org/#tag/Topics/operation/listLatestTopics
332      def path(
333         self,
334         stream_state: Mapping[str, Any] = None,
335         stream_slice: Mapping[str, Any] = None,
336         next_page_token: Mapping[str, Any] = None
337      ) -> str:
338          return f"{self.url}/tags.json"
339  
340      def parse_response(
341         self,
342         response: requests.Response,
343         **kwargs
344      ) -> Iterable[Mapping]:
345          data = response.json()
346          logger.debug("Response groups %s", data)
347          for elt in data.get("tags"):
348              yield elt
349  
350  
351  class Category(DiscourseStream):
352      primary_key="id"
353      def path(
354         self,
355         stream_state: Mapping[str, Any] = None,
356         stream_slice: Mapping[str, Any] = None,
357         next_page_token: Mapping[str, Any] = None
358      ) -> str:
359          return f"{self.url}/categories.json"
360  
361      def parse_response(
362         self,
363         response: requests.Response,
364         **kwargs
365      ) -> Iterable[Mapping]:
366          data = response.json()
367          logger.debug("Response groups %s", data)
368          for elt in data.get("category_list").get("categories"):
369              yield elt
370  
371  
372  
373  
374  # Source
375  class SourceDiscourseFetcher(AbstractSource):
376      def check_connection(self, logger, config) -> Tuple[bool, any]:
377          return True, None
378  
379      def streams(self, config: Mapping[str, Any]) -> List[Stream]:
380          logger.info("Configuring Stream fron %s", config["url"])
381          args = {
382              "api_key": config['api-key'],
383              "api_username": config['api-username'],
384              "url": config['url']
385          }
386          user = User(**args)
387          # Create a SEPARATE User instance for UserActions to use as parent
388          # This prevents pagination state conflicts with the main user stream
389          user_for_actions = User(**args)
390          group = Group(**args)
391          return [
392              user,
393              UserAction(parent=user_for_actions, **args),
394              Post(**args),
395              Topic(**args),
396              group,
397              GroupMember(parent=group, **args),
398              Tag(**args),
399              Category(**args)
400          ]