/ src / libserver / groupcache.cpp
groupcache.cpp
  1  /*
  2      EIBD eib bus access and management daemon
  3      Copyright (C) 2005-2011 Martin Koegler <mkoegler@auto.tuwien.ac.at>
  4  
  5      This program is free software; you can redistribute it and/or modify
  6      it under the terms of the GNU General Public License as published by
  7      the Free Software Foundation; either version 2 of the License, or
  8      (at your option) any later version.
  9  
 10      This program is distributed in the hope that it will be useful,
 11      but WITHOUT ANY WARRANTY; without even the implied warranty of
 12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13      GNU General Public License for more details.
 14  
 15      You should have received a copy of the GNU General Public License
 16      along with this program; if not, write to the Free Software
 17      Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 18  */
 19  
 20  #include "groupcache.h"
 21  
 22  #include "apdu.h"
 23  #include "tpdu.h"
 24  
 25  GroupCache::GroupCache (const LinkConnectPtr& c, IniSectionPtr& s)
 26    : Driver(c,s)
 27  {
 28    t->setAuxName("G");
 29    TRACEPRINTF (t, 4, "GroupCacheInit");
 30    enable = 0;
 31    remtrigger.set<GroupCache, &GroupCache::remtrigger_cb>(this);
 32    addr = c->router.addr;
 33    c->is_local = true;
 34  }
 35  
 36  GroupCache::~GroupCache ()
 37  {
 38    remtrigger.stop();
 39    R_ITER(i,reader)
 40    {
 41      (*i)->stop(false);
 42      delete *i;
 43    }
 44    TRACEPRINTF (t, 4, "GroupCacheDestroy");
 45    Clear ();
 46  }
 47  
 48  bool
 49  GroupCache::setup()
 50  {
 51    if (!Driver::setup())
 52      return false;
 53    remtrigger.start();
 54    this->maxsize = cfg->value("max-size", 0xFFFF);
 55    return true;
 56  }
 57  
 58  void
 59  GroupCache::start()
 60  {
 61    enable = true;
 62    Driver::start();
 63  }
 64  
 65  void
 66  GroupCache::stop(bool err)
 67  {
 68    enable = false;
 69    Driver::stop(err);
 70  }
 71  
 72  void
 73  GroupCache::send_L_Data (LDataPtr lpdu)
 74  {
 75    if (enable)
 76      {
 77        TPDUPtr tpdu = TPDU::fromPacket (lpdu->address_type, lpdu->destination_address, lpdu->lsdu, t);
 78        if (tpdu->getType () == T_Data_Group)
 79          {
 80            T_Data_Group_PDU *tpdu1 = (T_Data_Group_PDU *) &*tpdu;
 81            if (tpdu1->tsdu.size() >= 2 && !(tpdu1->tsdu[0] & 0x3) &&
 82                ((tpdu1->tsdu[1] & 0xC0) == 0x40 || (tpdu1->tsdu[1] & 0xC0) == 0x80)) // response or write
 83              {
 84                CacheMap::iterator ci = cache.find (lpdu->destination_address);
 85                CacheMap::value_type *c;
 86                if (ci == cache.end())
 87                  {
 88                    while (cache_seq.size() >= maxsize)
 89                      {
 90                        SeqMap::iterator si = cache_seq.begin();
 91                        cache.erase(si->second);
 92                        cache_seq.erase(si);
 93                      }
 94                    c = &(*cache.emplace(lpdu->destination_address, GroupCacheEntry(lpdu->destination_address)).first);
 95                  }
 96                else
 97                  {
 98                    c = &(*ci);
 99                    cache_seq.erase(c->second.seq);
100                  }
101                c->second.src = lpdu->source_address;
102                c->second.data = tpdu1->tsdu;
103                c->second.recvtime = time (0);
104                c->second.seq = seq++;
105                cache_seq.emplace(c->second.seq,c->first);
106                updated(c->second);
107              }
108          }
109      }
110    send_Next();
111  }
112  
113  bool
114  GroupCache::Start ()
115  {
116    TRACEPRINTF (t, 4, "GroupCacheEnable");
117    enable = 1;
118    return true;
119  }
120  
121  void
122  GroupCache::Clear ()
123  {
124    TRACEPRINTF (t, 4, "GroupCacheClear");
125    cache.clear();
126  }
127  
128  void
129  GroupCache::Stop ()
130  {
131    Clear ();
132    TRACEPRINTF (t, 4, "GroupCacheStop");
133    enable = 0;
134  }
135  
136  void
137  GroupCache::remove (eibaddr_t ga)
138  {
139    CacheMap::iterator f = cache.find(ga);
140    if (f != cache.end())
141      {
142        cache_seq.erase(f->second.seq);
143        cache.erase(f);
144      }
145  }
146  
147  GroupCacheReader::GroupCacheReader(GroupCache *gc)
148  {
149    this->gc = gc;
150    gc->add(this);
151  }
152  
153  GroupCacheReader::~GroupCacheReader()
154  {
155  }
156  
157  void
158  GroupCacheReader::stop(bool err)
159  {
160    if (stopped)
161      return;
162    stopped = true;
163    gc->remove(this);
164  }
165  
166  void
167  GroupCache::add (GroupCacheReader * entry)
168  {
169    reader.push_back(entry);
170  }
171  
172  void
173  GroupCache::updated(GroupCacheEntry &c)
174  {
175    // do this in reverse so that the update handler can safely remove itself
176    R_ITER(i,reader)
177    (*i)->updated(c);
178  }
179  
180  void
181  GroupCache::remove (GroupCacheReader *)
182  {
183    remtrigger.send();
184  }
185  
186  void
187  GroupCache::remtrigger_cb(ev::async &, int)
188  {
189    // erase() doesn't do reverse iterators
190    //R_ITER(i,reader)
191    unsigned int i = reader.size();
192    while(i--)
193      {
194        GroupCacheReader *r = reader[i];
195        if (!r->stopped)
196          continue;
197        delete r;
198        reader.erase(reader.begin()+i);
199      }
200  }
201  
202  class GCReader : protected GroupCacheReader
203  {
204    GCReadCallback cb;
205    ClientConnPtr cc;
206    eibaddr_t addr;
207    uint16_t age;
208    ev::timer timeout;
209  public:
210    GCReader(GroupCache *gc, eibaddr_t addr, int Timeout, uint16_t age,
211             GCReadCallback cb, ClientConnPtr cc) : GroupCacheReader(gc)
212    {
213      this->cb = cb;
214      this->cc = cc;
215      this->addr = addr;
216      this->age = age;
217      timeout.set<GCReader,&GCReader::timeout_cb>(this);
218      timeout.start(Timeout/1000.0, 0);
219    }
220    virtual ~GCReader()
221    {
222      if (stopped)
223        return;
224      timeout.stop();
225      GroupCacheReader::stop(false);
226    }
227  private:
228    void updated(GroupCacheEntry &c)
229    {
230      if (stopped)
231        return;
232      if (c.dst != addr)
233        return;
234  
235      TRACEPRINTF (gc->t, 4, "GroupCache found: %s",
236                   FormatEIBAddr (c.src).c_str());
237      cb(c,false,cc);
238      stop(false);
239    }
240  
241    void timeout_cb(ev::timer &, int)
242    {
243      if (stopped)
244        return;
245  
246      GroupCacheEntry f(addr);
247      TRACEPRINTF (gc->t, 4, "GroupCache reread timeout");
248      cb(f,false,cc);
249      stop(false);
250    }
251  };
252  
253  void
254  GroupCache::Read (eibaddr_t addr, unsigned Timeout, uint16_t age,
255                    GCReadCallback cb, ClientConnPtr cc)
256  {
257    TRACEPRINTF (t, 4, "GroupCacheRead %s %d %d",
258                 FormatGroupAddr (addr).c_str(), Timeout, age);
259  
260    if (!enable)
261      {
262        GroupCacheEntry f(0);
263        TRACEPRINTF (t, 4, "GroupCache not enabled");
264        cb(f, Timeout == 0, cc);
265        return;
266      }
267  
268    CacheMap::iterator c = cache.find (addr);
269    if (c != cache.end() && age && c->second.recvtime + age < time (0))
270      c = cache.end();
271    if (c != cache.end())
272      {
273        TRACEPRINTF (t, 4, "GroupCache found: %s",
274                     FormatEIBAddr (c->second.src).c_str());
275        cb(c->second, Timeout == 0, cc);
276        return;
277      }
278  
279    if (!Timeout)
280      {
281        GroupCacheEntry f(addr);
282        TRACEPRINTF (t, 4, "GroupCache no entry");
283        cb(f, true, cc);
284        return;
285      }
286  
287    // No data fond. Send a Read request.
288    A_GroupValue_Read_PDU apdu;
289    T_Data_Group_PDU tpdu;
290    LDataPtr lpdu;
291  
292    new GCReader(this,addr,Timeout,age, cb,cc);
293  
294    tpdu.tsdu = apdu.ToPacket ();
295    lpdu = LDataPtr(new L_Data_PDU ());
296    lpdu->lsdu = tpdu.ToPacket ();
297    lpdu->source_address = 0;
298    lpdu->destination_address = addr;
299    lpdu->address_type = GroupAddress;
300    recv_L_Data (std::move(lpdu));
301  }
302  
303  class GCTracker : protected GroupCacheReader
304  {
305    GCLastCallback cb;
306    ClientConnPtr cc;
307    ev::timer timeout;
308    std::vector < eibaddr_t > a;
309    uint32_t start;
310  public:
311    bool stopped = false;
312  
313    GCTracker(GroupCache *gc, uint32_t start, int Timeout,
314              GCLastCallback cb, ClientConnPtr cc) : GroupCacheReader(gc)
315    {
316      this->cb = cb;
317      this->cc = cc;
318      this->start = start;
319      timeout.set<GCTracker,&GCTracker::timeout_cb>(this);
320      timeout.start(Timeout,0);
321      if (start != gc->seq)
322        handler();
323    }
324    virtual ~GCTracker()
325    {
326      a.clear();
327    }
328    void stop(bool err)
329    {
330      if (stopped)
331        return;
332      timeout.stop();
333      GroupCacheReader::stop(err);
334    }
335  private:
336    void updated(GroupCacheEntry &)
337    {
338      if (stopped)
339        return;
340  
341      handler();
342    }
343  
344    void timeout_cb(ev::timer &, int)
345    {
346      if (stopped)
347        return;
348      if (handler())
349        return;
350      cb(a,gc->seq,cc);
351      stop(true);
352    }
353  
354    bool handler()
355    {
356      TRACEPRINTF (gc->t, 8, "LastUpdates start: x%x pos: x%x", start, gc->seq);
357      SeqMap::const_iterator sa = gc->cache_seq.begin();
358      SeqMap::const_iterator sb = gc->cache_seq.end();
359      while (sb != sa && (--sb)->first >= start)
360        {
361          a.push_back (sb->second);
362          if (sb->first == start)
363            break;
364        }
365      cb(a,gc->seq,cc);
366      stop(false);
367      return true;
368    }
369  };
370  
371  void
372  GroupCache::LastUpdates (uint16_t start, uint8_t Timeout,
373                           GCLastCallback cb, ClientConnPtr cc)
374  {
375    uint32_t st;
376    // counter wraparound
377    st = (seq&~0xFFFF) + start;
378    if (st > seq)
379      st -= 0x10000;
380    new GCTracker(this, st, Timeout, cb,cc);
381  }
382  
383  void
384  GroupCache::LastUpdates2 (uint32_t start, uint8_t Timeout,
385                            GCLastCallback cb, ClientConnPtr cc)
386  {
387    new GCTracker(this, start, Timeout, cb,cc);
388  }
389