python开源   Django   Python   DjangoApp   pycharm  
docker   Elasticsearch  
aigc   chatgpt  
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
反馈   公告   社区推广  
Py学习  »  Elasticsearch


go4it • 4 年前 • 291 次点击  
阅读 15





 * Interface for an object that can be incremented, breaking after some
 * configured limit has been reached.
public interface CircuitBreaker {

     * The parent breaker is a sum of all the following breakers combined. With
     * this we allow a single breaker to have a significant amount of memory
     * available while still having a "total" limit for all breakers. Note that
     * it's not a "real" breaker in that it cannot be added to or subtracted
     * from by itself.
    String PARENT = "parent";
     * The fielddata breaker tracks data used for fielddata (on fields) as well
     * as the id cached used for parent/child queries.
    String FIELDDATA = "fielddata";
     * The request breaker tracks memory used for particular requests. This
     * includes allocations for things like the cardinality aggregation, and
     * accounting for the number of buckets used in an aggregation request.
     * Generally the amounts added to this breaker are released after a request
     * is finished.
    String REQUEST = "request";
     * The in-flight request breaker tracks bytes allocated for reading and
     * writing requests on the network layer.
    String IN_FLIGHT_REQUESTS = "in_flight_requests";
     * The accounting breaker tracks things held in memory that is independent
     * of the request lifecycle. This includes memory used by Lucene for
     * segments.
    String ACCOUNTING = "accounting";

    enum Type {
        // A regular or ChildMemoryCircuitBreaker
        // A special parent-type for the hierarchy breaker service
        // A breaker where every action is a noop, it never breaks

        public static Type parseValue(String value) {
            switch(value.toLowerCase(Locale.ROOT)) {
                case "noop":
                    return Type.NOOP;
                case "parent":
                    return Type.PARENT;
                case "memory":
                    return Type.MEMORY;
                    throw new IllegalArgumentException("No CircuitBreaker with type: " + value);

    enum Durability {
        // The condition that tripped the circuit breaker fixes itself eventually.
        // The condition that tripped the circuit breaker requires manual intervention.

     * Trip the circuit breaker
     * @param fieldName name of the field responsible for tripping the breaker
     * @param bytesNeeded bytes asked for but unable to be allocated
    void circuitBreak(String fieldName, long bytesNeeded);

     * add bytes to the breaker and maybe trip
     * @param bytes number of bytes to add
     * @param label string label describing the bytes being added
     * @return the number of "used" bytes for the circuit breaker
    double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;

     * Adjust the circuit breaker without tripping
    long addWithoutBreaking(long bytes);

     * @return the currently used bytes the breaker is tracking
    long getUsed();

     * @return maximum number of bytes the circuit breaker can track before tripping
    long getLimit();

     * @return overhead of circuit breaker
    double getOverhead();

     * @return the number of times the circuit breaker has been tripped
    long getTrippedCount();

     * @return the name of the breaker
    String getName();

     * @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
    Durability getDurability();
  • CircuitBreaker定义了Type、Durability枚举;它还定义了circuitBreak、addEstimateBytesAndMaybeBreak、addWithoutBreaking、getUsed、getLimit、getOverhead、getTrippedCount等方法;它有两个实现类分别是NoopCircuitBreaker、ChildMemoryCircuitBreaker



public class NoopCircuitBreaker implements CircuitBreaker {
    public static final int LIMIT = -1;

    private final String name;

    public NoopCircuitBreaker(String name) {
        this.name = name;

    public void circuitBreak(String fieldName, long bytesNeeded) {
        // noop

    public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
        return 0;

    public long addWithoutBreaking(long bytes) {
        return 0;

    public long getUsed() {
        return 0;

    public long getLimit() {
        return LIMIT;

    public double getOverhead() {
        return 0;

    public long getTrippedCount() {
        return 0;

    public String getName() {
        return this.name;

    public Durability getDurability() {
        return Durability.PERMANENT;
  • NoopCircuitBreaker实现了CircuitBreaker接口,它不做任何操作



public class ChildMemoryCircuitBreaker implements CircuitBreaker {

    private final long memoryBytesLimit;
    private final double overheadConstant;
    private final Durability durability;
    private final AtomicLong used;
    private final AtomicLong trippedCount;
    private final Logger logger;
    private final HierarchyCircuitBreakerService parent;
    private final String name;

     * Create a circuit breaker that will break if

 the number of estimated
     * bytes grows above the limit. All estimations will be multiplied by
     * the given overheadConstant. This breaker starts with 0 bytes used.
     * @param settings settings to configure this breaker
     * @param parent parent circuit breaker service to delegate tripped breakers to
     * @param name the name of the breaker
    public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger,
                                     HierarchyCircuitBreakerService parent, String name) {
        this(settings, null, logger, parent, name);

     * Create a circuit breaker that will break if the number of estimated
     * bytes grows above the limit. All estimations will be multiplied by
     * the given overheadConstant. Uses the given oldBreaker to initialize
     * the starting offset.
     * @param settings settings to configure this breaker
     * @param parent parent circuit breaker service to delegate tripped breakers to
     * @param name the name of the breaker
     * @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset)
    public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker,
                                     Logger logger, HierarchyCircuitBreakerService parent, String name) {
        this.name = name;
        this.memoryBytesLimit = settings.getLimit();
        this.overheadConstant = settings.getOverhead();
        this.durability = settings.getDurability();
        if (oldBreaker == null) {
            this.used = new AtomicLong(0);
            this.trippedCount = new AtomicLong(0);
        } else {
            this.used = oldBreaker.used;
            this.trippedCount = oldBreaker.trippedCount;
        this.logger = logger;
        if (logger.isTraceEnabled()) {
            logger.trace("creating ChildCircuitBreaker with settings {}", settings);
        this.parent = parent;

     * Method used to trip the breaker, delegates to the parent to determine
     * whether to trip the breaker or not
    public void circuitBreak(String fieldName, long bytesNeeded) {
        final String message = "[" + this.name + "] Data too large, data for [" + fieldName + "]" +
                " would be [" + bytesNeeded + "/" + new ByteSizeValue(bytesNeeded) + "]" +
                ", which is larger than the limit of [" +
                memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]";
        logger.debug("{}", message);
        throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, durability);

     * Add a number of bytes, tripping the circuit breaker if the aggregated
     * estimates are above the limit. Automatically trips the breaker if the
     * memory limit is set to 0. Will never trip the breaker if the limit is
     * set < 0, but can still be used to aggregate estimations.
     * @param bytes number of bytes to add to the breaker
     * @return number of "used" bytes so far
    public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
        // short-circuit on no data allowed, immediately throwing an exception
        if (memoryBytesLimit == 0) {
            circuitBreak(label, bytes);

        long newUsed;
        // If there is no limit (-1), we can optimize a bit by using
        // .addAndGet() instead of looping (because we don't have to check a
        // limit), which makes the RamAccountingTermsEnum case faster.
        if (this.memoryBytesLimit == -1) {
            newUsed = noLimit(bytes, label);
        } else {
            newUsed = limit(bytes, label);

        // Additionally, we need to check that we haven't exceeded the parent's limit
        try {
            parent.checkParentLimit((long) (bytes * overheadConstant), label);
        } catch (CircuitBreakingException e) {
            // If the parent breaker is tripped, this breaker has to be
            // adjusted back down because the allocation is "blocked" but the
            // breaker has already been incremented
            throw e;
        return newUsed;

    private long noLimit(long bytes, String label) {
        long newUsed;
        newUsed = this.used.addAndGet(bytes);
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
                this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));
        return newUsed;

    private long limit(long bytes, String label) {
        long newUsed;// Otherwise, check the addition and commit the addition, looping if
        // there are conflicts. May result in additional logging, but it's
        // trace logging and shouldn't be counted on for additions.
        long currentUsed;
        do {
            currentUsed = this.used.get();
            newUsed = currentUsed + bytes;
            long newUsedWithOverhead = (long) (newUsed * overheadConstant);
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
                        new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed),
                        memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),
                        newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
            if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
                logger.warn("[{}] New used memory {} [{}] for data of [{}] would be larger than configured breaker: {} [{}], breaking",
                        newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label,
                        memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));
                circuitBreak(label, newUsedWithOverhead);
            // Attempt to set the new used value, but make sure it hasn't changed
            // underneath us, if it has, keep trying until we are able to set it
        } while (!this.used.compareAndSet(currentUsed, newUsed));
        return newUsed;

     * Add an <b>exact</b> number of bytes, not checking for tripping the
     * circuit breaker. This bypasses the overheadConstant multiplication.
     * Also does not check with the parent breaker to see if the parent limit
     * has been exceeded.
     * @param bytes number of bytes to add to the breaker
     * @return number of "used" bytes so far
    public long addWithoutBreaking(long bytes) {
        long u = used.addAndGet(bytes);
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u);
        assert u >= 0 : "Used bytes: [" + u + "] must be >= 0";
        return u;

     * @return the number of aggregated "used" bytes so far
    public long getUsed() {
        return this.used.get();

     * @return the number of bytes that can be added before the breaker trips
    public long getLimit() {
        return this.memoryBytesLimit;

     * @return the constant multiplier the breaker uses for aggregations
    public double getOverhead() {
        return this.overheadConstant;

     * @return the number of times the breaker has been tripped
    public long getTrippedCount() {
        return this.trippedCount.get();

     * @return the name of the breaker
    public String getName() {
        return this.name;

     * @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
    public Durability getDurability

() {
        return this.durability;
  • ChildMemoryCircuitBreaker实现了CircuitBreaker接口;其circuitBreak方法会抛出CircuitBreakingException
  • addEstimateBytesAndMaybeBreak方法首先判断memoryBytesLimit,如果为0,则执行circuitBreak方法;如果为-1则调用noLimit,否则调用limit计算newUsed,没有抛出异常的话,则最后执行 parent.checkParentLimit方法
  • noLimit方法直接执行this.used.addAndGet(bytes);limit方法首先计算newUsed,然后根据overheadConstant得出newUsedWithOverhead,如果newUsedWithOverhead大于memoryBytesLimit则执行circuitBreak方法,否则将newUsed更新到this.used中


  • CircuitBreaker定义了Type、Durability枚举;它还定义了circuitBreak、addEstimateBytesAndMaybeBreak、addWithoutBreaking、getUsed、getLimit、getOverhead、getTrippedCount等方法;它有两个实现类分别是NoopCircuitBreaker、ChildMemoryCircuitBreaker
  • NoopCircuitBreaker实现了CircuitBreaker接口,它不做任何操作
  • ChildMemoryCircuitBreaker实现了CircuitBreaker接口;其circuitBreak方法会抛出CircuitBreakingException;addEstimateBytesAndMaybeBreak方法则先判断newUsed是否超出memoryBytesLimit,超出则执行circuitBreak方法,最后执行parent.checkParentLimit方法


291 次点击