HDFS-4349. Add test for reading files from BackupNode. Contributed by Konstantin Shvachko.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1427290 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
df09a85ca9
commit
7b70a688ac
@ -635,6 +635,8 @@ Release 2.0.3-alpha - Unreleased
|
|||||||
HDFS-4347. Avoid infinite waiting checkpoint to complete in TestBackupNode.
|
HDFS-4347. Avoid infinite waiting checkpoint to complete in TestBackupNode.
|
||||||
(Plamen Jeliazkov via shv)
|
(Plamen Jeliazkov via shv)
|
||||||
|
|
||||||
|
HDFS-4349. Add test for reading files from BackupNode. (shv)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-3077 SUBTASKS
|
BREAKDOWN OF HDFS-3077 SUBTASKS
|
||||||
|
|
||||||
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
|
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
|
||||||
|
@ -21,6 +21,8 @@
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
@ -41,6 +43,7 @@
|
|||||||
* int, int, byte[])
|
* int, int, byte[])
|
||||||
*/
|
*/
|
||||||
class EditLogBackupOutputStream extends EditLogOutputStream {
|
class EditLogBackupOutputStream extends EditLogOutputStream {
|
||||||
|
private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
|
||||||
static int DEFAULT_BUFFER_SIZE = 256;
|
static int DEFAULT_BUFFER_SIZE = 256;
|
||||||
|
|
||||||
private final JournalProtocol backupNode; // RPC proxy to backup node
|
private final JournalProtocol backupNode; // RPC proxy to backup node
|
||||||
@ -117,6 +120,11 @@ public void setReadyToFlush() throws IOException {
|
|||||||
protected void flushAndSync(boolean durable) throws IOException {
|
protected void flushAndSync(boolean durable) throws IOException {
|
||||||
assert out.getLength() == 0 : "Output buffer is not empty";
|
assert out.getLength() == 0 : "Output buffer is not empty";
|
||||||
|
|
||||||
|
if (doubleBuf.isFlushed()) {
|
||||||
|
LOG.info("Nothing to flush");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int numReadyTxns = doubleBuf.countReadyTxns();
|
int numReadyTxns = doubleBuf.countReadyTxns();
|
||||||
long firstTxToFlush = doubleBuf.getFirstReadyTxId();
|
long firstTxToFlush = doubleBuf.getFirstReadyTxId();
|
||||||
|
|
||||||
|
@ -417,11 +417,65 @@ void testCheckpoint(StartupOption op) throws Exception {
|
|||||||
// verify that file2 exists
|
// verify that file2 exists
|
||||||
assertTrue(fileSys.exists(file2));
|
assertTrue(fileSys.exists(file2));
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
LOG.error("Error in TestBackupNode:", e);
|
LOG.error("Error in TestBackupNode: ", e);
|
||||||
assertTrue(e.getLocalizedMessage(), false);
|
assertTrue(e.getLocalizedMessage(), false);
|
||||||
} finally {
|
} finally {
|
||||||
fileSys.close();
|
fileSys.close();
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that a file can be read both from NameNode and BackupNode.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCanReadData() throws IOException {
|
||||||
|
Path file1 = new Path("/fileToRead.dat");
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
FileSystem fileSys = null;
|
||||||
|
BackupNode backup = null;
|
||||||
|
try {
|
||||||
|
// Start NameNode and BackupNode
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(0).format(true).build();
|
||||||
|
fileSys = cluster.getFileSystem();
|
||||||
|
long txid = cluster.getNameNodeRpc().getTransactionID();
|
||||||
|
backup = startBackupNode(conf, StartupOption.BACKUP, 1);
|
||||||
|
waitCheckpointDone(cluster, txid);
|
||||||
|
|
||||||
|
// Setup dual NameNode configuration for DataNodes
|
||||||
|
String rpcAddrKeyPreffix =
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + ".bnCluster";
|
||||||
|
String nnAddr = cluster.getNameNode().getNameNodeAddressHostPortString();
|
||||||
|
conf.get(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||||
|
String bnAddr = backup.getNameNodeAddressHostPortString();
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMESERVICES, "bnCluster");
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, "bnCluster");
|
||||||
|
conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".bnCluster",
|
||||||
|
"nnActive, nnBackup");
|
||||||
|
conf.set(rpcAddrKeyPreffix + ".nnActive", nnAddr);
|
||||||
|
conf.set(rpcAddrKeyPreffix + ".nnBackup", bnAddr);
|
||||||
|
cluster.startDataNodes(conf, 3, true, StartupOption.REGULAR, null);
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(
|
||||||
|
fileSys, file1, fileSize, fileSize, blockSize, (short)3, seed);
|
||||||
|
|
||||||
|
// Read the same file from file systems pointing to NN and BN
|
||||||
|
FileSystem bnFS = FileSystem.get(
|
||||||
|
new Path("hdfs://" + bnAddr).toUri(), conf);
|
||||||
|
String nnData = DFSTestUtil.readFile(fileSys, file1);
|
||||||
|
String bnData = DFSTestUtil.readFile(bnFS, file1);
|
||||||
|
assertEquals("Data read from BackupNode and NameNode is not the same.",
|
||||||
|
nnData, bnData);
|
||||||
|
} catch(IOException e) {
|
||||||
|
LOG.error("Error in TestBackupNode: ", e);
|
||||||
|
assertTrue(e.getLocalizedMessage(), false);
|
||||||
|
} finally {
|
||||||
|
if(fileSys != null) fileSys.close();
|
||||||
|
if(backup != null) backup.stop();
|
||||||
|
if(cluster != null) cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user