_analysis_search_terms.py
1 """Search term analysis mixin.""" 2 3 from __future__ import annotations 4 5 import logging 6 from typing import TYPE_CHECKING, Any 7 8 from mureo.google_ads._analysis_constants import ( 9 _INFORMATIONAL_PATTERNS, 10 _extract_ngrams, 11 _get_comparison_date_ranges, 12 ) 13 14 if TYPE_CHECKING: 15 from google.ads.googleads.client import GoogleAdsClient 16 17 logger = logging.getLogger(__name__) 18 19 20 def _is_informational_term(term_text: str) -> bool: 21 """Determine if a search term matches informational patterns.""" 22 return any(p in term_text for p in _INFORMATIONAL_PATTERNS) 23 24 25 def _build_add_candidate( 26 term_text: str, 27 conversions: float, 28 clicks: int, 29 cost: float, 30 ctr: float, 31 match_type: str, 32 score: int, 33 reason: str, 34 ) -> dict[str, Any]: 35 """Build candidate entry for keyword addition.""" 36 return { 37 "search_term": term_text, 38 "action": "add", 39 "match_type": match_type, 40 "score": score, 41 "reason": reason, 42 "metrics": { 43 "conversions": conversions, 44 "clicks": clicks, 45 "cost": cost, 46 "ctr": round(ctr, 4), 47 }, 48 } 49 50 51 def _build_exclude_candidate( 52 term_text: str, 53 conversions: float, 54 clicks: int, 55 cost: float, 56 ctr: float, 57 match_type: str, 58 score: int, 59 reason: str, 60 ) -> dict[str, Any]: 61 """Build exclusion candidate entry.""" 62 return { 63 "search_term": term_text, 64 "action": "exclude", 65 "match_type": match_type, 66 "score": score, 67 "reason": reason, 68 "metrics": { 69 "conversions": conversions, 70 "clicks": clicks, 71 "cost": cost, 72 "ctr": round(ctr, 4), 73 }, 74 } 75 76 77 class _SearchTermsAnalysisMixin: 78 """Mixin providing search term analysis methods.""" 79 80 # Type declarations for attributes/methods provided by parent class 81 _customer_id: str 82 _client: GoogleAdsClient 83 84 @staticmethod 85 def _validate_id(value: str, field_name: str) -> str: ... # type: ignore[empty-body] 86 87 async def get_search_terms_report(self, **kwargs: Any) -> list[dict[str, Any]]: ... # type: ignore[empty-body] 88 async def list_keywords( # type: ignore[empty-body] 89 self, ad_group_id: str | None = None, campaign_id: str | None = None 90 ) -> list[dict[str, Any]]: ... 91 async def list_negative_keywords( # type: ignore[empty-body] 92 self, campaign_id: str 93 ) -> list[dict[str, Any]]: ... 94 95 # Method from PerformanceAnalysisMixin 96 async def _resolve_target_cpa( # type: ignore[empty-body] 97 self, campaign_id: str, explicit: float | None = None 98 ) -> tuple[float | None, str]: ... 99 100 # ================================================================= 101 # Common: Search terms retrieval with previous period / new term routing 102 # ================================================================= 103 104 async def _fetch_terms_with_prev( 105 self, 106 campaign_id: str, 107 period: str, 108 ad_group_id: str | None = None, 109 ) -> tuple[list[dict[str, Any]], set[str]]: 110 """Return current period search terms list and previous period term text set.""" 111 current_range, prev_range = _get_comparison_date_ranges(period) 112 search_terms = await self.get_search_terms_report( 113 campaign_id=campaign_id, ad_group_id=ad_group_id, period=current_range 114 ) 115 prev_terms = await self.get_search_terms_report( 116 campaign_id=campaign_id, ad_group_id=ad_group_id, period=prev_range 117 ) 118 prev_term_set = {t.get("search_term", "").lower() for t in prev_terms} 119 return search_terms, prev_term_set 120 121 @staticmethod 122 def _route_by_newness( 123 entry: dict[str, Any], 124 term_text: str, 125 is_new: bool, 126 main_list: list[dict[str, Any]], 127 watch_list: list[dict[str, Any]], 128 ) -> None: 129 """Route new terms to watch list and existing terms to main list.""" 130 if is_new: 131 entry["reason"] = f"New term (under observation): {entry['reason']}" 132 if "action" in entry: 133 entry["action"] = "watch" 134 watch_list.append(entry) 135 else: 136 main_list.append(entry) 137 138 # ================================================================= 139 # Search term overlap analysis 140 # ================================================================= 141 142 async def analyze_search_terms( 143 self, 144 campaign_id: str, 145 period: str = "LAST_30_DAYS", 146 ) -> dict[str, Any]: 147 """Analyze search term/keyword overlap, N-gram distribution, and candidates.""" 148 self._validate_id(campaign_id, "campaign_id") 149 150 # Retrieve keywords and search terms 151 keywords = await self.list_keywords(campaign_id=campaign_id) 152 search_terms = await self.get_search_terms_report( 153 campaign_id=campaign_id, period=period 154 ) 155 156 # Set of keyword texts (lowercase) 157 keyword_texts: set[str] = {kw.get("text", "").lower() for kw in keywords} 158 159 # Overlap rate 160 overlap_count = sum( 161 1 for t in search_terms if t.get("search_term", "").lower() in keyword_texts 162 ) 163 overlap_rate = overlap_count / len(search_terms) if search_terms else 0.0 164 165 # N-gram distribution (1-3gram) 166 ngram_agg: dict[int, dict[str, dict[str, float]]] = { 167 1: {}, 168 2: {}, 169 3: {}, 170 } 171 for t in search_terms: 172 text = t.get("search_term", "") 173 m = t.get("metrics", {}) 174 cost = float(m.get("cost", 0)) 175 convs = float(m.get("conversions", 0)) 176 for n in (1, 2, 3): 177 for gram in _extract_ngrams(text, n): 178 agg = ngram_agg[n].setdefault( 179 gram, {"count": 0, "cost": 0.0, "conversions": 0.0} 180 ) 181 agg["count"] += 1 182 agg["cost"] += cost 183 agg["conversions"] += convs 184 185 ngram_distribution: dict[str, list[dict[str, Any]]] = {} 186 label_map = {1: "unigrams", 2: "bigrams", 3: "trigrams"} 187 for n, label in label_map.items(): 188 sorted_grams = sorted( 189 ngram_agg[n].items(), 190 key=lambda x: x[1]["count"], 191 reverse=True, 192 )[:10] 193 ngram_distribution[label] = [ 194 { 195 "text": g, 196 "count": int(v["count"]), 197 "cost": round(v["cost"], 0), 198 "conversions": round(v["conversions"], 1), 199 } 200 for g, v in sorted_grams 201 ] 202 203 # Keyword candidates: CV > 0 and not registered 204 keyword_candidates = [ 205 { 206 "search_term": t.get("search_term", ""), 207 "conversions": float(t.get("metrics", {}).get("conversions", 0)), 208 "cost": float(t.get("metrics", {}).get("cost", 0)), 209 "clicks": int(t.get("metrics", {}).get("clicks", 0)), 210 } 211 for t in search_terms 212 if float(t.get("metrics", {}).get("conversions", 0)) > 0 213 and t.get("search_term", "").lower() not in keyword_texts 214 ] 215 216 # Exclusion candidates: has cost, CV=0 (sorted by cost desc, top 20) 217 negative_candidates = sorted( 218 [ 219 { 220 "search_term": t.get("search_term", ""), 221 "cost": float(t.get("metrics", {}).get("cost", 0)), 222 "clicks": int(t.get("metrics", {}).get("clicks", 0)), 223 "impressions": int(t.get("metrics", {}).get("impressions", 0)), 224 } 225 for t in search_terms 226 if float(t.get("metrics", {}).get("cost", 0)) > 0 227 and float(t.get("metrics", {}).get("conversions", 0)) == 0 228 ], 229 key=lambda x: x["cost"], 230 reverse=True, 231 )[:20] 232 233 # Insight generation 234 insights: list[str] = [] 235 if overlap_rate < 0.3: 236 insights.append( 237 f"Overlap rate is {overlap_rate:.0%}, which is low. " 238 "Many search terms are not registered as keywords. " 239 "Consider adding keywords" 240 ) 241 if negative_candidates: 242 total_waste = sum(c["cost"] for c in negative_candidates) 243 insights.append( 244 f"There are {len(negative_candidates)} search terms with cost but no conversions, " 245 f"resulting in ¥{total_waste:,.0f} of wasted cost" 246 ) 247 if keyword_candidates: 248 insights.append( 249 f"There are {len(keyword_candidates)} search terms with conversions that are not registered. " 250 "We recommend adding them as keywords" 251 ) 252 253 return { 254 "campaign_id": campaign_id, 255 "period": period, 256 "registered_keywords_count": len(keywords), 257 "search_terms_count": len(search_terms), 258 "overlap_rate": round(overlap_rate, 3), 259 "ngram_distribution": ngram_distribution, 260 "keyword_candidates": keyword_candidates, 261 "negative_candidates": negative_candidates, 262 "insights": insights, 263 } 264 265 # ================================================================= 266 # Automatic negative keyword suggestions 267 # ================================================================= 268 269 async def suggest_negative_keywords( 270 self, 271 campaign_id: str, 272 period: str = "LAST_30_DAYS", 273 target_cpa: float | None = None, 274 use_intent_analysis: bool = True, 275 ad_group_id: str | None = None, 276 **_kwargs: Any, 277 ) -> dict[str, Any]: 278 """Automatically suggest negative keyword candidates.""" 279 self._validate_id(campaign_id, "campaign_id") 280 281 effective_target = target_cpa 282 283 # CPA-based threshold resolution (always use CPA x 1.5) 284 resolved_cpa, cpa_source = await self._resolve_target_cpa( 285 campaign_id, explicit=effective_target 286 ) 287 effective_threshold: float | None = None 288 if resolved_cpa is not None: 289 effective_threshold = resolved_cpa * 1.5 290 291 # Retrieve search terms for current/previous periods (for new term protection) 292 search_terms, prev_term_set = await self._fetch_terms_with_prev( 293 campaign_id, period, ad_group_id=ad_group_id 294 ) 295 296 existing_negatives = await self.list_negative_keywords(campaign_id) 297 298 # Existing negative keyword texts (lowercase) 299 existing_neg_texts: set[str] = { 300 n.get("keyword_text", "").lower() for n in existing_negatives 301 } 302 303 # Filter: >= target CPA x 1.5, CV=0, no overlap with existing negatives 304 suggestions: list[dict[str, Any]] = [] 305 watch_terms: list[dict[str, Any]] = [] 306 total_wasteful_cost: float = 0.0 307 for t in search_terms: 308 m = t.get("metrics", {}) 309 cost = float(m.get("cost", 0)) 310 convs = float(m.get("conversions", 0)) 311 term_text = t.get("search_term", "") 312 313 if convs > 0: 314 continue 315 316 is_new = term_text.lower() not in prev_term_set 317 is_informational = _is_informational_term(term_text) 318 319 if not is_informational: 320 if effective_threshold is not None and cost < effective_threshold: 321 if cost > 0: 322 total_wasteful_cost += cost 323 continue 324 if effective_threshold is None: 325 if cost > 0: 326 total_wasteful_cost += cost 327 continue 328 329 if cost > 0: 330 total_wasteful_cost += cost 331 332 if term_text.lower() in existing_neg_texts: 333 continue 334 335 # Recommended match type 336 if is_informational: 337 match_type = "PHRASE" 338 reason = f"Informational intent (0 CV, cost ¥{cost:,.0f})" 339 else: 340 if resolved_cpa is None: 341 raise RuntimeError("resolved_cpa should not be None here") 342 word_count = len(term_text.strip().split()) 343 match_type = "EXACT" if word_count <= 2 else "PHRASE" 344 reason = f"¥{cost:,.0f} cost with 0 CV (exceeds target CPA ¥{resolved_cpa:,.0f} x 1.5)" 345 346 entry = { 347 "search_term": term_text, 348 "cost": cost, 349 "clicks": int(m.get("clicks", 0)), 350 "impressions": int(m.get("impressions", 0)), 351 "ctr": float(m.get("ctr", 0)), 352 "recommended_match_type": match_type, 353 "reason": reason, 354 } 355 self._route_by_newness(entry, term_text, is_new, suggestions, watch_terms) 356 357 # Sort by cost descending 358 suggestions.sort(key=lambda x: x["cost"], reverse=True) 359 watch_terms.sort(key=lambda x: x["cost"], reverse=True) 360 potential_savings = sum(s["cost"] for s in suggestions) 361 362 # Insight generation 363 insights: list[str] = [] 364 if resolved_cpa is not None: 365 insights.append( 366 f"Using target CPA ¥{resolved_cpa:,.0f} ({cpa_source}) x 1.5 = " 367 f"¥{effective_threshold:,.0f} as exclusion threshold" 368 ) 369 else: 370 insights.append( 371 "Could not retrieve target CPA; " 372 "threshold-based exclusion was skipped except for informational patterns" 373 ) 374 if suggestions: 375 insights.append( 376 f"There are {len(suggestions)} negative keyword candidates. " 377 f"Adding them could save up to ¥{potential_savings:,.0f}" 378 ) 379 if watch_terms: 380 insights.append( 381 f"There are {len(watch_terms)} new terms. " 382 "They were not present in the previous period; observation is recommended" 383 ) 384 if not suggestions and not watch_terms and total_wasteful_cost == 0: 385 insights.append( 386 "No zero-CV search terms exceeding the exclusion threshold were found" 387 ) 388 389 result: dict[str, Any] = { 390 "campaign_id": campaign_id, 391 "ad_group_id": ad_group_id, 392 "period": period, 393 "target_cpa": resolved_cpa, 394 "target_cpa_source": cpa_source, 395 "existing_negative_count": len(existing_negatives), 396 "suggestions": suggestions, 397 "watch_terms": watch_terms, 398 "total_wasteful_cost": round(total_wasteful_cost, 0), 399 "potential_savings": round(potential_savings, 0), 400 "insights": insights, 401 } 402 403 # Intent analysis (optional) 404 if use_intent_analysis: 405 logger.info( 406 "suggest_negative_keywords: intent analysis start campaign_id=%s", 407 campaign_id, 408 ) 409 intent_additions = await self._suggest_by_intent( 410 campaign_id=campaign_id, 411 search_terms=search_terms, 412 existing_suggestions=suggestions, 413 existing_neg_texts=existing_neg_texts, 414 ) 415 logger.info( 416 "suggest_negative_keywords: intent analysis done campaign_id=%s", 417 campaign_id, 418 ) 419 if intent_additions: 420 result["intent_based_suggestions"] = intent_additions 421 insights.append( 422 f"Intent analysis detected {len(intent_additions)} additional " 423 "exclusion candidates" 424 ) 425 426 return result 427 428 # ================================================================= 429 # Search term review (multi-stage evaluation) 430 # ================================================================= 431 432 async def review_search_terms( 433 self, 434 campaign_id: str, 435 period: str = "LAST_7_DAYS", 436 target_cpa: float | None = None, 437 use_intent_analysis: bool = True, 438 ad_group_id: str | None = None, 439 ) -> dict[str, Any]: 440 """Review search terms with multi-stage rules and suggest add/exclude candidates.""" 441 self._validate_id(campaign_id, "campaign_id") 442 443 effective_target = target_cpa 444 445 # Resolve target CPA 446 resolved_cpa, cpa_source = await self._resolve_target_cpa( 447 campaign_id, explicit=effective_target 448 ) 449 450 # Retrieve search terms for current/previous periods 451 search_terms, prev_term_set = await self._fetch_terms_with_prev( 452 campaign_id, period, ad_group_id=ad_group_id 453 ) 454 455 keywords = await self.list_keywords( 456 campaign_id=campaign_id, ad_group_id=ad_group_id 457 ) 458 keyword_texts: set[str] = {kw.get("text", "").lower() for kw in keywords} 459 existing_negatives = await self.list_negative_keywords(campaign_id=campaign_id) 460 existing_neg_texts: set[str] = { 461 n.get("keyword_text", "").lower() for n in existing_negatives 462 } 463 464 add_candidates: list[dict[str, Any]] = [] 465 exclude_candidates: list[dict[str, Any]] = [] 466 watch_candidates: list[dict[str, Any]] = [] 467 468 for t in search_terms: 469 self._classify_search_term( 470 t, 471 keyword_texts=keyword_texts, 472 existing_neg_texts=existing_neg_texts, 473 prev_term_set=prev_term_set, 474 resolved_cpa=resolved_cpa, 475 add_candidates=add_candidates, 476 exclude_candidates=exclude_candidates, 477 watch_candidates=watch_candidates, 478 ) 479 480 # Sort by score descending 481 add_candidates.sort(key=lambda x: x["score"], reverse=True) 482 exclude_candidates.sort(key=lambda x: x["score"], reverse=True) 483 watch_candidates.sort(key=lambda x: x["score"], reverse=True) 484 485 # Intent analysis (optional) 486 intent_summary: dict[str, Any] | None = None 487 if use_intent_analysis: 488 logger.info( 489 "review_search_terms: intent analysis start campaign_id=%s", campaign_id 490 ) 491 intent_summary = await self._apply_intent_analysis( 492 campaign_id=campaign_id, 493 add_candidates=add_candidates, 494 exclude_candidates=exclude_candidates, 495 watch_candidates=watch_candidates, 496 keyword_texts=keyword_texts, 497 ) 498 logger.info( 499 "review_search_terms: intent analysis done campaign_id=%s", campaign_id 500 ) 501 502 result: dict[str, Any] = { 503 "campaign_id": campaign_id, 504 "ad_group_id": ad_group_id, 505 "period": period, 506 "target_cpa": resolved_cpa, 507 "target_cpa_source": cpa_source, 508 "add_candidates": add_candidates, 509 "exclude_candidates": exclude_candidates, 510 "watch_candidates": watch_candidates, 511 "summary": { 512 "total_search_terms": len(search_terms), 513 "add_count": len(add_candidates), 514 "exclude_count": len(exclude_candidates), 515 "watch_count": len(watch_candidates), 516 }, 517 } 518 if intent_summary is not None: 519 result["intent_analysis"] = intent_summary 520 return result 521 522 def _classify_search_term( 523 self, 524 t: dict[str, Any], 525 *, 526 keyword_texts: set[str], 527 existing_neg_texts: set[str], 528 prev_term_set: set[str], 529 resolved_cpa: float | None, 530 add_candidates: list[dict[str, Any]], 531 exclude_candidates: list[dict[str, Any]], 532 watch_candidates: list[dict[str, Any]], 533 ) -> None: 534 """Classify a single search term using evaluation rules and add to the appropriate list.""" 535 term_text = t.get("search_term", "") 536 m = t.get("metrics", {}) 537 conversions = float(m.get("conversions", 0)) 538 clicks = int(m.get("clicks", 0)) 539 cost = float(m.get("cost", 0)) 540 impressions = int(m.get("impressions", 0)) 541 ctr = clicks / impressions if impressions > 0 else 0.0 542 543 is_registered = term_text.lower() in keyword_texts 544 is_new = term_text.lower() not in prev_term_set 545 546 # Rule 1: CV>=2 & 未登録 → add EXACT (score=90) 547 if conversions >= 2 and not is_registered: 548 add_candidates.append( 549 _build_add_candidate( 550 term_text, 551 conversions, 552 clicks, 553 cost, 554 ctr, 555 "EXACT", 556 90, 557 f"{conversions:.0f} conversions, keyword not registered", 558 ) 559 ) 560 return 561 562 # Rule 2: CV=1 & CPA<=目標CPA & 未登録 → add EXACT (score=70) 563 if conversions == 1 and not is_registered and resolved_cpa is not None: 564 cpa = cost 565 if cpa <= resolved_cpa: 566 add_candidates.append( 567 _build_add_candidate( 568 term_text, 569 conversions, 570 clicks, 571 cost, 572 ctr, 573 "EXACT", 574 70, 575 f"CV1件、CPA ¥{cpa:,.0f} ≤ 目標CPA ¥{resolved_cpa:,.0f}", 576 ) 577 ) 578 return 579 580 # Rule 3: CV=0 & Click>=20 & CTR>=3% & 未登録 → add PHRASE (score=50) 581 if conversions == 0 and clicks >= 20 and ctr >= 0.03 and not is_registered: 582 add_candidates.append( 583 _build_add_candidate( 584 term_text, 585 conversions, 586 clicks, 587 cost, 588 ctr, 589 "PHRASE", 590 50, 591 f"CTR {ctr:.1%} (high CTR), {clicks} clicks", 592 ) 593 ) 594 return 595 596 # Exclusion candidates: skip if already registered as negative keyword 597 is_already_excluded = term_text.lower() in existing_neg_texts 598 599 # Rule 4: CV=0 & cost >= target CPA × 2 → exclude EXACT (score=80) 600 if ( 601 conversions == 0 602 and resolved_cpa is not None 603 and cost >= resolved_cpa * 2 604 and not is_already_excluded 605 ): 606 entry = _build_exclude_candidate( 607 term_text, 608 conversions, 609 clicks, 610 cost, 611 ctr, 612 "EXACT", 613 80, 614 f"0 conversions, cost ¥{cost:,.0f} >= target CPA x2 (¥{resolved_cpa * 2:,.0f})", 615 ) 616 self._route_by_newness( 617 entry, term_text, is_new, exclude_candidates, watch_candidates 618 ) 619 return 620 621 # Rule 5: CV=0 & Click>=30 & CTR<1% → exclude EXACT (score=60) 622 if conversions == 0 and clicks >= 30 and ctr < 0.01 and not is_already_excluded: 623 entry = _build_exclude_candidate( 624 term_text, 625 conversions, 626 clicks, 627 cost, 628 ctr, 629 "EXACT", 630 60, 631 f"0 conversions, {clicks} clicks with CTR {ctr:.2%} (low CTR)", 632 ) 633 self._route_by_newness( 634 entry, term_text, is_new, exclude_candidates, watch_candidates 635 ) 636 return 637 638 # Rule 6: Informational pattern & CV=0 -> exclude PHRASE (score=40) 639 if ( 640 conversions == 0 641 and _is_informational_term(term_text) 642 and not is_already_excluded 643 ): 644 entry = _build_exclude_candidate( 645 term_text, 646 conversions, 647 clicks, 648 cost, 649 ctr, 650 "PHRASE", 651 40, 652 "Informational intent search term (0 CV)", 653 ) 654 self._route_by_newness( 655 entry, term_text, is_new, exclude_candidates, watch_candidates 656 ) 657 658 # ================================================================= 659 # Intent-based search term analysis (LLM helper/stub) 660 # ================================================================= 661 662 async def _apply_intent_analysis( 663 self, 664 campaign_id: str, 665 add_candidates: list[dict[str, Any]], 666 exclude_candidates: list[dict[str, Any]], 667 watch_candidates: list[dict[str, Any]], 668 keyword_texts: set[str], 669 ) -> dict[str, Any]: 670 """Stub for LLM intent analysis. LLM dependency removed in mureo-core.""" 671 return { 672 "classified_count": 0, 673 "adjustments": [], 674 "note": "LLM intent analysis is performed on the Managed side", 675 } 676 677 async def _suggest_by_intent( 678 self, 679 campaign_id: str, 680 search_terms: list[dict[str, Any]], 681 existing_suggestions: list[dict[str, Any]], 682 existing_neg_texts: set[str], 683 ) -> list[dict[str, Any]]: 684 """Stub for additional suggestions via LLM intent analysis.""" 685 return [] 686 687 async def _get_strategic_context_for_intent(self, campaign_id: str) -> str | None: 688 """Stub for strategic context retrieval.""" 689 return None