Envoy启动过程

前言

Envoy是Servicemesh体系中的佼佼者,也是目前Istio默认集成的数据平面,在网上Envoy源码解析的文章非常少,基本很难搜罗到对应的一些细节资料。以下将从源码级别切入,深度分析Envoy架构

1. 初始化

1.1 main函数执行

服务启动的总入口main函数,会去生成MainCommon,并执行run方法。

int main(int argc, char** argv) {
#ifndef __APPLE__
  // absl::Symbolize mostly works without this, but this improves corner case
  // handling, such as running in a chroot jail.
  absl::InitializeSymbolizer(argv[0]);
#endif
  std::unique_ptr<Envoy::MainCommon> main_common;

  // Initialize the server's main context under a try/catch loop and simply return EXIT_FAILURE
  // as needed. Whatever code in the initialization path that fails is expected to log an error
  // message so the user can diagnose.
  try {
    main_common = std::make_unique<Envoy::MainCommon>(argc, argv);
  } catch (const Envoy::NoServingException& e) {
    return EXIT_SUCCESS;
  } catch (const Envoy::MalformedArgvException& e) {
    return EXIT_FAILURE;
  } catch (const Envoy::EnvoyException& e) {
    return EXIT_FAILURE;
  }

  // Run the server listener loop outside try/catch blocks, so that unexpected exceptions
  // show up as a core-dumps for easier diagnostis.
  return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE;
}
1.2 MainCommon初始化
// main_common.cc
int main_common(OptionsImpl& options) {
  try {
      // 生成maincommonbase,在里面会做server instance的初始化
    MainCommonBase main_common(options);
    return main_common.run() ? EXIT_SUCCESS : EXIT_FAILURE;
  } catch (EnvoyException& e) {
    return EXIT_FAILURE;
  }
  return EXIT_SUCCESS;
}

MainCommonBase::MainCommonBase(OptionsImpl& options) : options_(options) {
    ......
    // 可以看到,MainCommon将会初始化Instance,即一个服务的实例,于是,InstanceImpl进行初始化
    server_.reset(new Server::InstanceImpl(
        options_, local_address, default_test_hooks_, *restarter_, *stats_store_, access_log_lock,
        component_factory_, std::make_unique<Runtime::RandomGeneratorImpl>(), *tls_));
    ......
}
1.3 Instance初始化

Instance会启动初始化,在初始化核心函数中,将会进行listenerConfig的全面注册

// server.cc
InstanceImpl::InstanceImpl(Options& options, Network::Address::InstanceConstSharedPtr local_address,
                           TestHooks& hooks, HotRestart& restarter, Stats::StoreRoot& store,
                           Thread::BasicLockable& access_log_lock,
                           ComponentFactory& component_factory,
                           Runtime::RandomGeneratorPtr&& random_generator,
                           ThreadLocal::Instance& tls) {
    ......  
    initialize(options, local_address, component_factory);
    ......
}

void InstanceImpl::initialize(Options& options,
                              Network::Address::InstanceConstSharedPtr local_address,
                              ComponentFactory& component_factory) {
    ...
    // 初始化ListenerManager
    listener_manager_.reset(new ListenerManagerImpl(
      *this, listener_component_factory_, worker_factory_, ProdSystemTimeSource::instance_));

    // 会初始化
    main_config->initialize(bootstrap_, *this, *cluster_manager_factory_);

    ...
}

void MainImpl::initialize(const envoy::config::bootstrap::v2::Bootstrap& bootstrap,
                          Instance& server,
                          Upstream::ClusterManagerFactory& cluster_manager_factory) {
  ......
  const auto& listeners = bootstrap.static_resources().listeners();
  ENVOY_LOG(info, "loading {} listener(s)", listeners.size());
  // 从bootstrap配置(yaml文件)中提取listener配置,并依次进行添加操作。最终会添加入ListenerManager#active_listeners vector中,供在第二阶段进行真实listener的初始化并启动。
  for (ssize_t i = 0; i < listeners.size(); i++) {
    ENVOY_LOG(debug, "listener #{}:", i);
    server.listenerManager().addOrUpdateListener(listeners[i], "", false);
  }
  ...... 

需要格外注意的是,这边addOrUpdateListener的ListenerImpl,是Server NameSpace下的,其继承关系如下所示:

class ListenerImpl : public Network::ListenerConfig,
                     public Configuration::ListenerFactoryContext,
                     public Network::DrainDecision,
                     public Network::FilterChainManager,
                     public Network::FilterChainFactory,
                     public Configuration::TransportSocketFactoryContext,
                     Logger::Loggable<Logger::Id::config> {

该类主要作用,在于明确listener的配置,以及明确其Read/Write Filter的创建方式。真正的Listener,此时尚未被创建出来。Listener的Read/Write Filter的Factory如下函数进行遍历添加:

 // createNetworkFilterFactoryList方法将进行遍历中的每个filter工厂类的初始化
 addFilterChain(PROTOBUF_GET_WRAPPED_OR_DEFAULT(filter_chain_match, destination_port, 0),
                   destination_ips, server_names, filter_chain_match.transport_protocol(),
                   application_protocols,
                   config_factory.createTransportSocketFactory(*message, *this, server_names),
                   parent_.factory_.createNetworkFilterFactoryList(filter_chain.filters(), *this));

2. 启动

2.1 启动入口

在main函数执行的最后,会调用MainCommon的启动方法:

main_common->run()

从而进一步调用到InstanceImpl的run方法

bool MainCommonBase::run() {
        switch (options_.mode()) {
            case Server::Mode::Serve:
                server_->run();
                return true;
            case Server::Mode::Validate: {
                auto local_address = Network::Utility::getLocalAddress(options_.localAddressIpVersion());
                return Server::validateConfig(options_, local_address, component_factory_);
            }
            case Server::Mode::InitOnly:
                PERF_DUMP();
                return true;
        }
        NOT_REACHED_GCOVR_EXCL_LINE;
    }

在InstanceImpl#run方法中,会进行真实网络级别Listener的初始化(基于上文提到的ListenerManager中的ListenerConfig数组active_listeners),

void InstanceImpl::run() {
  // startWorker即会进行eventloop
  RunHelper helper(*dispatcher_, clusterManager(), restarter_, access_log_manager_, init_manager_,
                   [this]() -> void { startWorkers(); });
  ......
}

void InstanceImpl::startWorkers() {
  listener_manager_->startWorkers(*guard_dog_);
  ......
}
2.2 绑定listener到worker上

每个work绑定一个单独的ConnectionHandler,handler将完成listener绑定的任务。worker即代表了envoy服务的并发度。主体逻辑都将在worker及绑定的对应的libevent库中执行。

void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
  ENVOY_LOG(info, "all dependencies initialized. starting workers");
  ASSERT(!workers_started_);
  workers_started_ = true;
  for (const auto& worker : workers_) {
    ASSERT(warming_listeners_.empty());
    for (const auto& listener : active_listeners_) {
      // 此处即会将所有listener绑定到所有worker身上。worker即服务的并发线程数。
      addListenerToWorker(*worker, *listener);
    }
    worker->start(guard_dog);
  }
}

void WorkerImpl::addListener(Network::ListenerConfig& listener, AddListenerCompletion completion) {
   ......
   handler_->addListener(listener);
   ......
}
2.3 Listener初始化
void ConnectionHandlerImpl::addListener(Network::ListenerConfig& config) {
  // 生成ActiveListener
  ActiveListenerPtr l(new ActiveListener(*this, config));
  listeners_.emplace_back(config.socket().localAddress(), std::move(l));
}

ConnectionHandlerImpl::ActiveListener::ActiveListener(ConnectionHandlerImpl& parent,
                                                      Network::ListenerConfig& config)
    : ActiveListener(
          parent,
          // 可以看到,在ActiveListener初始化过程中,将进行真正网络级别Listener的初始化。
          parent.dispatcher_.createListener(config.socket(), *this, config.bindToPort(),
                                            config.handOffRestoredDestinationConnections()),
          config) {}

ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
                           bool bind_to_port, bool hand_off_restored_destination_connections)
    : local_address_(nullptr), cb_(cb),
      hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
    ......
    // 通过libevent的`evconnlistener_new`实现对指定监听fd的新连接事件的回调处理。
    listener_.reset(
        evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));
    ......
}
2.4 Listener对新连接设置回调函数 & Listener Filter创建
void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
                                  int remote_addr_len, void* arg) {
  ......
  // 此处的fd已经不是listenfd,已经是该新连接的connfd。
  listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address),
                         listener->hand_off_restored_destination_connections_);
  ......
}

// 回调时候主要做两件事情,
// 1. 构建出对应的Listener Accept Filter
// 2. 构建出ServerConnection
void ConnectionHandlerImpl::ActiveListener::onAccept(
    Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections) {
  ......
  auto active_socket = std::make_unique<ActiveSocket>(*this, std::move(socket),
                                                 hand_off_restored_destination_connections);
  config_.filterChainFactory().createListenerFilterChain(*active_socket);
  active_socket->continueFilterChain(true);
  ......
}

void ConnectionHandlerImpl::ActiveSocket::continueFilterChain(bool success) {
   ......
   listener_.newConnection(std::move(socket_));
   ......  
}
2.5 连接初始化 & Read/Write Filter创建

划重点了,此处同样利用了libevent对连接的读写事件进行监听,同时采用了epoll边缘触发的机制。同时可以看到,write_buffer采用了高低水位的一个监控控制,来避免对服务本身造成太大的冲击。

void ConnectionHandlerImpl::ActiveListener::newConnection(Network::ConnectionSocketPtr&& socket) {
  ......
  auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket();
  // 创建ServerConnection
  Network::ConnectionPtr new_connection =
      parent_.dispatcher_.createServerConnection(std::move(socket), std::move(transport_socket));
  new_connection->setBufferLimits(config_.perConnectionBufferLimitBytes());
  // 创建真正的Read/Write Filter
 const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain(
      *new_connection, filter_chain->networkFilterFactories());
  ......
}

ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
                               TransportSocketPtr&& transport_socket, bool connected)
    : transport_socket_(std::move(transport_socket)), filter_manager_(*this, *this),
      socket_(std::move(socket)), write_buffer_(dispatcher.getWatermarkFactory().create(
                                      [this]() -> void { this->onLowWatermark(); },
                                      [this]() -> void { this->onHighWatermark(); })),
      dispatcher_(dispatcher), id_(next_global_id_++) {

  file_event_ = dispatcher_.createFileEvent(
      fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge,
      Event::FileReadyType::Read | Event::FileReadyType::Write);
}
2.6 启动Worker

初始化完了连接和Listener,也初始化完了Accept Listener Filter和Read/Write Listener Filter,此时,Listener的libevent事件已经准备就绪,有请求到来后,Connection的read/write事件也将被触发。

void WorkerImpl::start(GuardDog& guard_dog) {
  ASSERT(!thread_);
  thread_.reset(new Thread::Thread([this, &guard_dog]() -> void { threadRoutine(guard_dog); }));
}

void WorkerImpl::threadRoutine(GuardDog& guard_dog) {
  ......
  dispatcher_->run(Event::Dispatcher::RunType::Block);

  // 异常退出后做的清理操作
  guard_dog.stopWatching(watchdog);
  handler_.reset();
  tls_.shutdownThread();
  watchdog.reset();
}

void DispatcherImpl::run(RunType type) {
  // 启动libevent处理
  event_base_loop(base_.get(), type == RunType::NonBlock ? EVLOOP_NONBLOCK : 0);
}

3. 结语

本章节,分析了最开始阶段envoy如何启动,以及如何进行相关的核心数据结构的初始化。下一章节,我们将结合源码,进一步分析当请求到来后,envoy是如何进行处理的。我们将一个全流程完成串联后,将会进行一些核心数据结构的关系梳理。