events.jl
1 @doc """EventTrace structure for managing event data. 2 3 $(FIELDS) 4 5 Represents a collection of events with caching capabilities. It is designed to efficiently handle large datasets by caching event data in memory. The structure includes a ZarrInstance for data storage, a ZArray for data access, a cache for temporary storage, a frequency for event timing, and an index for the next event. 6 7 """ 8 mutable struct EventTrace{I<:ZarrInstance,Z<:ZArray} 9 const lock::ReentrantLock 10 const _buf::IOBuffer 11 const _zi::I 12 const _arr::Z 13 const _cache::Vector{Vector{Vector{UInt8}}} 14 const freq::Period 15 last_flush::DateTime 16 function EventTrace(name; freq=Second(1), path=nothing, zi=nothing) 17 zi_args = isnothing(path) ? () : (path,) 18 zi = @something zi ZarrInstance(zi_args...) 19 arr = load_data(zi, string(name); serialized=true, as_z=true)[1] 20 cache = Matrix{Vector{UInt8}}[] 21 new{typeof(zi),typeof(arr)}( 22 ReentrantLock(), IOBuffer(), zi, arr, cache, freq, DateTime(0) 23 ) 24 end 25 end 26 27 eventtrace(args...; kwargs...) = EventTrace(args...; kwargs...) 28 29 @nospecialize 30 function Base.print(io::IO, et::EventTrace) 31 println(io, "EventTrace") 32 println(io, "name: ", et._arr.path) 33 n = size(et._arr, 1) 34 println(io, "events: ", n) 35 if size(et._arr, 1) > 0 36 start = todata(et._arr[begin, 1]) 37 stop = todata(et._arr[end, 1]) 38 println(io, "period: ", start, " .. ", stop) 39 else 40 println(io, "period: ", nothing) 41 end 42 println( 43 io, 44 "last event: ", 45 if n == 0 46 nothing 47 else 48 last_v = todata(et._arr[end, 2]) 49 if hasfield(typeof(last_v), :tag) 50 last_v.tag 51 end 52 end, 53 ) 54 end 55 56 Base.display(s::EventTrace; kwargs...) = print(s) 57 Base.show(out::IO, ::MIME"text/plain", s::EventTrace; kwargs...) = print(out, s; kwargs...) 58 Base.show(out::IO, s::EventTrace; kwargs...) = print(out, ":", nameof(s)) 59 60 @specialize 61 62 function Base.push!(et::EventTrace, v; event_date=now(), this_date=now(), sync=false) 63 @lock et.lock begin 64 this_v = tobytes.(et._buf, [event_date, v]) 65 push!(et._cache, this_v) 66 if sync || this_date - et.last_flush > et.freq 67 n_cached = size(et._cache, 1) 68 if n_cached > 0 69 this_size = size(et._arr, 1) 70 resize!(et._arr, this_size + n_cached, 2) 71 et._arr[(this_size + 1):end, :] .= permutedims( 72 hcat(splice!(et._cache, eachindex(et._cache))...) 73 ) 74 et.last_flush = this_date 75 end 76 end 77 v 78 end 79 end 80 81 Base.empty!(et::EventTrace) = (empty!(et._arr); resize!(et._arr, 0, 2)) 82 Base.length(et::EventTrace) = size(et._arr, 1) 83 Base.isempty(et::EventTrace) = isempty(et._arr) 84 85 function trace_tail(et::EventTrace, n=10; as_df=false) 86 len = length(et) 87 if iszero(len) 88 return nothing 89 end 90 ans = @lock(et.lock, todata.(et._buf, et._arr[(end - min(len, n) + 1):end, :])) 91 if as_df 92 dates = DateTime[] 93 tags = Symbol[] 94 data = Any[] 95 for v in eachrow(ans) 96 ev = v[2] 97 push!(dates, v[1]) 98 push!(tags, ev.tag) 99 push!(data, ev) 100 end 101 DataFrame([dates, tags, data], [:date, :tag, :data]; copycols=false) 102 else 103 ans 104 end 105 end