Logo Search packages:      
Sourcecode: bayonne version File versions  Download package

thread.cpp

// Copyright (C) 2002 David Kerry.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
//
// As a special exception to the GNU General Public License, permission is
// granted for additional uses of the text contained in its release
// of Bayonne as noted here.
//
// This exception is that permission is hereby granted to link Bayonne
// with the Aculab telephony libraries to produce a executable image
// without requiring Aculab's libraries to be supplied in a free software
// license as long as each source file so linked contains this exclusion
// and the unaltered Aculab source files are made available.
//
// This exception does not however invalidate any other reasons why
// the resulting executable file might be covered by the GNU General
// public license or invalidate the licensing requirements of any
// other component or library.
//
// This exception applies only to the code released by OST under the
// name Bayonne.  If you copy code from other releases into a copy of
// Bayonne, as the General Public License permits, the exception does not
// apply to the code that you add in this way.  To avoid misleading
// anyone as to the status of such modified files, you must delete
// this exception notice from them.
//
// If you write modifications of your own to Bayonne, it is your choice
// whether to permit this exception to apply to your modifications.
// If you do not wish that, delete this exception notice, at which
// point the terms of your modification would be covered under the GPL
// as explicitly stated in "COPYING".


// $Id: thread.cpp,v 1.6 2002/12/11 04:38:16 dyfet Exp $
//
// AculabQueueThead:  This worker thread to waits for call level events
//           to be queued up by the main driver thread and then
//           feeds the requests through to the individual trunk
//           objects as required.  This prevents the need for
//           having one thread per trunk, which rapidly gets out
//           of hand with PRI/ISDN multi-port board (ie: 400+
//           threads).  The number of worker threads is configurable
//           and should be tuned for the system at hand.
//
// AculabDSPEventThread: There is one of these threads per system.  It
//           listens for events from the DSP channels and passes it
//           into the main event queue (the one the AculabQueueThread)
//           worker pulls from.  This thread is necessary because the
//           Aculab driver has two levels of event queuing - one for
//           the call and switching layer (call connected/disconnected/
//           etc) and one for the DSP channel layer (dtmf detected/write/
//           read/etc).  Unfortunately, Aculab provided no single
//           entry-point to catch all of these events, thus this
//           multiple thread approach.
//
// AculabAudioThread: There is one instance of this thread per system.  It
//           is responsible for for feeding audio to the Prosody DSP
//           to play and pulling audio during recording for all channels
//           on all ports.
//
// AculabMonitorThread: There is one instance of this thread per system.  It
//           is responsible for monitoring the link-state and layer-1
//           statistics for each port in the system.
//
#include "driver.h"

#ifdef    CCXX_NAMESPACES
using namespace std;
namespace ost {
#endif

int AculabQueueThread::count = 0;

AculabQueueThread::AculabQueueThread(AculabFifo *fifo) :
Mutex(), Thread(keythreads.priService())
{
      id = ++count;
      active = false;
      queue=fifo;
}

AculabQueueThread::~AculabQueueThread()
{
      if(!active)
            return;

      slog(Slog::levelDebug) << "worker thread " << id << " exiting"<<endl;

      active = false;
      terminate();
}

void AculabQueueThread::run(void)
{
      AculabTrunkEvent *aevent;

      slog(Slog::levelInfo) << "worker thread " << id << " running..." << endl;

      active = true;

      while(active) {
            aevent=queue->pullBlock();
            if (aevent == NULL) {
                  continue;
            }
                setCancel(cancelDisabled);
            aevent->trunk->postEvent(& aevent->event);
            delete aevent;
            setCancel(cancelImmediate);
//          slog(levelDebug) << "worker "<< id << " got event"<<endl;
      }
      Thread::sleep(50);
}



AculabDSPEventThread::AculabDSPEventThread(AculabFifo *fifo, AculabTrunk **ix) :
Mutex(), Thread(keythreads.priService())
{
      active = false;
      queue=fifo;
      ixmaps=ix;

      /* Create the any channel recognition event. */
      smd_ev_create(&recogEventParms.event,
                  kSMNullChannelId,
                  kSMEventTypeRecog,
                  kSMAnyChannelEvent);

        /*
         * Register this event with the driver so it knows which event
         * to notify to the application when recognition occurs on a
       * channel nominated to notify the any channel recognition
       * event.
         */
      recogEventParms.channel = kSMNullChannelId;
      recogEventParms.event_type = kSMEventTypeRecog;
      recogEventParms.issue_events = kSMAnyChannelEvent;
      sm_channel_set_event(&recogEventParms);
}

AculabDSPEventThread::~AculabDSPEventThread()
{
      if(!active)
            return;

      slog(Slog::levelDebug) << "DSP event thread exiting"<<endl;

      active = false;

      recogEventParms.issue_events = kSMChannelNoEvent;
      sm_channel_set_event(&recogEventParms);

      smd_ev_free(recogEventParms.event);

      terminate();

}

void AculabDSPEventThread::run(void)
{
      SM_RECOGNISED_IX_PARMS  ixParms;
      AculabTrunkEvent *aevent;
        AculabTrunk *trunk;
      struct pollfd pollfd[1];
      int rc;

      slog(Slog::levelInfo) << "DSP event thread running..." << endl;

      active = true;

      while(active) {

            pollfd[0].fd      = recogEventParms.event.fd;
            pollfd[0].events  = recogEventParms.event.mode;
            pollfd[0].revents = 0;

            rc=poll(&pollfd[0],
                  ((unsigned long) (sizeof(pollfd)/sizeof(struct pollfd))),
                  500);

            if (rc == 0) {
                  continue; /* Timeout */
            }
            else if (rc == -1) {
                  slog(Slog::levelError) << "dsp event thread poll error: "<< strerror(errno) <<endl;
                  /* What to do here??? */
                  Thread::sleep(500);
                  continue;
            }

            if ((pollfd[0].revents & recogEventParms.event.mode) == 0) {
                  continue;
            }

            /* Figure out what channel triggered the event */
            while(1) {
                  ixParms.channel_ix = -1;
                  rc = sm_get_recognised_ix(&ixParms);
                  if ((rc == 0) && (ixParms.channel_ix != -1)) {
                        trunk=ixmaps[ixParms.channel_ix];
                        if (ixParms.type == kSMRecognisedDigit) {
                              aevent=new AculabTrunkEvent();
                              aevent->trunk=trunk;
                              aevent->event.id=TRUNK_DTMF_KEYUP;
                              aevent->event.parm.dtmf.digit =
                                    trunk->getDigit(ixParms.param0);
                              aevent->event.parm.dtmf.duration = 40;
                              queue->push(aevent);
slog(Slog::levelDebug)<<"dsp event thread - got dtmf event ('"<<ixParms.param0<<"'/"<<trunk->getDigit(ixParms.param0)<<")"<<endl;
                        }
                  }
                  else {
                        break;
                  }
            }
      }
//    setCancel(cancelImmediate);
      Thread::sleep(50);
}




AculabAudioThread::AculabAudioThread(AculabFifo *fifo, AculabTrunk **ix) :
Mutex(), Thread(keythreads.priService())
{
      active = false;
      queue=fifo;
      ixmaps=ix;

      /* Create the any channel write event. */
      smd_ev_create(&writeEventParms.event,
                  kSMNullChannelId,
                  kSMEventTypeWriteData,
                  kSMAnyChannelEvent);

        /*
         * Register this event with the driver so it knows which event
         * to notify to the application when notifiable write activity occurs
         * on a channel nominated to notify the any channel write event.
         */
      writeEventParms.channel = kSMNullChannelId;
      writeEventParms.event_type = kSMEventTypeWriteData;
      writeEventParms.issue_events = kSMAnyChannelEvent;
      sm_channel_set_event(&writeEventParms);


      /* Create the any channel read event. */
      smd_ev_create(&readEventParms.event,
                  kSMNullChannelId,
                  kSMEventTypeReadData,
                  kSMAnyChannelEvent);

        /*
         * Register this event with the driver so it knows which event
         * to notify to the application when notifiable write activity occurs
         * on a channel nominated to notify the any channel write event.
         */
      readEventParms.channel = kSMNullChannelId;
      readEventParms.event_type = kSMEventTypeReadData;
      readEventParms.issue_events = kSMAnyChannelEvent;
      sm_channel_set_event(&readEventParms);
}

AculabAudioThread::~AculabAudioThread()
{
      if(!active)
            return;

      slog(Slog::levelDebug) << "audio thread exiting"<<endl;

      active = false;

      writeEventParms.issue_events = kSMChannelNoEvent;
      sm_channel_set_event(&writeEventParms);
      readEventParms.issue_events = kSMChannelNoEvent;
      sm_channel_set_event(&readEventParms);

      smd_ev_free(writeEventParms.event);
      smd_ev_free(readEventParms.event);

      terminate();
}

void AculabAudioThread::run(void)
{
//    SM_RECOGNISED_IX_PARMS  ixParms;
      SM_BESP_STATUS_IX_PARMS statusParms;
      AculabTrunkEvent *aevent;
        AculabTrunk *trunk;
      struct pollfd pollfd[2];
      int rc;
      bool doRead;
      bool doWrite;

      slog(Slog::levelInfo) << "audio event thread running..." << endl;

      active = true;

      while(active) {

            doRead=false;
            doWrite=false;

            pollfd[0].fd      = writeEventParms.event.fd;
            pollfd[0].events  = writeEventParms.event.mode;
            pollfd[0].revents = 0;
            pollfd[1].fd      = readEventParms.event.fd;
            pollfd[1].events  = readEventParms.event.mode;
            pollfd[1].revents = 0;

            /* Wait for something to do */
            rc=poll(&pollfd[0],
                  ((unsigned long) (sizeof(pollfd)/sizeof(struct pollfd))),
                  500);

            if (rc == 0) {
                  continue; /* Timeout */
            }
            else if (rc == -1) {
                  slog(Slog::levelError) << "audio event thread poll error: "<<errno<<endl;
                  /* What to do here??? */
                  Thread::sleep(500);
                  continue;
            }

            /* More data needs to be written to a channel */
            if ((pollfd[0].revents & writeEventParms.event.mode) != 0) {
                  doWrite=true;
            }

            /* More data needs to be read from a channel */
            if ((pollfd[1].revents & readEventParms.event.mode) != 0) {
                  doRead=true;
            }
            else {
                  if (!doWrite) {
                        continue;
                  }
            }

            while(doRead || doWrite) {
                  /*
                   * Instead of asking for what channel triggered the
                   * read/write event, the aculab api can instead tell
                   * us which channel will best benefit from attention.
                   * This should be a better approach to handling the
                   * audio processing (one would hope).
                   */
                  statusParms.channel_ix = -1;
                  if (doWrite) {
                        rc=sm_besp_write_status_ix(&statusParms);
                        if ((rc == 0) && (statusParms.channel_ix != -1)) {
                              trunk=ixmaps[statusParms.channel_ix];

                              switch(trunk->Trunk::flags.dsp) {
                              case DSP_MODE_TONE:
                                    switch(statusParms.status) {
                                    case kSMPlayToneStatusComplete:
                                    /* case kSMPlayCPToneStatusComplete: */
                                          break;
                                    case kSMPlayToneStatusOngoing:
                                    /* case kSMPlayCPToneStatusOngoing: */
                                          break;
                                    default:
                                          break;
                                    }
                                    break;

                              case DSP_MODE_VOICE:
                              default:
                                    switch(statusParms.status) {
                                    /* REPLAY status events */
                                    case kSMReplayStatusComplete:
//                                        slog(Slog::levelDebug)<<"audio thread - EOF on audio file"<<endl;
                                          /* EOF */
                                          aevent=new AculabTrunkEvent();
                                          aevent->trunk=trunk;
                                          aevent->event.id=TRUNK_AUDIO_IDLE;
                                          queue->push(aevent);
                                          break;
                                    case kSMReplayStatusCompleteData:
                                    case kSMReplayStatusUnderrun:
                                    case kSMReplayStatusHasCapacity:
                                          trunk->PlayNext();
                                          break;
                                    case kSMReplayStatusNoCapacity:
                                          break;
                                    default:
                                          break;
                                    }
                                    break;
                              }
                        }
                        else {
                              /* No more channels need servicing */
                              doWrite=false;
                        }
                  }

                  if (doRead) {
                        rc=sm_besp_read_status_ix(&statusParms);
                        if ((rc == 0) && (statusParms.channel_ix != -1)) {
                              trunk=ixmaps[statusParms.channel_ix];

                              switch (statusParms.status) {

                              case kSMRecordStatusComplete:
                                    slog(Slog::levelDebug)<<"audio thread - EOF on record data"<<endl;
                                    /* EOF */
                                    aevent=new AculabTrunkEvent();
                                    aevent->trunk=trunk;
                                    aevent->event.id=TRUNK_AUDIO_IDLE;
                                    queue->push(aevent);
                                    break;
                              case kSMRecordStatusCompleteData:
                              case kSMRecordStatusOverrun:
                              case kSMRecordStatusData:
                                    trunk->RecordNext();
                                    break;
                              case kSMRecordStatusNoData:
                                    break;
                              default:
                                    break;
                              }
                        }
                        else {
                              /* No more channels need servicing */
                              doRead=false;
                        }
                  }

            } /* while(doRead && doWrite) ... */

      } /* while(active) ... */
}


AculabMonitorThread::AculabMonitorThread(void) :
Mutex(), Thread(keythreads.priService())
{
      active = false;
}

AculabMonitorThread::~AculabMonitorThread()
{
      if(!active)
            return;

      slog(Slog::levelDebug) << "l1 monitor thread exiting"<<endl;

      active = false;

      terminate();
}

void AculabMonitorThread::run(void)
{
      struct l1_xstats *l1;
      struct l1_xstats l1_curr;
      struct l1_xstats *l1_old;
      int rc;
      int n;
      int nports;

      slog(Slog::levelInfo) << "l1 monitor thread running..." << endl;

      active = true;

      nports=call_nports();

      l1=new struct l1_xstats[nports];
      memset(l1,0,sizeof(struct l1_xstats) * nports);

      while(active) {
            for (n=0; n < nports; n++) {
                  l1_old=&l1[n];

                  l1_curr.net=n;
                  l1_curr.getset.linestat=0;
                  l1_curr.getset.bipvios=0;
                  l1_curr.getset.faserrs=0;
                  l1_curr.getset.sliperrs=0;
                  rc=call_l1_stats(&l1_curr);

                  /*
                   * Compare current flags with previous
                   * and alert upon change of state.
                   */
                  if (l1_curr.get.nos != l1_old->get.nos) {
                        if (l1_curr.get.nos == 0xff) {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Lost signal"<<endl;
                        }
                        else {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Lost signal CLEARED"<<endl;
                        }
                  }
                  if (l1_curr.get.ais != l1_old->get.ais) {
                        if (l1_curr.get.nos == 0xff) {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Incoming alarm"<<endl;
                        }
                        else {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Incoming alarm CLEARED"<<endl;
                        }
                  }
                  if (l1_curr.get.los != l1_old->get.los) {
                        if (l1_curr.get.los == 0xff) {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Lost synchronisation"<<endl;
                        }
                        else {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Lost synchronisation CLEARED"<<endl;
                        }
                  }
                  if (l1_curr.get.rra != l1_old->get.rra) {
                        if (l1_curr.get.rra == 0xff) {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Receive remote alarm"<<endl;
                        }
                        else {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Receive remote alarm CLEARED"<<endl;
                        }
                  }
                  if (l1_curr.get.tra != l1_old->get.tra) {
                        if (l1_curr.get.tra == 0xff) {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Transmit remote alarm"<<endl;
                        }
                        else {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Transmit remote alarm CLEARED"<<endl;
                        }
                  }
                  if (l1_curr.get.rma != l1_old->get.rma) {
                        if (l1_curr.get.rma == 0xff) {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Receive multi-frame alarm"<<endl;
                        }
                        else {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Receive multi-frame alarm CLEARED"<<endl;
                        }
                  }
                  if (l1_curr.get.tma != l1_old->get.tma) {
                        if (l1_curr.get.tma == 0xff) {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Transmit multi-frame alarm"<<endl;
                        }
                        else {
                              slog(Slog::levelCritical) << "L1["<<n<<"]: Transmit multi-frame alarm CLEARED"<<endl;
                        }
                  }
                  memcpy(l1_old,&l1_curr,sizeof(struct l1_xstats));
            }
            Thread::sleep(2000);
      }
}

#ifdef    CCXX_NAMESPACES
};
#endif

Generated by  Doxygen 1.6.0   Back to index