HDDS-186. Create under replicated queue. Contributed by Ajay Kumar.

This commit is contained in:
Xiaoyu Yao 2018-06-27 13:35:30 -07:00
parent 56a4cdb980
commit e9ec3d78f5
5 changed files with 362 additions and 0 deletions

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
/**
* Priority queue to handle under-replicated and over replicated containers
* in ozone. ReplicationManager will consume these messages and decide
* accordingly.
*/
public class ReplicationQueue {
private final Queue<ReplicationRequest> queue;
ReplicationQueue() {
queue = new PriorityQueue<>();
}
public synchronized boolean add(ReplicationRequest repObj) {
if (this.queue.contains(repObj)) {
// Remove the earlier message and insert this one
this.queue.remove(repObj);
}
return this.queue.add(repObj);
}
public synchronized boolean remove(ReplicationRequest repObj) {
return queue.remove(repObj);
}
/**
* Retrieves, but does not remove, the head of this queue,
* or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
public synchronized ReplicationRequest peek() {
return queue.peek();
}
/**
* Retrieves and removes the head of this queue,
* or returns {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
public synchronized ReplicationRequest poll() {
return queue.poll();
}
public synchronized boolean removeAll(List<ReplicationRequest> repObjs) {
return queue.removeAll(repObjs);
}
public int size() {
return queue.size();
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.io.Serializable;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
/**
* Wrapper class for hdds replication queue. Implements its natural
* ordering for priority queue.
*/
public class ReplicationRequest implements Comparable<ReplicationRequest>,
Serializable {
private final long containerId;
private final short replicationCount;
private final short expecReplicationCount;
private final long timestamp;
public ReplicationRequest(long containerId, short replicationCount,
long timestamp, short expecReplicationCount) {
this.containerId = containerId;
this.replicationCount = replicationCount;
this.timestamp = timestamp;
this.expecReplicationCount = expecReplicationCount;
}
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less
* than, equal to, or greater than the specified object.
* @param o the object to be compared.
* @return a negative integer, zero, or a positive integer as this object
* is less than, equal to, or greater than the specified object.
* @throws NullPointerException if the specified object is null
* @throws ClassCastException if the specified object's type prevents it
* from being compared to this object.
*/
@Override
public int compareTo(ReplicationRequest o) {
if (o == null) {
return 1;
}
if (this == o) {
return 0;
}
int retVal = Integer
.compare(getReplicationCount() - getExpecReplicationCount(),
o.getReplicationCount() - o.getExpecReplicationCount());
if (retVal != 0) {
return retVal;
}
return Long.compare(getTimestamp(), o.getTimestamp());
}
@Override
public int hashCode() {
return new HashCodeBuilder(91, 1011)
.append(getContainerId())
.toHashCode();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ReplicationRequest that = (ReplicationRequest) o;
return new EqualsBuilder().append(getContainerId(), that.getContainerId())
.isEquals();
}
public long getContainerId() {
return containerId;
}
public short getReplicationCount() {
return replicationCount;
}
public long getTimestamp() {
return timestamp;
}
public short getExpecReplicationCount() {
return expecReplicationCount;
}
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
/**
* Ozone Container replicaton related classes.
*/

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.replication;
import java.util.Random;
import java.util.UUID;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test class for ReplicationQueue.
*/
public class TestReplicationQueue {
private ReplicationQueue replicationQueue;
private Random random;
@Before
public void setUp() {
replicationQueue = new ReplicationQueue();
random = new Random();
}
@Test
public void testDuplicateAddOp() {
long contId = random.nextLong();
String nodeId = UUID.randomUUID().toString();
ReplicationRequest obj1, obj2, obj3;
long time = Time.monotonicNow();
obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3);
obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3);
replicationQueue.add(obj1);
replicationQueue.add(obj2);
replicationQueue.add(obj3);
Assert.assertEquals("Should add only 1 msg as second one is duplicate",
1, replicationQueue.size());
ReplicationRequest temp = replicationQueue.poll();
Assert.assertEquals(temp, obj3);
}
@Test
public void testPollOp() {
long contId = random.nextLong();
String nodeId = UUID.randomUUID().toString();
ReplicationRequest msg1, msg2, msg3, msg4, msg5;
msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
(short) 3);
long time = Time.monotonicNow();
msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3);
msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3);
msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
// Replication message for same container but different nodeId
msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3);
replicationQueue.add(msg1);
replicationQueue.add(msg2);
replicationQueue.add(msg3);
replicationQueue.add(msg4);
replicationQueue.add(msg5);
Assert.assertEquals("Should have 3 objects",
3, replicationQueue.size());
// Since Priority queue orders messages according to replication count,
// message with lowest replication should be first
ReplicationRequest temp;
temp = replicationQueue.poll();
Assert.assertEquals("Should have 2 objects",
2, replicationQueue.size());
Assert.assertEquals(temp, msg3);
temp = replicationQueue.poll();
Assert.assertEquals("Should have 1 objects",
1, replicationQueue.size());
Assert.assertEquals(temp, msg5);
// Message 2 should be ordered before message 5 as both have same replication
// number but message 2 has earlier timestamp.
temp = replicationQueue.poll();
Assert.assertEquals("Should have 0 objects",
replicationQueue.size(), 0);
Assert.assertEquals(temp, msg4);
}
@Test
public void testRemoveOp() {
long contId = random.nextLong();
String nodeId = UUID.randomUUID().toString();
ReplicationRequest obj1, obj2, obj3;
obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
(short) 3);
obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(),
(short) 3);
obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(),
(short) 3);
replicationQueue.add(obj1);
replicationQueue.add(obj2);
replicationQueue.add(obj3);
Assert.assertEquals("Should have 3 objects",
3, replicationQueue.size());
replicationQueue.remove(obj3);
Assert.assertEquals("Should have 2 objects",
2, replicationQueue.size());
replicationQueue.remove(obj2);
Assert.assertEquals("Should have 1 objects",
1, replicationQueue.size());
replicationQueue.remove(obj1);
Assert.assertEquals("Should have 0 objects",
0, replicationQueue.size());
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/**
* SCM Testing and Mocking Utils.
*/
package org.apache.hadoop.ozone.container.replication;
// Test classes for Replication functionality.