вторник, 23 ноября 2010 г.

Трюк со стековой памятью

Интересный способ оптимизации я недавно обнаружил изучая реализацию системного вызова poll в ядре Linux. Этому системному вызову передаётся массив структур pollfd. Для работы с этим массивом его необходимо скопировать в адресное пространство ядра. Можно выделить для этого память в куче, но доступ к куче относительно медленный. Чаще всего системному вызову poll передаётся не большое количество дескрипторов, поэтому часть массива можно разместить на стеке, а то, что не поместилось - в куче. Для этого используется связный список.
Элемент связного списка описывается структурой:

struct poll_list {
 struct poll_list *next;
 int len;
 struct pollfd entries[0];
};

struct poll_list *next - указатель на следующий элемент списка;
int len - длина массива дескрипторов;
struct pollfd entries[0] - указатель на массив;

Структура pollfd:
struct pollfd {
 int fd;
 short events;
 short revents;
};

Для указателя на массив дескрипторов используется массив нулевой длины. Почему не struct pollfd* entries будет понятно ниже.

Далее имеем вот такой код (несколько упрощённый вариант)

#define POLL_STACK_ALLOC 256

/*Количество элементов массива pollfd, которое поместится на стеке
  Также учитывается размер элемента списка, т.к. он тоже размещается на стеке*/

#define N_STACK_PPS ((sizeof(stack_pps) - sizeof(struct poll_list))  / \
   sizeof(struct pollfd))

#define POLLFD_PER_PAGE  ((PAGE_SIZE-sizeof(struct poll_list)) / sizeof(struct pollfd))


int do_sys_poll(struct pollfd *ufds, unsigned int nfds,
 struct timespec *end_time)
{
 struct poll_wqueues table;
 int err = -EFAULT, fdcount, len, size;

 /*Память под элемент списка и массив pollfd*/
 long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
 /*Указатель на первый элемент списка*/
 struct poll_list *const head = (struct poll_list *)stack_pps;
 /*Указатель для прохода по списку*/
 struct poll_list *walk = head;
 /*Сколько ещё элементов необходимо скопировать*/
 unsigned long todo = nfds;

 /*При перво проходе цикла len содержит количество элементов,
    которые будут размещены на стеке*/
 len = min_t(unsigned int, nfds, N_STACK_PPS);
 for (;;) {
  walk->next = NULL;
  walk->len = len;
  if (!len)
   break;

  /*Копируем данные из пространства пользователя в пространство ядра*/
  if (copy_from_user(walk->entries, ufds + nfds-todo,
     sizeof(struct pollfd) * walk->len))
   goto out_fds;

  /*Сколько ещё осталось скопировать*/
  todo -= walk->len;
  if (!todo)
   break;

  /*Все элементы, которые не поместились на стеке, мы размещаем в куче
     постранично*/

  len = min(todo, POLLFD_PER_PAGE);
  size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;
  walk = walk->next = kmalloc(size, GFP_KERNEL);
  if (!walk) {
   err = -ENOMEM;
   goto out_fds;
  }
 }

 ................................

 return err;
}

Макрос N_STACK_PPS вычисляет количество элементов + размер элемента списка, которые  могут поместиться на стеке. Получается, что в массиве stack_pps первые sizeof(poll_list) байт отводится под элемент списка, а sizeof(stack_pps) - sizeof(poll_list) байт под массив pollfd.
Надо как то ссылаться на массив pollfd из элемента списка. Известно, что массив располагается сразу после структуры poll_list в массиве stack_pps и в выделяемой в куче памяти. Для того, чтобы получить указатель на область сразу за poll_list используется массив нулевой длинны struct pollfd entries[0]. Это эффективней чем указатель struct pollfd* entries, т.к. он съел бы дополнительные байты памяти, и его необходимо было бы инициализировать адресом первого элемента массива.

воскресенье, 21 ноября 2010 г.

Использование epoll

Мультиплексирование ввода-вывода

Существует множество способов написать сервер, обрабатывающий клиентский запросы. Вот некоторые из них:
  • Последовательная обработка каждого запроса;
  • Один процесс/поток на клиента;
  • Мультиплексирование ввода-вывода.
Каждый подход обладает своими достоинствами и недостатками.
Я опишу мультиплексирование ввода вывода.
Принцип его работы в следующем: дескрипторы ввода/вывода помещается в список ожидания, затем процесс блокируется до наступления одного из событий:
Для читающего сокета:
  • Можно выполнить чтение данных. Т.е. количество байт в буфере приёмнике больше или равно значению минимального количества данных (по умолчанию это значение равно 1, но его можно поменять с помощью параметра сокета SO_RCVLOWAT);
  • На противоположном конце соединение закрывается;
  • Сокет является прослушиваемым и число установленных соединений больше нуля.
  • Ошибка сокета;
Для пишущего сокета:
  • Можно выполнить запись данных. Т.е. количество байт в буфере передатчике больше или равно значению минимального количества данных (по умолчанию это значение равно 2048, но его можно поменять с помощью параметра сокета SO_SNDLOWAT);
  • На противоположном конце соединение закрывается. При этом если записать данные в сокет, сгенерируется сигнал SIGPIPE;
  • Ошибка сокета;
Также процесс выходит из состояния блокировки при получении внеполосных данных.

epoll
Системные вызовы группы epoll присущи только Linux. Они появились в ядре версии 2.6 и призваны исправить проблемы системных вызовов poll и select.
Работа с epoll осуществляется в три этапа:
1. Создаём дескриптор epoll, вызвав функцию

int epoll_create(int size);

2. Добавляем дескриптор ввода/вывода в массив ожидания, вызвав функцию

int epoll_ctl(int efd, int op, int fd, struct epool_event* event);

3. Запускаем ожидание готовности, вызвав функцию

int epoll_wait(int efd, struct epoll_event *events, int maxevents, int timeout);

За более подробной информацией по параметрам и возвращающим значениям функций, необходимо обратиться к документации.

Вот небольшой пример кода, использующий epoll:

#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <strings.h>
#include <string.h>
#include <arpa/inet.h>
#include <errno.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <iostream>

namespace
{

 int setnonblocking(int sock);
 void do_read(int fd);
 void do_write(int fd);
 void process_error(int fd);

}

int main(int argc, char* argv[])
{
 const int MAX_EPOLL_EVENTS = 100;
 const int BACK_LOG = 100;

 if (argc < 2)
 {
  std::cout <<  "Usage: server [port]" << std::endl;
  return 0;
 }

 char* p;
 int serv_port = strtol(argv[1], &p, 10);
 if (*p)
 {
  std::cout << "Invalid port number" << std::endl;
  return -1;
 }

 // Игнорируем сигнал ошибки работы с сокетом
 signal(SIGPIPE, SIG_IGN);

 // Создание дескриптора epoll
 int efd = epoll_create(MAX_EPOLL_EVENTS);

 int listenfd;
 listenfd = socket(AF_INET, SOCK_STREAM, 0);
 if (listenfd < 0)
 {
  perror("Socket creation");
  return -1;
 }

 // Неблокирующий режим
 setnonblocking(listenfd);

 struct sockaddr_in servaddr;
 bzero(&servaddr, sizeof(servaddr));
 servaddr.sin_family = AF_INET;
 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
 servaddr.sin_port = htons(serv_port);

 if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
 {
  perror("Socket bind");
  return -1;
 }

 if (listen(listenfd, BACK_LOG) < 0)
 {
  perror("Socket listen");
  return -1;
 }

 // Добавляем дескриптор в массив ожидания
 struct epoll_event listenev;
 listenev.events = EPOLLIN | EPOLLPRI | EPOLLET;
 listenev.data.fd = listenfd;
 if (epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &listenev) < 0)
 {
  perror("Epoll fd add");
  return -1;
 }

 socklen_t client;
 // Массив готовых дескрипторов
 struct epoll_event events[MAX_EPOLL_EVENTS];
 struct epoll_event connev;
 struct sockaddr_in cliaddr;

 int events_cout = 1;

 for (;;)
 {
  // Блокирование до готовности одно или нескольких дескрипторов
  int nfds = epoll_wait(efd, events, MAX_EPOLL_EVENTS, -1);

  for (int n = 0; n < nfds; ++n)
  {
   // Готов слушающий дескриптор
   if (events[n].data.fd == listenfd)
   {
    client = sizeof(cliaddr);
    int connfd = accept(listenfd, (struct sockaddr*) &cliaddr, &client);
    if (connfd < 0)
    {
     perror("accept");
     continue;
    }

    // Недостаточно места в массиве ожидания
    if (events_cout == MAX_EPOLL_EVENTS-1)
    {
     std::cout << "Event array is full" << std::endl;
     close(connfd);
     continue;
    }

    // Добавление клиентского дескриптора в массив ожидания
    setnonblocking(connfd);
    connev.data.fd = connfd;
    connev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
    if (!epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &connev) < 0)
    {
     perror("Epoll fd add");
     close(connfd);
     continue;
    }

    ++events_cout;
   }
   // Готов клиентский дескриптор
   else
   {
    // Выполням работу с дескриптором
    int fd = events[n].data.fd;

    if (events[n].events & EPOLLIN)
     do_read(fd);

    if (events[n].events & EPOLLOUT)
     do_write(fd);

    if (events[n].events & EPOLLRDHUP)
     process_error(fd);

    // В даннoм примере дескриптор просто закрывается и удаляется из массива ожидания.
    // В зависимости от логики работы можно не удалять дескриптор и подождать следующую порцию данных
    epoll_ctl(efd, EPOLL_CTL_DEL, fd, &connev);
    --events_cout;
    close(fd);
   }
  }
 }

 return 0;
}

namespace
{

 int setnonblocking(int sock)
 {
  int opts;

  opts = fcntl(sock,F_GETFL);
  if (opts < 0)
  {
   perror("fcntl(F_GETFL)");
   return -1;
  }
  opts = (opts | O_NONBLOCK);
  if (fcntl(sock,F_SETFL,opts) < 0)
  {
   perror("fcntl(F_SETFL)");
   return -1;
  }

  return 0;
 }

 void do_read(int fd)
 {
  std::cout << "do_read" << std::endl;
 }

 void do_write(int fd)
 {
  std::cout << "do_write" << std::endl;
 }

 void process_error(int fd)
 {
  std::cout << "process_error" << std::endl;
 }
}