/ Data / src / events.jl
events.jl
  1  @doc """EventTrace structure for managing event data.
  2  
  3  $(FIELDS)
  4  
  5  Represents a collection of events with caching capabilities. It is designed to efficiently handle large datasets by caching event data in memory. The structure includes a ZarrInstance for data storage, a ZArray for data access, a cache for temporary storage, a frequency for event timing, and an index for the next event.
  6  
  7  """
  8  mutable struct EventTrace{I<:ZarrInstance,Z<:ZArray}
  9      const lock::ReentrantLock
 10      const _buf::IOBuffer
 11      const _zi::I
 12      const _arr::Z
 13      const _cache::Vector{Vector{Vector{UInt8}}}
 14      const freq::Period
 15      last_flush::DateTime
 16      function EventTrace(name; freq=Second(1), path=nothing, zi=nothing)
 17          zi_args = isnothing(path) ? () : (path,)
 18          zi = @something zi ZarrInstance(zi_args...)
 19          arr = load_data(zi, string(name); serialized=true, as_z=true)[1]
 20          cache = Matrix{Vector{UInt8}}[]
 21          new{typeof(zi),typeof(arr)}(
 22              ReentrantLock(), IOBuffer(), zi, arr, cache, freq, DateTime(0)
 23          )
 24      end
 25  end
 26  
 27  eventtrace(args...; kwargs...) = EventTrace(args...; kwargs...)
 28  
 29  @nospecialize
 30  function Base.print(io::IO, et::EventTrace)
 31      println(io, "EventTrace")
 32      println(io, "name: ", et._arr.path)
 33      n = size(et._arr, 1)
 34      println(io, "events: ", n)
 35      if size(et._arr, 1) > 0
 36          start = todata(et._arr[begin, 1])
 37          stop = todata(et._arr[end, 1])
 38          println(io, "period: ", start, " .. ", stop)
 39      else
 40          println(io, "period: ", nothing)
 41      end
 42      println(
 43          io,
 44          "last event: ",
 45          if n == 0
 46              nothing
 47          else
 48              last_v = todata(et._arr[end, 2])
 49              if hasfield(typeof(last_v), :tag)
 50                  last_v.tag
 51              end
 52          end,
 53      )
 54  end
 55  
 56  Base.display(s::EventTrace; kwargs...) = print(s)
 57  Base.show(out::IO, ::MIME"text/plain", s::EventTrace; kwargs...) = print(out, s; kwargs...)
 58  Base.show(out::IO, s::EventTrace; kwargs...) = print(out, ":", nameof(s))
 59  
 60  @specialize
 61  
 62  function Base.push!(et::EventTrace, v; event_date=now(), this_date=now(), sync=false)
 63      @lock et.lock begin
 64          this_v = tobytes.(et._buf, [event_date, v])
 65          push!(et._cache, this_v)
 66          if sync || this_date - et.last_flush > et.freq
 67              n_cached = size(et._cache, 1)
 68              if n_cached > 0
 69                  this_size = size(et._arr, 1)
 70                  resize!(et._arr, this_size + n_cached, 2)
 71                  et._arr[(this_size + 1):end, :] .= permutedims(
 72                      hcat(splice!(et._cache, eachindex(et._cache))...)
 73                  )
 74                  et.last_flush = this_date
 75              end
 76          end
 77          v
 78      end
 79  end
 80  
 81  Base.empty!(et::EventTrace) = (empty!(et._arr); resize!(et._arr, 0, 2))
 82  Base.length(et::EventTrace) = size(et._arr, 1)
 83  Base.isempty(et::EventTrace) = isempty(et._arr)
 84  
 85  function trace_tail(et::EventTrace, n=10; as_df=false)
 86      len = length(et)
 87      if iszero(len)
 88          return nothing
 89      end
 90      ans = @lock(et.lock, todata.(et._buf, et._arr[(end - min(len, n) + 1):end, :]))
 91      if as_df
 92          dates = DateTime[]
 93          tags = Symbol[]
 94          data = Any[]
 95          for v in eachrow(ans)
 96              ev = v[2]
 97              push!(dates, v[1])
 98              push!(tags, ev.tag)
 99              push!(data, ev)
100          end
101          DataFrame([dates, tags, data], [:date, :tag, :data]; copycols=false)
102      else
103          ans
104      end
105  end