groupcache.cpp
1 /* 2 EIBD eib bus access and management daemon 3 Copyright (C) 2005-2011 Martin Koegler <mkoegler@auto.tuwien.ac.at> 4 5 This program is free software; you can redistribute it and/or modify 6 it under the terms of the GNU General Public License as published by 7 the Free Software Foundation; either version 2 of the License, or 8 (at your option) any later version. 9 10 This program is distributed in the hope that it will be useful, 11 but WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 GNU General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with this program; if not, write to the Free Software 17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 18 */ 19 20 #include "groupcache.h" 21 22 #include "apdu.h" 23 #include "tpdu.h" 24 25 GroupCache::GroupCache (const LinkConnectPtr& c, IniSectionPtr& s) 26 : Driver(c,s) 27 { 28 t->setAuxName("G"); 29 TRACEPRINTF (t, 4, "GroupCacheInit"); 30 enable = 0; 31 remtrigger.set<GroupCache, &GroupCache::remtrigger_cb>(this); 32 addr = c->router.addr; 33 c->is_local = true; 34 } 35 36 GroupCache::~GroupCache () 37 { 38 remtrigger.stop(); 39 R_ITER(i,reader) 40 { 41 (*i)->stop(false); 42 delete *i; 43 } 44 TRACEPRINTF (t, 4, "GroupCacheDestroy"); 45 Clear (); 46 } 47 48 bool 49 GroupCache::setup() 50 { 51 if (!Driver::setup()) 52 return false; 53 remtrigger.start(); 54 this->maxsize = cfg->value("max-size", 0xFFFF); 55 return true; 56 } 57 58 void 59 GroupCache::start() 60 { 61 enable = true; 62 Driver::start(); 63 } 64 65 void 66 GroupCache::stop(bool err) 67 { 68 enable = false; 69 Driver::stop(err); 70 } 71 72 void 73 GroupCache::send_L_Data (LDataPtr lpdu) 74 { 75 if (enable) 76 { 77 TPDUPtr tpdu = TPDU::fromPacket (lpdu->address_type, lpdu->destination_address, lpdu->lsdu, t); 78 if (tpdu->getType () == T_Data_Group) 79 { 80 T_Data_Group_PDU *tpdu1 = (T_Data_Group_PDU *) &*tpdu; 81 if (tpdu1->tsdu.size() >= 2 && !(tpdu1->tsdu[0] & 0x3) && 82 ((tpdu1->tsdu[1] & 0xC0) == 0x40 || (tpdu1->tsdu[1] & 0xC0) == 0x80)) // response or write 83 { 84 CacheMap::iterator ci = cache.find (lpdu->destination_address); 85 CacheMap::value_type *c; 86 if (ci == cache.end()) 87 { 88 while (cache_seq.size() >= maxsize) 89 { 90 SeqMap::iterator si = cache_seq.begin(); 91 cache.erase(si->second); 92 cache_seq.erase(si); 93 } 94 c = &(*cache.emplace(lpdu->destination_address, GroupCacheEntry(lpdu->destination_address)).first); 95 } 96 else 97 { 98 c = &(*ci); 99 cache_seq.erase(c->second.seq); 100 } 101 c->second.src = lpdu->source_address; 102 c->second.data = tpdu1->tsdu; 103 c->second.recvtime = time (0); 104 c->second.seq = seq++; 105 cache_seq.emplace(c->second.seq,c->first); 106 updated(c->second); 107 } 108 } 109 } 110 send_Next(); 111 } 112 113 bool 114 GroupCache::Start () 115 { 116 TRACEPRINTF (t, 4, "GroupCacheEnable"); 117 enable = 1; 118 return true; 119 } 120 121 void 122 GroupCache::Clear () 123 { 124 TRACEPRINTF (t, 4, "GroupCacheClear"); 125 cache.clear(); 126 } 127 128 void 129 GroupCache::Stop () 130 { 131 Clear (); 132 TRACEPRINTF (t, 4, "GroupCacheStop"); 133 enable = 0; 134 } 135 136 void 137 GroupCache::remove (eibaddr_t ga) 138 { 139 CacheMap::iterator f = cache.find(ga); 140 if (f != cache.end()) 141 { 142 cache_seq.erase(f->second.seq); 143 cache.erase(f); 144 } 145 } 146 147 GroupCacheReader::GroupCacheReader(GroupCache *gc) 148 { 149 this->gc = gc; 150 gc->add(this); 151 } 152 153 GroupCacheReader::~GroupCacheReader() 154 { 155 } 156 157 void 158 GroupCacheReader::stop(bool err) 159 { 160 if (stopped) 161 return; 162 stopped = true; 163 gc->remove(this); 164 } 165 166 void 167 GroupCache::add (GroupCacheReader * entry) 168 { 169 reader.push_back(entry); 170 } 171 172 void 173 GroupCache::updated(GroupCacheEntry &c) 174 { 175 // do this in reverse so that the update handler can safely remove itself 176 R_ITER(i,reader) 177 (*i)->updated(c); 178 } 179 180 void 181 GroupCache::remove (GroupCacheReader *) 182 { 183 remtrigger.send(); 184 } 185 186 void 187 GroupCache::remtrigger_cb(ev::async &, int) 188 { 189 // erase() doesn't do reverse iterators 190 //R_ITER(i,reader) 191 unsigned int i = reader.size(); 192 while(i--) 193 { 194 GroupCacheReader *r = reader[i]; 195 if (!r->stopped) 196 continue; 197 delete r; 198 reader.erase(reader.begin()+i); 199 } 200 } 201 202 class GCReader : protected GroupCacheReader 203 { 204 GCReadCallback cb; 205 ClientConnPtr cc; 206 eibaddr_t addr; 207 uint16_t age; 208 ev::timer timeout; 209 public: 210 GCReader(GroupCache *gc, eibaddr_t addr, int Timeout, uint16_t age, 211 GCReadCallback cb, ClientConnPtr cc) : GroupCacheReader(gc) 212 { 213 this->cb = cb; 214 this->cc = cc; 215 this->addr = addr; 216 this->age = age; 217 timeout.set<GCReader,&GCReader::timeout_cb>(this); 218 timeout.start(Timeout/1000.0, 0); 219 } 220 virtual ~GCReader() 221 { 222 if (stopped) 223 return; 224 timeout.stop(); 225 GroupCacheReader::stop(false); 226 } 227 private: 228 void updated(GroupCacheEntry &c) 229 { 230 if (stopped) 231 return; 232 if (c.dst != addr) 233 return; 234 235 TRACEPRINTF (gc->t, 4, "GroupCache found: %s", 236 FormatEIBAddr (c.src).c_str()); 237 cb(c,false,cc); 238 stop(false); 239 } 240 241 void timeout_cb(ev::timer &, int) 242 { 243 if (stopped) 244 return; 245 246 GroupCacheEntry f(addr); 247 TRACEPRINTF (gc->t, 4, "GroupCache reread timeout"); 248 cb(f,false,cc); 249 stop(false); 250 } 251 }; 252 253 void 254 GroupCache::Read (eibaddr_t addr, unsigned Timeout, uint16_t age, 255 GCReadCallback cb, ClientConnPtr cc) 256 { 257 TRACEPRINTF (t, 4, "GroupCacheRead %s %d %d", 258 FormatGroupAddr (addr).c_str(), Timeout, age); 259 260 if (!enable) 261 { 262 GroupCacheEntry f(0); 263 TRACEPRINTF (t, 4, "GroupCache not enabled"); 264 cb(f, Timeout == 0, cc); 265 return; 266 } 267 268 CacheMap::iterator c = cache.find (addr); 269 if (c != cache.end() && age && c->second.recvtime + age < time (0)) 270 c = cache.end(); 271 if (c != cache.end()) 272 { 273 TRACEPRINTF (t, 4, "GroupCache found: %s", 274 FormatEIBAddr (c->second.src).c_str()); 275 cb(c->second, Timeout == 0, cc); 276 return; 277 } 278 279 if (!Timeout) 280 { 281 GroupCacheEntry f(addr); 282 TRACEPRINTF (t, 4, "GroupCache no entry"); 283 cb(f, true, cc); 284 return; 285 } 286 287 // No data fond. Send a Read request. 288 A_GroupValue_Read_PDU apdu; 289 T_Data_Group_PDU tpdu; 290 LDataPtr lpdu; 291 292 new GCReader(this,addr,Timeout,age, cb,cc); 293 294 tpdu.tsdu = apdu.ToPacket (); 295 lpdu = LDataPtr(new L_Data_PDU ()); 296 lpdu->lsdu = tpdu.ToPacket (); 297 lpdu->source_address = 0; 298 lpdu->destination_address = addr; 299 lpdu->address_type = GroupAddress; 300 recv_L_Data (std::move(lpdu)); 301 } 302 303 class GCTracker : protected GroupCacheReader 304 { 305 GCLastCallback cb; 306 ClientConnPtr cc; 307 ev::timer timeout; 308 std::vector < eibaddr_t > a; 309 uint32_t start; 310 public: 311 bool stopped = false; 312 313 GCTracker(GroupCache *gc, uint32_t start, int Timeout, 314 GCLastCallback cb, ClientConnPtr cc) : GroupCacheReader(gc) 315 { 316 this->cb = cb; 317 this->cc = cc; 318 this->start = start; 319 timeout.set<GCTracker,&GCTracker::timeout_cb>(this); 320 timeout.start(Timeout,0); 321 if (start != gc->seq) 322 handler(); 323 } 324 virtual ~GCTracker() 325 { 326 a.clear(); 327 } 328 void stop(bool err) 329 { 330 if (stopped) 331 return; 332 timeout.stop(); 333 GroupCacheReader::stop(err); 334 } 335 private: 336 void updated(GroupCacheEntry &) 337 { 338 if (stopped) 339 return; 340 341 handler(); 342 } 343 344 void timeout_cb(ev::timer &, int) 345 { 346 if (stopped) 347 return; 348 if (handler()) 349 return; 350 cb(a,gc->seq,cc); 351 stop(true); 352 } 353 354 bool handler() 355 { 356 TRACEPRINTF (gc->t, 8, "LastUpdates start: x%x pos: x%x", start, gc->seq); 357 SeqMap::const_iterator sa = gc->cache_seq.begin(); 358 SeqMap::const_iterator sb = gc->cache_seq.end(); 359 while (sb != sa && (--sb)->first >= start) 360 { 361 a.push_back (sb->second); 362 if (sb->first == start) 363 break; 364 } 365 cb(a,gc->seq,cc); 366 stop(false); 367 return true; 368 } 369 }; 370 371 void 372 GroupCache::LastUpdates (uint16_t start, uint8_t Timeout, 373 GCLastCallback cb, ClientConnPtr cc) 374 { 375 uint32_t st; 376 // counter wraparound 377 st = (seq&~0xFFFF) + start; 378 if (st > seq) 379 st -= 0x10000; 380 new GCTracker(this, st, Timeout, cb,cc); 381 } 382 383 void 384 GroupCache::LastUpdates2 (uint32_t start, uint8_t Timeout, 385 GCLastCallback cb, ClientConnPtr cc) 386 { 387 new GCTracker(this, start, Timeout, cb,cc); 388 } 389