博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
经过两个多月的攻关,终于搞定了live555多线程并稳定压测通过
阅读量:4587 次
发布时间:2019-06-09

本文共 6289 字,大约阅读时间需要 20 分钟。

live555已经发展了十几年了,不得不钦佩作者坚持不懈的奉献和国外的开源生态环境,live555可以说是大部分的安防从业者的入门之选,尤其是在嵌入式或者Linux系统上,其应用还是蛮广泛的,主要是其兼容性和稳定性;

但是随着live555十几年的不断迭代,很多开发者反复向作者Ross提到的多线程和IPv6的功能,作者也一直都没有去尝试,可能是这样会对live555的架构产生比较大的改动和影响,作者为了稳妥,选择了小改动、稳定、逐步迭代的方式, 虽然是性能稳定,但支持的路数有限,不能多线程工作始终是个坎; 网上找到几篇live555多线程的博客, 基本上大同小异,就是创建独立的UsageEnvironment和TaskSchedule, 由独立的线程分工协作; 本人也是这个思路,创建多个工作线程,每个工作线程内创建UsageEnvironment和TaskSchedule,然后各自开启EventLoop;

今天我们抛砖引玉,先大概聊一下主体思路,在后续的博客中将尽力完整地汇总这些思路和开发的过程:

目标

将live555修改为多线程, 每个通道对应一个工作线程,由工作线程对该通道进行独立处理;

大体修改点

修改支持多线程, 主要涉及到以下类的修改

  • GenericMediaServer
  • RTSPServer

GenericMediaServer.cpp

在GenericMediaServer的构造函数中, 创建工作线程个数,即最大支持的通道数;

GenericMediaServer::GenericMediaServer(UsageEnvironment& env, int ourSocketV4, int ourSocketV6, Port ourPort,             unsigned reclamationSeconds)  : Medium(env),    fServerSocket4(ourSocketV4), fServerSocket6(ourSocketV6),     fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),    fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),    fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),    fClientSessions(HashTable::create(STRING_HASH_KEYS)) {  ignoreSigPipeOnSocket(fServerSocket4); // so that clients on the same host that are killed don't also kill us  ignoreSigPipeOnSocket(fServerSocket6); // so that clients on the same host that are killed don't also kill us#ifdef LIVE_MULTI_THREAD_ENABLE  InitMutex(&mutexClientConnection);  memset(&multiThreadCore, 0x00, sizeof(MultiThread_CORE_T));  multiThreadCore.threadNum = MAX_DEFAULT_MULTI_THREAD_NUM;  multiThreadCore.threadTask = new LIVE_THREAD_TASK_T[multiThreadCore.threadNum];  memset(&multiThreadCore.threadTask[0], 0x00, sizeof(LIVE_THREAD_TASK_T) * multiThreadCore.threadNum);  for (int i=0; i

接受客户端连接

按原有流程接受客户端连接;

分配客户端请求

在收到客户端发送的DESCRIBE命令后,

在通道列表中找出空闲的通道,将该客户端关联到该通道, 然后从主线程中移除该socket, 由工作线程接管该socket的操作;
后续有客户端如访问已经存在的通道,则主线程会将该请求直接分配给对应的工作线程处理;

注意: 主线程的工作到此结束,不要执行lookupServerMediaSession的操作;

在工作线程中, 接管客户端的socket后, 马上执行lookupServerMediaSession, 在该函数中,将后缀回调给上层调用程序, 由上层调用程序判断是否存在该通道,如不存在则返回失败,如存在则向前端取流,然后填充媒体信息返回成功, 库内部则创建相应的mediasession, 再回应客户端, 后续的则完成整个rtsp流程的交互;

注意: 创建MediaSession时,必须将工作线程中的UsageEnvironment传进去, 不能使用主线程中的envir();

int RTSPServer::RTSPClientConnection::handleCmd_DESCRIBE(UsageEnvironment *pEnv, char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr, LIVE_THREAD_TASK_T **pThreadTask) {    int handleCmdRet = 0;  ServerMediaSession* session = NULL;  char* sdpDescription = NULL;  char* rtspURL = NULL;  do {    char urlTotalSuffix[2*RTSP_PARAM_STRING_MAX];        // enough space for urlPreSuffix/urlSuffix'\0'    urlTotalSuffix[0] = '\0';    if (urlPreSuffix[0] != '\0') {      strcat(urlTotalSuffix, urlPreSuffix);      strcat(urlTotalSuffix, "/");    }    strcat(urlTotalSuffix, urlSuffix);    if (!authenticationOK("DESCRIBE", urlTotalSuffix, fullRequestStr)) break;    // We should really check that the request contains an "Accept:" #####    // for "application/sdp", because that's what we're sending back #####    _TRACE(TRACE_LOG_DEBUG, "handleCmd_DESCRIBE  socket[%d]\n", this->fOurSocket);    #ifdef LIVE_MULTI_THREAD_ENABLE    //如果当前是主线程,则进入到查找通道流程    if (pEnv->GetEnvirId() == 1000)    {        fOurServer.LockClientConnection();  //Lock        UsageEnvironment  *pChEnv = fOurServer.GetEnvBySuffix(urlSuffix, this, pThreadTask);        if (NULL == pChEnv)        {            fOurServer.UnlockClientConnection();        //Unlock            handleCmdRet = -1;            this->assignSink = False;            this->pEnv = NULL;            handleCmd_notFound();            break;        }        else        {            _TRACE(TRACE_LOG_DEBUG, "将socket[%d] 关联到[%s]\n", this->fOurSocket, pChEnv->GetEnvirName());            //将socket从主线程移到工作线程中            UsageEnvironment  *pMainEnv = &envir();            envir().taskScheduler().disableBackgroundHandling(fOurSocket);            fOurServer.UnlockClientConnection();        //Unlock            return 1000;        }        break;    }    #endif    // Begin by looking up the "ServerMediaSession" object for the specified "urlTotalSuffix":    //在工作线程中执行 lookupServerMediaSession    session = fOurServer.lookupServerMediaSession(pEnv, 1, this, urlTotalSuffix);    if (session == NULL) {        //pChEnv->taskScheduler().disableBackgroundHandling(fOurSocket);        _TRACE(TRACE_LOG_DEBUG, "socket[%d] 在[%s]中, 源未就绪[%s]\n", this->fOurSocket, pEnv->GetEnvirName(), urlTotalSuffix);        this->assignSink = False;        this->pEnv = NULL;        handleCmdRet = -1;        //envir().taskScheduler().disableBackgroundHandling(fOurSocket);        //fOurServer.ResetEnvBySuffix(urlSuffix, this);        handleCmd_notFound();        break;    }    session->incrementReferenceCount();    // Then, assemble a SDP description for this session:    sdpDescription = session->generateSDPDescription(fOurIPVer);    if (sdpDescription == NULL) {      // This usually means that a file name that was specified for a      // "ServerMediaSubsession" does not exist.      setRTSPResponse("404 File Not Found, Or In Incorrect Format");      break;    }    unsigned sdpDescriptionSize = strlen(sdpDescription);    // Also, generate our RTSP URL, for the "Content-Base:" header    // (which is necessary to ensure that the correct URL gets used in subsequent "SETUP" requests).    rtspURL = fOurRTSPServer.rtspURL(session, fOurIPVer, fClientInputSocket);    snprintf((char*)fResponseBuffer, sizeof fResponseBuffer,         "RTSP/1.0 200 OK\r\nCSeq: %s\r\n"         "%s"         "Content-Base: %s/\r\n"         "Content-Type: application/sdp\r\n"         "Content-Length: %d\r\n\r\n"         "%s",         fCurrentCSeq,         dateHeader(),         rtspURL,         sdpDescriptionSize,         sdpDescription);  } while (0);  if (session != NULL) {    // Decrement its reference count, now that we're done using it:    session->decrementReferenceCount();    if (session->referenceCount() == 0 && session->deleteWhenUnreferenced()) {      fOurServer.removeServerMediaSession(pEnv, session, True);    }    session->SetStreamStatus(1);        //置标志,让后续访问该通道的客户端可以得到迅速响应  }  delete[] sdpDescription;  delete[] rtspURL;  return handleCmdRet;}

历经2个多月,终于将多线程问题搞定. 在此记录一下, 欢迎探讨;

live555技术交流

邮件:289042893@qq.com

live555技术交流群:475947825

转载于:https://www.cnblogs.com/babosa/p/8993582.html

你可能感兴趣的文章
javascript变量类型转换(简单记几个)
查看>>
python过滤 Kubernetes api数据
查看>>
量子测量
查看>>
教你配置安全的ProFTPD服务器(上)
查看>>
bzoj 1226: [SDOI2009]学校食堂Dining
查看>>
spm3构建多入口项目
查看>>
ios开发随笔第二天 简单动画的实现
查看>>
Python基础——5模块
查看>>
单链表是否相交
查看>>
iPhone X Web 设计
查看>>
文件描述符和文件指针的相互转换
查看>>
Parallels Destop软件配置
查看>>
HDU 5699 货物运输 二分
查看>>
Country Road
查看>>
Java:基本语法
查看>>
HTTP头的Expires与Cache-control
查看>>
(8)盒子的定位
查看>>
ReactJS学习系列课程(React ref的使用)
查看>>
ASIHTTPRequest 详解 例子
查看>>
小L 的二叉树(洛谷 U4727)
查看>>