diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 78f4e88c37..c21c383b9e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -246,7 +246,7 @@ public void restartStorageContainerManager() @Override public void restartOzoneManager() throws IOException { ozoneManager.stop(); - ozoneManager.start(); + ozoneManager.restart(); } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 8db443e1e5..7fde94b4bf 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -44,15 +44,14 @@ import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.security.OzoneSecurityException; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.NetUtils; @@ -100,6 +99,7 @@ import org.apache.hadoop.ozone.security.acl.RequestContext; import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager; import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -220,6 +220,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private JvmPauseMonitor jvmPauseMonitor; private final SecurityConfig secConfig; private final S3SecretManager s3SecretManager; + private volatile boolean isOmRpcServerRunning = false; private OzoneManager(OzoneConfiguration conf) throws IOException { Preconditions.checkNotNull(conf); @@ -246,38 +247,12 @@ private OzoneManager(OzoneConfiguration conf) throws IOException { scmContainerClient = null; scmBlockClient = null; } - InetSocketAddress omNodeRpcAddr = getOmAddress(configuration); - int handlerCount = configuration.getInt(OZONE_OM_HANDLER_COUNT_KEY, - OZONE_OM_HANDLER_COUNT_DEFAULT); - // This is a temporary check. Once fully implemented, all OM state change - // should go through Ratis - either standalone (for non-HA) or replicated - // (for HA). - boolean omRatisEnabled = configuration.getBoolean( - OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, - OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); - if (omRatisEnabled) { - omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId, - omNodeRpcAddr.getAddress(), configuration); - omRatisServer.start(); - - LOG.info("OzoneManager Ratis server started at port {}", - omRatisServer.getServerPort()); - - omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient( - omId, omRatisServer.getRaftGroup(), configuration); - omRatisClient.connect(); - } else { - omRatisServer = null; - omRatisClient = null; - } RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class, ProtobufRpcEngine.class); - BlockingService omService = newReflectiveBlockingService( - new OzoneManagerProtocolServerSideTranslatorPB( - this, omRatisClient, omRatisEnabled)); + omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration)); secConfig = new SecurityConfig(configuration); if (secConfig.isGrpcBlockTokenEnabled()) { @@ -286,10 +261,8 @@ private OzoneManager(OzoneConfiguration conf) throws IOException { if(secConfig.isSecurityEnabled()){ delegationTokenMgr = createDelegationTokenSecretManager(configuration); } - - omRpcServer = startRpcServer(configuration, omNodeRpcAddr, - OzoneManagerProtocolPB.class, omService, - handlerCount); + InetSocketAddress omNodeRpcAddr = getOmAddress(configuration); + omRpcServer = getRpcServer(conf); omRpcAddress = updateRPCListenAddress(configuration, OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer); metadataManager = new OmMetadataManagerImpl(configuration); @@ -848,6 +821,53 @@ public void start() throws IOException { keyManager.start(configuration); omRpcServer.start(); + isOmRpcServerRunning = true; + try { + httpServer = new OzoneManagerHttpServer(configuration, this); + httpServer.start(); + } catch (Exception ex) { + // Allow OM to start as Http Server failure is not fatal. + LOG.error("OM HttpServer failed to start.", ex); + } + registerMXBean(); + setStartTime(); + } + + /** + * Restarts the service. This method re-initializes the rpc server. + */ + public void restart() throws IOException { + LOG.info(buildRpcServerStartMessage("OzoneManager RPC server", + omRpcAddress)); + + + DefaultMetricsSystem.initialize("OzoneManager"); + + metadataManager.start(configuration); + startSecretManagerIfNecessary(); + + // Set metrics and start metrics back ground thread + metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager + .getVolumeTable())); + metrics.setNumBuckets(metadataManager.countRowsInTable(metadataManager + .getBucketTable())); + + if (getMetricsStorageFile().exists()) { + OmMetricsInfo metricsInfo = READER.readValue(getMetricsStorageFile()); + metrics.setNumKeys(metricsInfo.getNumKeys()); + } + + // Schedule save metrics + long period = configuration.getTimeDuration(OZONE_OM_METRICS_SAVE_INTERVAL, + OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + scheduleOMMetricsWriteTask = new ScheduleOMMetricsWriteTask(); + metricsTimer = new Timer(); + metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period); + + keyManager.start(configuration); + omRpcServer = getRpcServer(configuration); + omRpcServer.start(); + isOmRpcServerRunning = true; try { httpServer = new OzoneManagerHttpServer(configuration, this); httpServer.start(); @@ -864,6 +884,51 @@ public void start() throws IOException { setStartTime(); } + /** + * Creates a new instance of rpc server. If an earlier instance is already + * running then returns the same. + */ + private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException { + if (isOmRpcServerRunning) { + return omRpcServer; + } + + InetSocketAddress omNodeRpcAddr = getOmAddress(configuration); + // This is a temporary check. Once fully implemented, all OM state change + // should go through Ratis - either standalone (for non-HA) or replicated + // (for HA). + boolean omRatisEnabled = configuration.getBoolean( + OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, + OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); + if (omRatisEnabled) { + omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId, + omNodeRpcAddr.getAddress(), configuration); + omRatisServer.start(); + + LOG.info("OzoneManager Ratis server started at port {}", + omRatisServer.getServerPort()); + + omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient( + omId, omRatisServer.getRaftGroup(), configuration); + omRatisClient.connect(); + } else { + omRatisServer = null; + omRatisClient = null; + } + + final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY, + OZONE_OM_HANDLER_COUNT_DEFAULT); + RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class, + ProtobufRpcEngine.class); + + BlockingService omService = newReflectiveBlockingService( + new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisClient, + omRatisEnabled)); + return startRpcServer(configuration, omNodeRpcAddr, + OzoneManagerProtocolPB.class, omService, + handlerCount); + } + /** * Stop service. */ @@ -879,6 +944,7 @@ public void stop() { if (omRatisServer != null) { omRatisServer.stop(); } + isOmRpcServerRunning = false; keyManager.stop(); stopSecretManager(); httpServer.stop();