publicinterfaceDBextendsIterable<Map.Entry<byte[],byte[]>>,Closeable{byte[]get(byte[]key)throwsDBException;byte[]get(byte[]key,ReadOptionsoptions)throwsDBException;@OverrideDBIteratoriterator();DBIteratoriterator(ReadOptionsoptions);voidput(byte[]key,byte[]value)throwsDBException;voiddelete(byte[]key)throwsDBException;voidwrite(WriteBatchupdates)throwsDBException;WriteBatchcreateWriteBatch();/** * @return null if options.isSnapshot()==false otherwise returns a snapshot * of the DB after this operation. */Snapshotput(byte[]key,byte[]value,WriteOptionsoptions)throwsDBException;/** * @return null if options.isSnapshot()==false otherwise returns a snapshot * of the DB after this operation. */Snapshotdelete(byte[]key,WriteOptionsoptions)throwsDBException;/** * @return null if options.isSnapshot()==false otherwise returns a snapshot * of the DB after this operation. */Snapshotwrite(WriteBatchupdates,WriteOptionsoptions)throwsDBException;SnapshotgetSnapshot();}
publicinterfaceDBIteratorextendsIterator<Map.Entry<byte[],byte[]>>,Closeable{/** * Repositions the iterator so the key of the next BlockElement * returned greater than or equal to the specified targetKey. */voidseek(byte[]key);/** * Repositions the iterator so is is at the beginning of the Database. */voidseekToFirst();/** * Returns the next element in the iteration, without advancing the iteration. */Map.Entry<byte[],byte[]>peekNext();}
privatevoidmakeRoomForWrite(booleanforce){//检查当前线程是否持有独占锁来操作memotablecheckState(mutex.isHeldByCurrentThread());booleanallowDelay=!force;while(true){//case1:如果需要压缩,并且允许延迟,则释放锁并sleep1s,让出cpu使得memtable被快速压缩if(allowDelay&&versions.numberOfFilesInLevel(0)>L0_SLOWDOWN_WRITES_TRIGGER){try{mutex.unlock();Thread.sleep(1);}catch(InterruptedExceptione){Thread.currentThread().interrupt();thrownewRuntimeException(e);}finally{mutex.lock();}// Do not delay a single write more than onceallowDelay=false;}//case2:当前的memtable都没满elseif(!force&&memTable.approximateMemoryUsage()<=options.writeBufferSize()){// There is room in current memtablebreak;}//case3:immutable memtable还没有压缩释放elseif(immutableMemTable!=null){// We have filled up the current memtable, but the previous// one is still being compacted, so we wait.backgroundCondition.awaitUninterruptibly();}//case4:迫切需要压缩,则当前线程处于阻塞在条件队列中,等待其他线程将其唤醒。elseif(versions.numberOfFilesInLevel(0)>=L0_STOP_WRITES_TRIGGER){// There are too many level-0 files.// Log(options_.info_log, "waiting...\n");backgroundCondition.awaitUninterruptibly();}else{//case5:创建一个新的memtable,将旧的进行压缩// Attempt to switch to a new memtable and trigger compaction of oldcheckState(versions.getPrevLogNumber()==0);// close the existing logtry{log.close();}catch(IOExceptione){thrownewRuntimeException("Unable to close log file "+log.getFile(),e);}// open a new loglonglogNumber=versions.getNextFileNumber();try{this.log=Logs.createLogWriter(newFile(databaseDir,Filename.logFileName(logNumber)),logNumber);}catch(IOExceptione){thrownewRuntimeException("Unable to open new log file "+newFile(databaseDir,Filename.logFileName(logNumber)).getAbsoluteFile(),e);}// create a new mem tableimmutableMemTable=memTable;memTable=newMemTable(internalKeyComparator);// Do not force another compaction there is space availableforce=false;maybeScheduleCompaction();}}}
publicbyte[]get(byte[]key,ReadOptionsoptions)throwsDBException{//检查后台线程异常checkBackgroundException();LookupKeylookupKey;//加上独占锁,操作memtable,以及immutable tablemutex.lock();try{//根据options返回指定快照,或者创建最新快照SnapshotImplsnapshot=getSnapshot(options);//基于原始key,以及当前的lastSeq number,创建用于查找的keylookupKey=newLookupKey(Slices.wrappedBuffer(key),snapshot.getLastSequence());// First look in the memtable, then in the immutable memtable (if any).//一个DbImpl分别维护一个memtable,以及一个immutable table//首先在memo table中查看是否有对应key对应的valueLookupResultlookupResult=memTable.get(lookupKey);if(lookupResult!=null){Slicevalue=lookupResult.getValue();if(value==null){returnnull;}returnvalue.getBytes();}//然后在immutable table中查看是否有if(immutableMemTable!=null){lookupResult=immutableMemTable.get(lookupKey);if(lookupResult!=null){Slicevalue=lookupResult.getValue();if(value==null){returnnull;}returnvalue.getBytes();}}}finally{//释放独占锁,允许其他操作访问tablemutex.unlock();}//在SST文件(磁盘中查找),也就是调用versions.get(lookupkey)//这里会合并迭代器数据,将数据截止到lastVersionEnd的最新版本合并出来// Not in memTables; try live files in level orderLookupResultlookupResult=versions.get(lookupKey);// schedule compaction if necessary//在这里触发SST文件压缩,将压缩任务提交到DbImpl维护的线程池中执行//为什么要在读取的时候触发压缩,而不是写入的时候?//1. 避免阻塞写入//2. 减少后续查询开销mutex.lock();try{if(versions.needsCompaction()){maybeScheduleCompaction();}}finally{mutex.unlock();}//返回结果if(lookupResult!=null){Slicevalue=lookupResult.getValue();if(value!=null){returnvalue.getBytes();}}returnnull;}