/ ProjectPlugins / CodexClient / CodexNode.cs
CodexNode.cs
  1  using CodexClient.Hooks;
  2  using FileUtils;
  3  using Logging;
  4  using Utils;
  5  
  6  namespace CodexClient
  7  {
  8      public partial interface ICodexNode : IHasEthAddress, IHasMetricsScrapeTarget
  9      {
 10          string GetName();
 11          string GetImageName();
 12          string GetPeerId();
 13          DebugInfo GetDebugInfo(bool log = false);
 14          void SetLogLevel(string logLevel);
 15          string GetSpr();
 16          DebugPeer GetDebugPeer(string peerId);
 17          ContentId UploadFile(TrackedFile file);
 18          ContentId UploadFile(TrackedFile file, string contentType, string contentDisposition);
 19          TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "");
 20          TrackedFile? DownloadContent(ContentId contentId, TimeSpan timeout, string fileLabel = "");
 21          LocalDataset DownloadStreamless(ContentId cid);
 22          /// <summary>
 23          /// TODO: This will monitor the quota-used of the node until 'size' bytes are added. That's a very bad way
 24          /// to track the streamless download progress. Replace it once we have a good API for this.
 25          /// </summary>
 26          LocalDataset DownloadStreamlessWait(ContentId cid, ByteSize size);
 27          LocalDataset DownloadManifestOnly(ContentId cid);
 28          LocalDatasetList LocalFiles();
 29          CodexSpace Space();
 30          void ConnectToPeer(ICodexNode node);
 31          DebugInfoVersion Version { get; }
 32          IMarketplaceAccess Marketplace { get; }
 33          ITransferSpeeds TransferSpeeds { get; }
 34          EthAccount EthAccount { get; }
 35          StoragePurchase? GetPurchaseStatus(string purchaseId);
 36  
 37          Address GetDiscoveryEndpoint();
 38          Address GetApiEndpoint();
 39          Address GetListenEndpoint();
 40  
 41          /// <summary>
 42          /// Warning! The node is not usable after this.
 43          /// TODO: Replace with delete-blocks debug call once available in Codex.
 44          /// </summary>
 45          void DeleteDataDirFolder();
 46          void Stop(bool waitTillStopped);
 47          IDownloadedLog DownloadLog(string additionalName = "");
 48          bool HasCrashed();
 49      }
 50  
 51      public class CodexNode : ICodexNode
 52      {
 53          private const string UploadFailedMessage = "Unable to store block";
 54          private readonly ILog log;
 55          private readonly ICodexNodeHooks hooks;
 56          private readonly TransferSpeeds transferSpeeds;
 57          private string peerId = string.Empty;
 58          private string nodeId = string.Empty;
 59          private readonly CodexAccess codexAccess;
 60          private readonly IFileManager fileManager;
 61  
 62          public CodexNode(ILog log, CodexAccess codexAccess, IFileManager fileManager, IMarketplaceAccess marketplaceAccess, ICodexNodeHooks hooks)
 63          {
 64              this.codexAccess = codexAccess;
 65              this.fileManager = fileManager;
 66              Marketplace = marketplaceAccess;
 67              this.hooks = hooks;
 68              Version = new DebugInfoVersion();
 69              transferSpeeds = new TransferSpeeds();
 70  
 71              this.log = new LogPrefixer(log, $"{GetName()} ");
 72          }
 73  
 74          public void Awake()
 75          {
 76              hooks.OnNodeStarting(codexAccess.GetStartUtc(), codexAccess.GetImageName(), codexAccess.GetEthAccount());
 77          }
 78  
 79          public void Initialize()
 80          {
 81              // This is the moment we first connect to a codex node. Sometimes, Kubernetes takes a while to spin up the
 82              // container. So we'll adding a custom, generous retry here.
 83              var kubeSpinupRetry = new Retry("CodexNode_Initialize",
 84                  maxTimeout: TimeSpan.FromMinutes(10.0),
 85                  sleepAfterFail: TimeSpan.FromSeconds(10.0),
 86                  onFail: f => { },
 87                  failFast: false);
 88  
 89              kubeSpinupRetry.Run(InitializePeerNodeId);
 90  
 91              InitializeLogReplacements();
 92  
 93              hooks.OnNodeStarted(this, peerId, nodeId);
 94          }
 95  
 96          public IMarketplaceAccess Marketplace { get; }
 97          public DebugInfoVersion Version { get; private set; }
 98          public ITransferSpeeds TransferSpeeds { get => transferSpeeds; }
 99  
100          public StoragePurchase? GetPurchaseStatus(string purchaseId)
101          {
102              return codexAccess.GetPurchaseStatus(purchaseId);
103          }
104  
105          public EthAddress EthAddress 
106          {
107              get
108              {
109                  EnsureMarketplace();
110                  return codexAccess.GetEthAccount()!.EthAddress;
111              }
112          }
113  
114          public EthAccount EthAccount
115          {
116              get
117              {
118                  EnsureMarketplace();
119                  return codexAccess.GetEthAccount()!;
120              }
121          }
122  
123          public string GetName()
124          {
125              return codexAccess.GetName();
126          }
127  
128          public string GetImageName()
129          {
130              return codexAccess.GetImageName();
131          }
132  
133          public string GetPeerId()
134          {
135              return peerId;
136          }
137  
138          public DebugInfo GetDebugInfo(bool log = false)
139          {
140              var debugInfo = codexAccess.GetDebugInfo();
141              if (log)
142              {
143                  var known = string.Join(",", debugInfo.Table.Nodes.Select(n => n.PeerId));
144                  Log($"Got DebugInfo with id: {debugInfo.Id}. This node knows: [{known}]");
145              }
146              return debugInfo;
147          }
148  
149          public void SetLogLevel(string logLevel)
150          {
151              codexAccess.SetLogLevel(logLevel);
152          }
153  
154          public string GetSpr()
155          {
156              return codexAccess.GetSpr();
157          }
158  
159          public DebugPeer GetDebugPeer(string peerId)
160          {
161              return codexAccess.GetDebugPeer(peerId);
162          }
163  
164          public ContentId UploadFile(TrackedFile file)
165          {
166              return UploadFile(file, "application/octet-stream", $"attachment; filename=\"{Path.GetFileName(file.Filename)}\"");
167          }
168  
169          public ContentId UploadFile(TrackedFile file, string contentType, string contentDisposition)
170          {
171              using var fileStream = File.OpenRead(file.Filename);
172              var uniqueId = Guid.NewGuid().ToString();
173              var size = file.GetFilesize();
174  
175              hooks.OnFileUploading(uniqueId, size);
176  
177              var input = new UploadInput(contentType, contentDisposition, fileStream);
178              var logMessage = $"Uploading file {file.Describe()} with contentType: '{input.ContentType}' and disposition: '{input.ContentDisposition}'...";
179              var measurement = Stopwatch.Measure(log, logMessage, () =>
180              {
181                  return codexAccess.UploadFile(input);
182              });
183  
184              var response = measurement.Value;
185              transferSpeeds.AddUploadSample(size, measurement.Duration);
186  
187              if (string.IsNullOrEmpty(response)) FrameworkAssert.Fail("Received empty response.");
188              if (response.StartsWith(UploadFailedMessage)) FrameworkAssert.Fail("Node failed to store block.");
189  
190              Log($"Uploaded file {file.Describe()}. Received contentId: '{response}'.");
191  
192              var cid = new ContentId(response);
193              hooks.OnFileUploaded(uniqueId, size, cid);
194              return cid;
195          }
196  
197          public TrackedFile? DownloadContent(ContentId contentId, string fileLabel = "")
198          {
199              return DownloadContent(contentId, TimeSpan.FromMinutes(10.0), fileLabel);
200          }
201  
202          public TrackedFile? DownloadContent(ContentId contentId, TimeSpan timeout, string fileLabel = "")
203          {
204              var file = fileManager.CreateEmptyFile(fileLabel);
205              hooks.OnFileDownloading(contentId);
206              Log($"Downloading '{contentId}'...");
207  
208              var logMessage = $"Downloaded '{contentId}' to '{file.Filename}'";
209              var measurement = Stopwatch.Measure(log, logMessage, () => DownloadToFile(contentId.Id, file, timeout));
210  
211              var size = file.GetFilesize();
212              transferSpeeds.AddDownloadSample(size, measurement);
213              hooks.OnFileDownloaded(size, contentId);
214  
215              return file;
216          }
217  
218          public LocalDataset DownloadStreamless(ContentId cid)
219          {
220              Log($"Downloading streamless '{cid}' (no-wait)");
221              return codexAccess.DownloadStreamless(cid);
222          }
223  
224          public LocalDataset DownloadStreamlessWait(ContentId cid, ByteSize size)
225          {
226              Log($"Downloading streamless '{cid}' (wait till finished)");
227  
228              var sw = Stopwatch.Measure(log, nameof(DownloadStreamlessWait), () =>
229              {
230                  var startSpace = Space();
231                  var result = codexAccess.DownloadStreamless(cid);
232                  WaitUntilQuotaUsedIncreased(startSpace, size);
233                  return result;
234              });
235  
236              return sw.Value;
237          }
238  
239          public LocalDataset DownloadManifestOnly(ContentId cid)
240          {
241              Log($"Downloading manifest-only '{cid}'");
242              return codexAccess.DownloadManifestOnly(cid);
243          }
244  
245          public LocalDatasetList LocalFiles()
246          {
247              return codexAccess.LocalFiles();
248          }
249  
250          public CodexSpace Space()
251          {
252              return codexAccess.Space();
253          }
254  
255          public void ConnectToPeer(ICodexNode node)
256          {
257              var peer = (CodexNode)node;
258  
259              Log($"Connecting to peer {peer.GetName()}...");
260              var peerInfo = node.GetDebugInfo();
261              codexAccess.ConnectToPeer(peerInfo.Id, GetPeerMultiAddresses(peer, peerInfo));
262  
263              Log($"Successfully connected to peer {peer.GetName()}.");
264          }
265  
266          public void DeleteDataDirFolder()
267          {
268              codexAccess.DeleteDataDirFolder();
269          }
270  
271          public void Stop(bool waitTillStopped)
272          {
273              Log("Stopping...");
274              hooks.OnNodeStopping();
275              codexAccess.Stop(waitTillStopped);
276          }
277  
278          public IDownloadedLog DownloadLog(string additionalName = "")
279          {
280              return codexAccess.DownloadLog(additionalName);
281          }
282  
283          public Address GetDiscoveryEndpoint()
284          {
285              return codexAccess.GetDiscoveryEndpoint();
286          }
287  
288          public Address GetApiEndpoint()
289          {
290              return codexAccess.GetApiEndpoint();
291          }
292  
293          public Address GetListenEndpoint()
294          {
295              return codexAccess.GetListenEndpoint();
296          }
297  
298          public Address GetMetricsScrapeTarget()
299          {
300              var address = codexAccess.GetMetricsEndpoint();
301              if (address == null) throw new Exception("Metrics ScrapeTarget accessed, but node was not started with EnableMetrics()");
302              return address;
303          }
304  
305          public bool HasCrashed()
306          {
307              return codexAccess.HasCrashed();
308          }
309  
310          public override string ToString()
311          {
312              return $"CodexNode:{GetName()}";
313          }
314  
315          private void InitializePeerNodeId()
316          {
317              var debugInfo = codexAccess.GetDebugInfo();
318              if (!debugInfo.Version.IsValid())
319              {
320                  throw new Exception($"Invalid version information received from Codex node {GetName()}: {debugInfo.Version}");
321              }
322  
323              peerId = debugInfo.Id;
324              nodeId = debugInfo.Table.LocalNode.NodeId;
325              Version = debugInfo.Version;
326          }
327  
328          private void InitializeLogReplacements()
329          {
330              var nodeName = GetName();
331  
332              log.AddStringReplace(peerId, nodeName);
333              log.AddStringReplace(CodexUtils.ToShortId(peerId), nodeName);
334              log.AddStringReplace(nodeId, nodeName);
335              log.AddStringReplace(CodexUtils.ToShortId(nodeId), nodeName);
336  
337              var ethAccount = codexAccess.GetEthAccount();
338              if (ethAccount != null)
339              {
340                  var addr = ethAccount.EthAddress.ToString();
341                  log.AddStringReplace(addr, nodeName);
342                  log.AddStringReplace(addr.ToLowerInvariant(), nodeName);
343              }
344          }
345  
346          private string[] GetPeerMultiAddresses(CodexNode peer, DebugInfo peerInfo)
347          {
348              var peerId = peer.GetDiscoveryEndpoint().Host
349                  .Replace("http://", "")
350                  .Replace("https://", "");
351  
352              return peerInfo.Addrs.Select(a => a
353                  .Replace("0.0.0.0", peerId))
354                  .ToArray();
355          }
356  
357          private void DownloadToFile(string contentId, TrackedFile file, TimeSpan timeout)
358          {
359              using var fileStream = File.OpenWrite(file.Filename);
360              try
361              {
362                  // Type of stream generated by openAPI client does not support timeouts.
363                  // So we use a task and cancellation token to track our timeout manually.
364  
365                  var start = DateTime.UtcNow;
366                  var cts = new CancellationTokenSource();
367                  var downloadTask = Task.Run(() =>
368                  {
369                      using var downloadStream = codexAccess.DownloadFile(contentId);
370                      downloadStream.CopyTo(fileStream);
371                  }, cts.Token);
372                  
373                  while (DateTime.UtcNow - start < timeout)
374                  {
375                      if (downloadTask.IsFaulted) throw downloadTask.Exception;
376                      if (downloadTask.IsCompletedSuccessfully) return;
377                      Thread.Sleep(100);
378                  }
379  
380                  cts.Cancel();
381                  throw new TimeoutException($"Download of '{contentId}' timed out after {Time.FormatDuration(timeout)}");
382              }
383              catch (Exception ex)
384              {
385                  Log($"Failed to download file '{contentId}': {ex}");
386                  throw;
387              }
388          }
389  
390          public void WaitUntilQuotaUsedIncreased(CodexSpace startSpace, ByteSize expectedIncreaseOfQuotaUsed)
391          {
392              WaitUntilQuotaUsedIncreased(startSpace, expectedIncreaseOfQuotaUsed, TimeSpan.FromMinutes(30));
393          }
394  
395          public void WaitUntilQuotaUsedIncreased(
396              CodexSpace startSpace,
397              ByteSize expectedIncreaseOfQuotaUsed,
398              TimeSpan maxTimeout)
399          {
400              Log($"Waiting until quotaUsed " +
401                  $"(start: {startSpace.QuotaUsedBytes}) " +
402                  $"increases by {expectedIncreaseOfQuotaUsed} " +
403                  $"to reach {startSpace.QuotaUsedBytes + expectedIncreaseOfQuotaUsed.SizeInBytes}");
404  
405              var retry = new Retry($"Checking local space for quotaUsed increase of {expectedIncreaseOfQuotaUsed}",
406              maxTimeout: maxTimeout,
407              sleepAfterFail: TimeSpan.FromSeconds(10),
408              onFail: f => { },
409              failFast: false);
410  
411              retry.Run(() =>
412              {
413                  var space = Space();
414                  var increase = space.QuotaUsedBytes - startSpace.QuotaUsedBytes;
415  
416                  if (increase < expectedIncreaseOfQuotaUsed.SizeInBytes)
417                      throw new Exception($"Expected quota-used not reached. " +
418                          $"Expected increase: {expectedIncreaseOfQuotaUsed.SizeInBytes} " +
419                          $"Actual increase: {increase} " +
420                          $"Actual used: {space.QuotaUsedBytes}");
421              });
422          }
423  
424          private void EnsureMarketplace()
425          {
426              if (codexAccess.GetEthAccount() == null) throw new Exception("Marketplace is not enabled for this Codex node. Please start it with the option '.EnableMarketplace(...)' to enable it.");
427          }
428  
429          private void Log(string msg)
430          {
431              log.Log(msg);
432          }
433      }
434  }