Envoy源码分析之启动与新连接建立

代码版本:stable/v1.7.1
上篇文章我们分析过Envoy启动的第一步,进行六个模块的初始化操作,其中比较重要的一块是server的初始化。本片文章将继续走读server对象实例被初始化之后的启动流程以及新连接的建立。

1. 入口

入口函数还是在main.cc中的int main中,如下

/**
 * Basic Site-Specific main()
 *
 * This should be used to do setup tasks specific to a particular site's
 * deployment such as initializing signal handling. It calls main_common
 * after setting up command line options.
 */
int main(int argc, char** argv) {
  std::unique_ptr<Envoy::MainCommon> main_common;

  // Initialize the server's main context under a try/catch loop and simply return EXIT_FAILURE
  // as needed. Whatever code in the initialization path that fails is expected to log an error
  // message so the user can diagnose.
  try {
    main_common = std::make_unique<Envoy::MainCommon>(argc, argv);
  } catch (const Envoy::NoServingException& e) {
    return EXIT_SUCCESS;
  } catch (const Envoy::MalformedArgvException& e) {
    return EXIT_FAILURE;
  } catch (const Envoy::EnvoyException& e) {
    return EXIT_FAILURE;
  }

  // Run the server listener loop outside try/catch blocks, so that unexpected exceptions
  // show up as a core-dumps for easier diagnostis.
  return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE;
}

其中main_common是Envoy::MainCommon类型实例,在其class定义中,run函数定义如下

lass MainCommon {
public:
  MainCommon(int argc, const char* const* argv);
  bool run() { return base_.run(); }

  static std::string hotRestartVersion(uint64_t max_num_stats, uint64_t max_stat_name_len,
                                       bool hot_restart_enabled);

private:
#ifdef ENVOY_HANDLE_SIGNALS
  Envoy::SignalAction handle_sigs;
  Envoy::TerminateHandler log_on_terminate;
#endif

  Envoy::OptionsImpl options_;
  MainCommonBase base_;
};

base_类型是是MainCommonBase类型的实例,在main_common.cc中找到run的具体实现

bool MainCommonBase::run() {
  switch (options_.mode()) {
  case Server::Mode::Serve:
    server_->run();
    return true;
  case Server::Mode::Validate: {
    auto local_address = Network::Utility::getLocalAddress(options_.localAddressIpVersion());
    return Server::validateConfig(options_, local_address, component_factory_);
  }
  case Server::Mode::InitOnly:
    PERF_DUMP();
    return true;
  }
  NOT_REACHED;
}

对应的,这里执行了server->run()方法,代码如下:

void InstanceImpl::run() {
  RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
                   [this]() -> void { startWorkers(); });

  // Run the main dispatch loop waiting to exit.
  ENVOY_LOG(info, "starting main dispatch loop");
  auto watchdog = guard_dog_->createWatchDog(Thread::Thread::currentThreadId());
  watchdog->startWatchdog(*dispatcher_);
  dispatcher_->run(Event::Dispatcher::RunType::Block);
  ENVOY_LOG(info, "main dispatch loop exited");
  guard_dog_->stopWatching(watchdog);
  watchdog.reset();

  terminate();
}

这部分代码就是run的核心启动逻辑了,下图简要梳理了Envoy从启动到建立连接的主要流程。主要流程是启动worker,加载Listener,然后接受新连接三个三块操作。

Envoy new connect

2. 启动worker
 RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
                   [this]() -> void { startWorkers(); });

在serve.cc中,server去调用RunHelper来启动startWorkers(),其代码如下:

void InstanceImpl::startWorkers() {
  listener_manager_->startWorkers(*guard_dog_);

  // At this point we are ready to take traffic and all listening ports are up. Notify our parent
  // if applicable that they can stop listening and drain.
  restarter_.drainParentListeners();
  drain_manager_->startParentShutdownSequence();
}

先看第一行listener_manager_->startWorkers(*guard_dog_)

void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
  ENVOY_LOG(info, "all dependencies initialized. starting workers");
  ASSERT(!workers_started_);
  workers_started_ = true;
  for (const auto& worker : workers_) {
    ASSERT(warming_listeners_.empty());
    for (const auto& listener : active_listeners_) {
      addListenerToWorker(*worker, *listener);
    }
    worker->start(guard_dog);
  }
}

抛开断言和日志,核心是两个for循环,外面这层是遍历初始化中的worker_,还记得在https://www.jianshu.com/p/204d1631239d
中对worker_的初始化么,这里取出其中的每一个,内层for取出active状态的所有listener,添加listener到worker上,执行addListenerToWorker(*worker, *listener),最后启动每一个worker_
那么去看看addListenerWorker做了什么吧。

void ListenerManagerImpl::addListenerToWorker(Worker& worker, ListenerImpl& listener) {
  worker.addListener(listener, [this, &listener](bool success) -> void {
    // The add listener completion runs on the worker thread. Post back to the main thread to
    // avoid locking.
    server_.dispatcher().post([this, success, &listener]() -> void {
      // It is theoretically possible for a listener to get added on 1 worker but not the others.
      // The below check with onListenerCreateFailure() is there to ensure we execute the
      // removal/logging/stats at most once on failure. Note also that that drain/removal can race
      // with addition. It's guaranteed that workers process remove after add so this should be
      // fine.
      if (!success && !listener.onListenerCreateFailure()) {
        // TODO(mattklein123): In addition to a critical log and a stat, we should consider adding
        //                     a startup option here to cause the server to exit. I think we
        //                     probably want this at Lyft but I will do it in a follow up.
        ENVOY_LOG(critical, "listener '{}' failed to listen on address '{}' on worker",
                  listener.name(), listener.socket().localAddress()->asString());
        stats_.listener_create_failure_.inc();
        removeListener(listener.name());
      }
      if (success) {
        stats_.listener_create_success_.inc();
      }
    });
  });
}

worker的addListener方法两个入参,一个是listener,另一个是add成功后的回调函数,这个可以从其头文件中的注释中看出,

/**
   * Add a listener to the worker.
   * @param listener supplies the listener to add.
   * @param completion supplies the completion to call when the listener has been added (or not) on
   *                   the worker.
   */
  virtual void addListener(Network::ListenerConfig& listener,
                           AddListenerCompletion completion) PURE;

回过头看worker的addListener如下,调用dispatcher的post方法,传入callback func,callback的入参是worker addListener本身的入参listener、和complete这个外部传入的回调,来完成给worker添加listener,

void WorkerImpl::addListener(Network::ListenerConfig& listener, AddListenerCompletion completion) {
  // All listener additions happen via post. However, we must deal with the case where the listener
  // can not be created on the worker. There is a race condition where 2 processes can successfully
  // bind to an address, but then fail to listen() with EADDRINUSE. During initial startup, we want
  // to surface this.
  dispatcher_->post([this, &listener, completion]() -> void {
    try {
      handler_->addListener(listener);
      hooks_.onWorkerListenerAdded();
      completion(true);
    } catch (const Network::CreateListenerException& e) {
      completion(false);
    }
  });
}

上面dispatcher的callback的body体中handler->addListener(listener)是添加listener的核心代码

void ConnectionHandlerImpl::addListener(Network::ListenerConfig& config) {
  ActiveListenerPtr l(new ActiveListener(*this, config));
  listeners_.emplace_back(config.socket().localAddress(), std::move(l));
}

handler->addListener(listener)由ConnectionHandlerImpl实现如下,入参config是上述两层for循环中的内层activelistener配置,config作为ActiveListener的构造函数入参构造了一个ActiveListener,并用指针指向它。观察ActiveListener的构造函数如下

ConnectionHandlerImpl::ActiveListener::ActiveListener(ConnectionHandlerImpl& parent,
                                                      Network::ListenerConfig& config)
    : ActiveListener(
          parent,
          parent.dispatcher_.createListener(config.socket(), *this, config.bindToPort(),
                                            config.handOffRestoredDestinationConnections()),
          config) {}

那句createListener的实现在DispatcherIml这个类中(明确我们现在是在分析addListener的流程代码,add之前要把这个listener创建出来,所以目前在connection_handler_impl.cc中,而他的构造函数中,createListener是有dispatcher对象创建的),其实实现就一句话,return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port, hand_off_restored_destination_connections)};这里new出来了一个ListenerImpl,并用一个NetWork:ListenerPtr的指针指向它。这里创建出来的Listener返回 ConnectionHandlerImpl::addListener中,最后,listener_是一个list结构std::list<std::pair<Network::Address::InstanceConstSharedPtr, ActiveListenerPtr>> listeners_;它定义在connection_handler_impl.h中。

3. Listener的加载&接收连接

其实上面一部分已经涉及到了不少Listener加载的流程(addListener),接下来的部分主要是加载后的连接建立部分。我们来复习一下Envoy启动和接收连接那个图。

Envoy new connect

在1和2中,我们已经完成了从server->run到createListener的流程,现在,让我们深入到2的最后那句话,看看Listener这个对象是怎么实现的,毕竟是在这里面完成对上下游流量连接的监听和回调处理的。

其实实现就一句话,return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port, hand_off_restored_destination_connections)};这里new出来了一个ListenerImpl,并用一个NetWork:ListenerPtr的指针指向它。

ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
                           bool bind_to_port, bool hand_off_restored_destination_connections)
    : local_address_(nullptr), cb_(cb),
      hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
      listener_(nullptr) {
  const auto ip = socket.localAddress()->ip();

  // Only use the listen socket's local address for new connections if it is not the all hosts
  // address (e.g., 0.0.0.0 for IPv4).
  if (!(ip && ip->isAnyAddress())) {
    local_address_ = socket.localAddress();
  }

  if (bind_to_port) {
    listener_.reset(
        evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));

    if (!listener_) {
      throw CreateListenerException(
          fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
    }

    if (!Network::Socket::applyOptions(socket.options(), socket, Socket::SocketState::Listening)) {
      throw CreateListenerException(fmt::format(
          "cannot set post-listen socket option on socket: {}", socket.localAddress()->asString()));
    }

    evconnlistener_set_error_cb(listener_.get(), errorCallback);
  }
}

在listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));中evconnlistener_new来注册监听socket的fd的新连接,通过listencallback回调。listencallback函数如下,在这当中实现了listener->cb_.onAccept().

void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
                                  int remote_addr_len, void* arg) {
  ListenerImpl* listener = static_cast<ListenerImpl*>(arg);
  // Get the local address from the new socket if the listener is listening on IP ANY
  // (e.g., 0.0.0.0 for IPv4) (local_address_ is nullptr in this case).
  const Address::InstanceConstSharedPtr& local_address =
      listener->local_address_ ? listener->local_address_ : listener->getLocalAddress(fd);
  // The accept() call that filled in remote_addr doesn't fill in more than the sa_family field
  // for Unix domain sockets; apparently there isn't a mechanism in the kernel to get the
  // sockaddr_un associated with the client socket when starting from the server socket.
  // We work around this by using our own name for the socket in this case.
  // Pass the 'v6only' parameter as true if the local_address is an IPv6 address. This has no effect
  // if the socket is a v4 socket, but for v6 sockets this will create an IPv4 remote address if an
  // IPv4 local_address was created from an IPv6 mapped IPv4 address.
  const Address::InstanceConstSharedPtr& remote_address =
      (remote_addr->sa_family == AF_UNIX)
          ? Address::peerAddressFromFd(fd)
          : Address::addressFromSockAddr(*reinterpret_cast<const sockaddr_storage*>(remote_addr),
                                         remote_addr_len,
                                         local_address->ip()->version() == Address::IpVersion::v6);
  listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address),
                         listener->hand_off_restored_destination_connections_);
}

cb_的onAccept是ConnectionHandlerImpl::ActiveListener::onAccept,实现如下,

void ConnectionHandlerImpl::ActiveListener::onAccept(
    Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections) {
  Network::Address::InstanceConstSharedPtr local_address = socket->localAddress();
  auto active_socket = std::make_unique<ActiveSocket>(*this, std::move(socket),
                                                      hand_off_restored_destination_connections);

  // Create and run the filters
  config_.filterChainFactory().createListenerFilterChain(*active_socket);
  active_socket->continueFilterChain(true);

  // Move active_socket to the sockets_ list if filter iteration needs to continue later.
  // Otherwise we let active_socket be destructed when it goes out of scope.
  if (active_socket->iter_ != active_socket->accept_filters_.end()) {
    active_socket->moveIntoListBack(std::move(active_socket), sockets_);
  }
}

上述代码中,config_.filterChainFactory().createListenerFilterChain(*active_socket);会创建listener过滤器的filterChain,然后通过continueFilterChain()运行过滤器。

到这里我们先大致梳理一下再继续去继续去看Envoy是如何接收连接的。
在上面,我们提到,

在listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));中evconnlistener_new来注册监听socket的fd的新连接,通过listencallback回调。

也就是说,当envoy以sidecar形式运行在pod中去代理client的流量时,通过监听对应端口,在client通过该端口发起请求时,触发envoy的对应回调,执行以下函数调用链listenCallback->cb_.onAccept->createListenerFilterChain+continueFilterChain,

  • createListenerFilterChain 创建FilterChain,这个可以用来拓展过滤器
  • continueFilterChain启动过滤器
接收连接主要分为两部分
  • continueFilterChain中listener_.newConnection(std::move(socket_));中createServerConnection
    createServerConnection建立了client请求段和Envoy server之间的连接,并在这里进行buffer的高低水位进行流量控制。这个createServerConnection返回的是一个指向connection对象的指针,其构造方法在connection_impl.cc中,在这当中实现了并创建FileEvent。对应其构造函数中实现如下:
  // We never ask for both early close and read at the same time. If we are reading, we want to
  // consume all available data.
  file_event_ = dispatcher_.createFileEvent(
      fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge,
      Event::FileReadyType::Read | Event::FileReadyType::Write);

在创建FileEvent后,会创建新的FileEventImpl(),然后通过assignEvents()分配事件,再通过event_add()注册事件。

  • continueFilterChain中listener_.newConnection(std::move(socket_));中createNetworkFilterChain
    在createServerConnection之后,通过createNetworkFilterChain创建网络的过滤链,代码如下。
  const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain(
      *new_connection, filter_chain->networkFilterFactories());

通过FilterFactory的回调来执行函数buildFilterChain(),返回filter_manager的initializeReadFilters,初始化readFilter。如果这里network的filter是空的,即empty_filter_chain是空的,则关闭链接如下:

  if (empty_filter_chain) {
    ENVOY_CONN_LOG_TO_LOGGER(parent_.logger_, debug, "closing connection: no filters",
                             *new_connection);
    new_connection->close(Network::ConnectionCloseType::NoFlush);
    return;
  }
  • 在上述两步完成之后,启动一次技术。通过onNewConnection,进行一次ActiveConnection的监听计数:自增1.
4. 回顾

OK,至此,我们已经走完了本文刚开始,Envoy启动和对client发起请求时新连接建立(监听实现)的整个梳理如下图。

Envoy new connect

现在,来回顾之前最早main函数为出发点中,server->run()做了什么。还记得run的实现么?

void InstanceImpl::run() {
  RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
                   [this]() -> void { startWorkers(); });

  // Run the main dispatch loop waiting to exit.
  ENVOY_LOG(info, "starting main dispatch loop");
  auto watchdog = guard_dog_->createWatchDog(Thread::Thread::currentThreadId());
  watchdog->startWatchdog(*dispatcher_);
  dispatcher_->run(Event::Dispatcher::RunType::Block);
  ENVOY_LOG(info, "main dispatch loop exited");
  guard_dog_->stopWatching(watchdog);
  watchdog.reset();

  terminate();
}
  • helper启动worker,添加listener,绑定回调和filterchain
  • createWatchDog启动守护进程看门狗,防止死锁
  • dispatcher_->run(Event::Dispatcher::RunType::Block); 运行调度器,调度器运行之后会调用libevent的envet_base_loop进行监听,当有新事件来到时进进入处理流程。这部分我们下节分析。