HDDS-25. Simple async event processing for SCM.

Contributed by Elek, Marton.
This commit is contained in:
Anu Engineer 2018-05-11 11:35:21 -07:00
parent 1f10a36021
commit ba12e8805e
10 changed files with 753 additions and 0 deletions

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;
/**
* Identifier of an async event.
*
* @param <PAYLOAD> THe message payload type of this event.
*/
public interface Event<PAYLOAD> {
/**
* The type of the event payload. Payload contains all the required data
* to process the event.
*
*/
Class<PAYLOAD> getPayloadType();
/**
* The human readable name of the event.
*
* Used for display in thread names
* and monitoring.
*
*/
String getName();
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;
/**
* Executors defined the way how an EventHandler should be called.
* <p>
* Executors are used only by the EventQueue and they do the thread separation
* between the caller and the EventHandler.
* <p>
* Executors should guarantee that only one thread is executing one
* EventHandler at the same time.
*
* @param <PAYLOAD> the payload type of the event.
*/
public interface EventExecutor<PAYLOAD> extends AutoCloseable {
/**
* Process an event payload.
*
* @param handler the handler to process the payload
* @param eventPayload to be processed.
* @param publisher to send response/other message forward to the chain.
*/
void onMessage(EventHandler<PAYLOAD> handler,
PAYLOAD eventPayload,
EventPublisher
publisher);
/**
* Return the number of the failed events.
*/
long failedEvents();
/**
* Return the number of the processed events.
*/
long successfulEvents();
/**
* Return the number of the not-yet processed events.
*/
long queuedEvents();
/**
* The human readable name for the event executor.
* <p>
* Used in monitoring and logging.
*
*/
String getName();
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;
/**
* Processor to react on an event.
*
* EventExecutors should guarantee that the implementations are called only
* from one thread.
*
* @param <PAYLOAD>
*/
@FunctionalInterface
public interface EventHandler<PAYLOAD> {
void onMessage(PAYLOAD payload, EventPublisher publisher);
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;
/**
* Client interface to send a new event.
*/
public interface EventPublisher {
<PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void
fireEvent(EVENT_TYPE event, PAYLOAD payload);
}

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Simple async event processing utility.
* <p>
* Event queue handles a collection of event handlers and routes the incoming
* events to one (or more) event handler.
*/
public class EventQueue implements EventPublisher, AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(EventQueue.class);
private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors =
new HashMap<>();
private final AtomicLong queuedCount = new AtomicLong(0);
private final AtomicLong eventCount = new AtomicLong(0);
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
this.addHandler(event, new SingleThreadExecutor<>(
event.getName()), handler);
}
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
EVENT_TYPE event,
EventExecutor<PAYLOAD> executor,
EventHandler<PAYLOAD> handler) {
executors.putIfAbsent(event, new HashMap<>());
executors.get(event).putIfAbsent(executor, new ArrayList<>());
executors.get(event)
.get(executor)
.add(handler);
}
/**
* Creates one executor with multiple event handlers.
*/
public void addHandlerGroup(String name, HandlerForEvent<?>...
eventsAndHandlers) {
SingleThreadExecutor sharedExecutor =
new SingleThreadExecutor(name);
for (HandlerForEvent handlerForEvent : eventsAndHandlers) {
addHandler(handlerForEvent.event, sharedExecutor,
handlerForEvent.handler);
}
}
/**
* Route an event with payload to the right listener(s).
*
* @param event The event identifier
* @param payload The payload of the event.
* @throws IllegalArgumentException If there is no EventHandler for
* the specific event.
*/
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
EVENT_TYPE event, PAYLOAD payload) {
Map<EventExecutor, List<EventHandler>> eventExecutorListMap =
this.executors.get(event);
eventCount.incrementAndGet();
if (eventExecutorListMap != null) {
for (Map.Entry<EventExecutor, List<EventHandler>> executorAndHandlers :
eventExecutorListMap.entrySet()) {
for (EventHandler handler : executorAndHandlers.getValue()) {
queuedCount.incrementAndGet();
executorAndHandlers.getKey()
.onMessage(handler, payload, this);
}
}
} else {
throw new IllegalArgumentException(
"No event handler registered for event " + event);
}
}
/**
* This is just for unit testing, don't use it for production code.
* <p>
* It waits for all messages to be processed. If one event handler invokes an
* other one, the later one also should be finished.
* <p>
* Long counter overflow is not handled, therefore it's safe only for unit
* testing.
* <p>
* This method is just eventually consistent. In some cases it could return
* even if there are new messages in some of the handler. But in a simple
* case (one message) it will return only if the message is processed and
* all the dependent messages (messages which are sent by current handlers)
* are processed.
*
* @param timeout Timeout in seconds to wait for the processing.
*/
@VisibleForTesting
public void processAll(long timeout) {
long currentTime = Time.now();
while (true) {
long processed = 0;
Stream<EventExecutor> allExecutor = this.executors.values().stream()
.flatMap(handlerMap -> handlerMap.keySet().stream());
boolean allIdle =
allExecutor.allMatch(executor -> executor.queuedEvents() == executor
.successfulEvents() + executor.failedEvents());
if (allIdle) {
return;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (Time.now() > currentTime + timeout) {
throw new AssertionError(
"Messages are not processed in the given timeframe. Queued: "
+ queuedCount.get() + " Processed: " + processed);
}
}
}
public void close() {
Set<EventExecutor> allExecutors = this.executors.values().stream()
.flatMap(handlerMap -> handlerMap.keySet().stream())
.collect(Collectors.toSet());
allExecutors.forEach(executor -> {
try {
executor.close();
} catch (Exception ex) {
LOG.error("Can't close the executor " + executor.getName(), ex);
}
});
}
/**
* Event identifier together with the handler.
*
* @param <PAYLOAD>
*/
public static class HandlerForEvent<PAYLOAD> {
private final Event<PAYLOAD> event;
private final EventHandler<PAYLOAD> handler;
public HandlerForEvent(
Event<PAYLOAD> event,
EventHandler<PAYLOAD> handler) {
this.event = event;
this.handler = handler;
}
public Event<PAYLOAD> getEvent() {
return event;
}
public EventHandler<PAYLOAD> getHandler() {
return handler;
}
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Simple EventExecutor to call all the event handler one-by-one.
*
* @param <T>
*/
public class SingleThreadExecutor<T> implements EventExecutor<T> {
public static final String THREAD_NAME_PREFIX = "EventQueue";
private static final Logger LOG =
LoggerFactory.getLogger(SingleThreadExecutor.class);
private final String name;
private final ThreadPoolExecutor executor;
private final AtomicLong queuedCount = new AtomicLong(0);
private final AtomicLong successfulCount = new AtomicLong(0);
private final AtomicLong failedCount = new AtomicLong(0);
public SingleThreadExecutor(String name) {
this.name = name;
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
executor =
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue,
runnable -> {
Thread thread = new Thread(runnable);
thread.setName(THREAD_NAME_PREFIX + "-" + name);
return thread;
});
}
@Override
public void onMessage(EventHandler<T> handler, T message, EventPublisher
publisher) {
queuedCount.incrementAndGet();
executor.execute(() -> {
try {
handler.onMessage(message, publisher);
successfulCount.incrementAndGet();
} catch (Exception ex) {
LOG.error("Error on execution message {}", message, ex);
failedCount.incrementAndGet();
}
});
}
@Override
public long failedEvents() {
return failedCount.get();
}
@Override
public long successfulEvents() {
return successfulCount.get();
}
@Override
public long queuedEvents() {
return queuedCount.get();
}
@Override
public void close() {
executor.shutdown();
}
@Override
public String getName() {
return name;
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;
/**
* Basic event implementation to implement custom events.
*
* @param <T>
*/
public class TypedEvent<T> implements Event<T> {
private final Class<T> payloadType;
private final String name;
public TypedEvent(Class<T> payloadType, String name) {
this.payloadType = payloadType;
this.name = name;
}
public TypedEvent(Class<T> payloadType) {
this.payloadType = payloadType;
this.name = payloadType.getSimpleName();
}
@Override
public Class<T> getPayloadType() {
return payloadType;
}
@Override
public String getName() {
return name;
}
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;
/**
* Simple event queue implementation for hdds/ozone components.
*/

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Testing the basic functionality of the event queue.
*/
public class TestEventQueue {
private static final Event<Long> EVENT1 =
new TypedEvent<>(Long.class, "SCM_EVENT1");
private static final Event<Long> EVENT2 =
new TypedEvent<>(Long.class, "SCM_EVENT2");
private static final Event<Long> EVENT3 =
new TypedEvent<>(Long.class, "SCM_EVENT3");
private static final Event<Long> EVENT4 =
new TypedEvent<>(Long.class, "SCM_EVENT4");
private EventQueue queue;
@Before
public void startEventQueue() {
queue = new EventQueue();
}
@After
public void stopEventQueue() {
queue.close();
}
@Test
public void simpleEvent() {
final long[] result = new long[2];
queue.addHandler(EVENT1, (payload, publisher) -> result[0] = payload);
queue.fireEvent(EVENT1, 11L);
queue.processAll(1000);
Assert.assertEquals(11, result[0]);
}
@Test
public void multipleSubscriber() {
final long[] result = new long[2];
queue.addHandler(EVENT2, (payload, publisher) -> result[0] = payload);
queue.addHandler(EVENT2, (payload, publisher) -> result[1] = payload);
queue.fireEvent(EVENT2, 23L);
queue.processAll(1000);
Assert.assertEquals(23, result[0]);
Assert.assertEquals(23, result[1]);
}
@Test
public void handlerGroup() {
final long[] result = new long[2];
queue.addHandlerGroup(
"group",
new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) ->
result[0] = payload),
new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) ->
result[1] = payload)
);
queue.fireEvent(EVENT3, 23L);
queue.fireEvent(EVENT4, 42L);
queue.processAll(1000);
Assert.assertEquals(23, result[0]);
Assert.assertEquals(42, result[1]);
Set<String> eventQueueThreadNames =
Thread.getAllStackTraces().keySet()
.stream()
.filter(t -> t.getName().startsWith(SingleThreadExecutor
.THREAD_NAME_PREFIX))
.map(Thread::getName)
.collect(Collectors.toSet());
System.out.println(eventQueueThreadNames);
Assert.assertEquals(1, eventQueueThreadNames.size());
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.server.events;
import org.junit.Test;
/**
* More realistic event test with sending event from one listener.
*/
public class TestEventQueueChain {
private static final Event<FailedNode> DECOMMISSION =
new TypedEvent<>(FailedNode.class);
private static final Event<FailedNode> DECOMMISSION_START =
new TypedEvent<>(FailedNode.class);
@Test
public void simpleEvent() {
EventQueue queue = new EventQueue();
queue.addHandler(DECOMMISSION, new PipelineManager());
queue.addHandler(DECOMMISSION_START, new NodeWatcher());
queue.fireEvent(DECOMMISSION, new FailedNode("node1"));
queue.processAll(5000);
}
static class FailedNode {
private final String nodeId;
FailedNode(String nodeId) {
this.nodeId = nodeId;
}
String getNodeId() {
return nodeId;
}
}
private static class PipelineManager implements EventHandler<FailedNode> {
@Override
public void onMessage(FailedNode message, EventPublisher publisher) {
System.out.println(
"Closing pipelines for all pipelines including node: " + message
.getNodeId());
publisher.fireEvent(DECOMMISSION_START, message);
}
}
private static class NodeWatcher implements EventHandler<FailedNode> {
@Override
public void onMessage(FailedNode message, EventPublisher publisher) {
System.out.println("Clear timer");
}
}
}