CodexAccess.cs
1 using System.Threading; 2 using CodexOpenApi; 3 using Logging; 4 using Newtonsoft.Json; 5 using Utils; 6 using WebUtils; 7 8 namespace CodexClient 9 { 10 public class CodexAccess 11 { 12 private readonly ILog log; 13 private readonly IHttpFactory httpFactory; 14 private readonly IProcessControl processControl; 15 private readonly ICodexInstance instance; 16 private readonly Mapper mapper = new Mapper(); 17 18 public CodexAccess(ILog log, IHttpFactory httpFactory, IProcessControl processControl, ICodexInstance instance) 19 { 20 this.log = log; 21 this.httpFactory = httpFactory; 22 this.processControl = processControl; 23 this.instance = instance; 24 } 25 26 public void Stop(bool waitTillStopped) 27 { 28 processControl.Stop(waitTillStopped); 29 } 30 31 public IDownloadedLog DownloadLog(string additionalName = "") 32 { 33 var file = log.CreateSubfile(GetName() + additionalName); 34 Log($"Downloading logs to '{file.Filename}'"); 35 return processControl.DownloadLog(file); 36 } 37 38 public string GetImageName() 39 { 40 return instance.ImageName; 41 } 42 43 public DateTime GetStartUtc() 44 { 45 return instance.StartUtc; 46 } 47 48 public DebugInfo GetDebugInfo() 49 { 50 return mapper.Map(OnCodex(api => api.GetDebugInfoAsync())); 51 } 52 53 public void SetLogLevel(string logLevel) 54 { 55 try 56 { 57 OnCodex(async api => 58 { 59 await api.SetDebugLogLevelAsync(logLevel); 60 return string.Empty; 61 }); 62 } 63 catch (Exception exc) 64 { 65 log.Error("Failed to set log level: " + exc); 66 } 67 } 68 69 public string GetSpr() 70 { 71 return CrashCheck(() => 72 { 73 var endpoint = GetEndpoint(); 74 var json = endpoint.HttpGetString("spr"); 75 var response = JsonConvert.DeserializeObject<SprResponse>(json); 76 return response!.Spr; 77 }); 78 } 79 80 private class SprResponse 81 { 82 public string Spr { get; set; } = string.Empty; 83 } 84 85 public DebugPeer GetDebugPeer(string peerId) 86 { 87 // Cannot use openAPI: debug/peer endpoint is not specified there. 88 return CrashCheck(() => 89 { 90 var endpoint = GetEndpoint(); 91 var str = endpoint.HttpGetString($"debug/peer/{peerId}"); 92 93 if (str.ToLowerInvariant() == "unable to find peer!") 94 { 95 return new DebugPeer 96 { 97 IsPeerFound = false 98 }; 99 } 100 101 var result = endpoint.Deserialize<DebugPeer>(str); 102 result.IsPeerFound = true; 103 return result; 104 }); 105 } 106 107 public void ConnectToPeer(string peerId, string[] peerMultiAddresses) 108 { 109 OnCodex(api => 110 { 111 Time.Wait(api.ConnectPeerAsync(peerId, peerMultiAddresses)); 112 return Task.FromResult(string.Empty); 113 }); 114 } 115 116 public string UploadFile(UploadInput uploadInput) 117 { 118 return OnCodex(api => api.UploadAsync(uploadInput.ContentType, uploadInput.ContentDisposition, uploadInput.FileStream)); 119 } 120 121 public Stream DownloadFile(string contentId) 122 { 123 var fileResponse = OnCodexNoRetry(api => api.DownloadNetworkStreamAsync(contentId)); 124 if (fileResponse.StatusCode != 200) throw new Exception("Download failed with StatusCode: " + fileResponse.StatusCode); 125 return fileResponse.Stream; 126 } 127 128 public LocalDataset DownloadStreamless(ContentId cid) 129 { 130 var response = OnCodex(api => api.DownloadNetworkAsync(cid.Id)); 131 return mapper.Map(response); 132 } 133 134 public LocalDataset DownloadManifestOnly(ContentId cid) 135 { 136 var response = OnCodex(api => api.DownloadNetworkManifestAsync(cid.Id)); 137 return mapper.Map(response); 138 } 139 140 public LocalDatasetList LocalFiles() 141 { 142 return mapper.Map(OnCodex(api => api.ListDataAsync())); 143 } 144 145 public StorageAvailability SalesAvailability(CreateStorageAvailability request) 146 { 147 var body = mapper.Map(request); 148 var read = OnCodex(api => api.OfferStorageAsync(body)); 149 return mapper.Map(read); 150 } 151 152 public StorageAvailability[] GetAvailabilities() 153 { 154 var collection = OnCodex(api => api.GetAvailabilitiesAsync()); 155 return mapper.Map(collection); 156 } 157 158 public string RequestStorage(StoragePurchaseRequest request) 159 { 160 var body = mapper.Map(request); 161 return OnCodex(api => api.CreateStorageRequestAsync(request.ContentId.Id, body)); 162 } 163 164 public CodexSpace Space() 165 { 166 var space = OnCodex(api => api.SpaceAsync()); 167 return mapper.Map(space); 168 } 169 170 public StoragePurchase? GetPurchaseStatus(string purchaseId) 171 { 172 var purchase = OnCodex(api => api.GetPurchaseAsync(purchaseId)); 173 return mapper.Map(purchase); 174 } 175 176 public string GetName() 177 { 178 return instance.Name; 179 } 180 181 public Address GetDiscoveryEndpoint() 182 { 183 return instance.DiscoveryEndpoint; 184 } 185 186 public Address GetApiEndpoint() 187 { 188 return instance.ApiEndpoint; 189 } 190 191 public Address GetListenEndpoint() 192 { 193 return instance.ListenEndpoint; 194 } 195 196 public bool HasCrashed() 197 { 198 return processControl.HasCrashed(); 199 } 200 201 public Address? GetMetricsEndpoint() 202 { 203 return instance.MetricsEndpoint; 204 } 205 206 public EthAccount? GetEthAccount() 207 { 208 return instance.EthAccount; 209 } 210 211 public void DeleteDataDirFolder() 212 { 213 processControl.DeleteDataDirFolder(); 214 } 215 216 private T OnCodexNoRetry<T>(Func<CodexApiClient, Task<T>> action) 217 { 218 var timeSet = httpFactory.WebCallTimeSet; 219 var noRetry = new Retry(nameof(OnCodexNoRetry), 220 maxTimeout: TimeSpan.FromSeconds(1.0), 221 sleepAfterFail: TimeSpan.FromSeconds(2.0), 222 onFail: f => { }, 223 failFast: true); 224 225 var result = httpFactory.CreateHttp(GetHttpId(), h => CheckContainerCrashed()).OnClient(client => CallCodex(client, action), noRetry); 226 return result; 227 } 228 229 private T OnCodex<T>(Func<CodexApiClient, Task<T>> action) 230 { 231 var result = httpFactory.CreateHttp(GetHttpId(), h => CheckContainerCrashed()).OnClient(client => CallCodex(client, action)); 232 return result; 233 } 234 235 private T CallCodex<T>(HttpClient client, Func<CodexApiClient, Task<T>> action) 236 { 237 var address = GetAddress(); 238 var api = new CodexApiClient(client); 239 api.BaseUrl = $"{address.Host}:{address.Port}/api/codex/v1"; 240 return CrashCheck(() => Time.Wait(action(api))); 241 } 242 243 private T CrashCheck<T>(Func<T> action) 244 { 245 try 246 { 247 return action(); 248 } 249 finally 250 { 251 CheckContainerCrashed(); 252 } 253 } 254 255 private IEndpoint GetEndpoint() 256 { 257 return httpFactory 258 .CreateHttp(GetHttpId(), h => CheckContainerCrashed()) 259 .CreateEndpoint(GetAddress(), "/api/codex/v1/", GetName()); 260 } 261 262 private Address GetAddress() 263 { 264 return instance.ApiEndpoint; 265 } 266 267 private string GetHttpId() 268 { 269 return GetAddress().ToString(); 270 } 271 272 private void CheckContainerCrashed() 273 { 274 if (processControl.HasCrashed()) throw new Exception($"Container {GetName()} has crashed."); 275 } 276 277 private void Throw(Failure failure) 278 { 279 throw failure.Exception; 280 } 281 282 private void Log(string msg) 283 { 284 log.Log($"({GetName()}) {msg}"); 285 } 286 } 287 288 public class UploadInput 289 { 290 public UploadInput(string contentType, string contentDisposition, FileStream fileStream) 291 { 292 ContentType = contentType; 293 ContentDisposition = contentDisposition; 294 FileStream = fileStream; 295 } 296 297 public string ContentType { get; } 298 public string ContentDisposition { get; } 299 public FileStream FileStream { get; } 300 } 301 }