/ mureo / google_ads / _analysis_search_terms.py
_analysis_search_terms.py
  1  """Search term analysis mixin."""
  2  
  3  from __future__ import annotations
  4  
  5  import logging
  6  from typing import TYPE_CHECKING, Any
  7  
  8  from mureo.google_ads._analysis_constants import (
  9      _INFORMATIONAL_PATTERNS,
 10      _extract_ngrams,
 11      _get_comparison_date_ranges,
 12  )
 13  
 14  if TYPE_CHECKING:
 15      from google.ads.googleads.client import GoogleAdsClient
 16  
 17  logger = logging.getLogger(__name__)
 18  
 19  
 20  def _is_informational_term(term_text: str) -> bool:
 21      """Determine if a search term matches informational patterns."""
 22      return any(p in term_text for p in _INFORMATIONAL_PATTERNS)
 23  
 24  
 25  def _build_add_candidate(
 26      term_text: str,
 27      conversions: float,
 28      clicks: int,
 29      cost: float,
 30      ctr: float,
 31      match_type: str,
 32      score: int,
 33      reason: str,
 34  ) -> dict[str, Any]:
 35      """Build candidate entry for keyword addition."""
 36      return {
 37          "search_term": term_text,
 38          "action": "add",
 39          "match_type": match_type,
 40          "score": score,
 41          "reason": reason,
 42          "metrics": {
 43              "conversions": conversions,
 44              "clicks": clicks,
 45              "cost": cost,
 46              "ctr": round(ctr, 4),
 47          },
 48      }
 49  
 50  
 51  def _build_exclude_candidate(
 52      term_text: str,
 53      conversions: float,
 54      clicks: int,
 55      cost: float,
 56      ctr: float,
 57      match_type: str,
 58      score: int,
 59      reason: str,
 60  ) -> dict[str, Any]:
 61      """Build exclusion candidate entry."""
 62      return {
 63          "search_term": term_text,
 64          "action": "exclude",
 65          "match_type": match_type,
 66          "score": score,
 67          "reason": reason,
 68          "metrics": {
 69              "conversions": conversions,
 70              "clicks": clicks,
 71              "cost": cost,
 72              "ctr": round(ctr, 4),
 73          },
 74      }
 75  
 76  
 77  class _SearchTermsAnalysisMixin:
 78      """Mixin providing search term analysis methods."""
 79  
 80      # Type declarations for attributes/methods provided by parent class
 81      _customer_id: str
 82      _client: GoogleAdsClient
 83  
 84      @staticmethod
 85      def _validate_id(value: str, field_name: str) -> str: ...  # type: ignore[empty-body]
 86  
 87      async def get_search_terms_report(self, **kwargs: Any) -> list[dict[str, Any]]: ...  # type: ignore[empty-body]
 88      async def list_keywords(  # type: ignore[empty-body]
 89          self, ad_group_id: str | None = None, campaign_id: str | None = None
 90      ) -> list[dict[str, Any]]: ...
 91      async def list_negative_keywords(  # type: ignore[empty-body]
 92          self, campaign_id: str
 93      ) -> list[dict[str, Any]]: ...
 94  
 95      # Method from PerformanceAnalysisMixin
 96      async def _resolve_target_cpa(  # type: ignore[empty-body]
 97          self, campaign_id: str, explicit: float | None = None
 98      ) -> tuple[float | None, str]: ...
 99  
100      # =================================================================
101      # Common: Search terms retrieval with previous period / new term routing
102      # =================================================================
103  
104      async def _fetch_terms_with_prev(
105          self,
106          campaign_id: str,
107          period: str,
108          ad_group_id: str | None = None,
109      ) -> tuple[list[dict[str, Any]], set[str]]:
110          """Return current period search terms list and previous period term text set."""
111          current_range, prev_range = _get_comparison_date_ranges(period)
112          search_terms = await self.get_search_terms_report(
113              campaign_id=campaign_id, ad_group_id=ad_group_id, period=current_range
114          )
115          prev_terms = await self.get_search_terms_report(
116              campaign_id=campaign_id, ad_group_id=ad_group_id, period=prev_range
117          )
118          prev_term_set = {t.get("search_term", "").lower() for t in prev_terms}
119          return search_terms, prev_term_set
120  
121      @staticmethod
122      def _route_by_newness(
123          entry: dict[str, Any],
124          term_text: str,
125          is_new: bool,
126          main_list: list[dict[str, Any]],
127          watch_list: list[dict[str, Any]],
128      ) -> None:
129          """Route new terms to watch list and existing terms to main list."""
130          if is_new:
131              entry["reason"] = f"New term (under observation): {entry['reason']}"
132              if "action" in entry:
133                  entry["action"] = "watch"
134              watch_list.append(entry)
135          else:
136              main_list.append(entry)
137  
138      # =================================================================
139      # Search term overlap analysis
140      # =================================================================
141  
142      async def analyze_search_terms(
143          self,
144          campaign_id: str,
145          period: str = "LAST_30_DAYS",
146      ) -> dict[str, Any]:
147          """Analyze search term/keyword overlap, N-gram distribution, and candidates."""
148          self._validate_id(campaign_id, "campaign_id")
149  
150          # Retrieve keywords and search terms
151          keywords = await self.list_keywords(campaign_id=campaign_id)
152          search_terms = await self.get_search_terms_report(
153              campaign_id=campaign_id, period=period
154          )
155  
156          # Set of keyword texts (lowercase)
157          keyword_texts: set[str] = {kw.get("text", "").lower() for kw in keywords}
158  
159          # Overlap rate
160          overlap_count = sum(
161              1 for t in search_terms if t.get("search_term", "").lower() in keyword_texts
162          )
163          overlap_rate = overlap_count / len(search_terms) if search_terms else 0.0
164  
165          # N-gram distribution (1-3gram)
166          ngram_agg: dict[int, dict[str, dict[str, float]]] = {
167              1: {},
168              2: {},
169              3: {},
170          }
171          for t in search_terms:
172              text = t.get("search_term", "")
173              m = t.get("metrics", {})
174              cost = float(m.get("cost", 0))
175              convs = float(m.get("conversions", 0))
176              for n in (1, 2, 3):
177                  for gram in _extract_ngrams(text, n):
178                      agg = ngram_agg[n].setdefault(
179                          gram, {"count": 0, "cost": 0.0, "conversions": 0.0}
180                      )
181                      agg["count"] += 1
182                      agg["cost"] += cost
183                      agg["conversions"] += convs
184  
185          ngram_distribution: dict[str, list[dict[str, Any]]] = {}
186          label_map = {1: "unigrams", 2: "bigrams", 3: "trigrams"}
187          for n, label in label_map.items():
188              sorted_grams = sorted(
189                  ngram_agg[n].items(),
190                  key=lambda x: x[1]["count"],
191                  reverse=True,
192              )[:10]
193              ngram_distribution[label] = [
194                  {
195                      "text": g,
196                      "count": int(v["count"]),
197                      "cost": round(v["cost"], 0),
198                      "conversions": round(v["conversions"], 1),
199                  }
200                  for g, v in sorted_grams
201              ]
202  
203          # Keyword candidates: CV > 0 and not registered
204          keyword_candidates = [
205              {
206                  "search_term": t.get("search_term", ""),
207                  "conversions": float(t.get("metrics", {}).get("conversions", 0)),
208                  "cost": float(t.get("metrics", {}).get("cost", 0)),
209                  "clicks": int(t.get("metrics", {}).get("clicks", 0)),
210              }
211              for t in search_terms
212              if float(t.get("metrics", {}).get("conversions", 0)) > 0
213              and t.get("search_term", "").lower() not in keyword_texts
214          ]
215  
216          # Exclusion candidates: has cost, CV=0 (sorted by cost desc, top 20)
217          negative_candidates = sorted(
218              [
219                  {
220                      "search_term": t.get("search_term", ""),
221                      "cost": float(t.get("metrics", {}).get("cost", 0)),
222                      "clicks": int(t.get("metrics", {}).get("clicks", 0)),
223                      "impressions": int(t.get("metrics", {}).get("impressions", 0)),
224                  }
225                  for t in search_terms
226                  if float(t.get("metrics", {}).get("cost", 0)) > 0
227                  and float(t.get("metrics", {}).get("conversions", 0)) == 0
228              ],
229              key=lambda x: x["cost"],
230              reverse=True,
231          )[:20]
232  
233          # Insight generation
234          insights: list[str] = []
235          if overlap_rate < 0.3:
236              insights.append(
237                  f"Overlap rate is {overlap_rate:.0%}, which is low. "
238                  "Many search terms are not registered as keywords. "
239                  "Consider adding keywords"
240              )
241          if negative_candidates:
242              total_waste = sum(c["cost"] for c in negative_candidates)
243              insights.append(
244                  f"There are {len(negative_candidates)} search terms with cost but no conversions, "
245                  f"resulting in ¥{total_waste:,.0f} of wasted cost"
246              )
247          if keyword_candidates:
248              insights.append(
249                  f"There are {len(keyword_candidates)} search terms with conversions that are not registered. "
250                  "We recommend adding them as keywords"
251              )
252  
253          return {
254              "campaign_id": campaign_id,
255              "period": period,
256              "registered_keywords_count": len(keywords),
257              "search_terms_count": len(search_terms),
258              "overlap_rate": round(overlap_rate, 3),
259              "ngram_distribution": ngram_distribution,
260              "keyword_candidates": keyword_candidates,
261              "negative_candidates": negative_candidates,
262              "insights": insights,
263          }
264  
265      # =================================================================
266      # Automatic negative keyword suggestions
267      # =================================================================
268  
269      async def suggest_negative_keywords(
270          self,
271          campaign_id: str,
272          period: str = "LAST_30_DAYS",
273          target_cpa: float | None = None,
274          use_intent_analysis: bool = True,
275          ad_group_id: str | None = None,
276          **_kwargs: Any,
277      ) -> dict[str, Any]:
278          """Automatically suggest negative keyword candidates."""
279          self._validate_id(campaign_id, "campaign_id")
280  
281          effective_target = target_cpa
282  
283          # CPA-based threshold resolution (always use CPA x 1.5)
284          resolved_cpa, cpa_source = await self._resolve_target_cpa(
285              campaign_id, explicit=effective_target
286          )
287          effective_threshold: float | None = None
288          if resolved_cpa is not None:
289              effective_threshold = resolved_cpa * 1.5
290  
291          # Retrieve search terms for current/previous periods (for new term protection)
292          search_terms, prev_term_set = await self._fetch_terms_with_prev(
293              campaign_id, period, ad_group_id=ad_group_id
294          )
295  
296          existing_negatives = await self.list_negative_keywords(campaign_id)
297  
298          # Existing negative keyword texts (lowercase)
299          existing_neg_texts: set[str] = {
300              n.get("keyword_text", "").lower() for n in existing_negatives
301          }
302  
303          # Filter: >= target CPA x 1.5, CV=0, no overlap with existing negatives
304          suggestions: list[dict[str, Any]] = []
305          watch_terms: list[dict[str, Any]] = []
306          total_wasteful_cost: float = 0.0
307          for t in search_terms:
308              m = t.get("metrics", {})
309              cost = float(m.get("cost", 0))
310              convs = float(m.get("conversions", 0))
311              term_text = t.get("search_term", "")
312  
313              if convs > 0:
314                  continue
315  
316              is_new = term_text.lower() not in prev_term_set
317              is_informational = _is_informational_term(term_text)
318  
319              if not is_informational:
320                  if effective_threshold is not None and cost < effective_threshold:
321                      if cost > 0:
322                          total_wasteful_cost += cost
323                      continue
324                  if effective_threshold is None:
325                      if cost > 0:
326                          total_wasteful_cost += cost
327                      continue
328  
329              if cost > 0:
330                  total_wasteful_cost += cost
331  
332              if term_text.lower() in existing_neg_texts:
333                  continue
334  
335              # Recommended match type
336              if is_informational:
337                  match_type = "PHRASE"
338                  reason = f"Informational intent (0 CV, cost ¥{cost:,.0f})"
339              else:
340                  if resolved_cpa is None:
341                      raise RuntimeError("resolved_cpa should not be None here")
342                  word_count = len(term_text.strip().split())
343                  match_type = "EXACT" if word_count <= 2 else "PHRASE"
344                  reason = f"¥{cost:,.0f} cost with 0 CV (exceeds target CPA ¥{resolved_cpa:,.0f} x 1.5)"
345  
346              entry = {
347                  "search_term": term_text,
348                  "cost": cost,
349                  "clicks": int(m.get("clicks", 0)),
350                  "impressions": int(m.get("impressions", 0)),
351                  "ctr": float(m.get("ctr", 0)),
352                  "recommended_match_type": match_type,
353                  "reason": reason,
354              }
355              self._route_by_newness(entry, term_text, is_new, suggestions, watch_terms)
356  
357          # Sort by cost descending
358          suggestions.sort(key=lambda x: x["cost"], reverse=True)
359          watch_terms.sort(key=lambda x: x["cost"], reverse=True)
360          potential_savings = sum(s["cost"] for s in suggestions)
361  
362          # Insight generation
363          insights: list[str] = []
364          if resolved_cpa is not None:
365              insights.append(
366                  f"Using target CPA ¥{resolved_cpa:,.0f} ({cpa_source}) x 1.5 = "
367                  f"¥{effective_threshold:,.0f} as exclusion threshold"
368              )
369          else:
370              insights.append(
371                  "Could not retrieve target CPA; "
372                  "threshold-based exclusion was skipped except for informational patterns"
373              )
374          if suggestions:
375              insights.append(
376                  f"There are {len(suggestions)} negative keyword candidates. "
377                  f"Adding them could save up to ¥{potential_savings:,.0f}"
378              )
379          if watch_terms:
380              insights.append(
381                  f"There are {len(watch_terms)} new terms. "
382                  "They were not present in the previous period; observation is recommended"
383              )
384          if not suggestions and not watch_terms and total_wasteful_cost == 0:
385              insights.append(
386                  "No zero-CV search terms exceeding the exclusion threshold were found"
387              )
388  
389          result: dict[str, Any] = {
390              "campaign_id": campaign_id,
391              "ad_group_id": ad_group_id,
392              "period": period,
393              "target_cpa": resolved_cpa,
394              "target_cpa_source": cpa_source,
395              "existing_negative_count": len(existing_negatives),
396              "suggestions": suggestions,
397              "watch_terms": watch_terms,
398              "total_wasteful_cost": round(total_wasteful_cost, 0),
399              "potential_savings": round(potential_savings, 0),
400              "insights": insights,
401          }
402  
403          # Intent analysis (optional)
404          if use_intent_analysis:
405              logger.info(
406                  "suggest_negative_keywords: intent analysis start campaign_id=%s",
407                  campaign_id,
408              )
409              intent_additions = await self._suggest_by_intent(
410                  campaign_id=campaign_id,
411                  search_terms=search_terms,
412                  existing_suggestions=suggestions,
413                  existing_neg_texts=existing_neg_texts,
414              )
415              logger.info(
416                  "suggest_negative_keywords: intent analysis done campaign_id=%s",
417                  campaign_id,
418              )
419              if intent_additions:
420                  result["intent_based_suggestions"] = intent_additions
421                  insights.append(
422                      f"Intent analysis detected {len(intent_additions)} additional "
423                      "exclusion candidates"
424                  )
425  
426          return result
427  
428      # =================================================================
429      # Search term review (multi-stage evaluation)
430      # =================================================================
431  
432      async def review_search_terms(
433          self,
434          campaign_id: str,
435          period: str = "LAST_7_DAYS",
436          target_cpa: float | None = None,
437          use_intent_analysis: bool = True,
438          ad_group_id: str | None = None,
439      ) -> dict[str, Any]:
440          """Review search terms with multi-stage rules and suggest add/exclude candidates."""
441          self._validate_id(campaign_id, "campaign_id")
442  
443          effective_target = target_cpa
444  
445          # Resolve target CPA
446          resolved_cpa, cpa_source = await self._resolve_target_cpa(
447              campaign_id, explicit=effective_target
448          )
449  
450          # Retrieve search terms for current/previous periods
451          search_terms, prev_term_set = await self._fetch_terms_with_prev(
452              campaign_id, period, ad_group_id=ad_group_id
453          )
454  
455          keywords = await self.list_keywords(
456              campaign_id=campaign_id, ad_group_id=ad_group_id
457          )
458          keyword_texts: set[str] = {kw.get("text", "").lower() for kw in keywords}
459          existing_negatives = await self.list_negative_keywords(campaign_id=campaign_id)
460          existing_neg_texts: set[str] = {
461              n.get("keyword_text", "").lower() for n in existing_negatives
462          }
463  
464          add_candidates: list[dict[str, Any]] = []
465          exclude_candidates: list[dict[str, Any]] = []
466          watch_candidates: list[dict[str, Any]] = []
467  
468          for t in search_terms:
469              self._classify_search_term(
470                  t,
471                  keyword_texts=keyword_texts,
472                  existing_neg_texts=existing_neg_texts,
473                  prev_term_set=prev_term_set,
474                  resolved_cpa=resolved_cpa,
475                  add_candidates=add_candidates,
476                  exclude_candidates=exclude_candidates,
477                  watch_candidates=watch_candidates,
478              )
479  
480          # Sort by score descending
481          add_candidates.sort(key=lambda x: x["score"], reverse=True)
482          exclude_candidates.sort(key=lambda x: x["score"], reverse=True)
483          watch_candidates.sort(key=lambda x: x["score"], reverse=True)
484  
485          # Intent analysis (optional)
486          intent_summary: dict[str, Any] | None = None
487          if use_intent_analysis:
488              logger.info(
489                  "review_search_terms: intent analysis start campaign_id=%s", campaign_id
490              )
491              intent_summary = await self._apply_intent_analysis(
492                  campaign_id=campaign_id,
493                  add_candidates=add_candidates,
494                  exclude_candidates=exclude_candidates,
495                  watch_candidates=watch_candidates,
496                  keyword_texts=keyword_texts,
497              )
498              logger.info(
499                  "review_search_terms: intent analysis done campaign_id=%s", campaign_id
500              )
501  
502          result: dict[str, Any] = {
503              "campaign_id": campaign_id,
504              "ad_group_id": ad_group_id,
505              "period": period,
506              "target_cpa": resolved_cpa,
507              "target_cpa_source": cpa_source,
508              "add_candidates": add_candidates,
509              "exclude_candidates": exclude_candidates,
510              "watch_candidates": watch_candidates,
511              "summary": {
512                  "total_search_terms": len(search_terms),
513                  "add_count": len(add_candidates),
514                  "exclude_count": len(exclude_candidates),
515                  "watch_count": len(watch_candidates),
516              },
517          }
518          if intent_summary is not None:
519              result["intent_analysis"] = intent_summary
520          return result
521  
522      def _classify_search_term(
523          self,
524          t: dict[str, Any],
525          *,
526          keyword_texts: set[str],
527          existing_neg_texts: set[str],
528          prev_term_set: set[str],
529          resolved_cpa: float | None,
530          add_candidates: list[dict[str, Any]],
531          exclude_candidates: list[dict[str, Any]],
532          watch_candidates: list[dict[str, Any]],
533      ) -> None:
534          """Classify a single search term using evaluation rules and add to the appropriate list."""
535          term_text = t.get("search_term", "")
536          m = t.get("metrics", {})
537          conversions = float(m.get("conversions", 0))
538          clicks = int(m.get("clicks", 0))
539          cost = float(m.get("cost", 0))
540          impressions = int(m.get("impressions", 0))
541          ctr = clicks / impressions if impressions > 0 else 0.0
542  
543          is_registered = term_text.lower() in keyword_texts
544          is_new = term_text.lower() not in prev_term_set
545  
546          # Rule 1: CV>=2 & 未登録 → add EXACT (score=90)
547          if conversions >= 2 and not is_registered:
548              add_candidates.append(
549                  _build_add_candidate(
550                      term_text,
551                      conversions,
552                      clicks,
553                      cost,
554                      ctr,
555                      "EXACT",
556                      90,
557                      f"{conversions:.0f} conversions, keyword not registered",
558                  )
559              )
560              return
561  
562          # Rule 2: CV=1 & CPA<=目標CPA & 未登録 → add EXACT (score=70)
563          if conversions == 1 and not is_registered and resolved_cpa is not None:
564              cpa = cost
565              if cpa <= resolved_cpa:
566                  add_candidates.append(
567                      _build_add_candidate(
568                          term_text,
569                          conversions,
570                          clicks,
571                          cost,
572                          ctr,
573                          "EXACT",
574                          70,
575                          f"CV1件、CPA ¥{cpa:,.0f} ≤ 目標CPA ¥{resolved_cpa:,.0f}",
576                      )
577                  )
578                  return
579  
580          # Rule 3: CV=0 & Click>=20 & CTR>=3% & 未登録 → add PHRASE (score=50)
581          if conversions == 0 and clicks >= 20 and ctr >= 0.03 and not is_registered:
582              add_candidates.append(
583                  _build_add_candidate(
584                      term_text,
585                      conversions,
586                      clicks,
587                      cost,
588                      ctr,
589                      "PHRASE",
590                      50,
591                      f"CTR {ctr:.1%} (high CTR), {clicks} clicks",
592                  )
593              )
594              return
595  
596          # Exclusion candidates: skip if already registered as negative keyword
597          is_already_excluded = term_text.lower() in existing_neg_texts
598  
599          # Rule 4: CV=0 & cost >= target CPA × 2 → exclude EXACT (score=80)
600          if (
601              conversions == 0
602              and resolved_cpa is not None
603              and cost >= resolved_cpa * 2
604              and not is_already_excluded
605          ):
606              entry = _build_exclude_candidate(
607                  term_text,
608                  conversions,
609                  clicks,
610                  cost,
611                  ctr,
612                  "EXACT",
613                  80,
614                  f"0 conversions, cost ¥{cost:,.0f} >= target CPA x2 (¥{resolved_cpa * 2:,.0f})",
615              )
616              self._route_by_newness(
617                  entry, term_text, is_new, exclude_candidates, watch_candidates
618              )
619              return
620  
621          # Rule 5: CV=0 & Click>=30 & CTR<1% → exclude EXACT (score=60)
622          if conversions == 0 and clicks >= 30 and ctr < 0.01 and not is_already_excluded:
623              entry = _build_exclude_candidate(
624                  term_text,
625                  conversions,
626                  clicks,
627                  cost,
628                  ctr,
629                  "EXACT",
630                  60,
631                  f"0 conversions, {clicks} clicks with CTR {ctr:.2%} (low CTR)",
632              )
633              self._route_by_newness(
634                  entry, term_text, is_new, exclude_candidates, watch_candidates
635              )
636              return
637  
638          # Rule 6: Informational pattern & CV=0 -> exclude PHRASE (score=40)
639          if (
640              conversions == 0
641              and _is_informational_term(term_text)
642              and not is_already_excluded
643          ):
644              entry = _build_exclude_candidate(
645                  term_text,
646                  conversions,
647                  clicks,
648                  cost,
649                  ctr,
650                  "PHRASE",
651                  40,
652                  "Informational intent search term (0 CV)",
653              )
654              self._route_by_newness(
655                  entry, term_text, is_new, exclude_candidates, watch_candidates
656              )
657  
658      # =================================================================
659      # Intent-based search term analysis (LLM helper/stub)
660      # =================================================================
661  
662      async def _apply_intent_analysis(
663          self,
664          campaign_id: str,
665          add_candidates: list[dict[str, Any]],
666          exclude_candidates: list[dict[str, Any]],
667          watch_candidates: list[dict[str, Any]],
668          keyword_texts: set[str],
669      ) -> dict[str, Any]:
670          """Stub for LLM intent analysis. LLM dependency removed in mureo-core."""
671          return {
672              "classified_count": 0,
673              "adjustments": [],
674              "note": "LLM intent analysis is performed on the Managed side",
675          }
676  
677      async def _suggest_by_intent(
678          self,
679          campaign_id: str,
680          search_terms: list[dict[str, Any]],
681          existing_suggestions: list[dict[str, Any]],
682          existing_neg_texts: set[str],
683      ) -> list[dict[str, Any]]:
684          """Stub for additional suggestions via LLM intent analysis."""
685          return []
686  
687      async def _get_strategic_context_for_intent(self, campaign_id: str) -> str | None:
688          """Stub for strategic context retrieval."""
689          return None