Logo Search packages:      
Sourcecode: bayonne version File versions  Download package

info.cpp

// Copyright (C) 2000 Open Source Telecom Corporation.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.

#include <server.h>

#ifdef  CCXX_NAMESPACES
namespace ost {
using namespace std;
#endif

#define     DB_SEQUENCE "_dbinfo.sequence"
#define     DB_SERVER   "_dbinfo.server"
#define     DB_PORT           "_dbinfo.port"
#define     DB_POINTER  "_dbinfo.pointer"
#define     DB_SERVICE  "_dbinfo.service"
#define     DB_SAVE           "_dbinfo.save"

class dbPacket : public Script
{
public:
      TimerPort expires, retry;
      unsigned sequence, resend, len;
      Trunk *trunk;
      char *packet;

      dbPacket(Trunk *trunk, timeout_t expires, char *packet);
      ~dbPacket();

      void outbound(void);
      void post(char *reply);
};

class dbInfo : private Module, protected Keydata, public Mutex, public Server, public UDPSocket
{
public:
      timeout_t interval, expires, resend;
      unsigned head, tail, buffers;
      dbPacket **packets;
      struct sockaddr bcast, iface;
      struct sockaddr_in *bcast_in, *iface_in;

      void broadcast(char *msg, unsigned len);

protected:

      modtype_t getType(void)
            {return MODULE_GENERIC;};

      char *getName(void)
            {return "info";};

      void run(void);
      void outbound(void);
      char *dispatch(Trunk *trunk);
      unsigned sleep(Trunk *trunk);
      void commit(Trunk *trunk);
      void attach(Trunk *trunk);
      void stop(void);

public:
      dbInfo();
} dbinfo;

dbPacket::dbPacket(Trunk *trk, timeout_t exp, char *pkt)
{
      char buf[6];

      trunk = trk;
      len = strlen(pkt);
      packet = new char[len + 1];
      strcpy(packet, pkt);
      if(*packet == 'C')
      {
            resend = 1;
            retry.setTimer(dbinfo.resend);
      }
      if(!exp)
            exp = dbinfo.expires;
      expires.setTimer(exp);

      dbinfo.enterMutex();
      sequence = dbinfo.tail;
      if(dbinfo.packets[sequence])
            delete dbinfo.packets[sequence];
      dbinfo.packets[sequence] = this;
      if(++dbinfo.tail >= dbinfo.buffers)
            dbinfo.tail = 0;

      snprintf(buf, 6, "%05d", sequence);
      memcpy(packet + 2, buf, 5);
      memcpy(pkt + 2, buf, 5);
      dbinfo.leaveMutex();
      dbinfo.broadcast(packet, len);
}

dbPacket::~dbPacket()
{
      dbinfo.enterMutex();
      dbinfo.packets[sequence] = NULL;

      if(packet)
            delete[] packet;
      dbinfo.leaveMutex();
}

void dbPacket::post(char *reply)
{
      Script::Line *line;
      TrunkEvent event;
      char *seq, *tok, *val, *kwd, *sym;
      int argc;

      trunk->enterMutex();
      seq = trunk->getSymbol(DB_SEQUENCE);      
      if(!seq)
      {
            trunk->leaveMutex();
            return;
      }
      line = trunk->getScript();
      if(line != trunk->getPointer(DB_POINTER) || (unsigned)atoi(seq) != sequence)
      {
            trunk->leaveMutex();
            return;
      }
      reply = strchr(reply + 2, ' ');
      if(!reply)
            reply = "";
      reply = strtok_r(reply, " ", &tok);
      while(reply)
      {
            seq = strtok_r(NULL, " ", &tok);
            val = strchr(reply, '=');     
            if(!val)
            {
                  reply = seq;
                  continue;
            }
            val = urlDecode(val);

            argc = 0;
            while(argc < line->argc)
            {
                  kwd = line->args[argc++];
                  if(*kwd == '&')
                  {
                        if(!stricmp(++kwd, reply))
                        {
                              trunk->setSymbol(kwd, val);
                              break;
                        }
                        continue;
                  }           
                  if(*(kwd++) != '=')
                        continue;
                  sym = line->args[argc++];
                  if(*(kwd++) != '&')
                        continue;
                  if(!stricmp(kwd, reply))
                  {
                        trunk->setSymbol(sym, val);
                        break;
                  }     
            }

            reply = seq;
      }
      trunk->setPointer(DB_POINTER, NULL);
      trunk->setSave(NULL);
      event.id = TRUNK_TIMER_EXPIRED;
      trunk->postEvent(&event);
      trunk->leaveMutex();
}

void dbPacket::outbound(void)
{
      if(!expires.getTimer())
      {
            slog(Slog::levelDebug) << "info(E): " << packet << endl;
            delete this;
            return;
      }

      if(*packet != 'C')
            return;

      if(retry.getTimer())
            return;

      if(resend > 10)
            return;

      retry.incTimer(((1 << resend) - resend) * dbinfo.resend);
      ++resend;
      dbinfo.broadcast(packet, len);
      slog(Slog::levelDebug) << "info(" << resend << "): " << packet << endl;
}

dbInfo::dbInfo() : Module(), Keydata("/bayonne/info"), Mutex(), Server(keythreads.priNetwork()), UDPSocket()
{
        static Keydata::Define keys[] = {
            {"interval", "500"},
            {"expires", "30"},
            {"resend", "650"},
            {"buffers", "1000"},
            {"broadcast", "127.0.0.1"}, // default delivery target
            {"port", "6128"},        // default port number
            {"interface", "127.0.0.1"}, // interface to bind
            {"cdr", "0"},
            {"services", "cdr"},    // prebound service ids
            {NULL, NULL}};

      char buffer[256];
      char *cp, *tok;
      socklen_t len = sizeof(struct sockaddr_in);

      load(keys);
      driver->addModule(this);
      snprintf(buffer, sizeof(buffer), "%s", getLast("services"));
      cp = strtok_r(buffer, " \t,;\n", &tok);
      while(cp)
      {
            driver->addModule(this, cp);
            cp = strtok_r(NULL, " \t,;\n", &tok);
      } 
      addSession();
      buffers = atoi(getLast("buffers"));
      interval = atoi(getLast("interval"));
      resend = atoi(getLast("resend"));
      expires = atoi(getLast("expires")) * 1000;
      packets = new dbPacket *[buffers];
      memset(packets, 0, buffers * sizeof(dbPacket *));
      head = tail = 0;

      InetHostAddress ifa(getLast("interface"));
      iface_in = (struct sockaddr_in *)&iface;
      iface_in->sin_addr = getaddress(ifa);
      iface_in->sin_port = htons(atoi(getLast("port")) + 1);
      iface_in->sin_family = AF_INET;
      if(bind(so, &iface, len))
            slog(Slog::levelError) << "info: unable to bind interface" << endl;

      bcast_in = (struct sockaddr_in *)&bcast;
        if(setBroadcast(true))
                slog(Slog::levelWarning) << "info: broadcast flag failed" << endl;
      InetHostAddress addr(getLast("broadcast"));
        bcast_in = (struct sockaddr_in *)&bcast;
        bcast_in->sin_addr = getaddress(addr);
        bcast_in->sin_port = htons(atoi(getLast("port")));
        bcast_in->sin_family = AF_INET;
}

void dbInfo::stop(void)
{
      slog(Slog::levelInfo) << "info: stopping" << endl;
      terminate();
      while(head != tail)
      {
            if(packets[head])
                  delete packets[head];
            if(++head >= buffers)
                  head = 0;
      }
      delete[] packets;
}

void dbInfo::broadcast(char *msg, unsigned len)
{
      ::sendto(so, msg, len, 0, &bcast, sizeof(struct sockaddr_in));
}

void dbInfo::run(void)
{
      TimerPort runclock;
      timeout_t pending;
      char packet[1024];
      struct sockaddr_in addr;
      socklen_t alen = sizeof(addr);
      int fromlen, len;
      unsigned sequence;
      dbPacket *pkt;
      char *cp;

      slog(Slog::levelInfo) << "info: running..." << endl;

      setCancel(cancelImmediate);
      runclock.setTimer(interval);
      for(;;)
      {
            pending = runclock.getTimer();
            if(!pending)
            {
                  runclock.setTimer(interval);
                  outbound();
                  continue;
            }
            if(!isPending(pendingInput, pending))
                  continue;

            Thread::sleep(100);
      
            fromlen = ::recvfrom(so, packet, sizeof(packet), 0, (struct sockaddr *)&addr, &alen);           
            if(fromlen < 1)
            {
                  slog(Slog::levelError) << "info(?): invalid packet" << endl;
                  continue;
            }
            packet[fromlen] = 0;
            sequence = atoi(packet + 2);
            if(sequence >= buffers)
            {
                  slog(Slog::levelError) << "info(!): " << packet << endl;
                  continue;
            }
            enterMutex();
            pkt = packets[sequence];
            if(!pkt)
            {
                  slog(Slog::levelDebug) << "info(I): " << packet << endl;
                  leaveMutex();
                  continue;
            }
            switch(packet[0])
            {
            case 'A':
                  delete pkt;
                  slog(Slog::levelDebug) << "info(R): " << packet << endl;
                  break;
            case 'P':
                  slog(Slog::levelDebug) << "info(R): " << packet << endl;
                  pkt->packet[0] = 'P';
                  break;
            case 'C':
                        slog(Slog::levelDebug) << "info(R): " << packet << endl;
                  packet[0] = 'A';
                  cp = strchr(packet + 2, ' ');
                  len = cp - packet - 1;
                  ::sendto(so, packet, len, 0, 
                        (struct sockaddr *)&addr, sizeof(sockaddr_in));
                  pkt->packet[0] = 'P';
                  pkt->post(packet);
                  delete pkt;
                  break;
            default:
                      slog(Slog::levelDebug) << "info(?): " << packet << endl;
            }
            leaveMutex();
      }
}

void dbInfo::outbound(void)
{
      unsigned base, top;
      dbPacket *pkt;

      enterMutex();
      base = tail;
      leaveMutex();

      while(head != base)
      {
            if(packets[head])
                  break;

            if(++head > buffers)
                  head = 0;
      }

      top = head;

      while(top != base)
      {
            enterMutex();
            pkt = packets[top];
            if(pkt)
                  pkt->outbound();
            leaveMutex();
            if(++top > buffers)
                  top = 0;
      }
}

void dbInfo::attach(Trunk *trunk)
{
      trunk->setPointer(DB_POINTER, NULL);
      trunk->setSymbol(DB_SEQUENCE, 5);
      trunk->setSymbol(DB_SERVICE, 8);
      trunk->setSymbol(DB_SEQUENCE, "----");
      trunk->setSymbol(DB_SAVE, 32);
      trunk->setConst(DB_SERVER, getLast("broadcast"));
      trunk->setConst(DB_PORT, getLast("port"));
}

void dbInfo::commit(Trunk *trunk)
{
      char packet[512], encode[512];
      Script::Line *line = trunk->getScript();
      int argc = 0;
      char *kwd, *val;
      unsigned plen = 0;

      trunk->setSave(DB_SAVE);

      sprintf(packet, "C XXXXX %s", trunk->getSymbol(DB_SERVICE));
      plen = strlen(packet);

      while(argc < line->argc && plen < sizeof(packet))
      {
            kwd = line->args[argc++];

            if(*kwd == '%')
            {
                  ++kwd;
                  val = urlEncode(trunk->getSymbol(kwd), encode, sizeof(encode));
                  snprintf(packet + plen, sizeof(packet) - plen, " %s=%s", kwd, val);
                  plen = strlen(packet);
                  continue;
            }

            if(*kwd != '=')
                  continue;

            val = line->args[argc++];
            if(*val == '%')
                  val = trunk->getContent(val);
            if(*(++kwd) == '&')
                  continue;

            val = urlEncode(val, encode, sizeof(encode));
            snprintf(packet + plen, sizeof(packet) - plen, " %s=%s", kwd, val);
            plen = strlen(packet);
      }

      new dbPacket(trunk, sleep(trunk) * 1000, packet);
      trunk->setSymbol(DB_SEQUENCE, packet + 2);

      slog(Slog::levelDebug) << "info(1): " << packet << endl;
}

unsigned dbInfo::sleep(Trunk *trunk)
{
      const char *mem = trunk->getMember();

      if(!mem)
            return 0;

      if(*mem >= '0' && *mem <= '9')
            return atoi(mem);

      mem = getLast(mem);
      if(!mem)
            return 0;

      return atoi(mem);
}

char *dbInfo::dispatch(Trunk *trunk)
{
      const char *svc = NULL;
      const char *mem = trunk->getMember();
      char *cp;
      long delay = 0;
      Script::Line *line = trunk->getScript();
      char name[65];

      trunk->setSymbol(DB_SEQUENCE, "-1");
      trunk->setSymbol(DB_SERVICE, "none");
      trunk->setPointer(DB_POINTER, trunk->getScript());

      strncpy(name, line->cmd, 64);
      name[64] = 0;
      cp = strchr(name, '-');
      if(cp)
      {
            if(!strnicmp(cp, "-info", 5))
            {
                  *cp = 0;
                  svc = name;
            }
      }
      printf("Name set as %s\n", name);

      if(!mem)
            mem = "-";

      if(*mem >= '0' && *mem <= '9')
      {
            if(!svc)
                  svc =  trunk->getValue(NULL);
      }
      else if(!svc)
      {
            svc = mem;
            mem = getLast(mem);
            if(!mem)
            {
                  svc = NULL;
                  mem = "0";
            }
      }
      else
      {
            mem = getLast(mem);
            if(!mem)
                  mem = "0";
      }

      printf("SERVICE %s %s\n", svc, mem);

      delay = atoi(mem);
      if(!svc)
            return "info-no-service";

      trunk->setSymbol(DB_SERVICE, svc);

      if(!delay)
            commit(trunk);
      return NULL;
}

#ifdef      CCXX_NAMESPACES
};
#endif

Generated by  Doxygen 1.6.0   Back to index