WAL
implicit private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
private def getLogWriter(currentTime: Long): FileBasedWriteAheadLogWriter = synchronized {
if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
resetWriter()
currentLogPath.foreach {
pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
}
currentLogWriterStartTime = currentTime
currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
val newLogPath = new Path(logDirectory,
timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
currentLogPath = Some(newLogPath.toString)
currentLogWriter = new FileBasedWriteAheadLogWriter(currentLogPath.get, hadoopConf)
}
currentLogWriter
}
def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
assertOpen()
data.rewind()
val lengthToWrite = data.remaining()
val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
stream.writeInt(lengthToWrite)
if (data.hasArray) {
stream.write(data.array())
} else {
while (data.hasRemaining) {
val array = new Array[Byte](data.remaining)
data.get(array)
stream.write(array)
}
}
flush()
nextOffset = stream.getPos()
segment
}
def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
def deleteFiles() {
oldLogFiles.foreach { logInfo =>
try {
val path = new Path(logInfo.path)
val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
fs.delete(path, true)
synchronized { pastLogs -= logInfo }
logDebug(s"Cleared log file $logInfo")
} catch {
case ex: Exception =>
logWarning(s"Error clearing write ahead log file $logInfo", ex)
}
}
logInfo(s"Cleared log files in $logDirectory older than $threshTime")
}
if (!executionContext.isShutdown) {
val f = Future { deleteFiles() }
if (waitForCompletion) {
import scala.concurrent.duration._
Await.ready(f, 1 second)
}
}
}