diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
new file mode 100644
index 0000000000..19fddde9b4
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.ozone.lease.Lease;
+import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException;
+import org.apache.hadoop.ozone.lease.LeaseExpiredException;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Event watcher the (re)send a message after timeout.
+ *
+ * Event watcher will send the tracked payload/event after a timeout period
+ * unless a confirmation from the original event (completion event) is arrived.
+ *
+ * @param The type of the events which are tracked.
+ * @param The type of event which could cancel the
+ * tracking.
+ */
+@SuppressWarnings("CheckStyle")
+public abstract class EventWatcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EventWatcher.class);
+
+ private final Event startEvent;
+
+ private final Event completionEvent;
+
+ private final LeaseManager leaseManager;
+
+ protected final Map trackedEventsByUUID =
+ new ConcurrentHashMap<>();
+
+ protected final Set trackedEvents = new HashSet<>();
+
+ public EventWatcher(Event startEvent,
+ Event completionEvent,
+ LeaseManager leaseManager) {
+ this.startEvent = startEvent;
+ this.completionEvent = completionEvent;
+ this.leaseManager = leaseManager;
+
+ }
+
+ public void start(EventQueue queue) {
+
+ queue.addHandler(startEvent, this::handleStartEvent);
+
+ queue.addHandler(completionEvent, (completionPayload, publisher) -> {
+ UUID uuid = completionPayload.getUUID();
+ try {
+ handleCompletion(uuid, publisher);
+ } catch (LeaseNotFoundException e) {
+ //It's already done. Too late, we already retried it.
+ //Not a real problem.
+ LOG.warn("Completion event without active lease. UUID={}", uuid);
+ }
+ });
+
+ }
+
+ private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
+ EventPublisher publisher) {
+ UUID identifier = payload.getUUID();
+ trackedEventsByUUID.put(identifier, payload);
+ trackedEvents.add(payload);
+ try {
+ Lease lease = leaseManager.acquire(identifier);
+ try {
+ lease.registerCallBack(() -> {
+ handleTimeout(publisher, identifier);
+ return null;
+ });
+
+ } catch (LeaseExpiredException e) {
+ handleTimeout(publisher, identifier);
+ }
+ } catch (LeaseAlreadyExistException e) {
+ //No problem at all. But timer is not reset.
+ }
+ }
+
+ private synchronized void handleCompletion(UUID uuid,
+ EventPublisher publisher) throws LeaseNotFoundException {
+ leaseManager.release(uuid);
+ TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(uuid);
+ trackedEvents.remove(payload);
+ onFinished(publisher, payload);
+ }
+
+ private synchronized void handleTimeout(EventPublisher publisher,
+ UUID identifier) {
+ TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(identifier);
+ trackedEvents.remove(payload);
+ onTimeout(publisher, payload);
+ }
+
+
+ /**
+ * Check if a specific payload is in-progress.
+ */
+ public synchronized boolean contains(TIMEOUT_PAYLOAD payload) {
+ return trackedEvents.contains(payload);
+ }
+
+ public synchronized boolean remove(TIMEOUT_PAYLOAD payload) {
+ try {
+ leaseManager.release(payload.getUUID());
+ } catch (LeaseNotFoundException e) {
+ LOG.warn("Completion event without active lease. UUID={}",
+ payload.getUUID());
+ }
+ trackedEventsByUUID.remove(payload.getUUID());
+ return trackedEvents.remove(payload);
+
+ }
+
+ abstract void onTimeout(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
+
+ abstract void onFinished(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
+
+ public List getTimeoutEvents(
+ Predicate super TIMEOUT_PAYLOAD> predicate) {
+ return trackedEventsByUUID.values().stream().filter(predicate)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IdentifiableEventPayload.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IdentifiableEventPayload.java
new file mode 100644
index 0000000000..e73e30fcde
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/IdentifiableEventPayload.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import java.util.UUID;
+
+/**
+ * Event with an additional unique identifier.
+ *
+ */
+public interface IdentifiableEventPayload {
+
+ UUID getUUID();
+
+}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/EventHandlerStub.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/EventHandlerStub.java
new file mode 100644
index 0000000000..3f34a70e6e
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/EventHandlerStub.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Dummy class for testing to collect all the received events.
+ */
+public class EventHandlerStub implements EventHandler {
+
+ private List receivedEvents = new ArrayList<>();
+
+ @Override
+ public void onMessage(PAYLOAD payload, EventPublisher publisher) {
+ receivedEvents.add(payload);
+ }
+
+ public List getReceivedEvents() {
+ return receivedEvents;
+ }
+}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
new file mode 100644
index 0000000000..1731350cfe
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.hadoop.ozone.lease.LeaseManager;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the basic functionality of event watcher.
+ */
+public class TestEventWatcher {
+
+ private static final TypedEvent WATCH_UNDER_REPLICATED =
+ new TypedEvent<>(UnderreplicatedEvent.class);
+
+ private static final TypedEvent UNDER_REPLICATED =
+ new TypedEvent<>(UnderreplicatedEvent.class);
+
+ private static final TypedEvent
+ REPLICATION_COMPLETED = new TypedEvent<>(ReplicationCompletedEvent.class);
+
+ LeaseManager leaseManager;
+
+ @Before
+ public void startLeaseManager() {
+ leaseManager = new LeaseManager<>(2000l);
+ leaseManager.start();
+ }
+
+ @After
+ public void stopLeaseManager() {
+ leaseManager.shutdown();
+ }
+
+
+ @Test
+ public void testEventHandling() throws InterruptedException {
+
+ EventQueue queue = new EventQueue();
+
+ EventWatcher
+ replicationWatcher = createEventWatcher();
+
+ EventHandlerStub underReplicatedEvents =
+ new EventHandlerStub<>();
+
+ queue.addHandler(UNDER_REPLICATED, underReplicatedEvents);
+
+ replicationWatcher.start(queue);
+
+ UUID uuid1 = UUID.randomUUID();
+ UUID uuid2 = UUID.randomUUID();
+
+ queue.fireEvent(WATCH_UNDER_REPLICATED,
+ new UnderreplicatedEvent(uuid1, "C1"));
+
+ queue.fireEvent(WATCH_UNDER_REPLICATED,
+ new UnderreplicatedEvent(uuid2, "C2"));
+
+ Assert.assertEquals(0, underReplicatedEvents.getReceivedEvents().size());
+
+ Thread.sleep(1000);
+
+ queue.fireEvent(REPLICATION_COMPLETED,
+ new ReplicationCompletedEvent(uuid1, "C2", "D1"));
+
+ Assert.assertEquals(0, underReplicatedEvents.getReceivedEvents().size());
+
+ Thread.sleep(1500);
+
+ queue.processAll(1000L);
+
+ Assert.assertEquals(1, underReplicatedEvents.getReceivedEvents().size());
+ Assert.assertEquals(uuid2,
+ underReplicatedEvents.getReceivedEvents().get(0).UUID);
+
+ }
+
+ @Test
+ public void testInprogressFilter() throws InterruptedException {
+
+ EventQueue queue = new EventQueue();
+
+ EventWatcher
+ replicationWatcher = createEventWatcher();
+
+ EventHandlerStub underReplicatedEvents =
+ new EventHandlerStub<>();
+
+ queue.addHandler(UNDER_REPLICATED, underReplicatedEvents);
+
+ replicationWatcher.start(queue);
+
+ UnderreplicatedEvent event1 =
+ new UnderreplicatedEvent(UUID.randomUUID(), "C1");
+
+ queue.fireEvent(WATCH_UNDER_REPLICATED, event1);
+
+ queue.fireEvent(WATCH_UNDER_REPLICATED,
+ new UnderreplicatedEvent(UUID.randomUUID(), "C2"));
+
+ queue.fireEvent(WATCH_UNDER_REPLICATED,
+ new UnderreplicatedEvent(UUID.randomUUID(), "C1"));
+
+ queue.processAll(1000L);
+ Thread.sleep(1000L);
+ List c1todo = replicationWatcher
+ .getTimeoutEvents(e -> e.containerId.equalsIgnoreCase("C1"));
+
+ Assert.assertEquals(2, c1todo.size());
+ Assert.assertTrue(replicationWatcher.contains(event1));
+ Thread.sleep(1500L);
+
+ c1todo = replicationWatcher
+ .getTimeoutEvents(e -> e.containerId.equalsIgnoreCase("C1"));
+ Assert.assertEquals(0, c1todo.size());
+ Assert.assertFalse(replicationWatcher.contains(event1));
+
+
+ }
+
+ private EventWatcher
+ createEventWatcher() {
+ return new EventWatcher(
+ WATCH_UNDER_REPLICATED, REPLICATION_COMPLETED, leaseManager) {
+
+ @Override
+ void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
+ publisher.fireEvent(UNDER_REPLICATED, payload);
+ }
+
+ @Override
+ void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
+ //Good job. We did it.
+ }
+ };
+ }
+
+ private static class ReplicationCompletedEvent
+ implements IdentifiableEventPayload {
+
+ private final UUID UUID;
+
+ private final String containerId;
+
+ private final String datanodeId;
+
+ public ReplicationCompletedEvent(UUID UUID, String containerId,
+ String datanodeId) {
+ this.UUID = UUID;
+ this.containerId = containerId;
+ this.datanodeId = datanodeId;
+ }
+
+ public UUID getUUID() {
+ return UUID;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReplicationCompletedEvent that = (ReplicationCompletedEvent) o;
+ return Objects.equals(containerId, that.containerId) && Objects
+ .equals(datanodeId, that.datanodeId);
+ }
+
+ @Override
+ public int hashCode() {
+
+ return Objects.hash(containerId, datanodeId);
+ }
+ }
+
+ private static class UnderreplicatedEvent
+
+ implements IdentifiableEventPayload {
+
+ private final UUID UUID;
+
+ private final String containerId;
+
+ public UnderreplicatedEvent(UUID UUID, String containerId) {
+ this.containerId = containerId;
+ this.UUID = UUID;
+ }
+
+ public UUID getUUID() {
+ return UUID;
+ }
+ }
+
+}
\ No newline at end of file