_monitoring.py
1 from __future__ import annotations 2 3 import logging 4 from typing import TYPE_CHECKING, Any 5 6 if TYPE_CHECKING: 7 from google.ads.googleads.client import GoogleAdsClient 8 9 logger = logging.getLogger(__name__) 10 11 12 class _MonitoringMixin: 13 """Mixin providing monitoring target evaluation macro tools.""" 14 15 # Type declarations for attributes/methods provided by parent class (GoogleAdsApiClient) 16 # Not present at runtime (placed inside TYPE_CHECKING to avoid overriding implementations via MRO) 17 if TYPE_CHECKING: 18 _customer_id: str 19 _client: GoogleAdsClient 20 21 @staticmethod 22 def _validate_id(value: str, field_name: str) -> str: ... 23 async def get_campaign(self, campaign_id: str) -> dict[str, Any] | None: ... 24 async def get_performance_report( 25 self, **kwargs: Any 26 ) -> list[dict[str, Any]]: ... 27 async def diagnose_campaign_delivery( 28 self, campaign_id: str 29 ) -> dict[str, Any]: ... 30 async def analyze_performance( 31 self, campaign_id: str, period: str = "LAST_7_DAYS" 32 ) -> dict[str, Any]: ... 33 async def investigate_cost_increase( 34 self, campaign_id: str 35 ) -> dict[str, Any]: ... 36 async def get_search_terms_report( 37 self, **kwargs: Any 38 ) -> list[dict[str, Any]]: ... 39 async def list_conversion_actions(self) -> list[dict[str, Any]]: ... 40 41 # ================================================================= 42 # 1. Delivery goal evaluation 43 # ================================================================= 44 45 async def evaluate_delivery_goal(self, campaign_id: str) -> dict[str, Any]: 46 """Evaluate delivery goal by integrating delivery status and performance.""" 47 self._validate_id(campaign_id, "campaign_id") 48 issues: list[str] = [] 49 result: dict[str, Any] = {"campaign_id": campaign_id} 50 51 # 1. Campaign basic information 52 campaign: dict[str, Any] | None = None 53 try: 54 campaign = await self.get_campaign(campaign_id) 55 except Exception: 56 logger.warning("Failed to retrieve campaign information", exc_info=True) 57 result["campaign"] = campaign 58 59 # 2. Delivery diagnostics 60 diagnosis: dict[str, Any] = {} 61 try: 62 diagnosis = await self.diagnose_campaign_delivery(campaign_id) 63 except Exception: 64 logger.warning("Failed to retrieve delivery diagnostics", exc_info=True) 65 issues.append("Failed to retrieve delivery diagnostics") 66 result["diagnosis"] = diagnosis 67 68 # 3. Previous day performance 69 performance: list[dict[str, Any]] = [] 70 try: 71 performance = await self.get_performance_report( 72 campaign_id=campaign_id, period="YESTERDAY" 73 ) 74 except Exception: 75 logger.warning("Failed to retrieve previous day performance", exc_info=True) 76 issues.append("Failed to retrieve previous day performance") 77 result["performance"] = performance 78 79 # Extract metrics 80 metrics = performance[0].get("metrics", {}) if performance else {} 81 impressions = int(metrics.get("impressions", 0)) 82 83 # Status determination 84 has_issues = bool(diagnosis.get("issues")) 85 has_warnings = bool(diagnosis.get("warnings")) 86 campaign_status = (campaign or {}).get("status", "") 87 88 if has_issues: 89 issues.append("Issues detected in delivery diagnostics") 90 if campaign_status and campaign_status != "ENABLED": 91 issues.append(f"Campaign status is {campaign_status}") 92 if impressions == 0: 93 issues.append("Yesterday's impressions are 0") 94 95 is_critical = ( 96 has_issues 97 or (campaign_status and campaign_status != "ENABLED") 98 or impressions == 0 99 ) 100 is_warning = has_warnings or ( 101 not is_critical and impressions > 0 and impressions < 10 102 ) 103 104 if is_critical: 105 status = "critical" 106 elif is_warning: 107 status = "warning" 108 if has_warnings: 109 issues.append("Warnings detected in delivery diagnostics") 110 else: 111 status = "healthy" 112 113 result["status"] = status 114 result["issues"] = issues 115 116 if status in ("critical", "warning"): 117 result["suggested_workflow"] = "delivery_fix" 118 119 # Summary generation 120 if status == "critical": 121 result["summary"] = ( 122 f"Campaign {campaign_id} has critical delivery issues. " 123 f"Issues detected: {', '.join(issues)}" 124 ) 125 elif status == "warning": 126 result["summary"] = ( 127 f"Campaign {campaign_id} delivery needs attention. " 128 f"Warnings detected: {', '.join(issues)}" 129 ) 130 else: 131 result["summary"] = ( 132 f"Campaign {campaign_id} delivery is operating normally. " 133 f"Previous day impressions: {impressions:,}" 134 ) 135 136 return result 137 138 # ================================================================= 139 # 2. CPA goal evaluation 140 # ================================================================= 141 142 async def evaluate_cpa_goal( 143 self, campaign_id: str, target_cpa: float 144 ) -> dict[str, Any]: 145 """Evaluate current performance against CPA target.""" 146 self._validate_id(campaign_id, "campaign_id") 147 issues: list[str] = [] 148 result: dict[str, Any] = { 149 "campaign_id": campaign_id, 150 "target_cpa": target_cpa, 151 } 152 153 # 1. Last 7 days performance 154 perf: list[dict[str, Any]] = [] 155 try: 156 perf = await self.get_performance_report( 157 campaign_id=campaign_id, period="LAST_7_DAYS" 158 ) 159 except Exception: 160 logger.warning("Failed to retrieve performance report", exc_info=True) 161 issues.append("Failed to retrieve performance report") 162 163 metrics = perf[0].get("metrics", {}) if perf else {} 164 cost = float(metrics.get("cost", 0)) 165 conversions = float(metrics.get("conversions", 0)) 166 167 # 2. Calculate CPA 168 if conversions > 0: 169 current_cpa = round(cost / conversions, 1) 170 result["current_cpa"] = current_cpa 171 else: 172 current_cpa = None 173 result["current_cpa"] = None 174 issues.append( 175 "Cannot calculate CPA because there are 0 conversions in the last 7 days" 176 ) 177 178 # 3. Cost analysis 179 cost_analysis: dict[str, Any] = {} 180 try: 181 cost_analysis = await self.investigate_cost_increase(campaign_id) 182 except Exception: 183 logger.warning("Failed to retrieve cost analysis", exc_info=True) 184 issues.append("Failed to retrieve cost analysis") 185 result["cost_analysis"] = cost_analysis 186 187 # Wasteful search terms (top 5) 188 wasteful_terms = cost_analysis.get("wasteful_search_terms", []) 189 if isinstance(wasteful_terms, list): 190 result["wasteful_terms"] = wasteful_terms[:5] 191 else: 192 result["wasteful_terms"] = [] 193 194 # 4. Calculate deviation rate and determine status 195 if current_cpa is not None: 196 deviation_pct = round((current_cpa - target_cpa) / target_cpa * 100, 1) 197 result["deviation_pct"] = deviation_pct 198 199 if current_cpa <= target_cpa: 200 status = "healthy" 201 elif current_cpa <= target_cpa * 1.2: 202 status = "warning" 203 issues.append( 204 f"CPA exceeds target by {deviation_pct}%" 205 f" (current: {current_cpa:,.0f} / target: {target_cpa:,.0f})" 206 ) 207 else: 208 status = "critical" 209 issues.append( 210 f"CPA significantly exceeds target ({deviation_pct}% over). " 211 f"Current: {current_cpa:,.0f} / target: {target_cpa:,.0f}" 212 ) 213 else: 214 # No conversions 215 status = "warning" 216 result["deviation_pct"] = None 217 218 result["status"] = status 219 result["issues"] = issues 220 221 if status in ("critical", "warning"): 222 result["suggested_workflow"] = "cpa_optimization" 223 224 # Summary generation 225 if current_cpa is not None: 226 if status == "healthy": 227 result["summary"] = ( 228 f"Campaign {campaign_id} CPA is within target. " 229 f"Current CPA: {current_cpa:,.0f} yen / Target: {target_cpa:,.0f} yen" 230 f" (deviation: {result['deviation_pct']}%)" 231 ) 232 elif status == "warning": 233 result["summary"] = ( 234 f"Campaign {campaign_id} CPA slightly exceeds target. " 235 f"Current CPA: {current_cpa:,.0f} yen / Target: {target_cpa:,.0f} yen" 236 f" (deviation: {result['deviation_pct']}%). Early action recommended" 237 ) 238 else: 239 result["summary"] = ( 240 f"Campaign {campaign_id} CPA significantly exceeds target. " 241 f"Current CPA: {current_cpa:,.0f} yen / Target: {target_cpa:,.0f} yen" 242 f" (deviation: {result['deviation_pct']}%). Urgent action required" 243 ) 244 else: 245 result["summary"] = ( 246 f"Campaign {campaign_id} has 0 conversions in the last 7 days, so " 247 f"CPA cannot be evaluated. Checking delivery status and conversion tracking is recommended" 248 ) 249 250 return result 251 252 # ================================================================= 253 # 3. CV goal evaluation 254 # ================================================================= 255 256 async def evaluate_cv_goal( 257 self, campaign_id: str, target_cv_daily: float 258 ) -> dict[str, Any]: 259 """Evaluate current performance against daily CV target.""" 260 self._validate_id(campaign_id, "campaign_id") 261 issues: list[str] = [] 262 result: dict[str, Any] = { 263 "campaign_id": campaign_id, 264 "target_cv_daily": target_cv_daily, 265 } 266 267 # 1. Last 7 days performance 268 perf: list[dict[str, Any]] = [] 269 try: 270 perf = await self.get_performance_report( 271 campaign_id=campaign_id, period="LAST_7_DAYS" 272 ) 273 except Exception: 274 logger.warning("Failed to retrieve performance report", exc_info=True) 275 issues.append("Failed to retrieve performance report") 276 277 metrics = perf[0].get("metrics", {}) if perf else {} 278 impressions = int(metrics.get("impressions", 0)) 279 clicks = int(metrics.get("clicks", 0)) 280 conversions = float(metrics.get("conversions", 0)) 281 282 # 2. Calculate daily average CV 283 daily_cv = round(conversions / 7, 2) 284 result["current_cv_daily"] = daily_cv 285 286 # 3. Comprehensive performance analysis 287 performance_analysis: dict[str, Any] = {} 288 try: 289 performance_analysis = await self.analyze_performance(campaign_id) 290 except Exception: 291 logger.warning("Failed to retrieve performance analysis", exc_info=True) 292 issues.append("Failed to retrieve performance analysis") 293 result["performance_analysis"] = performance_analysis 294 295 # 4. Calculate deviation rate 296 if target_cv_daily > 0: 297 deviation_pct = round( 298 (daily_cv - target_cv_daily) / target_cv_daily * 100, 1 299 ) 300 else: 301 deviation_pct = 0.0 302 result["deviation_pct"] = deviation_pct 303 304 # 5. Status determination 305 if daily_cv >= target_cv_daily: 306 status = "healthy" 307 elif daily_cv >= target_cv_daily * 0.8: 308 status = "warning" 309 issues.append( 310 f"Daily CV is below target" 311 f" (current: {daily_cv:.1f}/day / target: {target_cv_daily:.1f}/day)" 312 ) 313 else: 314 status = "critical" 315 issues.append( 316 f"Daily CV is significantly below target" 317 f" (current: {daily_cv:.1f}/day / target: {target_cv_daily:.1f}/day," 318 f" deviation: {deviation_pct}%)" 319 ) 320 321 result["status"] = status 322 323 # 6. Bottleneck identification 324 analysis_insights = performance_analysis.get("insights", []) 325 impression_issue_in_insights = any( 326 "impression" in insight.lower() for insight in analysis_insights 327 ) 328 329 if impression_issue_in_insights or (clicks > 0 and impressions < clicks * 10): 330 bottleneck = "impression" 331 if status != "healthy": 332 issues.append("Impression shortage may be the bottleneck") 333 elif impressions > 0 and (clicks / impressions) < 0.02: 334 bottleneck = "ctr" 335 if status != "healthy": 336 ctr_value = round(clicks / impressions * 100, 2) 337 issues.append( 338 f"CTR is low ({ctr_value}%, below industry average of 2%)" 339 ) 340 elif clicks > 0 and (conversions / clicks) < 0.01: 341 bottleneck = "cvr" 342 if status != "healthy": 343 cvr_value = round(conversions / clicks * 100, 2) 344 issues.append(f"CVR is low ({cvr_value}%, below 1%)") 345 else: 346 bottleneck = "cvr" 347 348 result["bottleneck"] = bottleneck 349 result["issues"] = issues 350 351 if status in ("critical", "warning"): 352 result["suggested_workflow"] = "cv_increase" 353 354 # Summary generation 355 bottleneck_label = { 356 "impression": "Insufficient impressions", 357 "ctr": "CTR (click-through rate) decline", 358 "cvr": "CVR (conversion rate) decline", 359 } 360 if status == "healthy": 361 result["summary"] = ( 362 f"Campaign {campaign_id} CV count meets the target. " 363 f"Daily CV: {daily_cv:.1f} / target: {target_cv_daily:.1f}" 364 ) 365 elif status == "warning": 366 result["summary"] = ( 367 f"Campaign {campaign_id} CV count is slightly below target. " 368 f"Daily CV: {daily_cv:.1f} / target: {target_cv_daily:.1f}" 369 f" (deviation: {deviation_pct}%)." 370 f" Main bottleneck: {bottleneck_label[bottleneck]}" 371 ) 372 else: 373 result["summary"] = ( 374 f"Campaign {campaign_id} CV count is significantly below target. " 375 f"Daily CV: {daily_cv:.1f} / target: {target_cv_daily:.1f}" 376 f" (deviation: {deviation_pct}%)." 377 f" Main bottleneck: {bottleneck_label[bottleneck]}. Urgent action required" 378 ) 379 380 return result 381 382 # ================================================================= 383 # 4. Conversion acquisition improvement diagnosis 384 # ================================================================= 385 386 async def diagnose_zero_conversions(self, campaign_id: str) -> dict[str, Any]: 387 """Diagnose zero-conversion issues. Collect all data needed for LLM improvement strategy planning.""" 388 self._validate_id(campaign_id, "campaign_id") 389 issues: list[str] = [] 390 result: dict[str, Any] = {"campaign_id": campaign_id} 391 392 # 1. Campaign basic information 393 campaign: dict[str, Any] | None = None 394 try: 395 campaign = await self.get_campaign(campaign_id) 396 except Exception: 397 logger.warning("Failed to retrieve campaign information", exc_info=True) 398 399 # 2. CV計測設定 400 cv_actions: list[dict[str, Any]] = [] 401 try: 402 cv_actions = await self.list_conversion_actions() 403 except Exception: 404 logger.warning("Failed to retrieve conversion action list", exc_info=True) 405 issues.append("Failed to retrieve conversion action list") 406 407 total_actions = len(cv_actions) 408 enabled_actions = sum( 409 1 for a in cv_actions if a.get("status", "").upper() == "ENABLED" 410 ) 411 has_cv_issue = total_actions == 0 or enabled_actions == 0 412 result["conversion_tracking"] = { 413 "total_actions": total_actions, 414 "enabled_actions": enabled_actions, 415 "has_issue": has_cv_issue, 416 "actions": cv_actions, 417 } 418 if has_cv_issue: 419 issues.append("No active conversion actions are configured") 420 421 # 3. Bidding x CV alignment check 422 bidding_strategy = (campaign or {}).get("bidding_strategy", "") 423 smart_bidding_types = { 424 "MAXIMIZE_CONVERSIONS", 425 "TARGET_CPA", 426 "TARGET_ROAS", 427 "MAXIMIZE_CONVERSION_VALUE", 428 } 429 is_smart = bidding_strategy.upper() in smart_bidding_types 430 bidding_issue: str | None = None 431 if is_smart and has_cv_issue: 432 bidding_issue = ( 433 f"Smart bidding ({bidding_strategy}) is configured, but " 434 "no active conversion tracking is configured" 435 ) 436 issues.append(bidding_issue) 437 result["bidding_cv_alignment"] = { 438 "strategy": bidding_strategy, 439 "is_smart_bidding": is_smart, 440 "cv_tracking_configured": not has_cv_issue, 441 "issue": bidding_issue, 442 } 443 444 # 4. Funnel data (last 7 days) 445 perf: list[dict[str, Any]] = [] 446 try: 447 perf = await self.get_performance_report( 448 campaign_id=campaign_id, period="LAST_7_DAYS" 449 ) 450 except Exception: 451 logger.warning("Failed to retrieve performance report", exc_info=True) 452 issues.append("Failed to retrieve performance report") 453 454 metrics = perf[0].get("metrics", {}) if perf else {} 455 impressions = int(metrics.get("impressions", 0)) 456 clicks = int(metrics.get("clicks", 0)) 457 conversions = float(metrics.get("conversions", 0)) 458 cost = float(metrics.get("cost", 0)) 459 ctr = round(clicks / impressions * 100, 2) if impressions > 0 else None 460 cvr = round(conversions / clicks * 100, 2) if clicks > 0 else None 461 462 if impressions == 0: 463 bottleneck = "no_delivery" 464 issues.append("Impressions in the last 7 days are 0") 465 elif clicks == 0: 466 bottleneck = "no_clicks" 467 issues.append("Clicks in the last 7 days are 0") 468 elif conversions == 0: 469 bottleneck = "no_conversions" 470 else: 471 bottleneck = None 472 473 result["funnel"] = { 474 "period": "LAST_7_DAYS", 475 "impressions": impressions, 476 "clicks": clicks, 477 "conversions": conversions, 478 "cost": cost, 479 "ctr": ctr, 480 "cvr": cvr, 481 "bottleneck": bottleneck, 482 } 483 484 # 5. Delivery diagnostics 485 diagnosis: dict[str, Any] = {} 486 try: 487 diagnosis = await self.diagnose_campaign_delivery(campaign_id) 488 except Exception: 489 logger.warning("Failed to retrieve delivery diagnostics", exc_info=True) 490 issues.append("Failed to retrieve delivery diagnostics") 491 492 result["delivery_diagnosis"] = { 493 "issues": diagnosis.get("issues", []), 494 "warnings": diagnosis.get("warnings", []), 495 "recommendations": diagnosis.get("recommendations", []), 496 } 497 498 # 6. Search term quality (only when clicks > 0) 499 search_term_quality: dict[str, Any] | None = None 500 if clicks > 0: 501 try: 502 terms = await self.get_search_terms_report( 503 campaign_id=campaign_id, period="LAST_7_DAYS" 504 ) 505 zero_cv_terms = [ 506 t 507 for t in terms 508 if float(t.get("metrics", {}).get("conversions", 0)) == 0 509 ] 510 zero_cv_cost = sum( 511 float(t.get("metrics", {}).get("cost", 0)) for t in zero_cv_terms 512 ) 513 # Top 10 zero-CV high-cost items 514 sorted_wasteful = sorted( 515 zero_cv_terms, 516 key=lambda t: float(t.get("metrics", {}).get("cost", 0)), 517 reverse=True, 518 ) 519 search_term_quality = { 520 "total_terms": len(terms), 521 "zero_cv_terms": len(zero_cv_terms), 522 "zero_cv_cost": zero_cv_cost, 523 "top_wasteful_terms": sorted_wasteful[:10], 524 } 525 # If zero-CV high-cost terms exceed 50% 526 if cost > 0 and zero_cv_cost / cost > 0.5: 527 issues.append( 528 f"Zero-CV search terms account for " 529 f"{round(zero_cv_cost / cost * 100, 1)}% of total cost" 530 ) 531 except Exception: 532 logger.warning("Failed to retrieve search terms report", exc_info=True) 533 issues.append("Failed to retrieve search terms report") 534 result["search_term_quality"] = search_term_quality 535 536 # Status determination 537 if ( 538 has_cv_issue 539 or bidding_issue 540 or bottleneck == "no_delivery" 541 or bottleneck == "no_clicks" 542 ): 543 status = "critical" 544 elif conversions == 0: 545 status = "warning" 546 else: 547 status = "healthy" 548 549 result["status"] = status 550 result["issues"] = issues 551 552 if status != "healthy": 553 result["suggested_workflow"] = "cv_acquisition" 554 555 # Recommended actions 556 result["recommended_actions"] = self._build_cv_recommendations( 557 has_cv_issue=has_cv_issue, 558 bidding_issue=bidding_issue, 559 bottleneck=bottleneck, 560 search_term_quality=search_term_quality, 561 cost=cost, 562 ) 563 564 # Summary generation 565 if status == "critical": 566 result["summary"] = ( 567 f"Campaign {campaign_id} has not acquired any conversions." 568 f"Critical issues detected: {', '.join(issues[:3])}" 569 ) 570 elif status == "warning": 571 result["summary"] = ( 572 f"Campaign {campaign_id} has 0 conversions." 573 f"Imp={impressions:,}, Click={clicks:,}, CV=0。" 574 f"Planning an improvement strategy is recommended" 575 ) 576 else: 577 result["summary"] = ( 578 f"Campaign {campaign_id} is generating conversions." 579 f"Conversions in the last 7 days: {conversions:.1f}" 580 ) 581 582 return result 583 584 @staticmethod 585 def _build_cv_recommendations( 586 *, 587 has_cv_issue: bool, 588 bidding_issue: str | None, 589 bottleneck: str | None, 590 search_term_quality: dict[str, Any] | None, 591 cost: float, 592 ) -> list[dict[str, Any]]: 593 """Generate prioritized recommended actions for CV improvement.""" 594 actions: list[dict[str, Any]] = [] 595 priority = 1 596 597 if has_cv_issue: 598 actions.append( 599 { 600 "priority": priority, 601 "action": "fix_cv_tracking", 602 "description": "Configure/fix conversion tracking", 603 } 604 ) 605 priority += 1 606 607 if bidding_issue: 608 actions.append( 609 { 610 "priority": priority, 611 "action": "fix_bidding_strategy", 612 "description": "Fix alignment between bidding strategy and CV tracking", 613 } 614 ) 615 priority += 1 616 617 if search_term_quality and search_term_quality["zero_cv_terms"] > 0: 618 actions.append( 619 { 620 "priority": priority, 621 "action": "add_negative_keywords", 622 "description": "Add negative keywords for zero-CV search terms", 623 } 624 ) 625 priority += 1 626 627 if bottleneck in ("no_delivery", "no_clicks"): 628 actions.append( 629 { 630 "priority": priority, 631 "action": "fix_delivery", 632 "description": "Improve delivery and click acquisition", 633 } 634 ) 635 priority += 1 636 637 # Always suggest candidates 638 actions.append( 639 { 640 "priority": priority, 641 "action": "improve_ads_and_keywords", 642 "description": "Improve ad copy and expand keywords", 643 } 644 ) 645 priority += 1 646 647 actions.append( 648 { 649 "priority": priority, 650 "action": "review_landing_page", 651 "description": "Landing page improvement (text-based advice)", 652 } 653 ) 654 655 return actions