DSS Source Code Analyse (09) - RTSPSession::Run
SInt64 RTSPSession::Run(){ // get events EventFlags events = this->GetEvents(); QTSS_Error err = QTSS_NoErr; QTSSModule* theModule = NULL; UInt32 numModules = 0; Assert
SInt64 RTSPSession::Run()
{
// get events
EventFlags events = this->GetEvents();QTSS_Error err = QTSS_NoErr;
QTSSModule* theModule = NULL;
UInt32 numModules = 0;
Assert(fLastRTPSessionIDPtr.Ptr == &fLastRTPSessionID[0]);
// Some callbacks look for this struct in the thread object
// set current thread's fThreadData= &fModuleState
OSThreadDataSetter theSetter(&fModuleState, NULL);//check for a timeout or a kill. If so, just consider the session dead
if ((events & Task::kTimeoutEvent) || (events & Task::kKillEvent))
fLiveSession = false;
while (this->IsLiveSession())
{
// RTSP Session state machine. There are several well defined points in an RTSP request
// where this session may have to return from its run function and wait for a new event.
// Because of this, we need to track our current state and return to it.
switch (fState)
{
case kReadingFirstRequest:
{
if ((err = fInputStream.ReadRequest()) == QTSS_NoErr)
{
// If the RequestStream returns QTSS_NoErr, it means
// that we've read all outstanding data off the socket,
// and still don't have a full request. Wait for more data.
//+rt use the socket that reads the data, may be different now.
// register read event once again for an uncompletely request
fInputSocketP->RequestEvent(EV_RE);return 0;
}
if ((err != QTSS_RequestArrived) && (err != E2BIG))
{
// Any other error implies that the client has gone away. At this point,
// we can't have 2 sockets, so we don't need to do the "half closed" check
// we do below
Assert(err > 0);
Assert(!this->IsLiveSession());
break;
}
// A complete request received, state transfer to kHTTPFilteringRequest
if (err == QTSS_RequestArrived)
fState = kHTTPFilteringRequest;
// If we get an E2BIG, it means our buffer was overfilled.
// In that case, we can just jump into the following state, and
// the code their does a check for this error and returns an error.
// transfer state to kHaveNonTunnelMessage
if (err == E2BIG)fState = kHaveNonTunnelMessage;
}
continue;
case kHTTPFilteringRequest:
{
HTTP_TRACE( "RTSPSession::Run kHTTPFilteringRequest\n" )
// transfer state to kHaveNonTunnelMessage
fState = kHaveNonTunnelMessage; // assume it's not a tunnel setup message
// prefilter will set correct tunnel state if it is.
// if no http tunneled or normal http tunneled, QTSS_NoErr will be returned from PreFilterForHTTPProxyTunnel
QTSS_Error preFilterErr = this->PreFilterForHTTPProxyTunnel();if ( preFilterErr == QTSS_NoErr )
{
HTTP_TRACE( "RTSPSession::Run kHTTPFilteringRequest\n" )
continue;
}
else
{
// pre filter error indicates a tunnelling message that could
// not join to a session.
HTTP_TRACE( "RTSPSession::Run kHTTPFilteringRequest Tunnel protocol ERROR.\n" )
return -1;
}
}
case kWaitingToBindHTTPTunnel:
//flush the GET response, if it's there
err = fOutputStream.Flush();
if (err == EAGAIN)
{
// If we get this error, we are currently flow-controlled and should
// wait for the socket to become writeable again
fSocket.RequestEvent(EV_WR);
}
return 0;
//continue;
case kSocketHasBeenBoundIntoHTTPTunnel:
// DMS - Can this execute either? I don't think so... this one
// we may not need...
// I've been joined, it's time to kill this session.
Assert(!this->IsLiveSession()); // at least the socket should not report connected any longer
HTTP_TRACE( "RTSPSession has died of snarfage.\n" )
break;
case kReadingRequest:
{
// We should lock down the session while reading in data,
// because we can't snarf up a POST while reading.
// we should be only reading an RTSP request here, no HTTP tunnel messages
if ((err = fInputStream.ReadRequest()) == QTSS_NoErr)
{
// If the RequestStream returns QTSS_NoErr, it means
// that we've read all outstanding data off the socket,
// and still don't have a full request. Wait for more data.
//+rt use the socket that reads the data, may be different now.
fInputSocketP->RequestEvent(EV_RE);
return 0;
}
if ((err != QTSS_RequestArrived) && (err != E2BIG) && (err != QTSS_BadArgument))
{
//Any other error implies that the input connection has gone away.
// We should only kill the whole session if we aren't doing HTTP.
// (If we are doing HTTP, the POST connection can go away)
Assert(err > 0);
if (fOutputSocketP->IsConnected())
{
// If we've gotten here, this must be an HTTP session with
// a dead input connection. If that's the case, we should
// clean up immediately so as to not have an open socket
// needlessly lingering around, taking up space.
Assert(fOutputSocketP != fInputSocketP);
Assert(!fInputSocketP->IsConnected());
fInputSocketP->Cleanup();
return 0;
}
else
{
Assert(!this->IsLiveSession());
break;
}
}
fState = kHaveNonTunnelMessage;
// fall thru to kHaveNonTunnelMessage
}
case kHaveNonTunnelMessage:
{
// should only get here when fInputStream has a full message built.
Assert( fInputStream.GetRequestBuffer() );
Assert(fRequest == NULL);
fRequest = NEW RTSPRequest(this);
fRoleParams.rtspRequestParams.inRTSPRequest = fRequest;
fRoleParams.rtspRequestParams.inRTSPHeaders = fRequest->GetHeaderDictionary();
// We have an RTSP request and are about to begin processing. We need to
// make sure that anyone sending interleaved data on this session won't
// be allowed to do so until we are done sending our response
// We also make sure that a POST session can't snarf in while we're
// processing the request.
fReadMutex.Lock();
fSessionMutex.Lock();
// The fOutputStream's fBytesWritten counter is used to
// count the # of bytes for this RTSP response. So, at
// this point, reset it to 0 (we can then just let it increment
// until the next request comes in)
fOutputStream.ResetBytesWritten();
// Check for an overfilled buffer, and return an error.
if (err == E2BIG)
{
(void)QTSSModuleUtils::SendErrorResponse(fRequest, qtssClientBadRequest,
qtssMsgRequestTooLong);
fState = kPostProcessingRequest;
break;
}
// Check for a corrupt base64 error, return an error
if (err == QTSS_BadArgument)
{
(void)QTSSModuleUtils::SendErrorResponse(fRequest, qtssClientBadRequest,
qtssMsgBadBase64);
fState = kPostProcessingRequest;
break;
}
Assert(err == QTSS_RequestArrived);
fState = kFilteringRequest;
// Note that there is no break here. We'd like to continue onto the next
// state at this point. This goes for every case in this case statement
}
case kFilteringRequest:
{
// We received something so auto refresh
// The need to auto refresh is because the api doesn't allow a module to refresh at this point
//
fTimeoutTask.RefreshTimeout();
//
// Before we even do this, check to see if this is a *data* packet,
// in which case this isn't an RTSP request, so we don't need to go
// through any of the remaining steps
if (fInputStream.IsDataPacket()) // can this interfere with MP3?
{
this->HandleIncomingDataPacket();
fState = kCleaningUp;
break;
}
//
// In case a module wants to replace the request
char* theReplacedRequest = NULL;
char* oldReplacedRequest = NULL;
// Setup the filter param block
QTSS_RoleParams theFilterParams;
theFilterParams.rtspFilterParams.inRTSPSession = this;
theFilterParams.rtspFilterParams.inRTSPRequest = fRequest;
theFilterParams.rtspFilterParams.outNewRequest = &theReplacedRequest;
// Invoke filter modules
numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kRTSPFilterRole);
for (; (fCurrentModule < numModules) && ((!fRequest->HasResponseBeenSent()) || fModuleState.eventRequested); fCurrentModule++)
{
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
if (fModuleState.globalLockRequested )
{ fModuleState.globalLockRequested = false;
fModuleState.isGlobalLocked = true;
}
theModule = QTSServerInterface::GetModule(QTSSModule::kRTSPFilterRole, fCurrentModule);
(void)theModule->CallDispatch(QTSS_RTSPFilter_Role, &theFilterParams);
fModuleState.isGlobalLocked = false;
// If this module has requested an event, return and wait for the event to transpire
if (fModuleState.globalLockRequested) // call this request back locked
return this->CallLocked();
if (fModuleState.eventRequested)
{
this->ForceSameThread(); // We are holding mutexes, so we need to force
// the same thread to be used for next Run()
return fModuleState.idleTime; // If the module has requested idle time...
}
//
// Check to see if this module has replaced the request. If so, check
// to see if there is an old replacement that we should delete
if (theReplacedRequest != NULL)
{
if (oldReplacedRequest != NULL)
delete [] oldReplacedRequest;
fRequest->SetVal(qtssRTSPReqFullRequest, theReplacedRequest, ::strlen(theReplacedRequest));
oldReplacedRequest = theReplacedRequest;
theReplacedRequest = NULL;
}
}
fCurrentModule = 0;
if (fRequest->HasResponseBeenSent())
{
fState = kPostProcessingRequest;
break;
}
if (fSentOptionsRequest && this->ParseOptionsResponse())
{
fRoundTripTime = (SInt32) (OS::Milliseconds() - fOptionsRequestSendTime);
//qtss_printf("RTSPSession::Run RTT time = %"_S32BITARG_" msec\n", fRoundTripTime);
fState = kSendingResponse;
break;
}
else
// Otherwise, this is a normal request, so parse it and get the RTPSession.
this->SetupRequest();
// This might happen if there is some syntax or other error,
// or if it is an OPTIONS request
if (fRequest->HasResponseBeenSent())
{
fState = kPostProcessingRequest;
break;
}
fState = kRoutingRequest;
}
case kRoutingRequest:
{
// Invoke router modules
numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kRTSPRouteRole);
{
// Manipulation of the RTPSession from the point of view of
// a module is guaranteed to be atomic by the API.
Assert(fRTPSession != NULL);
OSMutexLocker locker(fRTPSession->GetSessionMutex());
for (; (fCurrentModule < numModules) && ((!fRequest->HasResponseBeenSent()) || fModuleState.eventRequested); fCurrentModule++)
{
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
if (fModuleState.globalLockRequested )
{ fModuleState.globalLockRequested = false;
fModuleState.isGlobalLocked = true;
}
theModule = QTSServerInterface::GetModule(QTSSModule::kRTSPRouteRole, fCurrentModule);
(void)theModule->CallDispatch(QTSS_RTSPRoute_Role, &fRoleParams);
fModuleState.isGlobalLocked = false;
if (fModuleState.globalLockRequested) // call this request back locked
return this->CallLocked();
// If this module has requested an event, return and wait for the event to transpire
if (fModuleState.eventRequested)
{
this->ForceSameThread(); // We are holding mutexes, so we need to force
// the same thread to be used for next Run()
return fModuleState.idleTime; // If the module has requested idle time...
}
}
}
fCurrentModule = 0;
// SetupAuthLocalPath must happen after kRoutingRequest and before kAuthenticatingRequest
// placed here so that if the state is shifted to kPostProcessingRequest from a response being sent
// then the AuthLocalPath will still be set.
fRequest->SetupAuthLocalPath();
if (fRequest->HasResponseBeenSent())
{
fState = kPostProcessingRequest;
break;
}
if(fRequest->SkipAuthorization())
{
// Skip the authentication and authorization states
// The foll. normally gets executed at the end of the authorization state
// Prepare for kPreprocessingRequest state.
fState = kPreprocessingRequest;
if (fRequest->GetMethod() == qtssSetupMethod)
// Make sure to erase the session ID stored in the request at this point.
// If we fail to do so, this same session would be used if another
// SETUP was issued on this same TCP connection.
fLastRTPSessionIDPtr.Len = 0;
else if (fLastRTPSessionIDPtr.Len == 0)
fLastRTPSessionIDPtr.Len = ::strlen(fLastRTPSessionIDPtr.Ptr);
break;
}
else
fState = kAuthenticatingRequest;
}
case kAuthenticatingRequest:
{
Bool16 allowedDefault = QTSServerInterface::GetServer()->GetPrefs()->GetAllowGuestDefault();
Bool16 allowed = allowedDefault; //server pref?
Bool16 hasUser = false;
Bool16 handled = false;
Bool16 wasHandled = false;
StrPtrLenDel prefRealm(QTSServerInterface::GetServer()->GetPrefs()->GetAuthorizationRealm());
if (prefRealm.Ptr != NULL)
{
fRequest->SetValue(qtssRTSPReqURLRealm,0, prefRealm.Ptr, prefRealm.Len, kDontObeyReadOnly);
}
QTSS_RTSPMethod method = fRequest->GetMethod();
if (method != qtssIllegalMethod) do
{ //Set the request action before calling the authentication module
if((method == qtssAnnounceMethod) || ((method == qtssSetupMethod) && fRequest->IsPushRequest()))
{ fRequest->SetAction(qtssActionFlagsWrite);
break;
}
void* theSession = NULL;
UInt32 theLen = sizeof(theSession);
if (QTSS_NoErr == fRTPSession->GetValue(sClientBroadcastSessionAttr, 0, &theSession, &theLen) )
{ fRequest->SetAction(qtssActionFlagsWrite); // an incoming broadcast session
break;
}
fRequest->SetAction(qtssActionFlagsRead);
} while (false);
else
{ Assert(0);
}
if(fRequest->GetAuthScheme() == qtssAuthNone)
{
QTSS_AuthScheme scheme = QTSServerInterface::GetServer()->GetPrefs()->GetAuthScheme();
if( scheme == qtssAuthBasic)
fRequest->SetAuthScheme(qtssAuthBasic);
else if( scheme == qtssAuthDigest)
fRequest->SetAuthScheme(qtssAuthDigest);
if( scheme == qtssAuthDigest)
debug_printf("RTSPSession.cpp:kAuthenticatingRequest scheme == qtssAuthDigest\n");
}
// Setup the authentication param block
QTSS_RoleParams theAuthenticationParams;
theAuthenticationParams.rtspAthnParams.inRTSPRequest = fRequest;
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
fRequest->SetAllowed(allowed);
fRequest->SetHasUser(hasUser);
fRequest->SetAuthHandled(handled);
StrPtrLen* lastUsedDigestChallengePtr = this->GetValue(qtssRTSPSesLastDigestChallenge);
if (lastUsedDigestChallengePtr != NULL)
(void) fRequest->SetValue(qtssRTSPReqDigestChallenge,(UInt32) 0,(void *) lastUsedDigestChallengePtr->Ptr,lastUsedDigestChallengePtr->Len, QTSSDictionary::kDontObeyReadOnly);
numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kRTSPAthnRole);
for (fCurrentModule = 0; (fCurrentModule < numModules) && ((!fRequest->HasResponseBeenSent()) || fModuleState.eventRequested); fCurrentModule++)
{
fRequest->SetAllowed(allowedDefault);
fRequest->SetHasUser(false);
fRequest->SetAuthHandled(false);
debug_printf("RTSPSession.cpp:kAuthenticatingRequest fCurrentModule = %lu numModules=%lu\n", fCurrentModule,numModules);
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
if (fModuleState.globalLockRequested )
{ fModuleState.globalLockRequested = false;
fModuleState.isGlobalLocked = true;
}
theModule = QTSServerInterface::GetModule(QTSSModule::kRTSPAthnRole, fCurrentModule);
if (NULL == theModule)
continue;
if (__RTSP_AUTH_DEBUG__)
{ theModule->GetValue(qtssModName)->PrintStr("QTSSModule::CallDispatch ENTER module=", "\n");
}
(void)theModule->CallDispatch(QTSS_RTSPAuthenticate_Role, &theAuthenticationParams);
fModuleState.isGlobalLocked = false;
if (fModuleState.globalLockRequested) // call this request back locked
return this->CallLocked();
// If this module has requested an event, return and wait for the event to transpire
if (fModuleState.eventRequested)
{
this->ForceSameThread(); // We are holding mutexes, so we need to force
// the same thread to be used for next Run()
return fModuleState.idleTime; // If the module has requested idle time...
}
allowed = fRequest->GetAllowed();
hasUser = fRequest->GetHasUser();
handled = fRequest->GetAuthHandled();
debug_printf("RTSPSession::Run Role(kAuthenticatingRequest) allowedDefault =%d allowed= %d hasUser = %d handled=%d \n",allowedDefault, allowed,hasUser, handled);
if (handled)
wasHandled = handled;
if (hasUser || handled )
{
debug_printf("RTSPSession.cpp::Run(kAuthenticatingRequest) skipping other modules fCurrentModule = %lu numModules=%lu\n", fCurrentModule,numModules);
break;
}
}
if (!wasHandled) //don't check and possibly fail the user if it the user has already been checked.
this->CheckAuthentication();
fCurrentModule = 0;
if (fRequest->HasResponseBeenSent())
{
fState = kPostProcessingRequest;
break;
}
fState = kAuthorizingRequest;
}
case kAuthorizingRequest:
{
// Invoke authorization modules
numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kRTSPAuthRole);
Bool16 allowedDefault = QTSServerInterface::GetServer()->GetPrefs()->GetAllowGuestDefault();
Bool16 allowed = true;
Bool16 hasUser = false;
Bool16 handled = false;
QTSS_Error theErr = QTSS_NoErr;
// Invoke authorization modules
// Manipulation of the RTPSession from the point of view of
// a module is guaranteed to be atomic by the API.
Assert(fRTPSession != NULL);
OSMutexLocker locker(fRTPSession->GetSessionMutex());
fRequest->SetAllowed(allowed);
fRequest->SetHasUser(hasUser);
fRequest->SetAuthHandled(handled);
for (; (fCurrentModule < numModules) && ((!fRequest->HasResponseBeenSent()) || fModuleState.eventRequested); fCurrentModule++)
{
fRequest->SetHasUser(false);
fRequest->SetAuthHandled(false);
debug_printf("RTSPSession.cpp:kAuthorizingRequest BEFORE DISPATCH fCurrentModule = %lu numModules=%lu\n", fCurrentModule,numModules);
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
if (fModuleState.globalLockRequested )
{ fModuleState.globalLockRequested = false;
fModuleState.isGlobalLocked = true;
}
theModule = QTSServerInterface::GetModule(QTSSModule::kRTSPAuthRole, fCurrentModule);
if (NULL == theModule)
continue;
if (__RTSP_AUTH_DEBUG__)
{ theModule->GetValue(qtssModName)->PrintStr("QTSSModule::CallDispatch ENTER module=", "\n");
}
(void)theModule->CallDispatch(QTSS_RTSPAuthorize_Role, &fRoleParams);
fModuleState.isGlobalLocked = false;
if (fModuleState.globalLockRequested) // call this request back locked
return this->CallLocked();
// If this module has requested an event, return and wait for the event to transpire
if (fModuleState.eventRequested)
{
this->ForceSameThread(); // We are holding mutexes, so we need to force
// the same thread to be used for next Run()
return fModuleState.idleTime; // If the module has requested idle time...
}
// allowed != default means a module has set the result
// handled means a module wants to be the primary for this request
// -- example qtaccess says only allow valid user and allowed default is false. So module says handled, hasUser is false, allowed is false
//
allowed = fRequest->GetAllowed();
hasUser = fRequest->GetHasUser();
handled = fRequest->GetAuthHandled();
debug_printf("RTSPSession::Run Role(kAuthorizingRequest) allowedDefault =%d allowed= %d hasUser = %d handled=%d \n",allowedDefault, allowed,hasUser, handled);
if (!allowed && !handled) //old module break on !allowed
{
debug_printf("RTSPSession.cpp::Run(kAuthorizingRequest) skipping other modules fCurrentModule = %lu numModules=%lu\n", fCurrentModule,numModules);
break;
}
if (!allowed && hasUser && handled) //new module break on !allowed
{
debug_printf("RTSPSession.cpp::Run(kAuthorizingRequest) skipping other modules fCurrentModule = %lu numModules=%lu\n", fCurrentModule,numModules);
break;
}
}
this->SaveRequestAuthorizationParams(fRequest);
if (!allowed)
{
if (false == fRequest->HasResponseBeenSent())
{
QTSS_AuthScheme challengeScheme = fRequest->GetAuthScheme();
if( challengeScheme == qtssAuthDigest)
{ debug_printf("RTSPSession.cpp:kAuthorizingRequest scheme == qtssAuthDigest)\n");
}
else if( challengeScheme == qtssAuthBasic)
{ debug_printf("RTSPSession.cpp:kAuthorizingRequest scheme == qtssAuthBasic)\n");
}
if(challengeScheme == qtssAuthBasic) {
fRTPSession->SetAuthScheme(qtssAuthBasic);
theErr = fRequest->SendBasicChallenge();
}
else if(challengeScheme == qtssAuthDigest) {
fRTPSession->UpdateDigestAuthChallengeParams(false, false, RTSPSessionInterface::kNoQop);
theErr = fRequest->SendDigestChallenge(fRTPSession->GetAuthQop(), fRTPSession->GetAuthNonce(), fRTPSession->GetAuthOpaque());
}
else {
// No authentication scheme is given and the request was not allowed,
// so send a 403: Forbidden message
theErr = fRequest->SendForbiddenResponse();
}
if (QTSS_NoErr != theErr) // We had an error so bail on the request quit the session and post process the request.
{
fRequest->SetResponseKeepAlive(false);
fCurrentModule = 0;
fState = kPostProcessingRequest;
break;
}
}
}
fCurrentModule = 0;
if (fRequest->HasResponseBeenSent())
{
fState = kPostProcessingRequest;
break;
}
// Prepare for kPreprocessingRequest state.
fState = kPreprocessingRequest;
if (fRequest->GetMethod() == qtssSetupMethod)
// Make sure to erase the session ID stored in the request at this point.
// If we fail to do so, this same session would be used if another
// SETUP was issued on this same TCP connection.
fLastRTPSessionIDPtr.Len = 0;
else if (fLastRTPSessionIDPtr.Len == 0)
fLastRTPSessionIDPtr.Len = ::strlen(fLastRTPSessionIDPtr.Ptr);
}
case kPreprocessingRequest:
{
// Invoke preprocessor modules
numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kRTSPPreProcessorRole);
{
// Manipulation of the RTPSession from the point of view of
// a module is guarenteed to be atomic by the API.
Assert(fRTPSession != NULL);
OSMutexLocker locker(fRTPSession->GetSessionMutex());
for (; (fCurrentModule < numModules) && ((!fRequest->HasResponseBeenSent()) || fModuleState.eventRequested); fCurrentModule++)
{
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
if (fModuleState.globalLockRequested )
{ fModuleState.globalLockRequested = false;
fModuleState.isGlobalLocked = true;
}
theModule = QTSServerInterface::GetModule(QTSSModule::kRTSPPreProcessorRole, fCurrentModule);
(void)theModule->CallDispatch(QTSS_RTSPPreProcessor_Role, &fRoleParams);
fModuleState.isGlobalLocked = false;
// The way the API is set up currently, the first module that adds a stream
// to the session is responsible for sending RTP packets for the session.
if (fRTPSession->HasAnRTPStream() && (fRTPSession->GetPacketSendingModule() == NULL))
fRTPSession->SetPacketSendingModule(theModule);
if (fModuleState.globalLockRequested) // call this request back locked
return this->CallLocked();
// If this module has requested an event, return and wait for the event to transpire
if (fModuleState.eventRequested)
{
this->ForceSameThread(); // We are holding mutexes, so we need to force
// the same thread to be used for next Run()
return fModuleState.idleTime; // If the module has requested idle time...
}
}
}
fCurrentModule = 0;
if (fRequest->HasResponseBeenSent())
{
fState = kPostProcessingRequest;
break;
}
fState = kProcessingRequest;
}
case kProcessingRequest:
{
// If no preprocessor sends a response, move onto the request processing module. It
// is ALWAYS supposed to send a response, but if it doesn't, we have a canned error
// to send back.
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
if (QTSServerInterface::GetNumModulesInRole(QTSSModule::kRTSPRequestRole) > 0)
{
// Manipulation of the RTPSession from the point of view of
// a module is guarenteed to be atomic by the API.
Assert(fRTPSession != NULL);
OSMutexLocker locker(fRTPSession->GetSessionMutex());
if (fModuleState.globalLockRequested )
{ fModuleState.globalLockRequested = false;
fModuleState.isGlobalLocked = true;
}
theModule = QTSServerInterface::GetModule(QTSSModule::kRTSPRequestRole, 0);
(void)theModule->CallDispatch(QTSS_RTSPRequest_Role, &fRoleParams);
fModuleState.isGlobalLocked = false;
// Do the same check as above for the preprocessor
if (fRTPSession->HasAnRTPStream() && fRTPSession->GetPacketSendingModule() == NULL)
fRTPSession->SetPacketSendingModule(theModule);
this->Process3GPPData();
if (fModuleState.globalLockRequested) // call this request back locked
return this->CallLocked();
// If this module has requested an event, return and wait for the event to transpire
if (fModuleState.eventRequested)
{
this->ForceSameThread(); // We are holding mutexes, so we need to force
// the same thread to be used for next Run()
return fModuleState.idleTime; // If the module has requested idle time...
}
}
if (!fRequest->HasResponseBeenSent())
{
// no modules took this one so send back a parameter error
if (fRequest->GetMethod() == qtssSetParameterMethod) // keep session
{
QTSS_RTSPStatusCode statusCode = qtssSuccessOK; //qtssClientParameterNotUnderstood;
fRequest->SetValue(qtssRTSPReqStatusCode, 0, &statusCode, sizeof(statusCode));
fRequest->SendHeader();
}
else
{
QTSSModuleUtils::SendErrorResponse(fRequest, qtssServerInternal, qtssMsgNoModuleForRequest);
}
}
fState = kPostProcessingRequest;
}
case kPostProcessingRequest:
{
// Post process the request *before* sending the response. Therefore, we
// will post process regardless of whether the client actually gets our response
// or not.
//if this is not a keepalive request, we should kill the session NOW
fLiveSession = fRequest->GetResponseKeepAlive();
if (fRTPSession != NULL)
{
// Invoke postprocessor modules only if there is an RTP session. We do NOT want
// postprocessors running when filters or syntax errors have occurred in the request!
numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kRTSPPostProcessorRole);
{
// Manipulation of the RTPSession from the point of view of
// a module is guarenteed to be atomic by the API.
OSMutexLocker locker(fRTPSession->GetSessionMutex());
// Make sure the RTPSession contains a copy of the realStatusCode in this request
UInt32 realStatusCode = RTSPProtocol::GetStatusCode(fRequest->GetStatus());
(void) fRTPSession->SetValue(qtssCliRTSPReqRealStatusCode,(UInt32) 0,(void *) &realStatusCode, sizeof(realStatusCode), QTSSDictionary::kDontObeyReadOnly);
// Make sure the RTPSession contains a copy of the qtssRTSPReqRespMsg in this request
StrPtrLen* theRespMsg = fRequest->GetValue(qtssRTSPReqRespMsg);
if (theRespMsg->Len > 0)
(void)fRTPSession->SetValue(qtssCliRTSPReqRespMsg, 0, theRespMsg->Ptr, theRespMsg->Len, QTSSDictionary::kDontObeyReadOnly);
// Set the current RTSP session for this RTP session.
// We do this here because we need to make sure the SessionMutex
// is grabbed while we do this. Only do this if the RTSP session
// is still alive, of course.
if (this->IsLiveSession())
fRTPSession->UpdateRTSPSession(this);
for (; (fCurrentModule < numModules) || (fModuleState.eventRequested) ; fCurrentModule++)
{
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
if (fModuleState.globalLockRequested )
{ fModuleState.globalLockRequested = false;
fModuleState.isGlobalLocked = true;
}
theModule = QTSServerInterface::GetModule(QTSSModule::kRTSPPostProcessorRole, fCurrentModule);
(void)theModule->CallDispatch(QTSS_RTSPPostProcessor_Role, &fRoleParams);
fModuleState.isGlobalLocked = false;
if (fModuleState.globalLockRequested) // call this request back locked
return this->CallLocked();
// If this module has requested an event, return and wait for the event to transpire
if (fModuleState.eventRequested)
{
this->ForceSameThread(); // We are holding mutexes, so we need to force
// the same thread to be used for next Run()
return fModuleState.idleTime; // If the module has requested idle time...
}
}
}
}
fCurrentModule = 0;
fState = kSendingResponse;
}
case kSendingResponse:
{
// Sending the RTSP response consists of making sure the
// RTSP request output buffer is completely flushed to the socket.
Assert(fRequest != NULL);
// If x-dynamic-rate header is sent with a value of 1, send OPTIONS request
if ((fRequest->GetMethod() == qtssSetupMethod) && (fRequest->GetStatus() == qtssSuccessOK)
&& (fRequest->GetDynamicRateState() == 1) && fRoundTripTimeCalculation)
{
this->SaveOutputStream();
this->ResetOutputStream();
this->SendOptionsRequest();
}
if (fSentOptionsRequest && (fRequest->GetMethod() == qtssIllegalMethod))
{
this->ResetOutputStream();
this->RevertOutputStream();
fSentOptionsRequest = false;
}
err = fOutputStream.Flush();
if (err == EAGAIN)
{
// If we get this error, we are currently flow-controlled and should
// wait for the socket to become writeable again
fSocket.RequestEvent(EV_WR);
this->ForceSameThread(); // We are holding mutexes, so we need to force
// the same thread to be used for next Run()
return 0;
}
else if (err != QTSS_NoErr)
{
// Any other error means that the client has disconnected, right?
Assert(!this->IsLiveSession());
break;
}
fState = kCleaningUp;
}
case kCleaningUp:
{
// Cleaning up consists of making sure we've read all the incoming Request Body
// data off of the socket
if (this->GetRemainingReqBodyLen() > 0)
{
err = this->DumpRequestData();
if (err == EAGAIN)
{
fInputSocketP->RequestEvent(EV_RE);
this->ForceSameThread(); // We are holding mutexes, so we need to force
// the same thread to be used for next Run()
return 0;
}
}
// If we've gotten here, we've flushed all the data. Cleanup,
// and wait for our next request!
this->CleanupRequest();
fState = kReadingRequest;
}
}
}
// Make absolutely sure there are no resources being occupied by the session
// at this point.
this->CleanupRequest();
// Only delete if it is ok to delete!
if (fObjectHolders == 0)
return -1;
// If we are here because of a timeout, but we can't delete because someone
// is holding onto a reference to this session, just reschedule the timeout.
//
// At this point, however, the session is DEAD.
return 0;
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)