SimpleMemoryPoolSimpleMemoryPool 是 Kafka 客户端库中一个基础且重要的内存管理工具,它是一个简单的内存池实现,其核心功能是限制待处理(outstanding)内存的总量,防止无限制的内存分配导致程序内存溢出(OOM)。
SimpleMemoryPool 实现了 MemoryPool 接口,提供了一套标准的内存分配和释放的 API。
|
1 2 3 4 5 6 7 |
/** * a simple pool implementation. this implementation just provides a limit on the total outstanding memory. * any buffer allocated must be release()ed always otherwise memory is not marked as reclaimed (and "leak"s) */ public class SimpleMemoryPool implements MemoryPool { // ... } |
注释里明确指出了它的核心职责:
ByteBuffer 都必须在使用完毕后调用 release() 方法归还,否则这部分内存将被视为“泄漏”,永远不会被回收计入可用内存。SimpleMemoryPool 通过几个关键的原子(atomic)变量来保证线程安全和高效的内存追踪。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// ... existing code ... public class SimpleMemoryPool implements MemoryPool { protected final Logger log = LoggerFactory.getLogger(getClass()); //subclass-friendly protected final long sizeBytes; protected final boolean strict; protected final AtomicLong availableMemory; protected final int maxSingleAllocationSize; protected final AtomicLong startOfNoMemPeriod = new AtomicLong(); //nanoseconds protected volatile Sensor oomTimeSensor; // ... } |
sizeBytes: long 类型,表示这个内存池的总大小(容量),单位是字节。strict: boolean 类型,一个非常重要的标志,决定了内存分配的策略(详见 tryAllocate 方法分析)。availableMemory: AtomicLong 类型,核心状态变量,用于原子地追踪当前可用的内存大小。初始化时等于 sizeBytes。maxSingleAllocationSize: int 类型,限制单次能申请的最大内存。任何超过这个大小的请求都会直接失败。startOfNoMemPeriod: AtomicLong 类型,用于记录内存开始耗尽(即无法满足分配请求)的起始时间点(纳秒),主要用于监控和度量。oomTimeSensor: Sensor 类型,Kafka Metrics 框架的一部分。当内存池从耗尽状态恢复时,用它来记录从开始耗尽到恢复所经过的时间。构造函数负责初始化内存池的配置。
|
1 2 3 4 5 6 7 8 9 10 11 12 |
// ... existing code ... public SimpleMemoryPool(long sizeInBytes, int maxSingleAllocationBytes, boolean strict, Sensor oomPeriodSensor) { if (sizeInBytes <= 0 || maxSingleAllocationBytes <= 0 || maxSingleAllocationBytes > sizeInBytes) throw new IllegalArgumentException("must provide a positive size and max single allocation size smaller than size." + "provided " + sizeInBytes + " and " + maxSingleAllocationBytes + " respectively"); this.sizeBytes = sizeInBytes; this.strict = strict; this.availableMemory = new AtomicLong(sizeInBytes); this.maxSingleAllocationSize = maxSingleAllocationBytes; this.oomTimeSensor = oomPeriodSensor; } // ... existing code ... |
它做了以下几件事:
sizeInBytes 和单次最大分配 maxSingleAllocationBytes 都为正数,并且单次最大分配不能超过总大小。availableMemory 初始化为 sizeInBytes。tryAllocate这是 SimpleMemoryPool 最核心的方法,实现了内存分配的逻辑。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// ... existing code ... @Override public ByteBuffer tryAllocate(int sizeBytes) { if (sizeBytes < 1) throw new IllegalArgumentException("requested size " + sizeBytes + "<=0"); if (sizeBytes > maxSingleAllocationSize) throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize); long available; boolean success = false; //in strict mode we will only allocate memory if we have at least the size required. //in non-strict mode we will allocate memory if we have _any_ memory available (so available memory //can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize) long threshold = strict ? sizeBytes : 1; while ((available = availableMemory.get()) >= threshold) { success = availableMemory.compareAndSet(available, available - sizeBytes); if (success) break; } if (success) { maybeRecordEndOfDrySpell(); } else { if (oomTimeSensor != null) { startOfNoMemPeriod.compareAndSet(0, System.nanoTime()); } log.trace("refused to allocate buffer of size {}", sizeBytes); return null; } ByteBuffer allocated = ByteBuffer.allocate(sizeBytes); bufferToBeReturned(allocated); return allocated; } // ... existing code ... |
其执行流程如下:
sizeBytes 是否合法(大于0且小于等于 maxSingleAllocationSize)。threshold:
strict = true): threshold 等于 sizeBytes。这意味着只有当可用内存足够满足本次请求时,才会尝试分配。strict = false): threshold 等于 1。这意味着只要还有任何一点可用内存,就会尝试分配。在这种模式下,availableMemory 可能会变成负数,总分配内存最多可能超出 sizeBytes 接近 maxSingleAllocationSize。这是一种避免饥饿的策略,允许在资源紧张时进行“超额”分配。while 循环中,首先检查当前可用内存 available 是否大于等于 threshold。compareAndSet (CAS) 原子操作尝试将 availableMemory 减去 sizeBytes。CAS操作能保证在多线程环境下只有一个线程能成功修改 availableMemory。success 置为 true 并跳出循环。如果失败,说明有其他线程抢先修改了 availableMemory,循环会继续,重新获取最新的 availableMemory 值再次尝试。maybeRecordEndOfDrySpell() 记录可能结束的OOM周期,然后通过 ByteBuffer.allocate(sizeBytes) 在JVM堆上分配一块内存,并返回。null,表示分配失败。release此方法用于将之前分配的 ByteBuffer 归还给内存池。
|
1 2 3 4 5 6 7 8 9 10 11 |
// ... existing code ... @Override public void release(ByteBuffer previouslyAllocated) { if (previouslyAllocated == null) throw new IllegalArgumentException("provided null buffer"); bufferToBeReleased(previouslyAllocated); availableMemory.addAndGet(previouslyAllocated.capacity()); maybeRecordEndOfDrySpell(); } // ... existing code ... |
逻辑很简单:
previouslyAllocated 不为 null。bufferToBeReleased 钩子方法(用于子类扩展)。addAndGet 原子地将释放的 ByteBuffer 的容量加回到 availableMemory。maybeRecordEndOfDrySpell(),因为释放内存可能使得内存池从“耗尽”状态恢复,从而结束一个OOM周期。maybeRecordEndOfDrySpell(): 这个方法是连接内存管理和监控系统的桥梁。它原子地读取并重置 startOfNoMemPeriod,如果之前的值不为0,说明一个OOM周期刚刚结束,它会计算持续时间并报告给 oomTimeSensor。bufferToBeReturned() 和 bufferToBeReleased(): 这两个 protected 方法是为子类设计的扩展点。它们在内存分配给调用者之前和内存被标记为回收之前被调用。一个很好的例子是 GarbageCollectedMemoryPool,它重写了这两个方法来追踪每个分配的 ByteBuffer 对象,以便在它们被GC回收时也能自动释放内存。SimpleMemoryPool 是一个线程安全的、基于计数器的非池化内存管理器。它本身不持有或缓存任何 ByteBuffer 对象(分配是直接 ByteBuffer.allocate,释放也只是增加计数器),它的唯一目标是确保并发环境下,所有已分配内存的总和不超过一个预设的限制。
它的设计体现了几个原则:
AtomicLong 和 CAS 操作,避免了使用重量级锁,性能很高。strict 参数,可以在“严格保证内存上限”和“尽量避免饥饿”之间做选择。Sensor 集成了 Kafka 的 Metrics 框架,可以方便地监控内存池的健康状况。在 Kafka 源码中,它常被用于需要限制内存使用但又不需要复杂缓冲池功能的场景,例如 SocketServer 中用于限制网络请求队列所占用的内存。
BatchMemoryPoolBatchMemoryPool 和 SimpleMemoryPool 虽然都实现了 MemoryPool 接口,但它们的设计哲学和应用场景有显著的不同。BatchMemoryPool 是一个真正的缓冲池,它会缓存并复用 ByteBuffer 对象,而 SimpleMemoryPool 更像一个内存分配的“记账员”,只追踪内存使用量而不持有对象。
|
1 2 3 4 5 6 7 8 9 |
/** * Simple memory pool that tries to maintain a limited number of fixed-size buffers. * * This type implements an unbounded memory pool. When releasing byte buffers they will get pooled * up to the maximum retained number of batches. */ public class BatchMemoryPool implements MemoryPool { // ... } |
从注释中可以提炼出它的核心职责:
ByteBuffer 大小都是固定的(即 batchSize)。maxRetainedBatches 个缓冲区。|
1 2 3 4 5 6 7 8 9 |
// ... existing code ... public class BatchMemoryPool implements MemoryPool { private final ReentrantLock lock; private final Deque<ByteBuffer> free; private final int maxRetainedBatches; private final int batchSize; private int numAllocatedBatches = 0; // ... existing code ... |
lock: 一个 ReentrantLock。与 SimpleMemoryPool 使用原子类(CAS)的无锁思想不同,BatchMemoryPool 采用显式的锁来保证对内部状态访问的线程安全。free: 一个 Deque<ByteBuffer> (双端队列),用作空闲缓冲区列表。这是实现缓冲区复用的核心数据结构。maxRetainedBatches: int 类型,指定了 free 队列的最大容量,即池子最多保留多少个空闲的 ByteBuffer 以备重用。batchSize: int 类型,池中每个 ByteBuffer 的固定大小。numAllocatedBatches: int 类型,一个计数器,记录了当前由该池管理的缓冲区总数,包括在 free 队列中的和已经分配出去正在使用的。|
1 2 3 4 5 6 7 8 |
// ... existing code ... public BatchMemoryPool(int maxRetainedBatches, int batchSize) { this.maxRetainedBatches = maxRetainedBatches; this.batchSize = batchSize; this.free = new ArrayDeque<>(maxRetainedBatches); this.lock = new ReentrantLock(); } // ... existing code ... |
构造函数非常直接,就是初始化上述的几个核心属性。
tryAllocate这是 BatchMemoryPool 的分配逻辑,与 SimpleMemoryPool 的行为差异巨大。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
// ... existing code ... @Override public ByteBuffer tryAllocate(int sizeBytes) { if (sizeBytes > batchSize) { throw new IllegalArgumentException("Cannot allocate buffers larger than max " + "batch size of " + batchSize); } lock.lock(); try { ByteBuffer buffer = free.poll(); // Always allocation a new buffer if there are no free buffers if (buffer == null) { buffer = ByteBuffer.allocate(batchSize); numAllocatedBatches += 1; } return buffer; } finally { lock.unlock(); } } // ... existing code ... |
sizeBytes 是否大于 batchSize。这个池子只能分配固定大小的 ByteBuffer,所以任何大于 batchSize 的请求都是非法的。free.poll() 尝试从空闲队列的头部取出一个 ByteBuffer。poll() 返回 null,说明空闲队列是空的。此时,它会直接创建一个新的 ByteBuffer (ByteBuffer.allocate(batchSize))。这是它“无界”特性的体现。同时,numAllocatedBatches 计数器加一,表示池管理的总缓冲区数量增加了。ByteBuffer。finally 块中确保锁被释放。关键点:此方法永远不会返回 null(除非JVM本身OOM),它总能满足分配请求。
release此方法负责回收用完的 ByteBuffer。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
// ... existing code ... @Override public void release(ByteBuffer previouslyAllocated) { lock.lock(); try { previouslyAllocated.clear(); if (previouslyAllocated.capacity() != batchSize) { throw new IllegalArgumentException("Released buffer with unexpected size " + previouslyAllocated.capacity()); } // Free the buffer if the number of pooled buffers is already the maximum number of batches. // Otherwise return the buffer to the memory pool. if (free.size() >= maxRetainedBatches) { numAllocatedBatches--; } else { free.offer(previouslyAllocated); } } finally { lock.unlock(); } } // ... existing code ... |
previouslyAllocated.clear() 重置缓冲区的 position 和 limit,为下一次使用做准备。然后校验归还的缓冲区容量是否等于 batchSize。free 的大小是否已经达到 maxRetainedBatches 的上限。ByteBuffer 放入 free 队列。它只是将 numAllocatedBatches 减一,然后这个 ByteBuffer 对象在方法结束后就没有任何引用了,会被JVM垃圾回收。free.offer() 将这个 ByteBuffer 加入到空闲队列的尾部,以备后续复用。finally 块中释放锁。MemoryPool 接口的其他实现size(): 返回 numAllocatedBatches * batchSize,即当前池管理的所有缓冲区(无论空闲还是在用)的总内存大小。availableMemory(): 恒定返回 Long.MAX_VALUE。这明确地告诉调用者,这个池子在逻辑上永远有可用内存,分配请求不会因为容量限制而被阻塞或拒绝。isOutOfMemory(): 恒定返回 false。与 availableMemory() 的行为一致。BatchMemoryPool 是一个为特定场景优化的内存池,主要用在 Kafka Raft 内部的 BatchAccumulator 中,用于缓存待发送的日志批次。
与 SimpleMemoryPool 的对比:
| 特性 | SimpleMemoryPool | BatchMemoryPool |
|---|---|---|
| 核心机制 | 基于计数的内存限制器 | 基于队列的 ByteBuffer 对象池 |
| 内存限制 | 有严格的、可配置的总内存上限 (sizeBytes) |
逻辑上无界(总能分配),但池化的空闲对象数量有上限 |
| 分配行为 | 内存不足时返回 null |
总是成功分配(池中取或新建),不返回 null |
| 对象复用 | 不复用 ByteBuffer 对象,仅增减计数 |
复用 ByteBuffer 对象以减少 GC |
| 线程安全 | CAS 原子操作 (无锁) | ReentrantLock (高并发有锁,低并发AQS相当于CAS) |
| 适用场景 | 对总内存使用有严格控制的场景,如网络请求队列 | 需要频繁分配和释放固定大小缓冲区的场景,以减少 GC 开销 |
总而言之,BatchMemoryPool 通过复用固定大小的 ByteBuffer,有效地减少了在处理 Raft 日志批次时因频繁创建和销毁对象而带来的GC压力,是典型的以空间换时间(减少GC时间)的优化策略。
GarbageCollectedMemoryPool首先,从类的声明 public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements AutoCloseable 我们可以得到两个关键信息:
SimpleMemoryPool,这意味着它拥有 SimpleMemoryPool 所有的基础功能,即一个带限额的内存分配器。AutoCloseable 接口,这意味着它管理着需要显式关闭的资源,并且可以被用在 try-with-resources 语句中。正如类注释所说,这个类是 SimpleMemoryPool 的一个扩展,其主要目的是追踪已分配的缓冲区,并在它们“泄漏”(即在没有被 release() 的情况下被垃圾回收)时记录错误日志。注释也明确指出:这是一个用于开发和调试的辅助工具,不应该在生产环境中使用。
下面我们来逐一解析它的实现细节。
GarbageCollectedMemoryPool 的核心是利用了 Java 的弱引用(WeakReference)和引用队列(ReferenceQueue)机制来监控 ByteBuffer 对象是否被垃圾回收。
|
1 2 3 4 5 6 7 8 9 10 |
// ... existing code ... public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements AutoCloseable { private final ReferenceQueue<ByteBuffer> garbageCollectedBuffers = new ReferenceQueue<>(); //serves 2 purposes - 1st it maintains the ref objects reachable (which is a requirement for them //to ever be enqueued), 2nd keeps some (small) metadata for every buffer allocated private final Map<BufferReference, BufferMetadata> buffersInFlight = new ConcurrentHashMap<>(); private final Thread gcListenerThread; private volatile boolean alive = true; // ... existing code ... |
garbageCollectedBuffers: 一个 ReferenceQueue。当一个被弱引用关联的对象被GC回收时,这个弱引用对象本身会被放入这个队列中。buffersInFlight: 一个 ConcurrentHashMap,用于存储所有“在途”(已分配但未释放)的缓冲区信息。
BufferReference,一个自定义的 WeakReference 子类。BufferMetadata,一个简单的内部类,只记录了缓冲区的大小。 这个 Map 有两个作用:1) 保持 BufferReference 对象本身是强可达的,这样它们才有机会在关联的 ByteBuffer 被回收时进入引用队列;2) 存储每个已分配缓冲区的元数据。gcListenerThread: 一个后台守护线程,专门负责监听 garbageCollectedBuffers 队列。alive: 一个 volatile 布尔值,用于控制后台线程的生命周期。|
1 2 3 4 5 6 7 8 9 |
// ... existing code ... public GarbageCollectedMemoryPool(long sizeBytes, int maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) { super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor); GarbageCollectionListener gcListener = new GarbageCollectionListener(); this.gcListenerThread = new Thread(gcListener, "memory pool GC listener"); this.gcListenerThread.setDaemon(true); //so we dont need to worry about shutdown this.gcListenerThread.start(); } // ... existing code ... |
构造函数在调用父类 SimpleMemoryPool 的构造函数之后,创建并启动了一个名为 memory pool GC listener 的后台线程。这个线程被设置为守护线程 (setDaemon(true)),这意味着如果JVM中只剩下守护线程,JVM就会退出,我们不需要手动管理它的关闭(尽管它也提供了 close 方法)。
GarbageCollectedMemoryPool 通过重写父类的两个 protected 钩子方法 bufferToBeReturned 和 bufferToBeReleased 来注入其追踪逻辑。
bufferToBeReturned(ByteBuffer justAllocated)当一个缓冲区被分配后,在返回给调用者之前,这个方法会被调用。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// ... existing code ... @Override protected void bufferToBeReturned(ByteBuffer justAllocated) { BufferReference ref = new BufferReference(justAllocated, garbageCollectedBuffers); BufferMetadata metadata = new BufferMetadata(justAllocated.capacity()); if (buffersInFlight.put(ref, metadata) != null) //this is a bug. it means either 2 different co-existing buffers got //the same identity or we failed to register a released/GC'ed buffer throw new IllegalStateException("allocated buffer identity " + ref.hashCode + " already registered as in use?!"); log.trace("allocated buffer of size {} and identity {}", sizeBytes, ref.hashCode); } // ... existing code ... |
BufferReference (弱引用),将新分配的 justAllocated 缓冲区与 garbageCollectedBuffers 引用队列关联起来。BufferMetadata 来存储缓冲区的大小。(ref, metadata) 存入 buffersInFlight 这个 Map 中,进行“登记”。如果 put 方法返回了非 null 值,说明发生了逻辑错误,抛出异常。bufferToBeReleased(ByteBuffer justReleased)当调用者调用 release() 方法归还缓冲区时,在父类将内存加回 availableMemory 之前,这个方法会被调用。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// ... existing code ... @Override protected void bufferToBeReleased(ByteBuffer justReleased) { BufferReference ref = new BufferReference(justReleased); //used ro lookup only BufferMetadata metadata = buffersInFlight.remove(ref); if (metadata == null) //its impossible for the buffer to have already been GC'ed (because we have a hard ref to it //in the function arg) so this means either a double free or not our buffer. throw new IllegalArgumentException("returned buffer " + ref.hashCode + " was never allocated by this pool"); if (metadata.sizeBytes != justReleased.capacity()) { //this is a bug throw new IllegalStateException("buffer " + ref.hashCode + " has capacity " + justReleased.capacity() + " but recorded as " + metadata.sizeBytes); } log.trace("released buffer of size {} and identity {}", metadata.sizeBytes, ref.hashCode); } // ... existing code ... |
BufferReference,仅用于在 Map 中进行查找。buffersInFlight 中移除对应的条目,完成“注销”。remove 返回 null,说明这个缓冲区要么不是从这个池子分配的,要么被重复释放了(double free),抛出异常。GarbageCollectionListener 是实现泄漏检测的核心。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
// ... existing code ... private class GarbageCollectionListener implements Runnable { @Override public void run() { while (alive) { try { BufferReference ref = (BufferReference) garbageCollectedBuffers.remove(); //blocks ref.clear(); // ... BufferMetadata metadata = buffersInFlight.remove(ref); if (metadata == null) { // ... continue; } availableMemory.addAndGet(metadata.sizeBytes); log.error("Reclaimed buffer of size {} and identity {} that was not properly release()ed. This is a bug.", metadata.sizeBytes, ref.hashCode); } catch (InterruptedException e) { // ... } } log.info("GC listener shutting down"); } } // ... existing code ... |
while(alive) 循环中运行。garbageCollectedBuffers.remove(),这是一个阻塞方法。它会一直等待,直到有一个与弱引用关联的 ByteBuffer 对象被GC回收,然后该弱引用对象 BufferReference 会被放入队列,remove() 方法返回这个 BufferReference。ref,它会尝试从 buffersInFlight 中移除这个 ref。metadata 不为 null,这意味着:一个缓冲区在没有被正常 release()(release 会把它从 buffersInFlight 中移除)的情况下,就被GC回收了。这就是内存泄漏!availableMemory 中,防止这部分内存永远“丢失”。ERROR 级别的日志,报告发生了内存泄漏,并指明泄漏的缓冲区大小和标识。metadata 为 null,这通常是正常情况(缓冲区被 release 后,过了一段时间才被GC),但注释中也提到了一种罕见情况:由于 ConcurrentHashMap 的懒清理机制,可能一个被正常 release 的缓冲区的引用在被移除后,仍然被GC线程捕捉到并入队。这种情况直接 continue 忽略。close() 方法被调用,alive 变为 false,gcListenerThread 被中断,循环退出,线程结束。GarbageCollectedMemoryPool 是一个非常巧妙的调试工具。它通过继承 SimpleMemoryPool 复用了内存额度控制的逻辑,然后通过重写钩子方法和结合Java的引用机制,增加了一层“内存泄漏”的监控。
tryAllocate() -> bufferToBeReturned (登记) -> 使用 -> release() -> bufferToBeReleased (注销) -> ByteBuffer 失去引用 -> 被GC。tryAllocate() -> bufferToBeReturned (登记) -> 使用 -> 忘记调用 release() -> ByteBuffer 失去引用 -> 被GC -> BufferReference 进入队列 -> 后台线程检测到,并从 buffersInFlight 中找到了登记信息 -> 报告错误。
from:https://blog.csdn.net/lifallen/article/details/149510781