社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

聊聊Elasticsearch的CircuitBreaker

go4it • 4 年前 • 291 次点击  
阅读 15

聊聊Elasticsearch的CircuitBreaker

本文主要研究一下Elasticsearch的CircuitBreaker

CircuitBreaker

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java

/**
 * Interface for an object that can be incremented, breaking after some
 * configured limit has been reached.
 */
public interface CircuitBreaker {

    /**
     * The parent breaker is a sum of all the following breakers combined. With
     * this we allow a single breaker to have a significant amount of memory
     * available while still having a "total" limit for all breakers. Note that
     * it's not a "real" breaker in that it cannot be added to or subtracted
     * from by itself.
     */
    String PARENT = "parent";
    /**
     * The fielddata breaker tracks data used for fielddata (on fields) as well
     * as the id cached used for parent/child queries.
     */
    String FIELDDATA = "fielddata";
    /**
     * The request breaker tracks memory used for particular requests. This
     * includes allocations for things like the cardinality aggregation, and
     * accounting for the number of buckets used in an aggregation request.
     * Generally the amounts added to this breaker are released after a request
     * is finished.
     */
    String REQUEST = "request";
    /**
     * The in-flight request breaker tracks bytes allocated for reading and
     * writing requests on the network layer.
     */
    String IN_FLIGHT_REQUESTS = "in_flight_requests";
    /**
     * The accounting breaker tracks things held in memory that is independent
     * of the request lifecycle. This includes memory used by Lucene for
     * segments.
     */
    String ACCOUNTING = "accounting";

    enum Type {
        // A regular or ChildMemoryCircuitBreaker
        MEMORY,
        // A special parent-type for the hierarchy breaker service
        PARENT,
        // A breaker where every action is a noop, it never breaks
        NOOP;

        public static Type parseValue(String value) {
            switch(value.toLowerCase(Locale.ROOT)) {
                case "noop":
                    return Type.NOOP;
                case "parent":
                    return Type.PARENT;
                case "memory":
                    return Type.MEMORY;
                default:
                    throw new IllegalArgumentException("No CircuitBreaker with type: " + value);
            }
        }
    }

    enum Durability {
        // The condition that tripped the circuit breaker fixes itself eventually.
        TRANSIENT,
        // The condition that tripped the circuit breaker requires manual intervention.
        PERMANENT
    }

    /**
     * Trip the circuit breaker
     * @param fieldName name of the field responsible for tripping the breaker
     * @param bytesNeeded bytes asked for but unable to be allocated
     */
    void circuitBreak(String fieldName, long bytesNeeded);

    /**
     * add bytes to the breaker and maybe trip
     * @param bytes number of bytes to add
     * @param label string label describing the bytes being added
     * @return the number of "used" bytes for the circuit breaker
     */
    double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;

    /**
     * Adjust the circuit breaker without tripping
     */
    long addWithoutBreaking(long bytes);

    /**
     * @return the currently used bytes the breaker is tracking
     */
    long getUsed();

    /**
     * @return maximum number of bytes the circuit breaker can track before tripping
     */
    long getLimit();

    /**
     * @return overhead of circuit breaker
     */
    double getOverhead();

    /**
     * @return the number of times the circuit breaker has been tripped
     */
    long getTrippedCount();

    /**
     * @return the name of the breaker
     */
    String getName();

    /**
     * @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
     */
    Durability getDurability();
}
复制代码
  • CircuitBreaker定义了Type、Durability枚举;它还定义了circuitBreak、addEstimateBytesAndMaybeBreak、addWithoutBreaking、getUsed、getLimit、getOverhead、getTrippedCount等方法;它有两个实现类分别是NoopCircuitBreaker、ChildMemoryCircuitBreaker

NoopCircuitBreaker

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java

public class NoopCircuitBreaker implements CircuitBreaker {
    public static final int LIMIT = -1;

    private final String name;

    public NoopCircuitBreaker(String name) {
        this.name = name;
    }

    @Override
    public void circuitBreak(String fieldName, long bytesNeeded) {
        // noop
    }

    @Override
    public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
        return 0;
    }

    @Override
    public long addWithoutBreaking(long bytes) {
        return 0;
    }

    @Override
    public long getUsed() {
        return 0;
    }

    @Override
    public long getLimit() {
        return LIMIT;
    }

    @Override
    public double getOverhead() {
        return 0;
    }

    @Override
    public long getTrippedCount() {
        return 0;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public Durability getDurability() {
        return Durability.PERMANENT;
    }
}
复制代码
  • NoopCircuitBreaker实现了CircuitBreaker接口,它不做任何操作

ChildMemoryCircuitBreaker

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java

public class ChildMemoryCircuitBreaker implements CircuitBreaker {

    private final long memoryBytesLimit;
    private final double overheadConstant;
    private final Durability durability;
    private final AtomicLong used;
    private final AtomicLong trippedCount;
    private final Logger logger;
    private final HierarchyCircuitBreakerService parent;
    private final String name;

    /**
     * Create a circuit breaker that will break if


    
 the number of estimated
     * bytes grows above the limit. All estimations will be multiplied by
     * the given overheadConstant. This breaker starts with 0 bytes used.
     * @param settings settings to configure this breaker
     * @param parent parent circuit breaker service to delegate tripped breakers to
     * @param name the name of the breaker
     */
    public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger,
                                     HierarchyCircuitBreakerService parent, String name) {
        this(settings, null, logger, parent, name);
    }

    /**
     * Create a circuit breaker that will break if the number of estimated
     * bytes grows above the limit. All estimations will be multiplied by
     * the given overheadConstant. Uses the given oldBreaker to initialize
     * the starting offset.
     * @param settings settings to configure this breaker
     * @param parent parent circuit breaker service to delegate tripped breakers to
     * @param name the name of the breaker
     * @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset)
     */
    public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker,
                                     Logger logger, HierarchyCircuitBreakerService parent, String name) {
        this.name = name;
        this.memoryBytesLimit = settings.getLimit();
        this.overheadConstant = settings.getOverhead();
        this.durability = settings.getDurability();
        if (oldBreaker == null) {
            this.used = new AtomicLong(0);
            this.trippedCount = new AtomicLong(0);
        } else {
            this.used = oldBreaker.used;
            this.trippedCount = oldBreaker.trippedCount;
        }
        this.logger = logger;
        if (logger.isTraceEnabled()) {
            logger.trace("creating ChildCircuitBreaker with settings {}", settings);
        }
        this.parent = parent;
    }

    /**
     * Method used to trip the breaker, delegates to the parent to determine
     * whether to trip the breaker or not
     */
    @Override
    public void circuitBreak(String fieldName, long bytesNeeded) {
        this.trippedCount.incrementAndGet();
        final String message = "[" + this.name + "] Data too large, data for [" + fieldName + "]" +
                " would be [" + bytesNeeded + "/" + new ByteSizeValue(bytesNeeded) + "]" +
                ", which is larger than the limit of [" +
                memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]";
        logger.debug("{}", message);
        throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, durability);
    }

    /**
     * Add a number of bytes, tripping the circuit breaker if the aggregated
     * estimates are above the limit. Automatically trips the breaker if the
     * memory limit is set to 0. Will never trip the breaker if the limit is
     * set < 0, but can still be used to aggregate estimations.
     * @param bytes number of bytes to add to the breaker
     * @return number of "used" bytes so far
     */
    @Override
    public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
        // short-circuit on no data allowed, immediately throwing an exception
        if (memoryBytesLimit == 0) {
            circuitBreak(label, bytes);
        }

        long newUsed;
        // If there is no limit (-1), we can optimize a bit by using
        // .addAndGet() instead of looping (because we don't have to check a
        // limit), which makes the RamAccountingTermsEnum case faster.
        if (this.memoryBytesLimit == -1) {
            newUsed = noLimit(bytes, label);
        } else {
            newUsed = limit(bytes, label);
        }

        // Additionally, we need to check that we haven't exceeded the parent's limit
        try {
            parent.checkParentLimit((long) (bytes * overheadConstant), label);
        } catch (CircuitBreakingException e) {
            // If the parent breaker is tripped, this breaker has to be
            // adjusted back down because the allocation is "blocked" but the
            // breaker has already been incremented
            this.addWithoutBreaking(-bytes);
            throw e;
        }
        return newUsed;
    }

    private long noLimit(long bytes, String label) {
        long newUsed;
        newUsed = this.used.addAndGet(bytes);
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
                this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));
        }
        return newUsed;
    }

    private long limit(long bytes, String label) {
        long newUsed;// Otherwise, check the addition and commit the addition, looping if
        // there are conflicts. May result in additional logging, but it's
        // trace logging and shouldn't be counted on for additions.
        long currentUsed;
        do {
            currentUsed = this.used.get();
            newUsed = currentUsed + bytes;
            long newUsedWithOverhead = (long) (newUsed * overheadConstant);
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
                        this.name,
                        new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed),
                        memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),
                        newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
            }
            if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
                logger.warn("[{}] New used memory {} [{}] for data of [{}] would be larger than configured breaker: {} [{}], breaking",
                        this.name,
                        newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label,
                        memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));
                circuitBreak(label, newUsedWithOverhead);
            }
            // Attempt to set the new used value, but make sure it hasn't changed
            // underneath us, if it has, keep trying until we are able to set it
        } while (!this.used.compareAndSet(currentUsed, newUsed));
        return newUsed;
    }

    /**
     * Add an <b>exact</b> number of bytes, not checking for tripping the
     * circuit breaker. This bypasses the overheadConstant multiplication.
     *
     * Also does not check with the parent breaker to see if the parent limit
     * has been exceeded.
     *
     * @param bytes number of bytes to add to the breaker
     * @return number of "used" bytes so far
     */
    @Override
    public long addWithoutBreaking(long bytes) {
        long u = used.addAndGet(bytes);
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u);
        }
        assert u >= 0 : "Used bytes: [" + u + "] must be >= 0";
        return u;
    }

    /**
     * @return the number of aggregated "used" bytes so far
     */
    @Override
    public long getUsed() {
        return this.used.get();
    }

    /**
     * @return the number of bytes that can be added before the breaker trips
     */
    @Override
    public long getLimit() {
        return this.memoryBytesLimit;
    }

    /**
     * @return the constant multiplier the breaker uses for aggregations
     */
    @Override
    public double getOverhead() {
        return this.overheadConstant;
    }

    /**
     * @return the number of times the breaker has been tripped
     */
    @Override
    public long getTrippedCount() {
        return this.trippedCount.get();
    }

    /**
     * @return the name of the breaker
     */
    @Override
    public String getName() {
        return this.name;
    }

    /**
     * @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
     */
    @Override
    public Durability getDurability


    
() {
        return this.durability;
    }
}
复制代码
  • ChildMemoryCircuitBreaker实现了CircuitBreaker接口;其circuitBreak方法会抛出CircuitBreakingException
  • addEstimateBytesAndMaybeBreak方法首先判断memoryBytesLimit,如果为0,则执行circuitBreak方法;如果为-1则调用noLimit,否则调用limit计算newUsed,没有抛出异常的话,则最后执行 parent.checkParentLimit方法
  • noLimit方法直接执行this.used.addAndGet(bytes);limit方法首先计算newUsed,然后根据overheadConstant得出newUsedWithOverhead,如果newUsedWithOverhead大于memoryBytesLimit则执行circuitBreak方法,否则将newUsed更新到this.used中

小结

  • CircuitBreaker定义了Type、Durability枚举;它还定义了circuitBreak、addEstimateBytesAndMaybeBreak、addWithoutBreaking、getUsed、getLimit、getOverhead、getTrippedCount等方法;它有两个实现类分别是NoopCircuitBreaker、ChildMemoryCircuitBreaker
  • NoopCircuitBreaker实现了CircuitBreaker接口,它不做任何操作
  • ChildMemoryCircuitBreaker实现了CircuitBreaker接口;其circuitBreak方法会抛出CircuitBreakingException;addEstimateBytesAndMaybeBreak方法则先判断newUsed是否超出memoryBytesLimit,超出则执行circuitBreak方法,最后执行parent.checkParentLimit方法

doc

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/33349
 
291 次点击