Catégorie : Administration réseau     Tags : , ,      

    INTRO

    (Je suis à la bourre, vite un Droliprane 500mg !!!)
    Ca va être un article style Agence tous risques. Tu prends tes vieilles bagouzes, des colliers qui brillent, tu te fais une crête ou alors tu te mets à fumer le cigare en balançant des vannes à deux francs.
    Après, on prend notre caisse à outils et on va bricoler des trucs pour que ça aille plus vite, pour que ça soit plus résistant, hop hop un p’tit bout de tôle par là, hop hop on refait l’avant avec un bélier, et voilà, on a un char d’assaut fait maison et qui en fera trembler plus d’un...

    CA, c’est l’Agence tous risques, tout le monde connaît. Bah, kqueue(2)/kevent(2), c’est pareil !
    De quoi on va parler ? Un petit peu de socket programming mon bon monsieur, de comment on peut faire des choses belles, sexy, brillantes, fluffy (TOMATE !!!) et qui sentent bon, tout en restant un bon poilu.
    On va parler (un tout p’tit peu) de select(2)/poll(2) (qui est le standard POSIX) et de kqueue(2) qui est spécifique à BSD, mais offre beaucoup plus que ce que poll(2) ne peut offrir.
    Tous les systèmes *IX massivement déployés offrent des sous-systèmes équivalents, par exemple Linux avec epoll ou Solaris avec /dev/poll et enfin BSD avec kqueue(2)/kevent(2).
    Comme BSD, c’est BIEN, on va se concentrer sur kqueue(2)/kevent(2).
    Ami lecteur, des connaissances en programmation réseau de base te seront nécessaires, au moins avoir fait un serveur TCP qui gère quelques clients ou quelque chose du genre.

    FILE-MOI LA TAULE LOOPING!!!

    Si vous avez déjà bricolé, (ou si vous avez modifié des bagnoles comme Barracuda), vous connaissez certainement select(2)/poll(2), appels bien pratiques qui vous permettent de multiplexer des I/O (file descriptors) en fonction d’événements bien définis :

    • données à ECRIRE sur un FD (POLLOUT) ;
    • données à LIRE sur un FD (POLLIN) ;
    • données prioritaires à LIRE sur un FD (POLLPRI) ;
    • etc.

    Voilà comment c’est défini (on va prendre poll(2), c’est le moins courant des deux) :

    int
    poll(struct pollfd *fds, nfds_t nfds, int timeout);

    poll(2) va parcourir un tableau de struct pollfd :

    struct pollfd {
        int    fd;       /* file descriptor */
        short  events;   /* events to look for */
        short  revents;  /* events returned */
    };

    dont vous lui fournissez la taille (nfds). Il va alors vous dire :
    "hep, hep, y a un truc qui se passe parmi les événements que t’as définis".
    Alors après, on va dans son gentil tableau, on le parcourt entièrement et on vérifie chacun des éléments jusqu’à trouver celui dont poll(2) nous parle (poll(2) est bloquant).
    C’est génial, il ne nous reste plus qu’à lire les données qui arrivent sur notre file descriptor et à traiter l’info en fonction de ce pour quoi votre serveur est fait.
    Je vais filer un petit exemple pour montrer comment ça marche. Mon exemple est un mignon serveur qui écoute sur un port TCP et attend qu’un client se connecte. Il rajoute le file descriptor associé au client dans le tableau de struct pollfd (fonction push_fd()), comme ça, hop, le nouveau client est pris en compte par poll(2) et donc pas besoin de forker ou blah je ne sais pas quoi. On multiplexe les N clients tranquillement et, plus il y a de clients, plus select(2)/poll(2) risquent d’être lents, avec 5 ou 6, pas de souci, mais avec 5000, c’est plus la même histoire, n’est-ce pas ?
    Bref, plus simple, jette donc un coup d’œil au code de notre gruikpoll.c :

    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <netinet/in.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <poll.h>
    #define TCP_PORT 3133
    #define TCP_BACKLOG 5
    #define TIMEOUT 5000
    typedef struct client_connection {
      int fd;                       /* client file descriptor */
      struct sockaddr_in rside;     /* remote side */
      unsigned int rside_len;       /* remote side struct len */
    } cl_t;
    void push_fd(struct pollfd **fds, int *nfds, int fd,
                 short events)
    {
      fprintf(stderr,
              "adding 1 socket to the pool (nfds: %d)!\n",
              *nfds);
      /* no socket pushed */
      if ((!*fds) && (*nfds == 0)) {
        (*fds) =
            (struct pollfd *) calloc(1, sizeof (struct pollfd));
        if (!(*fds)) {
          perror("calloc()");
          exit(1);
        }
        (*fds)[(*nfds)].fd = fd;
        (*fds)[(*nfds)].events = events;
        (*nfds)++;
      } else {
        (*fds) =
            (struct pollfd *) realloc((*fds),
                                      (((*nfds) +
                                        1) *
                                       sizeof (struct pollfd)));
        (*fds)[(*nfds)].fd = fd;
        (*fds)[(*nfds)].events = events;
        (*nfds)++;
      }
      return;
    }
    int main(int argc, char **argv)
    {
      int rc, sd, fd, i, rfd;
      int yrc;
      struct sockaddr_in me;
      struct sockaddr_in remote;
      socklen_t remote_len;
      struct pollfd *fds = NULL;
      unsigned int nfds = 0;
      char buffer[512];
      fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
      if (fd < 0) {
        perror("socket(PF_INET, SOCKET_STREAM, TCP)");
        exit(1);
      }
      me.sin_family = PF_INET;
      me.sin_port = htons(TCP_PORT);
      me.sin_addr.s_addr = INADDR_ANY;
      rc = bind(fd, (struct sockaddr *) &me,
                sizeof (struct sockaddr));
      if (rc < 0) {
        perror("bind()");
        exit(1);
      }
      rc = listen(fd, TCP_BACKLOG);
      if (rc < 0) {
        perror("listen()");
        exit(1);
      }
      /* push the accept socket */
      push_fd(&fds, &nfds, fd, POLLIN);
      fprintf(stderr, "listening on port %d\n", TCP_PORT);
      /* fd # 0 is the accept() socket  :) */
      while (1) {
        rc = poll(fds, nfds, TIMEOUT);
        switch (rc) {
        case -1:
          perror("poll()");
          exit(1);
          break;
        case 0:
          fprintf(stderr, "timeout\n");
          break;
        default:
          for (i = 0; i < nfds; i++) {
            if (fds[i].revents & POLLIN) {
              /* the accept socket */
              if (i == 0) {
                /* accept case */
                rfd =
                    accept(fds[i].fd,
                           (struct sockaddr *) &remote,
                           &remote_len);
                fprintf(stderr, "accepted new client from %s\n",
                        inet_ntoa(remote.sin_addr));
                push_fd(&fds, &nfds, rfd, POLLIN);
                break;
              } else {
                memset(buffer, 0, sizeof (buffer));
                if (getpeername
                    (fds[i].fd, (struct sockaddr *) &remote,
                     &remote_len) < 0) {
                  perror("getpeername()");
                  exit(1);
                }
                yrc = read(fds[i].fd, buffer, sizeof (buffer));
                if (yrc == 0) {
                  fprintf(stderr,
                          "DISCONNECT %d @ %s (KILLING ALL TOO "
                          "LAZY AND TOO ANNOYING TO DO IT "
                          "THE CORRECT WAY)\n",
                          fds[i].fd,
                          inet_ntoa(remote.sin_addr));
                  close(fds[i].fd);
                  exit(1);
                }
                if (buffer[yrc - 2] == 0x0d)
                /* we remove the \x0d from telnet client */
                  buffer[yrc - 2] = '\x00';
                if (buffer[yrc - 1] == 0x0a)
                /* we remove the \x0a sent by telnet client */
                  buffer[yrc - 1] = '\x00';
                fprintf(stderr,
                        "client[%d @ %s] read[%d bytes]: '%s'\n",
                        fds[i].fd, inet_ntoa(remote.sin_addr),
                        yrc, buffer);
              }
            }
          }
          break;
        }
      }
      return 0;
    }

    Voilà ce que ça donne lorsqu’on lance le serveur :

    rival@kamehouse:~/dev/soul/linuxmag/avril_kqueue_bsd $ ./poll
    adding 1 socket to the pool (nfds: 0)!
    listening on port 3133
    timeout
    timeout
    accepted new client from 127.0.0.1
    adding 1 socket to the pool (nfds: 1)!
    client[4 @ 127.0.0.1] read[8 bytes]: 'prout'
    client[4 @ 127.0.0.1] read[6 bytes]: 'toto'
    client[4 @ 127.0.0.1] read[6 bytes]: 'fuck'
    timeout
    accepted new client from 127.0.0.1
    adding 1 socket to the pool (nfds: 2)!
    client[5 @ 127.0.0.1] read[8 bytes]: 'dslkds'
    client[5 @ 127.0.0.1] read[6 bytes]: 'ldks'
    timeout

    Et on connecte un client en envoyant quelques données :

    rival@kamehouse:~ $ telnet localhost 3133
    Trying ::1...
    telnet: connect to address ::1: Connection refused
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    prout
    toto
    fuck

    On connecte un second client et, pareil, on envoie un peu de données :

    rival@kamehouse:~ $ telnet 127.0.0.1 3133
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    dslkds
    ldks

    Il faut aussi gérer la déconnexion du client en enlevant gentiment du tableau de struct pollfd le fd qui était associé au client qui vient de se déconnecter. Je n’ai pas mis le code (relou+feign et j’avais déjà fait une pile toute simple pour rajouter les clients), ça se fait avec des listes chaînées, des tables de hash sur l’IP, etc.
    Y a mille et une manière de le faire, mais, l’important, c’est à VOUS de le faire.
    Voilà comment c’était fait pendant ces dernières années, jusqu’à l’arrivée de kqueue(2)/kevent(2).
    "Ce poteau électrique fera très bien l’affaire comme bélier sur la camionnette", (FUTE, de l’Agence tous
    risques).

    ON FAIT QUOI MAINTENANT au 21e siècle !?

    Avec kqueue(2)/kevent(2), la vie est beaucoup, beaucoup plus simple, tellement plus simple que c’est un bonheur de travailler avec et plus besoin de faire du multithread à tout va.
    "kqueue(2)/kevent(2) nous ont toujours sorti d’affaire", dixit Barracuda.
    Kqueue(2)/kevent(2), c’est une queue d’événements. En gros, on crée une queue (tout se passe dans le kernel) et on lui rajoute ou on lui enlève des événements. Ce qu’il a de terrible, c’est qu’il n’est pas nécessaire de connaître la queue, ni de parcourir un tableau, ni rien : kevent va gentiment ne retourner que le nombre d’événements à gérer que l’on veut dans une structure toute prête qui contiendra les infos qu’il faut pour continuer sans se prendre la tête.
    Genre après le kevent(2), t’as direct le file descriptor où lire/écrire, la taille des données dans le socket buffer. Tu peux même associer des données à toi avec ces événements. De quoi attacher tout ça à une structure context par client connecté. Dans notre exemple, je balade l’adresse du client avec.
    Voilà kqueue(2)/kevent(2) :

    int
    kqueue(void);
    int
    kevent(int kq, const struct kevent *changelist, size_t nchanges,
    struct kevent *eventlist, size_t nevents,
    const struct timespec *timeout);
    EV_SET(&kev, ident, filter, flags, fflags, data, udata);

    La structure pour définir un événement est la suivante :

    struct kevent {
    uintptr_t ident; /* identifier for this event */
    uint32_t filter; /* filter for event */
    uint32_t flags; /* action flags for kqueue */
    uint32_t fflags; /* filter flag value */
    int64_t data; /* filter data value */
    intptr_t udata; /* opaque user data identifier */
    };

    Chacun de ces éléments peut avoir une signification différente en fonction du type d’événement défini ou reçu. Oui, c’est la même structure qui sert à recevoir et à définir les événements qui sont remontés par kqueue(2).
    Alors, on reprend notre exemple de serveur TCP de tout à l’heure et on lui fait manger un bol de kqueue(2)/kevent(2).
    Ca donne ça (merci Looping pour le code) dans gruikqueue.c :

    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <netinet/in.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <sys/event.h>
    #define TCP_PORT 3133
    #define TCP_BACKLOG 1500
    #define TIMEOUT 5000
    int main(int argc, char **argv)
    {
      int rc, sd, fd, i, rfd;
      int yrc;
      struct sockaddr_in me;
      struct sockaddr_in *remote = NULL;
      socklen_t remote_len = sizeof (struct sockaddr_in);
      int kq;
      struct kevent ke[1500];       /* handle 1500 events */
      struct kevent re;             /* one event a time */
      char buffer[512];
      fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
      if (fd < 0) {
        perror("socket(PF_INET, SOCKET_STREAM, TCP)");
        exit(1);
      }
      /* cleanup */
      memset(ke, 0, sizeof (ke));
      memset(&me, 0, sizeof (me));
      /* me myself and i */
      me.sin_family = PF_INET;
      me.sin_port = htons(TCP_PORT);
      me.sin_addr.s_addr = INADDR_ANY;
      /* on bind un port TCP tout simple */
      rc = bind(fd, (struct sockaddr *) &me,
                sizeof (struct sockaddr));
      if (rc < 0) {
        perror("bind()");
        exit(1);
      }
      /* c'est parti mon kiki!!! */
      rc = listen(fd, TCP_BACKLOG);
      if (rc < 0) {
        perror("listen()");
        exit(1);
      }
      fprintf(stderr, "listening on port %d\n", TCP_PORT);
      /* cree la queue d'events */
      kq = kqueue();
      if (kq < 0) {
        perror("kqueue()");
        exit(1);
      }
      /*
       *  sd (# 0) is the accept() socket  :)
       *  on cree le premier event de la queue
       *  qui sera l'event d'acceptation d'un nouveau client TCP
       */
      EV_SET(ke, fd, EVFILT_READ, EV_ADD, 0, TCP_BACKLOG, 0);
      rc = kevent(kq, ke, 1, NULL, 0, NULL);
        if (rc < 0) {
        perror("kevent(init)");
        exit(1);
      }
      while (1) {
        /*
         * allez on POLL!!!!
         */
        rc = kevent(kq, NULL, 0, &re, 1, NULL);
        switch (rc) {
        case -1:
          /*
           * casse kqueue, casse!!
           */
          perror("kevent()");
          exit(1);
          break;
        case 0:
          /*
           * on utilise pas mais on mets le case pour le
           * pendant avec poll()
           */
          fprintf(stderr, "timeout\n");
          break;
        default:
          /*
           * c'est la socket qui accept() les nouveaux
           * clients!!!! ??!?!?
           */
          if (re.ident == fd) {
            remote =
                (struct sockaddr_in *) calloc(1,
                                              sizeof (struct
                                                      sockaddr_in));
            rfd =
                accept(re.ident, (struct sockaddr *) remote,
                       &remote_len);
            if (rfd < 0) {
              perror("accept()");
              exit(1);
            }
            fprintf(stderr, "accepted new client from %s\n",
                    inet_ntoa(remote->sin_addr));
            /*
             * on rajoute un event pour ce client avec son addresse
             * dans les user data
             */
            memset(&re, 0, sizeof (re));
            EV_SET(&re, rfd, EVFILT_READ, EV_ADD, 0, 0,
                   (intptr_t) remote);
            rc = kevent(kq, &re, 1, NULL, 0, NULL);
            break;
          } else {
            memset(buffer, 0, sizeof (buffer));
            remote = (struct sockaddr_in *) re.udata;
            fprintf(stderr, "client sent %d bytes\n", re.data);
            if (re.flags & EV_EOF) {
              /* not mandatory */
              EV_SET(&re, re.ident, EVFILT_READ, EV_DELETE, 0,
                     0, (intptr_t) NULL);
              kevent(kq, &re, 1, NULL, 0, NULL);
              fprintf(stderr, "DISCONNECT %d @ %s\n", re.ident,
                      inet_ntoa(remote->sin_addr));
              close((int) re.ident);
              free(remote);
              break;
            }
                    yrc = read(re.ident, buffer, re.data);
            if (buffer[yrc - 2] == 0x0d)
            /* we remove the \x0d from telnet client */
              buffer[yrc - 2] = '\x00';
            if (buffer[yrc - 1] == 0x0a)
            /* we remove the \x0a sent by telnet client */
              buffer[yrc - 1] = '\x00';
            fprintf(stderr,
                    "client[%d @ %s] read[%d bytes]: '%s'\n",
                    re.ident, inet_ntoa(remote->sin_addr), yrc,
                    buffer);
          }
          break;
        }
      }
      return 0;
    }

    Décortiquons un peu :

     [...]
    /* cree la queue d'events */
    kq = kqueue();
    if (kq < 0)
    {
            perror("kqueue()");
            exit(1);
    }
    /*
     *  sd (# 0) is the accept() socket  :)
     *  on cree le premier event de la queue
     *  qui sera l'event d'acceptation d'un nouveau client TCP
     */
    EV_SET(ke, fd, EVFILT_READ, EV_ADD, 0, TCP_BACKLOG, 0);
    rc = kevent(kq, ke, 1, NULL, 0, NULL);
    if (rc < 0)
    {
            perror("kevent(init)");
            exit(1);
    }
    [...]

    On initialise la queue d’événements (kq = kqueue()). On lui décrit directement un premier événement (EV_SET()) qui est "s’il y a quelque chose à lire sur le FD accept() préviens-moi !"... puis, on l’ajoute à la queue (kevent(...)), simple non ?

    Ensuite, on poll gentiment les événements en précisant qu’on ne veut en remonter qu’un seul à la fois pour le stocker dans la struct kevent re et pas remonter des pools entiers d’événements (ce qui est aussi possible avec kqueue(2)/kevent(2)) :

    [...]
    /*
    * allez on POLL!!!!
    */
    rc = kevent(kq, NULL, 0, &re, 1, NULL);
    switch (rc) {
    [...]

    Le switch avec son "default" case définit donc le moment où kevent(2) nous dit : "hep, hep, j’ai un truc que t’as défini qui est arrivé, je te l’ai mis dans ta struct re".
    Ohhhhhhh, miracle, la struct re est déjà remplie, avec comme contenu :

    • re.ident == File descriptor à lire ou écrire (lire dans notre cas) ;
    • re.data == Taille des données dans le socket buffer ; on connaît la taille avant de read(2) (!) ;
    • re.udata == Les données utilisateur qu’on aura disposées au moment de créer l’événement et qui seront de nouveau à notre disposition.

    ClaAAaaaaAAaasse non ? Si.
    Bon on reprend, on a un premier cas, celui où accept(2) reçoit un nouveau client :

    [...]
    remote = (struct sockaddr_in *)
      calloc(1, sizeof(struct sockaddr_in));
    rfd = accept(re.ident,
      (struct sockaddr *)remote, &remote_len);
    if (rfd < 0)
    {
          perror("accept()");
          exit(1);
    }
    fprintf(stderr, "accepted new client from %s\n",
      inet_ntoa(remote->sin_addr));
    /*
     * on rajoute un event pour ce client avec son adresse
     * dans les user data
     */
    memset(&re, 0, sizeof(re));
    EV_SET(&re, rfd, EVFILT_READ, EV_ADD, 0, 0, (intptr_t)remote);
    rc = kevent(kq, &re, 1, NULL, 0, NULL);
    [...]

    On récupère l’adresse de notre client dans accept(2), puis le nouveau file descriptor et encore un MIRACLE. On rajoute gentiment notre nouveau file descriptor avec son lot de données à lui (struct sockaddr_in * remote) fraîchement alloué dans la queue d’événements en précisant que s’il y a quelque chose à lire, dis-le moi !

    Alors précipitons-nous dans la partie où un client nous parle :
    (Il est tout attentif là Barracuda, ahhh ça te TROUE LE KQUEUE(2) HEIN !!!)

    [...]
    memset(buffer, 0, sizeof(buffer));
    remote = (struct sockaddr_in *)re.udata;
    fprintf(stderr, "client sent %d bytes\n", re.data);
    if (re.flags & EV_EOF)
    {
            /* not mandatory */
            EV_SET(&re, re.ident, EVFILT_READ,
                EV_DELETE, 0, 0, (intptr_t)NULL);
            kevent(kq, &re, 1, NULL, 0, NULL);
            fprintf(stderr, "DISCONNECT %d @ %s\n",
                 re.ident, inet_ntoa(remote->sin_addr));
            close((int)re.ident);
            free(remote);
            break;
    }
    yrc = read(re.ident, buffer, re.data);
    [...]

    Là, idem, simplicité, félicité, clarté, c’est beau.

    On connaît la taille des données à lire et en attente dans le noyau (re.data). On peut aussi vérifier que ce n’est pas simplement une déconnexion du client (re.flags & EV_EOF), auquel cas on efface l’événement associé au client de la queue (encore plus simple, il suffit de fermer le descriptor et il sera automatiquement éliminé de la queue cf. kqueue(2)), on ferme le file descriptor, on libère la mémoire associée aux données du client, hop c’est tout bon).
    Sinon, on read(2) simplement la taille (re.data) dans notre buffer (alloué par nos soins ou fixe si inférieur à une certaine taille) le file descriptor (re.ident) que l’événement nous a retourné si poliment.
    Résultat sur le serveur :

    rival@kamehouse:~/dev/soul/linuxmag/avril_kqueue_bsd $ ./kqueue
    listening on port 3133
    accepted new client from 127.0.0.1
    client sent 7 bytes
    client[5 @ 127.0.0.1] read[7 bytes]: 'prout'
    accepted new client from 127.0.0.1
    client sent 7 bytes
    client[6 @ 127.0.0.1] read[7 bytes]: 'totot'
    client sent 7 bytes
    client[6 @ 127.0.0.1] read[7 bytes]: 'titit'
    client sent 6 bytes
    client[6 @ 127.0.0.1] read[6 bytes]: 'tata'
    client sent 7 bytes
    client[5 @ 127.0.0.1] read[7 bytes]: 'gitan'
    client sent 8 bytes
    client[5 @ 127.0.0.1] read[8 bytes]: 'tgitan'

    client #1 :

    rival@kamehouse:~ $ telnet localhost 3133
    Trying ::1...
    telnet: connect to address ::1: Connection refused
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    prout
    gitan
    tgitan

    client #2 :

    rival@kamehouse:~ $ telnet 127.0.0.1 3133
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    totot
    titit
    tata

    On n’a rien à faire d’autre, rien à gérer, pas de liste, rien, tout est fait en interne dans le noyau. En plus de nous faire gagner en simplicité, il faut aussi parler de la scalabilité de kqueue(1) qui arrive à faire (si mes souvenirs sont bons, mes pilules !) du O(1) quel que soit le nombre d’événements concurrents, ce qui revient à dire que, quel que soit le nombre de clients connectés, les perfs seront les mêmes, ninaianiaaaarrrrr NIArrrrkrk NIARRKRKKKKK!!!!

    Conclusion

    Dans le reste du sexy de kqueue(2)/kevent(2), je vous invite à lire la page de manuel de celui-ci : en plus d’y gérer les sockets, les vnodes, il est également capable de mater l’état des processus, si ceux-ci fork(2), si un processus fait appel à execve(), si un fichier est effacé ou modifié en real-time, les filtres sont assez classieux et il y a de quoi s’amuser :).
    Voila, c’est aussi ça l’Agence tous risques, c’est pas que des boulons et des tournevis, mais c’est aussi du code poilu, du code full girly, du bon code, du code qui l’est bon comme la saucisse de George !! Hmmm la saucisse de George...

    Références

    Remerciements
    tomate (patience & viande hachée !)
    GCU/Ma maison, mon jardin, là où j’ai grandi, le foyer Sonnacotra des coders.
    le CICR pour la soupe chaude.

    Posté par (La rédaction) | Signature : Eric Auge (GCU) | Article paru dans Creative Commons License

    Laissez une réponse

    Vous devez avoir ouvert une session pour écrire un commentaire.


    • Il y a actuellement

    • 627 articles/billets en ligne.