hadoop_book/yarn/yarn_event.md
2023-11-26 20:25:45 +08:00

3.1 KiB
Raw Blame History

简介

Yarn采用了基于事件驱动的并发模型

  • 所有状态机都实现了EventHandler接口很多服务类名通常带有Service后缀也实现了该接口它们都是事件处理器
  • 需要异步处理的事件由中央异步调度器类名通常带有Dispatcher后缀统一接收/派发,需要同步处理的事件直接交给相应的事件处理器。

pic

某些事件处理器不仅处理事件,也会向中央异步调度器发送事件。

事件处理器定义

事件处理器定义如下:

@SuppressWarnings("rawtypes")
@Public
@Evolving
public interface EventHandler<T extends Event> {

  void handle(T event);

}

只有一个handler函数如参是事件

中央处理器AsyncDispatcher

AsyncDispatcher 实现了接口DispatcherDispatcher中定义了事件Dispatcher的接口。主要提供两个功能

  • 注册不同类型的事件。
  • 获取事件处理器用来派发事件等待异步执行真正的EventHandler。
@Public
@Evolving
public interface Dispatcher {

  EventHandler<Event> getEventHandler();

  void register(Class<? extends Enum> eventType, EventHandler handler);

}

AsyncDispatcher实现了Dispatcher接口也扩展了AbstractService表明AsyncDispatcher也是一个服务 是一个典型的生产者消费这模型。

public class AsyncDispatcher extends AbstractService implements Dispatcher {
 ...
}

事件处理器的注册

事件注册就是将事件写入到eventDispatchers里面eventDispatchers的定义Map<Class<? extends Enum>, EventHandler> eventDispatchers 键是事件类型value是事件的处理器。

对于同一事件类型注册多次handler处理函数时将使用MultiListenerHandler代替MultiListenerHandler里面保存了多个handler调用handler函数时 会依次调用每个handler。

public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler<Event> registeredHandler = (EventHandler<Event>) eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

事件处理

AsyncDispatcher#getEventHandler()是异步派发的关键:


https://monkeysayhi.github.io/2018/11/20/%E6%BA%90%E7%A0%81%7CYarn%E7%9A%84%E4%BA%8B%E4%BB%B6%E9%A9%B1%E5%8A%A8%E6%A8%A1%E5%9E%8B%E4%B8%8E%E7%8A%B6%E6%80%81%E6%9C%BA/