CodexNode.cs
1 using CodexClient.Hooks; 2 using FileUtils; 3 using Logging; 4 using Utils; 5 6 namespace CodexClient 7 { 8 public partial interface ICodexNode : IHasEthAddress, IHasMetricsScrapeTarget 9 { 10 string GetName(); 11 string GetImageName(); 12 string GetPeerId(); 13 DebugInfo GetDebugInfo(bool log = false); 14 void SetLogLevel(string logLevel); 15 string GetSpr(); 16 DebugPeer GetDebugPeer(string peerId); 17 ContentId UploadFile(TrackedFile file); 18 ContentId UploadFile(TrackedFile file, string contentType, string contentDisposition); 19 TrackedFile? DownloadContent(ContentId contentId, string fileLabel = ""); 20 TrackedFile? DownloadContent(ContentId contentId, TimeSpan timeout, string fileLabel = ""); 21 LocalDataset DownloadStreamless(ContentId cid); 22 /// <summary> 23 /// TODO: This will monitor the quota-used of the node until 'size' bytes are added. That's a very bad way 24 /// to track the streamless download progress. Replace it once we have a good API for this. 25 /// </summary> 26 LocalDataset DownloadStreamlessWait(ContentId cid, ByteSize size); 27 LocalDataset DownloadManifestOnly(ContentId cid); 28 LocalDatasetList LocalFiles(); 29 CodexSpace Space(); 30 void ConnectToPeer(ICodexNode node); 31 DebugInfoVersion Version { get; } 32 IMarketplaceAccess Marketplace { get; } 33 ITransferSpeeds TransferSpeeds { get; } 34 EthAccount EthAccount { get; } 35 StoragePurchase? GetPurchaseStatus(string purchaseId); 36 37 Address GetDiscoveryEndpoint(); 38 Address GetApiEndpoint(); 39 Address GetListenEndpoint(); 40 41 /// <summary> 42 /// Warning! The node is not usable after this. 43 /// TODO: Replace with delete-blocks debug call once available in Codex. 44 /// </summary> 45 void DeleteDataDirFolder(); 46 void Stop(bool waitTillStopped); 47 IDownloadedLog DownloadLog(string additionalName = ""); 48 bool HasCrashed(); 49 } 50 51 public class CodexNode : ICodexNode 52 { 53 private const string UploadFailedMessage = "Unable to store block"; 54 private readonly ILog log; 55 private readonly ICodexNodeHooks hooks; 56 private readonly TransferSpeeds transferSpeeds; 57 private string peerId = string.Empty; 58 private string nodeId = string.Empty; 59 private readonly CodexAccess codexAccess; 60 private readonly IFileManager fileManager; 61 62 public CodexNode(ILog log, CodexAccess codexAccess, IFileManager fileManager, IMarketplaceAccess marketplaceAccess, ICodexNodeHooks hooks) 63 { 64 this.codexAccess = codexAccess; 65 this.fileManager = fileManager; 66 Marketplace = marketplaceAccess; 67 this.hooks = hooks; 68 Version = new DebugInfoVersion(); 69 transferSpeeds = new TransferSpeeds(); 70 71 this.log = new LogPrefixer(log, $"{GetName()} "); 72 } 73 74 public void Awake() 75 { 76 hooks.OnNodeStarting(codexAccess.GetStartUtc(), codexAccess.GetImageName(), codexAccess.GetEthAccount()); 77 } 78 79 public void Initialize() 80 { 81 // This is the moment we first connect to a codex node. Sometimes, Kubernetes takes a while to spin up the 82 // container. So we'll adding a custom, generous retry here. 83 var kubeSpinupRetry = new Retry("CodexNode_Initialize", 84 maxTimeout: TimeSpan.FromMinutes(10.0), 85 sleepAfterFail: TimeSpan.FromSeconds(10.0), 86 onFail: f => { }, 87 failFast: false); 88 89 kubeSpinupRetry.Run(InitializePeerNodeId); 90 91 InitializeLogReplacements(); 92 93 hooks.OnNodeStarted(this, peerId, nodeId); 94 } 95 96 public IMarketplaceAccess Marketplace { get; } 97 public DebugInfoVersion Version { get; private set; } 98 public ITransferSpeeds TransferSpeeds { get => transferSpeeds; } 99 100 public StoragePurchase? GetPurchaseStatus(string purchaseId) 101 { 102 return codexAccess.GetPurchaseStatus(purchaseId); 103 } 104 105 public EthAddress EthAddress 106 { 107 get 108 { 109 EnsureMarketplace(); 110 return codexAccess.GetEthAccount()!.EthAddress; 111 } 112 } 113 114 public EthAccount EthAccount 115 { 116 get 117 { 118 EnsureMarketplace(); 119 return codexAccess.GetEthAccount()!; 120 } 121 } 122 123 public string GetName() 124 { 125 return codexAccess.GetName(); 126 } 127 128 public string GetImageName() 129 { 130 return codexAccess.GetImageName(); 131 } 132 133 public string GetPeerId() 134 { 135 return peerId; 136 } 137 138 public DebugInfo GetDebugInfo(bool log = false) 139 { 140 var debugInfo = codexAccess.GetDebugInfo(); 141 if (log) 142 { 143 var known = string.Join(",", debugInfo.Table.Nodes.Select(n => n.PeerId)); 144 Log($"Got DebugInfo with id: {debugInfo.Id}. This node knows: [{known}]"); 145 } 146 return debugInfo; 147 } 148 149 public void SetLogLevel(string logLevel) 150 { 151 codexAccess.SetLogLevel(logLevel); 152 } 153 154 public string GetSpr() 155 { 156 return codexAccess.GetSpr(); 157 } 158 159 public DebugPeer GetDebugPeer(string peerId) 160 { 161 return codexAccess.GetDebugPeer(peerId); 162 } 163 164 public ContentId UploadFile(TrackedFile file) 165 { 166 return UploadFile(file, "application/octet-stream", $"attachment; filename=\"{Path.GetFileName(file.Filename)}\""); 167 } 168 169 public ContentId UploadFile(TrackedFile file, string contentType, string contentDisposition) 170 { 171 using var fileStream = File.OpenRead(file.Filename); 172 var uniqueId = Guid.NewGuid().ToString(); 173 var size = file.GetFilesize(); 174 175 hooks.OnFileUploading(uniqueId, size); 176 177 var input = new UploadInput(contentType, contentDisposition, fileStream); 178 var logMessage = $"Uploading file {file.Describe()} with contentType: '{input.ContentType}' and disposition: '{input.ContentDisposition}'..."; 179 var measurement = Stopwatch.Measure(log, logMessage, () => 180 { 181 return codexAccess.UploadFile(input); 182 }); 183 184 var response = measurement.Value; 185 transferSpeeds.AddUploadSample(size, measurement.Duration); 186 187 if (string.IsNullOrEmpty(response)) FrameworkAssert.Fail("Received empty response."); 188 if (response.StartsWith(UploadFailedMessage)) FrameworkAssert.Fail("Node failed to store block."); 189 190 Log($"Uploaded file {file.Describe()}. Received contentId: '{response}'."); 191 192 var cid = new ContentId(response); 193 hooks.OnFileUploaded(uniqueId, size, cid); 194 return cid; 195 } 196 197 public TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "") 198 { 199 return DownloadContent(contentId, TimeSpan.FromMinutes(10.0), fileLabel); 200 } 201 202 public TrackedFile? DownloadContent(ContentId contentId, TimeSpan timeout, string fileLabel = "") 203 { 204 var file = fileManager.CreateEmptyFile(fileLabel); 205 hooks.OnFileDownloading(contentId); 206 Log($"Downloading '{contentId}'..."); 207 208 var logMessage = $"Downloaded '{contentId}' to '{file.Filename}'"; 209 var measurement = Stopwatch.Measure(log, logMessage, () => DownloadToFile(contentId.Id, file, timeout)); 210 211 var size = file.GetFilesize(); 212 transferSpeeds.AddDownloadSample(size, measurement); 213 hooks.OnFileDownloaded(size, contentId); 214 215 return file; 216 } 217 218 public LocalDataset DownloadStreamless(ContentId cid) 219 { 220 Log($"Downloading streamless '{cid}' (no-wait)"); 221 return codexAccess.DownloadStreamless(cid); 222 } 223 224 public LocalDataset DownloadStreamlessWait(ContentId cid, ByteSize size) 225 { 226 Log($"Downloading streamless '{cid}' (wait till finished)"); 227 228 var sw = Stopwatch.Measure(log, nameof(DownloadStreamlessWait), () => 229 { 230 var startSpace = Space(); 231 var result = codexAccess.DownloadStreamless(cid); 232 WaitUntilQuotaUsedIncreased(startSpace, size); 233 return result; 234 }); 235 236 return sw.Value; 237 } 238 239 public LocalDataset DownloadManifestOnly(ContentId cid) 240 { 241 Log($"Downloading manifest-only '{cid}'"); 242 return codexAccess.DownloadManifestOnly(cid); 243 } 244 245 public LocalDatasetList LocalFiles() 246 { 247 return codexAccess.LocalFiles(); 248 } 249 250 public CodexSpace Space() 251 { 252 return codexAccess.Space(); 253 } 254 255 public void ConnectToPeer(ICodexNode node) 256 { 257 var peer = (CodexNode)node; 258 259 Log($"Connecting to peer {peer.GetName()}..."); 260 var peerInfo = node.GetDebugInfo(); 261 codexAccess.ConnectToPeer(peerInfo.Id, GetPeerMultiAddresses(peer, peerInfo)); 262 263 Log($"Successfully connected to peer {peer.GetName()}."); 264 } 265 266 public void DeleteDataDirFolder() 267 { 268 codexAccess.DeleteDataDirFolder(); 269 } 270 271 public void Stop(bool waitTillStopped) 272 { 273 Log("Stopping..."); 274 hooks.OnNodeStopping(); 275 codexAccess.Stop(waitTillStopped); 276 } 277 278 public IDownloadedLog DownloadLog(string additionalName = "") 279 { 280 return codexAccess.DownloadLog(additionalName); 281 } 282 283 public Address GetDiscoveryEndpoint() 284 { 285 return codexAccess.GetDiscoveryEndpoint(); 286 } 287 288 public Address GetApiEndpoint() 289 { 290 return codexAccess.GetApiEndpoint(); 291 } 292 293 public Address GetListenEndpoint() 294 { 295 return codexAccess.GetListenEndpoint(); 296 } 297 298 public Address GetMetricsScrapeTarget() 299 { 300 var address = codexAccess.GetMetricsEndpoint(); 301 if (address == null) throw new Exception("Metrics ScrapeTarget accessed, but node was not started with EnableMetrics()"); 302 return address; 303 } 304 305 public bool HasCrashed() 306 { 307 return codexAccess.HasCrashed(); 308 } 309 310 public override string ToString() 311 { 312 return $"CodexNode:{GetName()}"; 313 } 314 315 private void InitializePeerNodeId() 316 { 317 var debugInfo = codexAccess.GetDebugInfo(); 318 if (!debugInfo.Version.IsValid()) 319 { 320 throw new Exception($"Invalid version information received from Codex node {GetName()}: {debugInfo.Version}"); 321 } 322 323 peerId = debugInfo.Id; 324 nodeId = debugInfo.Table.LocalNode.NodeId; 325 Version = debugInfo.Version; 326 } 327 328 private void InitializeLogReplacements() 329 { 330 var nodeName = GetName(); 331 332 log.AddStringReplace(peerId, nodeName); 333 log.AddStringReplace(CodexUtils.ToShortId(peerId), nodeName); 334 log.AddStringReplace(nodeId, nodeName); 335 log.AddStringReplace(CodexUtils.ToShortId(nodeId), nodeName); 336 337 var ethAccount = codexAccess.GetEthAccount(); 338 if (ethAccount != null) 339 { 340 var addr = ethAccount.EthAddress.ToString(); 341 log.AddStringReplace(addr, nodeName); 342 log.AddStringReplace(addr.ToLowerInvariant(), nodeName); 343 } 344 } 345 346 private string[] GetPeerMultiAddresses(CodexNode peer, DebugInfo peerInfo) 347 { 348 var peerId = peer.GetDiscoveryEndpoint().Host 349 .Replace("http://", "") 350 .Replace("https://", ""); 351 352 return peerInfo.Addrs.Select(a => a 353 .Replace("0.0.0.0", peerId)) 354 .ToArray(); 355 } 356 357 private void DownloadToFile(string contentId, TrackedFile file, TimeSpan timeout) 358 { 359 using var fileStream = File.OpenWrite(file.Filename); 360 try 361 { 362 // Type of stream generated by openAPI client does not support timeouts. 363 // So we use a task and cancellation token to track our timeout manually. 364 365 var start = DateTime.UtcNow; 366 var cts = new CancellationTokenSource(); 367 var downloadTask = Task.Run(() => 368 { 369 using var downloadStream = codexAccess.DownloadFile(contentId); 370 downloadStream.CopyTo(fileStream); 371 }, cts.Token); 372 373 while (DateTime.UtcNow - start < timeout) 374 { 375 if (downloadTask.IsFaulted) throw downloadTask.Exception; 376 if (downloadTask.IsCompletedSuccessfully) return; 377 Thread.Sleep(100); 378 } 379 380 cts.Cancel(); 381 throw new TimeoutException($"Download of '{contentId}' timed out after {Time.FormatDuration(timeout)}"); 382 } 383 catch (Exception ex) 384 { 385 Log($"Failed to download file '{contentId}': {ex}"); 386 throw; 387 } 388 } 389 390 public void WaitUntilQuotaUsedIncreased(CodexSpace startSpace, ByteSize expectedIncreaseOfQuotaUsed) 391 { 392 WaitUntilQuotaUsedIncreased(startSpace, expectedIncreaseOfQuotaUsed, TimeSpan.FromMinutes(30)); 393 } 394 395 public void WaitUntilQuotaUsedIncreased( 396 CodexSpace startSpace, 397 ByteSize expectedIncreaseOfQuotaUsed, 398 TimeSpan maxTimeout) 399 { 400 Log($"Waiting until quotaUsed " + 401 $"(start: {startSpace.QuotaUsedBytes}) " + 402 $"increases by {expectedIncreaseOfQuotaUsed} " + 403 $"to reach {startSpace.QuotaUsedBytes + expectedIncreaseOfQuotaUsed.SizeInBytes}"); 404 405 var retry = new Retry($"Checking local space for quotaUsed increase of {expectedIncreaseOfQuotaUsed}", 406 maxTimeout: maxTimeout, 407 sleepAfterFail: TimeSpan.FromSeconds(10), 408 onFail: f => { }, 409 failFast: false); 410 411 retry.Run(() => 412 { 413 var space = Space(); 414 var increase = space.QuotaUsedBytes - startSpace.QuotaUsedBytes; 415 416 if (increase < expectedIncreaseOfQuotaUsed.SizeInBytes) 417 throw new Exception($"Expected quota-used not reached. " + 418 $"Expected increase: {expectedIncreaseOfQuotaUsed.SizeInBytes} " + 419 $"Actual increase: {increase} " + 420 $"Actual used: {space.QuotaUsedBytes}"); 421 }); 422 } 423 424 private void EnsureMarketplace() 425 { 426 if (codexAccess.GetEthAccount() == null) throw new Exception("Marketplace is not enabled for this Codex node. Please start it with the option '.EnableMarketplace(...)' to enable it."); 427 } 428 429 private void Log(string msg) 430 { 431 log.Log(msg); 432 } 433 } 434 }