source.py
1 # 2 # Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3 # 4 5 from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple 6 import logging 7 import time 8 import requests 9 from airbyte_cdk.sources import AbstractSource 10 from airbyte_cdk.sources.streams import Stream 11 from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream 12 from airbyte_cdk.models import SyncMode 13 14 logger = logging.getLogger("airbyte") 15 16 USER_KEYS = [ 17 "id","name","username","active","created_at","trust_level","title","time_read", "staged","days_visited","posts_read_count","topics_entered","post_count", "email" 18 ] 19 20 USER_ACTION_KEYS = [ 21 "id", "action_type", "created_at", "acting_user_id", "acting_username", 22 "target_user_id", "target_post_id", "target_topic_id", "target_username", 23 "post_number", "topic_title", "slug", "category_id" 24 ] 25 26 # Action types to filter: LIKE (1), NEW_TOPIC (4), REPLY (5) 27 ALLOWED_ACTION_TYPES = [1, 4, 5] 28 29 POST_KEYS = [ 30 "id","name","username", "raw", "created_at", "post_number", "post_type", "post_count", "post_url", "updated_at", "reply_count", "reply_to_post_number","quote_count","incoming_link_count","reads","score","topic_id", "topic_slug","topic_title","topic_html_title","category_id" 31 ] 32 33 class DiscourseStream(HttpStream): 34 35 url_base = "" 36 primary_key = None 37 38 def __init__(self, api_key: str, api_username: str, url: str, **kwargs): 39 super().__init__(**kwargs) 40 self.api_key = api_key 41 self.api_username = api_username 42 self.url= url[:-1] if url.endswith("/") else url 43 44 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 45 return None 46 47 def request_headers( 48 self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None 49 ) -> MutableMapping[str, Any]: 50 return { "Api-Key" : f"{self.api_key}", "Api-Username": f"{self.api_username}"} 51 52 class User(DiscourseStream): 53 primary_key="id" 54 55 def __init__(self, **kwargs): 56 super().__init__(**kwargs) 57 self._next_page = 0 # Instance attribute for pagination 58 59 @property 60 def use_cache(self) -> bool: 61 return True 62 63 def path( 64 self, 65 stream_state: Mapping[str, Any] = None, 66 stream_slice: Mapping[str, Any] = None, 67 next_page_token: Mapping[str, Any] = None 68 ) -> str: 69 return f"{self.url}/admin/users/list/active.json" 70 71 72 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 73 next_page = {"page": self._next_page} if len(response.json()) > 0 else None 74 return next_page 75 76 77 def request_params(self, stream_state, stream_slice = None, next_page_token = None): 78 params = { 79 "order": "created", 80 "asc": "true" 81 } 82 if next_page_token: 83 params.update(next_page_token) 84 return params 85 86 def parse_response( 87 self, 88 response: requests.Response, 89 **kwargs 90 ) -> Iterable[Mapping]: 91 data = response.json() 92 for elt in data: 93 logger.debug("Response %s", elt) 94 user = { key : elt.get(key) for key in USER_KEYS } 95 yield user 96 self._next_page = self._next_page + 1 97 98 99 class UserAction(HttpSubStream): 100 """ 101 Fetches user actions (likes, topic creations, replies) for all users. 102 """ 103 primary_key = "id" 104 url_base = "" 105 106 def __init__(self, api_key: str, api_username: str, url: str, parent: User, **kwargs): 107 super().__init__(parent=parent, **kwargs) 108 self.api_key = api_key 109 self.api_username = api_username 110 self.url = url[:-1] if url.endswith("/") else url 111 self._current_slice_key = None 112 self._offset = 0 113 114 def request_headers( 115 self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None 116 ) -> MutableMapping[str, Any]: 117 return {"Api-Key": f"{self.api_key}", "Api-Username": f"{self.api_username}"} 118 119 def stream_slices( 120 self, 121 stream_state: Mapping[str, Any] = None, 122 **kwargs 123 ) -> Iterable[Optional[Mapping[str, Any]]]: 124 """ 125 Override stream_slices to iterate over all users. 126 Fetch all actions per user (no filter), then filter locally. 127 This reduces API calls by 3x compared to separate calls per action type. 128 """ 129 # Reset parent's pagination state so we get ALL users 130 self.parent._next_page = 0 131 132 user_count = 0 133 for parent_slice in super().stream_slices(sync_mode=SyncMode.full_refresh): 134 user_count += 1 135 username = parent_slice.get('parent', {}).get('username') 136 if user_count <= 10 or user_count % 100 == 0: 137 logger.info(f"Generating slice for user {user_count}: {username}") 138 139 yield {"parent": parent_slice.get('parent')} 140 141 logger.info(f"Finished generating slices: {user_count} users") 142 143 def path( 144 self, 145 stream_state: Mapping[str, Any] = None, 146 stream_slice: Mapping[str, Any] = None, 147 next_page_token: Mapping[str, Any] = None 148 ) -> str: 149 return f"{self.url}/user_actions.json" 150 151 def request_params(self, stream_state, stream_slice=None, next_page_token: Mapping[str, Any] = None): 152 username = stream_slice.get('parent', {}).get('username') 153 154 # Reset offset when switching to a new user 155 if username != self._current_slice_key: 156 self._current_slice_key = username 157 self._offset = 0 158 logger.info(f"Requesting user_actions for {username}") 159 160 # Fetch ALL actions (no filter), we'll filter locally for speed 161 params = {"username": username} 162 if next_page_token: 163 params.update(next_page_token) 164 return params 165 166 def backoff_time(self, response: requests.Response) -> Optional[float]: 167 """Use the wait_seconds from Discourse rate limit response for optimal backoff.""" 168 if response.status_code == 429: 169 try: 170 data = response.json() 171 wait_seconds = data.get("extras", {}).get("wait_seconds") 172 if wait_seconds: 173 logger.info(f"Rate limited, waiting {wait_seconds} seconds as requested by API") 174 return float(wait_seconds) + 1 175 except Exception: 176 pass 177 return None # Fall back to default exponential backoff 178 179 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 180 data = response.json() 181 user_actions = data.get("user_actions", []) 182 if len(user_actions) > 0: 183 self._offset += len(user_actions) 184 return {"offset": self._offset} 185 return None 186 187 def parse_response( 188 self, 189 response: requests.Response, 190 stream_slice: Mapping[str, Any] = None, 191 **kwargs 192 ) -> Iterable[Mapping]: 193 data = response.json() 194 username = stream_slice.get('parent', {}).get('username') 195 logger.debug("Response user_actions for %s: %s", username, data) 196 197 # Filter locally for allowed action types (LIKE=1, NEW_TOPIC=4, REPLY=5) 198 for elt in data.get("user_actions", []): 199 if elt.get("action_type") in ALLOWED_ACTION_TYPES: 200 user_action = {key: elt.get(key) for key in USER_ACTION_KEYS} 201 yield user_action 202 203 # Reduced rate limiting - rely on backoff_time for 429 handling 204 time.sleep(0.5) 205 206 207 class Post(DiscourseStream): 208 primary_key="id" 209 cursor = "created_at" 210 def path( 211 self, 212 stream_state: Mapping[str, Any] = None, 213 stream_slice: Mapping[str, Any] = None, 214 next_page_token: Mapping[str, Any] = None 215 ) -> str: 216 return f"{self.url}/posts.json" 217 218 def parse_response( 219 self, 220 response: requests.Response, 221 next_page_token: Mapping[str, Any] = None, 222 **kwargs 223 ) -> Iterable[Mapping]: 224 data: dict = response.json() 225 for elt in data.get("latest_posts"): 226 post = { key : elt.get(key) for key in POST_KEYS } 227 post["base_url"] = self.url 228 post["post_url"] = self.url + post["post_url"] 229 yield post 230 231 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 232 posts: list[dict] = response.json().get("latest_posts", []) 233 next_page = {"before": posts[-1]["id"]} if posts else None 234 return next_page 235 236 237 def request_params(self, stream_state, stream_slice = None, next_page_token: Mapping[str, Any] = None): 238 params = {} 239 if next_page_token: 240 params.update(next_page_token) 241 return params 242 243 class Topic(DiscourseStream): 244 primary_key="id" 245 next_page = 0 246 # https://docs.discourse.org/#tag/Topics/operation/listLatestTopics 247 def path( 248 self, 249 stream_state: Mapping[str, Any] = None, 250 stream_slice: Mapping[str, Any] = None, 251 next_page_token: Mapping[str, Any] = None 252 ) -> str: 253 return f"{self.url}/latest.json" 254 255 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 256 topics_list=response.json().get("topic_list").get("topics") 257 next_page = {"page": self.next_page } if len(topics_list) > 0 else None 258 return next_page 259 260 261 def request_params(self, stream_state, stream_slice = None, next_page_token: Mapping[str, Any] = None): 262 params = { 263 "order": "created", 264 "ascending": "true", 265 "per_page": "100" 266 } 267 if next_page_token: 268 params.update(next_page_token) 269 return params 270 271 def parse_response( 272 self, 273 response: requests.Response, 274 next_page_token: Mapping[str, Any] = None, 275 **kwargs 276 ) -> Iterable[Mapping]: 277 data = response.json() 278 logger.debug("Response latest topics %s", data) 279 for elt in data.get("topic_list").get("topics"): 280 yield elt 281 self.next_page = self.next_page + 1 282 283 284 class Group(DiscourseStream): 285 primary_key="id" 286 use_cache=True 287 288 # https://docs.discourse.org/#tag/Groups/operation/listGroups 289 def path( 290 self, 291 stream_state: Mapping[str, Any] = None, 292 stream_slice: Mapping[str, Any] = None, 293 next_page_token: Mapping[str, Any] = None 294 ) -> str: 295 return f"{self.url}/groups.json" 296 297 def parse_response( 298 self, 299 response: requests.Response, 300 **kwargs 301 ) -> Iterable[Mapping]: 302 data = response.json() 303 logger.debug("Response groups %s", data) 304 for elt in data.get("groups"): 305 yield elt 306 307 class GroupMember(HttpSubStream, Group): 308 primary_key="id" 309 # https://docs.discourse.org/#tag/Groups/operation/listGroupMembers 310 def path( 311 self, 312 stream_state: Mapping[str, Any] = None, 313 stream_slice: Mapping[str, Any] = None, 314 next_page_token: Mapping[str, Any] = None 315 ) -> str: 316 group_id = stream_slice.get('parent').get('name') 317 return f"{self.url}/groups/{group_id}/members.json" 318 319 def parse_response( 320 self, 321 response: requests.Response, 322 **kwargs 323 ) -> Iterable[Mapping]: 324 data = response.json() 325 for elt in data.get("members"): 326 yield elt 327 328 329 class Tag(DiscourseStream): 330 primary_key="id" 331 # https://docs.discourse.org/#tag/Topics/operation/listLatestTopics 332 def path( 333 self, 334 stream_state: Mapping[str, Any] = None, 335 stream_slice: Mapping[str, Any] = None, 336 next_page_token: Mapping[str, Any] = None 337 ) -> str: 338 return f"{self.url}/tags.json" 339 340 def parse_response( 341 self, 342 response: requests.Response, 343 **kwargs 344 ) -> Iterable[Mapping]: 345 data = response.json() 346 logger.debug("Response groups %s", data) 347 for elt in data.get("tags"): 348 yield elt 349 350 351 class Category(DiscourseStream): 352 primary_key="id" 353 def path( 354 self, 355 stream_state: Mapping[str, Any] = None, 356 stream_slice: Mapping[str, Any] = None, 357 next_page_token: Mapping[str, Any] = None 358 ) -> str: 359 return f"{self.url}/categories.json" 360 361 def parse_response( 362 self, 363 response: requests.Response, 364 **kwargs 365 ) -> Iterable[Mapping]: 366 data = response.json() 367 logger.debug("Response groups %s", data) 368 for elt in data.get("category_list").get("categories"): 369 yield elt 370 371 372 373 374 # Source 375 class SourceDiscourseFetcher(AbstractSource): 376 def check_connection(self, logger, config) -> Tuple[bool, any]: 377 return True, None 378 379 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 380 logger.info("Configuring Stream fron %s", config["url"]) 381 args = { 382 "api_key": config['api-key'], 383 "api_username": config['api-username'], 384 "url": config['url'] 385 } 386 user = User(**args) 387 # Create a SEPARATE User instance for UserActions to use as parent 388 # This prevents pagination state conflicts with the main user stream 389 user_for_actions = User(**args) 390 group = Group(**args) 391 return [ 392 user, 393 UserAction(parent=user_for_actions, **args), 394 Post(**args), 395 Topic(**args), 396 group, 397 GroupMember(parent=group, **args), 398 Tag(**args), 399 Category(**args) 400 ]