StreamSource.cpp
1 #include "StreamSource.h" 2 #include <string> 3 #include "misc.h" 4 #include "SecCFRelease.h" 5 6 using namespace std; 7 8 CFStringRef gStreamSourceName = CFSTR("StreamSource"); 9 10 const CFIndex kMaximumSize = 2048; 11 12 StreamSource::StreamSource(CFReadStreamRef input, Transform* transform, CFStringRef name) 13 : Source(gStreamSourceName, transform, name), 14 mReadStream(input), 15 mReading(dispatch_group_create()) 16 { 17 dispatch_group_enter(mReading); 18 CFRetainSafe(mReadStream); 19 } 20 21 void StreamSource::BackgroundActivate() 22 { 23 CFIndex result = 0; 24 25 do 26 { 27 // make a buffer big enough to handle the object 28 // NOTE: allocating this on the stack and letting CFDataCreate copy it is _faster_ then malloc and CFDataCreateWithBytes(..., kCFAllactorMalloc) by a fair margin. At least for 2K chunks. Retest if changing the size. 29 UInt8 buffer[kMaximumSize]; 30 31 result = CFReadStreamRead(mReadStream, buffer, kMaximumSize); 32 33 if (result > 0) // was data returned? 34 { 35 // make the data and send it to the transform 36 CFDataRef data = CFDataCreate(NULL, buffer, result); 37 38 CFErrorRef error = mDestination->SetAttribute(mDestinationName, data); 39 40 CFReleaseNull(data); 41 42 if (error != NULL) // we have a problem, there was probably an abort on the chain 43 { 44 return; // quiesce the source 45 } 46 } 47 } while (result > 0); 48 49 if (result < 0) 50 { 51 // we got an error! 52 CFErrorRef error = CFReadStreamCopyError(mReadStream); 53 mDestination->SetAttribute(mDestinationName, error); 54 if (error) 55 { 56 // NOTE: CF doesn't always tell us about this error. Arguably it could be better to 57 // "invent" a generic error, but it is a hard argument that we want to crash in CFReleaseNull(NULL)... 58 CFReleaseNull(error); 59 } 60 } 61 else 62 { 63 // send an EOS 64 mDestination->SetAttribute(mDestinationName, NULL); // end of stream 65 } 66 } 67 68 void StreamSource::DoActivate() 69 { 70 CFRetainSafe(mDestination->GetCFObject()); 71 dispatch_group_async(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{ 72 this->BackgroundActivate(); 73 CFReleaseSafe(mDestination->GetCFObject()); 74 }); 75 dispatch_group_leave(mReading); 76 } 77 78 void StreamSource::Finalize() 79 { 80 dispatch_group_notify(mReading, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^{ 81 delete this; 82 }); 83 } 84 85 StreamSource::~StreamSource() 86 { 87 CFReleaseNull(mReadStream); 88 mReadStream = NULL; 89 dispatch_release(mReading); 90 mReading = NULL; 91 } 92 93 94 Boolean StreamSource::Equal(const CoreFoundationObject* object) 95 { 96 // not equal if we are not the same object 97 if (Source::Equal(object)) 98 { 99 const StreamSource* ss = (StreamSource*) object; 100 return CFEqual(ss->mReadStream, mReadStream); 101 } 102 103 return false; 104 } 105 106 107 108 CFTypeRef StreamSource::Make(CFReadStreamRef input, Transform* transform, CFStringRef name) 109 { 110 return CoreFoundationHolder::MakeHolder(gInternalCFObjectName, new StreamSource(input, transform, name)); 111 } 112 113 114 115 string StreamSource::DebugDescription() 116 { 117 string result = Source::DebugDescription() + ": Stream "; 118 119 char buffer[256]; 120 snprintf(buffer, sizeof(buffer), "(mReadStream = %p)", mReadStream); 121 122 result += buffer; 123 124 return result; 125 }