diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java new file mode 100644 index 0000000000..e0a235122e --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.util.List; +import java.util.PriorityQueue; +import java.util.Queue; + +/** + * Priority queue to handle under-replicated and over replicated containers + * in ozone. ReplicationManager will consume these messages and decide + * accordingly. + */ +public class ReplicationQueue { + + private final Queue queue; + + ReplicationQueue() { + queue = new PriorityQueue<>(); + } + + public synchronized boolean add(ReplicationRequest repObj) { + if (this.queue.contains(repObj)) { + // Remove the earlier message and insert this one + this.queue.remove(repObj); + } + return this.queue.add(repObj); + } + + public synchronized boolean remove(ReplicationRequest repObj) { + return queue.remove(repObj); + } + + /** + * Retrieves, but does not remove, the head of this queue, + * or returns {@code null} if this queue is empty. + * + * @return the head of this queue, or {@code null} if this queue is empty + */ + public synchronized ReplicationRequest peek() { + return queue.peek(); + } + + /** + * Retrieves and removes the head of this queue, + * or returns {@code null} if this queue is empty. + * + * @return the head of this queue, or {@code null} if this queue is empty + */ + public synchronized ReplicationRequest poll() { + return queue.poll(); + } + + public synchronized boolean removeAll(List repObjs) { + return queue.removeAll(repObjs); + } + + public int size() { + return queue.size(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java new file mode 100644 index 0000000000..a6ccce13e0 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.io.Serializable; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +/** + * Wrapper class for hdds replication queue. Implements its natural + * ordering for priority queue. + */ +public class ReplicationRequest implements Comparable, + Serializable { + private final long containerId; + private final short replicationCount; + private final short expecReplicationCount; + private final long timestamp; + + public ReplicationRequest(long containerId, short replicationCount, + long timestamp, short expecReplicationCount) { + this.containerId = containerId; + this.replicationCount = replicationCount; + this.timestamp = timestamp; + this.expecReplicationCount = expecReplicationCount; + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less + * than, equal to, or greater than the specified object. + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object + * is less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(ReplicationRequest o) { + if (o == null) { + return 1; + } + if (this == o) { + return 0; + } + int retVal = Integer + .compare(getReplicationCount() - getExpecReplicationCount(), + o.getReplicationCount() - o.getExpecReplicationCount()); + if (retVal != 0) { + return retVal; + } + return Long.compare(getTimestamp(), o.getTimestamp()); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(91, 1011) + .append(getContainerId()) + .toHashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReplicationRequest that = (ReplicationRequest) o; + return new EqualsBuilder().append(getContainerId(), that.getContainerId()) + .isEquals(); + } + + public long getContainerId() { + return containerId; + } + + public short getReplicationCount() { + return replicationCount; + } + + public long getTimestamp() { + return timestamp; + } + + public short getExpecReplicationCount() { + return expecReplicationCount; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java new file mode 100644 index 0000000000..7f335e37c1 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.replication; + +/** + * Ozone Container replicaton related classes. + */ \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java new file mode 100644 index 0000000000..6d74c683ee --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import java.util.Random; +import java.util.UUID; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for ReplicationQueue. + */ +public class TestReplicationQueue { + + private ReplicationQueue replicationQueue; + private Random random; + + @Before + public void setUp() { + replicationQueue = new ReplicationQueue(); + random = new Random(); + } + + @Test + public void testDuplicateAddOp() { + long contId = random.nextLong(); + String nodeId = UUID.randomUUID().toString(); + ReplicationRequest obj1, obj2, obj3; + long time = Time.monotonicNow(); + obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3); + obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3); + obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3); + + replicationQueue.add(obj1); + replicationQueue.add(obj2); + replicationQueue.add(obj3); + Assert.assertEquals("Should add only 1 msg as second one is duplicate", + 1, replicationQueue.size()); + ReplicationRequest temp = replicationQueue.poll(); + Assert.assertEquals(temp, obj3); + } + + @Test + public void testPollOp() { + long contId = random.nextLong(); + String nodeId = UUID.randomUUID().toString(); + ReplicationRequest msg1, msg2, msg3, msg4, msg5; + msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(), + (short) 3); + long time = Time.monotonicNow(); + msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3); + msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3); + msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3); + // Replication message for same container but different nodeId + msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3); + + replicationQueue.add(msg1); + replicationQueue.add(msg2); + replicationQueue.add(msg3); + replicationQueue.add(msg4); + replicationQueue.add(msg5); + Assert.assertEquals("Should have 3 objects", + 3, replicationQueue.size()); + + // Since Priority queue orders messages according to replication count, + // message with lowest replication should be first + ReplicationRequest temp; + temp = replicationQueue.poll(); + Assert.assertEquals("Should have 2 objects", + 2, replicationQueue.size()); + Assert.assertEquals(temp, msg3); + + temp = replicationQueue.poll(); + Assert.assertEquals("Should have 1 objects", + 1, replicationQueue.size()); + Assert.assertEquals(temp, msg5); + + // Message 2 should be ordered before message 5 as both have same replication + // number but message 2 has earlier timestamp. + temp = replicationQueue.poll(); + Assert.assertEquals("Should have 0 objects", + replicationQueue.size(), 0); + Assert.assertEquals(temp, msg4); + } + + @Test + public void testRemoveOp() { + long contId = random.nextLong(); + String nodeId = UUID.randomUUID().toString(); + ReplicationRequest obj1, obj2, obj3; + obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(), + (short) 3); + obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(), + (short) 3); + obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(), + (short) 3); + + replicationQueue.add(obj1); + replicationQueue.add(obj2); + replicationQueue.add(obj3); + Assert.assertEquals("Should have 3 objects", + 3, replicationQueue.size()); + + replicationQueue.remove(obj3); + Assert.assertEquals("Should have 2 objects", + 2, replicationQueue.size()); + + replicationQueue.remove(obj2); + Assert.assertEquals("Should have 1 objects", + 1, replicationQueue.size()); + + replicationQueue.remove(obj1); + Assert.assertEquals("Should have 0 objects", + 0, replicationQueue.size()); + } + +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java new file mode 100644 index 0000000000..5b1fd0f43a --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/** + * SCM Testing and Mocking Utils. + */ +package org.apache.hadoop.ozone.container.replication; +// Test classes for Replication functionality. \ No newline at end of file