WAL

//FileBasedWriteAheadLog
//定义了执行上下文,一个线程哈
implicit private val executionContext = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))


private def getLogWriter(currentTime: Long): FileBasedWriteAheadLogWriter = synchronized {
    if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
      resetWriter()
      currentLogPath.foreach {
        pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
      }
      currentLogWriterStartTime = currentTime
      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
      //按时间的文件
      val newLogPath = new Path(logDirectory,
        timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
      currentLogPath = Some(newLogPath.toString)
      currentLogWriter = new FileBasedWriteAheadLogWriter(currentLogPath.get, hadoopConf)
    }
    currentLogWriter
  }

  //FileBaseWriteAheadLogWriter
  def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
      assertOpen()
      data.rewind() // Rewind to ensure all data in the buffer is retrieved
      val lengthToWrite = data.remaining()
      //写了多少和位置形成Segment
      val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
      //先写长度
      stream.writeInt(lengthToWrite)
      if (data.hasArray) {
        stream.write(data.array())
      } else {
        // If the buffer is not backed by an array, we transfer using temp array
        // Note that despite the extra array copy, this should be faster than byte-by-byte copy
        while (data.hasRemaining) {
          val array = new Array[Byte](data.remaining)
          data.get(array)
          //再写内容
          stream.write(array)
        }
      }
      flush()
      nextOffset = stream.getPos()
      segment
    }

    //jobGenerator.clearMetaData的时候会清除旧的
    def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
        //按时间过滤
        val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }

        def deleteFiles() {
          oldLogFiles.foreach { logInfo =>
            try {
              val path = new Path(logInfo.path)
              val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
              fs.delete(path, true)
              synchronized { pastLogs -= logInfo }
              logDebug(s"Cleared log file $logInfo")
            } catch {
              case ex: Exception =>
                logWarning(s"Error clearing write ahead log file $logInfo", ex)
            }
          }
          logInfo(s"Cleared log files in $logDirectory older than $threshTime")
        }
        if (!executionContext.isShutdown) {
          val f = Future { deleteFiles() }
          if (waitForCompletion) {
            import scala.concurrent.duration._
            //等一秒的意义?不过spark里用的都是waitForCompletion=false
            Await.ready(f, 1 second)
          }
        }
      }

results matching ""

    No results matching ""