test_informer.py
1 # Copyright 2025 Alibaba Group Holding Ltd. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 import time 16 from unittest.mock import MagicMock 17 18 from opensandbox_server.services.k8s.informer import WorkloadInformer 19 20 21 def _make_informer(**kwargs) -> WorkloadInformer: 22 """Return a WorkloadInformer with a mocked list_fn (watch disabled).""" 23 list_fn = kwargs.pop("list_fn", MagicMock(return_value={"items": [], "metadata": {}})) 24 return WorkloadInformer(list_fn=list_fn, enable_watch=False, **kwargs) 25 26 27 def _list_response(*names: str) -> dict: 28 """Build a fake CustomObjects list API response.""" 29 return { 30 "metadata": {"resourceVersion": "42"}, 31 "items": [{"metadata": {"name": n, "resourceVersion": "1"}} for n in names], 32 } 33 34 35 class TestWorkloadInformerInit: 36 """Construction and property defaults.""" 37 38 def test_has_synced_is_false_before_start(self): 39 """has_synced starts as False before the first list completes.""" 40 informer = _make_informer() 41 assert informer.has_synced is False 42 43 def test_get_returns_none_before_sync(self): 44 """get() returns None before the cache is populated.""" 45 informer = _make_informer() 46 assert informer.get("anything") is None 47 48 def test_resync_and_watch_params_stored(self): 49 """Constructor stores resync and watch timeout parameters.""" 50 informer = _make_informer(resync_period_seconds=120, watch_timeout_seconds=30) 51 assert informer.resync_period_seconds == 120 52 assert informer.watch_timeout_seconds == 30 53 54 def test_custom_thread_name_is_stored(self): 55 """thread_name parameter is stored and used when start() is called.""" 56 informer = _make_informer(thread_name="informer-foos-default") 57 assert informer._thread_name == "informer-foos-default" 58 59 def test_default_thread_name(self): 60 """Default thread_name is 'workload-informer' when not specified.""" 61 informer = _make_informer() 62 assert informer._thread_name == "workload-informer" 63 64 65 class TestWorkloadInformerFullResync: 66 """_full_resync populates the cache correctly.""" 67 68 def test_full_resync_populates_cache(self): 69 """After _full_resync, objects from list_fn are accessible via get().""" 70 list_fn = MagicMock(return_value=_list_response("alpha", "beta")) 71 informer = _make_informer(list_fn=list_fn) 72 informer._full_resync() 73 74 assert informer.get("alpha") is not None 75 assert informer.get("beta") is not None 76 assert informer.get("gamma") is None 77 78 def test_full_resync_sets_has_synced(self): 79 """_full_resync marks the informer as synced.""" 80 list_fn = MagicMock(return_value=_list_response("x")) 81 informer = _make_informer(list_fn=list_fn) 82 informer._full_resync() 83 assert informer.has_synced is True 84 85 def test_full_resync_stores_resource_version(self): 86 """_full_resync saves the resourceVersion from the list metadata.""" 87 list_fn = MagicMock(return_value=_list_response("x")) 88 informer = _make_informer(list_fn=list_fn) 89 informer._full_resync() 90 assert informer._resource_version == "42" 91 92 def test_full_resync_replaces_stale_cache(self): 93 """A second _full_resync replaces the previous cache contents.""" 94 list_fn = MagicMock(return_value=_list_response("old")) 95 informer = _make_informer(list_fn=list_fn) 96 informer._full_resync() 97 assert informer.get("old") is not None 98 99 list_fn.return_value = _list_response("new") 100 informer._full_resync() 101 assert informer.get("old") is None 102 assert informer.get("new") is not None 103 104 105 class TestWorkloadInformerUpdateCache: 106 """update_cache upserts objects into the cache.""" 107 108 def test_update_cache_adds_new_object(self): 109 """update_cache makes a previously missing object retrievable.""" 110 informer = _make_informer() 111 obj = {"metadata": {"name": "foo", "resourceVersion": "5"}} 112 informer.update_cache(obj) 113 assert informer.get("foo") == obj 114 115 def test_update_cache_overwrites_existing_object(self): 116 """update_cache replaces the cached version of an object.""" 117 informer = _make_informer() 118 informer.update_cache({"metadata": {"name": "foo", "resourceVersion": "1"}}) 119 updated = {"metadata": {"name": "foo", "resourceVersion": "2"}} 120 informer.update_cache(updated) 121 assert informer.get("foo") == updated 122 123 def test_update_cache_ignores_object_without_name(self): 124 """update_cache silently ignores objects that lack a metadata.name.""" 125 informer = _make_informer() 126 informer.update_cache({"metadata": {}}) 127 # Cache remains empty — no exception raised 128 assert informer._cache == {} 129 130 def test_update_cache_updates_resource_version(self): 131 """update_cache advances _resource_version from object metadata.""" 132 informer = _make_informer() 133 informer.update_cache({"metadata": {"name": "foo", "resourceVersion": "99"}}) 134 assert informer._resource_version == "99" 135 136 def test_update_cache_does_not_downgrade_resource_version(self): 137 """update_cache never rolls back _resource_version to an older value.""" 138 informer = _make_informer() 139 informer._resource_version = "200" 140 informer.update_cache({"metadata": {"name": "foo", "resourceVersion": "100"}}) 141 assert informer._resource_version == "200" 142 143 def test_update_cache_advances_resource_version_when_newer(self): 144 """update_cache advances _resource_version when the incoming value is strictly newer.""" 145 informer = _make_informer() 146 informer._resource_version = "50" 147 informer.update_cache({"metadata": {"name": "foo", "resourceVersion": "99"}}) 148 assert informer._resource_version == "99" 149 150 151 class TestWorkloadInformerHandleEvent: 152 """_handle_event applies watch events to the cache.""" 153 154 def test_handle_added_event_inserts_object(self): 155 """ADDED event inserts the object into the cache.""" 156 informer = _make_informer() 157 obj = {"metadata": {"name": "bar", "resourceVersion": "10"}} 158 informer._handle_event({"type": "ADDED", "object": obj}) 159 assert informer.get("bar") == obj 160 161 def test_handle_modified_event_replaces_object(self): 162 """MODIFIED event replaces the cached object.""" 163 informer = _make_informer() 164 informer._cache["bar"] = {"metadata": {"name": "bar", "resourceVersion": "1"}} 165 updated = {"metadata": {"name": "bar", "resourceVersion": "2"}} 166 informer._handle_event({"type": "MODIFIED", "object": updated}) 167 assert informer.get("bar") == updated 168 169 def test_handle_deleted_event_removes_object(self): 170 """DELETED event removes the object from the cache.""" 171 informer = _make_informer() 172 informer._cache["bar"] = {"metadata": {"name": "bar"}} 173 informer._handle_event({"type": "DELETED", "object": {"metadata": {"name": "bar"}}}) 174 assert informer.get("bar") is None 175 176 def test_handle_event_ignores_none_object(self): 177 """Events with a None object are silently ignored.""" 178 informer = _make_informer() 179 informer._handle_event({"type": "ADDED", "object": None}) 180 assert informer._cache == {} 181 182 def test_handle_event_ignores_object_without_name(self): 183 """Events whose object has no metadata.name are silently ignored.""" 184 informer = _make_informer() 185 informer._handle_event({"type": "ADDED", "object": {"metadata": {}}}) 186 assert informer._cache == {} 187 188 def test_handle_event_converts_non_dict_object(self): 189 """Non-dict objects are converted via to_dict() before caching.""" 190 informer = _make_informer() 191 sdk_obj = MagicMock() 192 sdk_obj.to_dict.return_value = {"metadata": {"name": "sdk-obj", "resourceVersion": "3"}} 193 informer._handle_event({"type": "ADDED", "object": sdk_obj}) 194 assert informer.get("sdk-obj") is not None 195 196 def test_handle_event_updates_resource_version(self): 197 """_handle_event advances _resource_version from the object metadata.""" 198 informer = _make_informer() 199 informer._handle_event({ 200 "type": "ADDED", 201 "object": {"metadata": {"name": "foo", "resourceVersion": "77"}}, 202 }) 203 assert informer._resource_version == "77" 204 205 def test_handle_event_does_not_downgrade_resource_version(self): 206 """_handle_event never rolls back _resource_version to an older value.""" 207 informer = _make_informer() 208 informer._resource_version = "200" 209 informer._handle_event({ 210 "type": "MODIFIED", 211 "object": {"metadata": {"name": "foo", "resourceVersion": "50"}}, 212 }) 213 assert informer._resource_version == "200" 214 215 216 class TestWorkloadInformerStartStop: 217 """start/stop thread lifecycle.""" 218 219 def test_start_launches_daemon_thread(self): 220 """start() spawns a daemon thread that is alive.""" 221 list_fn = MagicMock(return_value={"items": [], "metadata": {}}) 222 informer = WorkloadInformer(list_fn=list_fn, enable_watch=False, 223 resync_period_seconds=9999) 224 informer.start() 225 assert informer._thread is not None 226 assert informer._thread.is_alive() 227 informer.stop() 228 229 def test_start_is_idempotent(self): 230 """Calling start() twice does not create a second thread.""" 231 list_fn = MagicMock(return_value={"items": [], "metadata": {}}) 232 informer = WorkloadInformer(list_fn=list_fn, enable_watch=False, 233 resync_period_seconds=9999) 234 informer.start() 235 first_thread = informer._thread 236 informer.start() 237 assert informer._thread is first_thread 238 informer.stop() 239 240 def test_stop_signals_stop_event(self): 241 """stop() sets the internal stop event.""" 242 informer = _make_informer() 243 informer.stop() 244 assert informer._stop_event.is_set() 245 246 def test_poll_mode_resets_has_synced_after_wait(self): 247 """In poll mode (enable_watch=False), _has_synced is reset after each wait so the 248 cache is refreshed on the next loop iteration.""" 249 call_count = 0 250 251 def list_fn(): 252 nonlocal call_count 253 call_count += 1 254 return {"items": [], "metadata": {"resourceVersion": str(call_count)}} 255 256 informer = WorkloadInformer( 257 list_fn=list_fn, 258 enable_watch=False, 259 resync_period_seconds=0, # no wait, loop immediately 260 ) 261 informer.start() 262 263 # Give the thread time to execute at least two full loops 264 deadline = time.monotonic() + 2.0 265 while call_count < 2 and time.monotonic() < deadline: 266 time.sleep(0.01) 267 268 informer.stop() 269 assert call_count >= 2, "list_fn should be called more than once in poll mode"