/ mureo / google_ads / _monitoring.py
_monitoring.py
  1  from __future__ import annotations
  2  
  3  import logging
  4  from typing import TYPE_CHECKING, Any
  5  
  6  if TYPE_CHECKING:
  7      from google.ads.googleads.client import GoogleAdsClient
  8  
  9  logger = logging.getLogger(__name__)
 10  
 11  
 12  class _MonitoringMixin:
 13      """Mixin providing monitoring target evaluation macro tools."""
 14  
 15      # Type declarations for attributes/methods provided by parent class (GoogleAdsApiClient)
 16      # Not present at runtime (placed inside TYPE_CHECKING to avoid overriding implementations via MRO)
 17      if TYPE_CHECKING:
 18          _customer_id: str
 19          _client: GoogleAdsClient
 20  
 21          @staticmethod
 22          def _validate_id(value: str, field_name: str) -> str: ...
 23          async def get_campaign(self, campaign_id: str) -> dict[str, Any] | None: ...
 24          async def get_performance_report(
 25              self, **kwargs: Any
 26          ) -> list[dict[str, Any]]: ...
 27          async def diagnose_campaign_delivery(
 28              self, campaign_id: str
 29          ) -> dict[str, Any]: ...
 30          async def analyze_performance(
 31              self, campaign_id: str, period: str = "LAST_7_DAYS"
 32          ) -> dict[str, Any]: ...
 33          async def investigate_cost_increase(
 34              self, campaign_id: str
 35          ) -> dict[str, Any]: ...
 36          async def get_search_terms_report(
 37              self, **kwargs: Any
 38          ) -> list[dict[str, Any]]: ...
 39          async def list_conversion_actions(self) -> list[dict[str, Any]]: ...
 40  
 41      # =================================================================
 42      # 1. Delivery goal evaluation
 43      # =================================================================
 44  
 45      async def evaluate_delivery_goal(self, campaign_id: str) -> dict[str, Any]:
 46          """Evaluate delivery goal by integrating delivery status and performance."""
 47          self._validate_id(campaign_id, "campaign_id")
 48          issues: list[str] = []
 49          result: dict[str, Any] = {"campaign_id": campaign_id}
 50  
 51          # 1. Campaign basic information
 52          campaign: dict[str, Any] | None = None
 53          try:
 54              campaign = await self.get_campaign(campaign_id)
 55          except Exception:
 56              logger.warning("Failed to retrieve campaign information", exc_info=True)
 57          result["campaign"] = campaign
 58  
 59          # 2. Delivery diagnostics
 60          diagnosis: dict[str, Any] = {}
 61          try:
 62              diagnosis = await self.diagnose_campaign_delivery(campaign_id)
 63          except Exception:
 64              logger.warning("Failed to retrieve delivery diagnostics", exc_info=True)
 65              issues.append("Failed to retrieve delivery diagnostics")
 66          result["diagnosis"] = diagnosis
 67  
 68          # 3. Previous day performance
 69          performance: list[dict[str, Any]] = []
 70          try:
 71              performance = await self.get_performance_report(
 72                  campaign_id=campaign_id, period="YESTERDAY"
 73              )
 74          except Exception:
 75              logger.warning("Failed to retrieve previous day performance", exc_info=True)
 76              issues.append("Failed to retrieve previous day performance")
 77          result["performance"] = performance
 78  
 79          # Extract metrics
 80          metrics = performance[0].get("metrics", {}) if performance else {}
 81          impressions = int(metrics.get("impressions", 0))
 82  
 83          # Status determination
 84          has_issues = bool(diagnosis.get("issues"))
 85          has_warnings = bool(diagnosis.get("warnings"))
 86          campaign_status = (campaign or {}).get("status", "")
 87  
 88          if has_issues:
 89              issues.append("Issues detected in delivery diagnostics")
 90          if campaign_status and campaign_status != "ENABLED":
 91              issues.append(f"Campaign status is {campaign_status}")
 92          if impressions == 0:
 93              issues.append("Yesterday's impressions are 0")
 94  
 95          is_critical = (
 96              has_issues
 97              or (campaign_status and campaign_status != "ENABLED")
 98              or impressions == 0
 99          )
100          is_warning = has_warnings or (
101              not is_critical and impressions > 0 and impressions < 10
102          )
103  
104          if is_critical:
105              status = "critical"
106          elif is_warning:
107              status = "warning"
108              if has_warnings:
109                  issues.append("Warnings detected in delivery diagnostics")
110          else:
111              status = "healthy"
112  
113          result["status"] = status
114          result["issues"] = issues
115  
116          if status in ("critical", "warning"):
117              result["suggested_workflow"] = "delivery_fix"
118  
119          # Summary generation
120          if status == "critical":
121              result["summary"] = (
122                  f"Campaign {campaign_id} has critical delivery issues. "
123                  f"Issues detected: {', '.join(issues)}"
124              )
125          elif status == "warning":
126              result["summary"] = (
127                  f"Campaign {campaign_id} delivery needs attention. "
128                  f"Warnings detected: {', '.join(issues)}"
129              )
130          else:
131              result["summary"] = (
132                  f"Campaign {campaign_id} delivery is operating normally. "
133                  f"Previous day impressions: {impressions:,}"
134              )
135  
136          return result
137  
138      # =================================================================
139      # 2. CPA goal evaluation
140      # =================================================================
141  
142      async def evaluate_cpa_goal(
143          self, campaign_id: str, target_cpa: float
144      ) -> dict[str, Any]:
145          """Evaluate current performance against CPA target."""
146          self._validate_id(campaign_id, "campaign_id")
147          issues: list[str] = []
148          result: dict[str, Any] = {
149              "campaign_id": campaign_id,
150              "target_cpa": target_cpa,
151          }
152  
153          # 1. Last 7 days performance
154          perf: list[dict[str, Any]] = []
155          try:
156              perf = await self.get_performance_report(
157                  campaign_id=campaign_id, period="LAST_7_DAYS"
158              )
159          except Exception:
160              logger.warning("Failed to retrieve performance report", exc_info=True)
161              issues.append("Failed to retrieve performance report")
162  
163          metrics = perf[0].get("metrics", {}) if perf else {}
164          cost = float(metrics.get("cost", 0))
165          conversions = float(metrics.get("conversions", 0))
166  
167          # 2. Calculate CPA
168          if conversions > 0:
169              current_cpa = round(cost / conversions, 1)
170              result["current_cpa"] = current_cpa
171          else:
172              current_cpa = None
173              result["current_cpa"] = None
174              issues.append(
175                  "Cannot calculate CPA because there are 0 conversions in the last 7 days"
176              )
177  
178          # 3. Cost analysis
179          cost_analysis: dict[str, Any] = {}
180          try:
181              cost_analysis = await self.investigate_cost_increase(campaign_id)
182          except Exception:
183              logger.warning("Failed to retrieve cost analysis", exc_info=True)
184              issues.append("Failed to retrieve cost analysis")
185          result["cost_analysis"] = cost_analysis
186  
187          # Wasteful search terms (top 5)
188          wasteful_terms = cost_analysis.get("wasteful_search_terms", [])
189          if isinstance(wasteful_terms, list):
190              result["wasteful_terms"] = wasteful_terms[:5]
191          else:
192              result["wasteful_terms"] = []
193  
194          # 4. Calculate deviation rate and determine status
195          if current_cpa is not None:
196              deviation_pct = round((current_cpa - target_cpa) / target_cpa * 100, 1)
197              result["deviation_pct"] = deviation_pct
198  
199              if current_cpa <= target_cpa:
200                  status = "healthy"
201              elif current_cpa <= target_cpa * 1.2:
202                  status = "warning"
203                  issues.append(
204                      f"CPA exceeds target by {deviation_pct}%"
205                      f" (current: {current_cpa:,.0f} / target: {target_cpa:,.0f})"
206                  )
207              else:
208                  status = "critical"
209                  issues.append(
210                      f"CPA significantly exceeds target ({deviation_pct}% over). "
211                      f"Current: {current_cpa:,.0f} / target: {target_cpa:,.0f}"
212                  )
213          else:
214              # No conversions
215              status = "warning"
216              result["deviation_pct"] = None
217  
218          result["status"] = status
219          result["issues"] = issues
220  
221          if status in ("critical", "warning"):
222              result["suggested_workflow"] = "cpa_optimization"
223  
224          # Summary generation
225          if current_cpa is not None:
226              if status == "healthy":
227                  result["summary"] = (
228                      f"Campaign {campaign_id} CPA is within target. "
229                      f"Current CPA: {current_cpa:,.0f} yen / Target: {target_cpa:,.0f} yen"
230                      f" (deviation: {result['deviation_pct']}%)"
231                  )
232              elif status == "warning":
233                  result["summary"] = (
234                      f"Campaign {campaign_id} CPA slightly exceeds target. "
235                      f"Current CPA: {current_cpa:,.0f} yen / Target: {target_cpa:,.0f} yen"
236                      f" (deviation: {result['deviation_pct']}%). Early action recommended"
237                  )
238              else:
239                  result["summary"] = (
240                      f"Campaign {campaign_id} CPA significantly exceeds target. "
241                      f"Current CPA: {current_cpa:,.0f} yen / Target: {target_cpa:,.0f} yen"
242                      f" (deviation: {result['deviation_pct']}%). Urgent action required"
243                  )
244          else:
245              result["summary"] = (
246                  f"Campaign {campaign_id} has 0 conversions in the last 7 days, so "
247                  f"CPA cannot be evaluated. Checking delivery status and conversion tracking is recommended"
248              )
249  
250          return result
251  
252      # =================================================================
253      # 3. CV goal evaluation
254      # =================================================================
255  
256      async def evaluate_cv_goal(
257          self, campaign_id: str, target_cv_daily: float
258      ) -> dict[str, Any]:
259          """Evaluate current performance against daily CV target."""
260          self._validate_id(campaign_id, "campaign_id")
261          issues: list[str] = []
262          result: dict[str, Any] = {
263              "campaign_id": campaign_id,
264              "target_cv_daily": target_cv_daily,
265          }
266  
267          # 1. Last 7 days performance
268          perf: list[dict[str, Any]] = []
269          try:
270              perf = await self.get_performance_report(
271                  campaign_id=campaign_id, period="LAST_7_DAYS"
272              )
273          except Exception:
274              logger.warning("Failed to retrieve performance report", exc_info=True)
275              issues.append("Failed to retrieve performance report")
276  
277          metrics = perf[0].get("metrics", {}) if perf else {}
278          impressions = int(metrics.get("impressions", 0))
279          clicks = int(metrics.get("clicks", 0))
280          conversions = float(metrics.get("conversions", 0))
281  
282          # 2. Calculate daily average CV
283          daily_cv = round(conversions / 7, 2)
284          result["current_cv_daily"] = daily_cv
285  
286          # 3. Comprehensive performance analysis
287          performance_analysis: dict[str, Any] = {}
288          try:
289              performance_analysis = await self.analyze_performance(campaign_id)
290          except Exception:
291              logger.warning("Failed to retrieve performance analysis", exc_info=True)
292              issues.append("Failed to retrieve performance analysis")
293          result["performance_analysis"] = performance_analysis
294  
295          # 4. Calculate deviation rate
296          if target_cv_daily > 0:
297              deviation_pct = round(
298                  (daily_cv - target_cv_daily) / target_cv_daily * 100, 1
299              )
300          else:
301              deviation_pct = 0.0
302          result["deviation_pct"] = deviation_pct
303  
304          # 5. Status determination
305          if daily_cv >= target_cv_daily:
306              status = "healthy"
307          elif daily_cv >= target_cv_daily * 0.8:
308              status = "warning"
309              issues.append(
310                  f"Daily CV is below target"
311                  f" (current: {daily_cv:.1f}/day / target: {target_cv_daily:.1f}/day)"
312              )
313          else:
314              status = "critical"
315              issues.append(
316                  f"Daily CV is significantly below target"
317                  f" (current: {daily_cv:.1f}/day / target: {target_cv_daily:.1f}/day,"
318                  f" deviation: {deviation_pct}%)"
319              )
320  
321          result["status"] = status
322  
323          # 6. Bottleneck identification
324          analysis_insights = performance_analysis.get("insights", [])
325          impression_issue_in_insights = any(
326              "impression" in insight.lower() for insight in analysis_insights
327          )
328  
329          if impression_issue_in_insights or (clicks > 0 and impressions < clicks * 10):
330              bottleneck = "impression"
331              if status != "healthy":
332                  issues.append("Impression shortage may be the bottleneck")
333          elif impressions > 0 and (clicks / impressions) < 0.02:
334              bottleneck = "ctr"
335              if status != "healthy":
336                  ctr_value = round(clicks / impressions * 100, 2)
337                  issues.append(
338                      f"CTR is low ({ctr_value}%, below industry average of 2%)"
339                  )
340          elif clicks > 0 and (conversions / clicks) < 0.01:
341              bottleneck = "cvr"
342              if status != "healthy":
343                  cvr_value = round(conversions / clicks * 100, 2)
344                  issues.append(f"CVR is low ({cvr_value}%, below 1%)")
345          else:
346              bottleneck = "cvr"
347  
348          result["bottleneck"] = bottleneck
349          result["issues"] = issues
350  
351          if status in ("critical", "warning"):
352              result["suggested_workflow"] = "cv_increase"
353  
354          # Summary generation
355          bottleneck_label = {
356              "impression": "Insufficient impressions",
357              "ctr": "CTR (click-through rate) decline",
358              "cvr": "CVR (conversion rate) decline",
359          }
360          if status == "healthy":
361              result["summary"] = (
362                  f"Campaign {campaign_id} CV count meets the target. "
363                  f"Daily CV: {daily_cv:.1f} / target: {target_cv_daily:.1f}"
364              )
365          elif status == "warning":
366              result["summary"] = (
367                  f"Campaign {campaign_id} CV count is slightly below target. "
368                  f"Daily CV: {daily_cv:.1f} / target: {target_cv_daily:.1f}"
369                  f" (deviation: {deviation_pct}%)."
370                  f" Main bottleneck: {bottleneck_label[bottleneck]}"
371              )
372          else:
373              result["summary"] = (
374                  f"Campaign {campaign_id} CV count is significantly below target. "
375                  f"Daily CV: {daily_cv:.1f} / target: {target_cv_daily:.1f}"
376                  f" (deviation: {deviation_pct}%)."
377                  f" Main bottleneck: {bottleneck_label[bottleneck]}. Urgent action required"
378              )
379  
380          return result
381  
382      # =================================================================
383      # 4. Conversion acquisition improvement diagnosis
384      # =================================================================
385  
386      async def diagnose_zero_conversions(self, campaign_id: str) -> dict[str, Any]:
387          """Diagnose zero-conversion issues. Collect all data needed for LLM improvement strategy planning."""
388          self._validate_id(campaign_id, "campaign_id")
389          issues: list[str] = []
390          result: dict[str, Any] = {"campaign_id": campaign_id}
391  
392          # 1. Campaign basic information
393          campaign: dict[str, Any] | None = None
394          try:
395              campaign = await self.get_campaign(campaign_id)
396          except Exception:
397              logger.warning("Failed to retrieve campaign information", exc_info=True)
398  
399          # 2. CV計測設定
400          cv_actions: list[dict[str, Any]] = []
401          try:
402              cv_actions = await self.list_conversion_actions()
403          except Exception:
404              logger.warning("Failed to retrieve conversion action list", exc_info=True)
405              issues.append("Failed to retrieve conversion action list")
406  
407          total_actions = len(cv_actions)
408          enabled_actions = sum(
409              1 for a in cv_actions if a.get("status", "").upper() == "ENABLED"
410          )
411          has_cv_issue = total_actions == 0 or enabled_actions == 0
412          result["conversion_tracking"] = {
413              "total_actions": total_actions,
414              "enabled_actions": enabled_actions,
415              "has_issue": has_cv_issue,
416              "actions": cv_actions,
417          }
418          if has_cv_issue:
419              issues.append("No active conversion actions are configured")
420  
421          # 3. Bidding x CV alignment check
422          bidding_strategy = (campaign or {}).get("bidding_strategy", "")
423          smart_bidding_types = {
424              "MAXIMIZE_CONVERSIONS",
425              "TARGET_CPA",
426              "TARGET_ROAS",
427              "MAXIMIZE_CONVERSION_VALUE",
428          }
429          is_smart = bidding_strategy.upper() in smart_bidding_types
430          bidding_issue: str | None = None
431          if is_smart and has_cv_issue:
432              bidding_issue = (
433                  f"Smart bidding ({bidding_strategy}) is configured, but "
434                  "no active conversion tracking is configured"
435              )
436              issues.append(bidding_issue)
437          result["bidding_cv_alignment"] = {
438              "strategy": bidding_strategy,
439              "is_smart_bidding": is_smart,
440              "cv_tracking_configured": not has_cv_issue,
441              "issue": bidding_issue,
442          }
443  
444          # 4. Funnel data (last 7 days)
445          perf: list[dict[str, Any]] = []
446          try:
447              perf = await self.get_performance_report(
448                  campaign_id=campaign_id, period="LAST_7_DAYS"
449              )
450          except Exception:
451              logger.warning("Failed to retrieve performance report", exc_info=True)
452              issues.append("Failed to retrieve performance report")
453  
454          metrics = perf[0].get("metrics", {}) if perf else {}
455          impressions = int(metrics.get("impressions", 0))
456          clicks = int(metrics.get("clicks", 0))
457          conversions = float(metrics.get("conversions", 0))
458          cost = float(metrics.get("cost", 0))
459          ctr = round(clicks / impressions * 100, 2) if impressions > 0 else None
460          cvr = round(conversions / clicks * 100, 2) if clicks > 0 else None
461  
462          if impressions == 0:
463              bottleneck = "no_delivery"
464              issues.append("Impressions in the last 7 days are 0")
465          elif clicks == 0:
466              bottleneck = "no_clicks"
467              issues.append("Clicks in the last 7 days are 0")
468          elif conversions == 0:
469              bottleneck = "no_conversions"
470          else:
471              bottleneck = None
472  
473          result["funnel"] = {
474              "period": "LAST_7_DAYS",
475              "impressions": impressions,
476              "clicks": clicks,
477              "conversions": conversions,
478              "cost": cost,
479              "ctr": ctr,
480              "cvr": cvr,
481              "bottleneck": bottleneck,
482          }
483  
484          # 5. Delivery diagnostics
485          diagnosis: dict[str, Any] = {}
486          try:
487              diagnosis = await self.diagnose_campaign_delivery(campaign_id)
488          except Exception:
489              logger.warning("Failed to retrieve delivery diagnostics", exc_info=True)
490              issues.append("Failed to retrieve delivery diagnostics")
491  
492          result["delivery_diagnosis"] = {
493              "issues": diagnosis.get("issues", []),
494              "warnings": diagnosis.get("warnings", []),
495              "recommendations": diagnosis.get("recommendations", []),
496          }
497  
498          # 6. Search term quality (only when clicks > 0)
499          search_term_quality: dict[str, Any] | None = None
500          if clicks > 0:
501              try:
502                  terms = await self.get_search_terms_report(
503                      campaign_id=campaign_id, period="LAST_7_DAYS"
504                  )
505                  zero_cv_terms = [
506                      t
507                      for t in terms
508                      if float(t.get("metrics", {}).get("conversions", 0)) == 0
509                  ]
510                  zero_cv_cost = sum(
511                      float(t.get("metrics", {}).get("cost", 0)) for t in zero_cv_terms
512                  )
513                  # Top 10 zero-CV high-cost items
514                  sorted_wasteful = sorted(
515                      zero_cv_terms,
516                      key=lambda t: float(t.get("metrics", {}).get("cost", 0)),
517                      reverse=True,
518                  )
519                  search_term_quality = {
520                      "total_terms": len(terms),
521                      "zero_cv_terms": len(zero_cv_terms),
522                      "zero_cv_cost": zero_cv_cost,
523                      "top_wasteful_terms": sorted_wasteful[:10],
524                  }
525                  # If zero-CV high-cost terms exceed 50%
526                  if cost > 0 and zero_cv_cost / cost > 0.5:
527                      issues.append(
528                          f"Zero-CV search terms account for "
529                          f"{round(zero_cv_cost / cost * 100, 1)}% of total cost"
530                      )
531              except Exception:
532                  logger.warning("Failed to retrieve search terms report", exc_info=True)
533                  issues.append("Failed to retrieve search terms report")
534          result["search_term_quality"] = search_term_quality
535  
536          # Status determination
537          if (
538              has_cv_issue
539              or bidding_issue
540              or bottleneck == "no_delivery"
541              or bottleneck == "no_clicks"
542          ):
543              status = "critical"
544          elif conversions == 0:
545              status = "warning"
546          else:
547              status = "healthy"
548  
549          result["status"] = status
550          result["issues"] = issues
551  
552          if status != "healthy":
553              result["suggested_workflow"] = "cv_acquisition"
554  
555          # Recommended actions
556          result["recommended_actions"] = self._build_cv_recommendations(
557              has_cv_issue=has_cv_issue,
558              bidding_issue=bidding_issue,
559              bottleneck=bottleneck,
560              search_term_quality=search_term_quality,
561              cost=cost,
562          )
563  
564          # Summary generation
565          if status == "critical":
566              result["summary"] = (
567                  f"Campaign {campaign_id} has not acquired any conversions."
568                  f"Critical issues detected: {', '.join(issues[:3])}"
569              )
570          elif status == "warning":
571              result["summary"] = (
572                  f"Campaign {campaign_id} has 0 conversions."
573                  f"Imp={impressions:,}, Click={clicks:,}, CV=0。"
574                  f"Planning an improvement strategy is recommended"
575              )
576          else:
577              result["summary"] = (
578                  f"Campaign {campaign_id} is generating conversions."
579                  f"Conversions in the last 7 days: {conversions:.1f}"
580              )
581  
582          return result
583  
584      @staticmethod
585      def _build_cv_recommendations(
586          *,
587          has_cv_issue: bool,
588          bidding_issue: str | None,
589          bottleneck: str | None,
590          search_term_quality: dict[str, Any] | None,
591          cost: float,
592      ) -> list[dict[str, Any]]:
593          """Generate prioritized recommended actions for CV improvement."""
594          actions: list[dict[str, Any]] = []
595          priority = 1
596  
597          if has_cv_issue:
598              actions.append(
599                  {
600                      "priority": priority,
601                      "action": "fix_cv_tracking",
602                      "description": "Configure/fix conversion tracking",
603                  }
604              )
605              priority += 1
606  
607          if bidding_issue:
608              actions.append(
609                  {
610                      "priority": priority,
611                      "action": "fix_bidding_strategy",
612                      "description": "Fix alignment between bidding strategy and CV tracking",
613                  }
614              )
615              priority += 1
616  
617          if search_term_quality and search_term_quality["zero_cv_terms"] > 0:
618              actions.append(
619                  {
620                      "priority": priority,
621                      "action": "add_negative_keywords",
622                      "description": "Add negative keywords for zero-CV search terms",
623                  }
624              )
625              priority += 1
626  
627          if bottleneck in ("no_delivery", "no_clicks"):
628              actions.append(
629                  {
630                      "priority": priority,
631                      "action": "fix_delivery",
632                      "description": "Improve delivery and click acquisition",
633                  }
634              )
635              priority += 1
636  
637          # Always suggest candidates
638          actions.append(
639              {
640                  "priority": priority,
641                  "action": "improve_ads_and_keywords",
642                  "description": "Improve ad copy and expand keywords",
643              }
644          )
645          priority += 1
646  
647          actions.append(
648              {
649                  "priority": priority,
650                  "action": "review_landing_page",
651                  "description": "Landing page improvement (text-based advice)",
652              }
653          )
654  
655          return actions