/ server / tests / k8s / test_informer.py
test_informer.py
  1  # Copyright 2025 Alibaba Group Holding Ltd.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  
 15  import time
 16  from unittest.mock import MagicMock
 17  
 18  from opensandbox_server.services.k8s.informer import WorkloadInformer
 19  
 20  
 21  def _make_informer(**kwargs) -> WorkloadInformer:
 22      """Return a WorkloadInformer with a mocked list_fn (watch disabled)."""
 23      list_fn = kwargs.pop("list_fn", MagicMock(return_value={"items": [], "metadata": {}}))
 24      return WorkloadInformer(list_fn=list_fn, enable_watch=False, **kwargs)
 25  
 26  
 27  def _list_response(*names: str) -> dict:
 28      """Build a fake CustomObjects list API response."""
 29      return {
 30          "metadata": {"resourceVersion": "42"},
 31          "items": [{"metadata": {"name": n, "resourceVersion": "1"}} for n in names],
 32      }
 33  
 34  
 35  class TestWorkloadInformerInit:
 36      """Construction and property defaults."""
 37  
 38      def test_has_synced_is_false_before_start(self):
 39          """has_synced starts as False before the first list completes."""
 40          informer = _make_informer()
 41          assert informer.has_synced is False
 42  
 43      def test_get_returns_none_before_sync(self):
 44          """get() returns None before the cache is populated."""
 45          informer = _make_informer()
 46          assert informer.get("anything") is None
 47  
 48      def test_resync_and_watch_params_stored(self):
 49          """Constructor stores resync and watch timeout parameters."""
 50          informer = _make_informer(resync_period_seconds=120, watch_timeout_seconds=30)
 51          assert informer.resync_period_seconds == 120
 52          assert informer.watch_timeout_seconds == 30
 53  
 54      def test_custom_thread_name_is_stored(self):
 55          """thread_name parameter is stored and used when start() is called."""
 56          informer = _make_informer(thread_name="informer-foos-default")
 57          assert informer._thread_name == "informer-foos-default"
 58  
 59      def test_default_thread_name(self):
 60          """Default thread_name is 'workload-informer' when not specified."""
 61          informer = _make_informer()
 62          assert informer._thread_name == "workload-informer"
 63  
 64  
 65  class TestWorkloadInformerFullResync:
 66      """_full_resync populates the cache correctly."""
 67  
 68      def test_full_resync_populates_cache(self):
 69          """After _full_resync, objects from list_fn are accessible via get()."""
 70          list_fn = MagicMock(return_value=_list_response("alpha", "beta"))
 71          informer = _make_informer(list_fn=list_fn)
 72          informer._full_resync()
 73  
 74          assert informer.get("alpha") is not None
 75          assert informer.get("beta") is not None
 76          assert informer.get("gamma") is None
 77  
 78      def test_full_resync_sets_has_synced(self):
 79          """_full_resync marks the informer as synced."""
 80          list_fn = MagicMock(return_value=_list_response("x"))
 81          informer = _make_informer(list_fn=list_fn)
 82          informer._full_resync()
 83          assert informer.has_synced is True
 84  
 85      def test_full_resync_stores_resource_version(self):
 86          """_full_resync saves the resourceVersion from the list metadata."""
 87          list_fn = MagicMock(return_value=_list_response("x"))
 88          informer = _make_informer(list_fn=list_fn)
 89          informer._full_resync()
 90          assert informer._resource_version == "42"
 91  
 92      def test_full_resync_replaces_stale_cache(self):
 93          """A second _full_resync replaces the previous cache contents."""
 94          list_fn = MagicMock(return_value=_list_response("old"))
 95          informer = _make_informer(list_fn=list_fn)
 96          informer._full_resync()
 97          assert informer.get("old") is not None
 98  
 99          list_fn.return_value = _list_response("new")
100          informer._full_resync()
101          assert informer.get("old") is None
102          assert informer.get("new") is not None
103  
104  
105  class TestWorkloadInformerUpdateCache:
106      """update_cache upserts objects into the cache."""
107  
108      def test_update_cache_adds_new_object(self):
109          """update_cache makes a previously missing object retrievable."""
110          informer = _make_informer()
111          obj = {"metadata": {"name": "foo", "resourceVersion": "5"}}
112          informer.update_cache(obj)
113          assert informer.get("foo") == obj
114  
115      def test_update_cache_overwrites_existing_object(self):
116          """update_cache replaces the cached version of an object."""
117          informer = _make_informer()
118          informer.update_cache({"metadata": {"name": "foo", "resourceVersion": "1"}})
119          updated = {"metadata": {"name": "foo", "resourceVersion": "2"}}
120          informer.update_cache(updated)
121          assert informer.get("foo") == updated
122  
123      def test_update_cache_ignores_object_without_name(self):
124          """update_cache silently ignores objects that lack a metadata.name."""
125          informer = _make_informer()
126          informer.update_cache({"metadata": {}})
127          # Cache remains empty — no exception raised
128          assert informer._cache == {}
129  
130      def test_update_cache_updates_resource_version(self):
131          """update_cache advances _resource_version from object metadata."""
132          informer = _make_informer()
133          informer.update_cache({"metadata": {"name": "foo", "resourceVersion": "99"}})
134          assert informer._resource_version == "99"
135  
136      def test_update_cache_does_not_downgrade_resource_version(self):
137          """update_cache never rolls back _resource_version to an older value."""
138          informer = _make_informer()
139          informer._resource_version = "200"
140          informer.update_cache({"metadata": {"name": "foo", "resourceVersion": "100"}})
141          assert informer._resource_version == "200"
142  
143      def test_update_cache_advances_resource_version_when_newer(self):
144          """update_cache advances _resource_version when the incoming value is strictly newer."""
145          informer = _make_informer()
146          informer._resource_version = "50"
147          informer.update_cache({"metadata": {"name": "foo", "resourceVersion": "99"}})
148          assert informer._resource_version == "99"
149  
150  
151  class TestWorkloadInformerHandleEvent:
152      """_handle_event applies watch events to the cache."""
153  
154      def test_handle_added_event_inserts_object(self):
155          """ADDED event inserts the object into the cache."""
156          informer = _make_informer()
157          obj = {"metadata": {"name": "bar", "resourceVersion": "10"}}
158          informer._handle_event({"type": "ADDED", "object": obj})
159          assert informer.get("bar") == obj
160  
161      def test_handle_modified_event_replaces_object(self):
162          """MODIFIED event replaces the cached object."""
163          informer = _make_informer()
164          informer._cache["bar"] = {"metadata": {"name": "bar", "resourceVersion": "1"}}
165          updated = {"metadata": {"name": "bar", "resourceVersion": "2"}}
166          informer._handle_event({"type": "MODIFIED", "object": updated})
167          assert informer.get("bar") == updated
168  
169      def test_handle_deleted_event_removes_object(self):
170          """DELETED event removes the object from the cache."""
171          informer = _make_informer()
172          informer._cache["bar"] = {"metadata": {"name": "bar"}}
173          informer._handle_event({"type": "DELETED", "object": {"metadata": {"name": "bar"}}})
174          assert informer.get("bar") is None
175  
176      def test_handle_event_ignores_none_object(self):
177          """Events with a None object are silently ignored."""
178          informer = _make_informer()
179          informer._handle_event({"type": "ADDED", "object": None})
180          assert informer._cache == {}
181  
182      def test_handle_event_ignores_object_without_name(self):
183          """Events whose object has no metadata.name are silently ignored."""
184          informer = _make_informer()
185          informer._handle_event({"type": "ADDED", "object": {"metadata": {}}})
186          assert informer._cache == {}
187  
188      def test_handle_event_converts_non_dict_object(self):
189          """Non-dict objects are converted via to_dict() before caching."""
190          informer = _make_informer()
191          sdk_obj = MagicMock()
192          sdk_obj.to_dict.return_value = {"metadata": {"name": "sdk-obj", "resourceVersion": "3"}}
193          informer._handle_event({"type": "ADDED", "object": sdk_obj})
194          assert informer.get("sdk-obj") is not None
195  
196      def test_handle_event_updates_resource_version(self):
197          """_handle_event advances _resource_version from the object metadata."""
198          informer = _make_informer()
199          informer._handle_event({
200              "type": "ADDED",
201              "object": {"metadata": {"name": "foo", "resourceVersion": "77"}},
202          })
203          assert informer._resource_version == "77"
204  
205      def test_handle_event_does_not_downgrade_resource_version(self):
206          """_handle_event never rolls back _resource_version to an older value."""
207          informer = _make_informer()
208          informer._resource_version = "200"
209          informer._handle_event({
210              "type": "MODIFIED",
211              "object": {"metadata": {"name": "foo", "resourceVersion": "50"}},
212          })
213          assert informer._resource_version == "200"
214  
215  
216  class TestWorkloadInformerStartStop:
217      """start/stop thread lifecycle."""
218  
219      def test_start_launches_daemon_thread(self):
220          """start() spawns a daemon thread that is alive."""
221          list_fn = MagicMock(return_value={"items": [], "metadata": {}})
222          informer = WorkloadInformer(list_fn=list_fn, enable_watch=False,
223                                      resync_period_seconds=9999)
224          informer.start()
225          assert informer._thread is not None
226          assert informer._thread.is_alive()
227          informer.stop()
228  
229      def test_start_is_idempotent(self):
230          """Calling start() twice does not create a second thread."""
231          list_fn = MagicMock(return_value={"items": [], "metadata": {}})
232          informer = WorkloadInformer(list_fn=list_fn, enable_watch=False,
233                                      resync_period_seconds=9999)
234          informer.start()
235          first_thread = informer._thread
236          informer.start()
237          assert informer._thread is first_thread
238          informer.stop()
239  
240      def test_stop_signals_stop_event(self):
241          """stop() sets the internal stop event."""
242          informer = _make_informer()
243          informer.stop()
244          assert informer._stop_event.is_set()
245  
246      def test_poll_mode_resets_has_synced_after_wait(self):
247          """In poll mode (enable_watch=False), _has_synced is reset after each wait so the
248          cache is refreshed on the next loop iteration."""
249          call_count = 0
250  
251          def list_fn():
252              nonlocal call_count
253              call_count += 1
254              return {"items": [], "metadata": {"resourceVersion": str(call_count)}}
255  
256          informer = WorkloadInformer(
257              list_fn=list_fn,
258              enable_watch=False,
259              resync_period_seconds=0,  # no wait, loop immediately
260          )
261          informer.start()
262  
263          # Give the thread time to execute at least two full loops
264          deadline = time.monotonic() + 2.0
265          while call_count < 2 and time.monotonic() < deadline:
266              time.sleep(0.01)
267  
268          informer.stop()
269          assert call_count >= 2, "list_fn should be called more than once in poll mode"