社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

聊聊Elasticsearch的ReleasableLock

go4it • 6 年前 • 378 次点击  
阅读 5

聊聊Elasticsearch的ReleasableLock

本文主要研究一下Elasticsearch的ReleasableLock

ReleasableLock

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java

public class ReleasableLock implements Releasable {
    private final Lock lock;


    // a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled
    private final ThreadLocal<Integer> holdingThreads;

    public ReleasableLock(Lock lock) {
        this.lock = lock;
        if (Assertions.ENABLED) {
            holdingThreads = new ThreadLocal<>();
        } else {
            holdingThreads = null;
        }
    }

    @Override
    public void close() {
        lock.unlock();
        assert removeCurrentThread();
    }


    public ReleasableLock acquire() throws EngineException {
        lock.lock();
        assert addCurrentThread();
        return this;
    }

    private boolean addCurrentThread() {
        final Integer current = holdingThreads.get();
        holdingThreads.set(current == null ? 1 : current + 1);
        return true;
    }

    private boolean removeCurrentThread() {
        final Integer count = holdingThreads.get();
        assert count != null && count > 0;
        if (count == 1) {
            holdingThreads.remove();
        } else {
            holdingThreads.set(count - 1);
        }
        return true;
    }

    public boolean isHeldByCurrentThread() {
        if (holdingThreads == null) {
            throw new UnsupportedOperationException("asserts must be enabled");
        }
        final Integer count = holdingThreads.get();
        return count != null && count > 0;
    }
}
复制代码
  • ReleasableLock实现了Releasable接口(close方法);它的构造器要求输入Lock参数,只有在开启了assertions的条件下才会初始化holdingThreads;isHeldByCurrentThread方法判断调用线程是否正在使用lock
  • acquire方法首先调用lock的lock方法,然后利用assert来断言addCurrentThread方法,该方法会增加调用线程正在使用lock的次数
  • close方法首先调用lock的unlock方法,然后利用assert来断言removeCurrentThread方法,该方法会减少调用线程正在使用lock的次数

ReleasableLockTests

elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java

public class ReleasableLockTests extends ESTestCase {

    /**
     * Test that accounting on whether or not a thread holds a releasable lock is correct. Previously we had a bug where on a re-entrant
     * lock that if a thread entered the lock twice we would declare that it does not hold the lock after it exits its first entrance but
     * not its second entrance.
     *
     * @throws BrokenBarrierException if awaiting on the synchronization barrier breaks
     * @throws InterruptedException   if awaiting on the synchronization barrier is interrupted
     */
    public void testIsHeldByCurrentThread() throws BrokenBarrierException, InterruptedException {
        final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        final ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
        final ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());

        final int numberOfThreads = scaledRandomIntBetween(1, 32);
        final int iterations = scaledRandomIntBetween(1, 32);
        final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
        final List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < numberOfThreads; i++) {
            final Thread thread = new Thread(() -> {
                try {
                    barrier.await();
                } catch (final BrokenBarrierException | InterruptedException e) {
                    throw new RuntimeException(e);
                }
                for (int j = 0; j < iterations; j++) {
                    if (randomBoolean()) {
                        acquire(readLock, writeLock);
                    } else {
                        acquire(writeLock, readLock);
                    }
                }
                try {
                    barrier.await();
                } catch (final BrokenBarrierException | InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
            threads.add(thread);
            thread.start();
        }

        barrier.await();
        barrier.await();
        for (final Thread thread : threads) {
            thread.join();
        }
    }

    private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) {
        try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) {
            assertTrue(lockToAcquire.isHeldByCurrentThread());
            assertFalse(otherLock.isHeldByCurrentThread());
            try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) {
                assertTrue(lockToAcquire.isHeldByCurrentThread());
                assertFalse(otherLock.isHeldByCurrentThread());
            }
            // previously there was a bug here and this would return false
            assertTrue(lockToAcquire.isHeldByCurrentThread());
            assertFalse(otherLock.isHeldByCurrentThread());
        }
        assertFalse(lockToAcquire.isHeldByCurrentThread());
        assertFalse(otherLock.isHeldByCurrentThread());
    }

}
复制代码
  • ReleasableLockTests使用多线程随机执行acquire,该方法断言lockToAcquire被当前线程持有,而otherLock不被当前线程持有

Cache.CacheSegment

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/cache/Cache.java

    private static class CacheSegment<K, V> {
        // read/write lock protecting mutations to the segment
        ReadWriteLock segmentLock = new ReentrantReadWriteLock();

        ReleasableLock read


    
Lock = new ReleasableLock(segmentLock.readLock());
        ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock());

        Map<K, CompletableFuture<Entry<K, V>>> map = new HashMap<>();

        SegmentStats segmentStats = new SegmentStats();

        /**
         * get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is
         * pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback
         *
         * @param key       the key of the entry to get from the cache
         * @param now       the access time of this entry
         * @param isExpired test if the entry is expired
         * @param onExpiration a callback if the entry associated to the key is expired
         * @return the entry if there was one, otherwise null
         */
        Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired, Consumer<Entry<K, V>> onExpiration) {
            CompletableFuture<Entry<K, V>> future;
            try (ReleasableLock ignored = readLock.acquire()) {
                future = map.get(key);
            }
            if (future != null) {
                Entry<K, V> entry;
                try {
                    entry = future.get();
                } catch (ExecutionException e) {
                    assert future.isCompletedExceptionally();
                    segmentStats.miss();
                    return null;
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
                if (isExpired.test(entry)) {
                    segmentStats.miss();
                    onExpiration.accept(entry);
                    return null;
                } else {
                    segmentStats.hit();
                    entry.accessTime = now;
                    return entry;
                }
            } else {
                segmentStats.miss();
                return null;
            }
        }

        /**
         * put an entry into the segment
         *
         * @param key   the key of the entry to add to the cache
         * @param value the value of the entry to add to the cache
         * @param now   the access time of this entry
         * @return a tuple of the new entry and the existing entry, if there was one otherwise null
         */
        Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
            Entry<K, V> entry = new Entry<>(key, value, now);
            Entry<K, V> existing = null;
            try (ReleasableLock ignored = writeLock.acquire()) {
                try {
                    CompletableFuture<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry));
                    if (future != null) {
                        existing = future.handle((ok, ex) -> {
                            if (ok != null) {
                                return ok;
                            } else {
                                return null;
                            }
                        }).get();
                    }
                } catch (ExecutionException | InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            }
            return Tuple.tuple(entry, existing);
        }

        /**
         * remove an entry from the segment
         *
         * @param key       the key of the entry to remove from the cache
         * @param onRemoval a callback for the removed entry
         */
        void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
            CompletableFuture<Entry<K, V>> future;
            try (ReleasableLock ignored = writeLock.acquire()) {
                future = map.remove(key);
            }
            if (future != null) {
                segmentStats.eviction();
                onRemoval.accept(future);
            }
        }

        /**
         * remove an entry from the segment iff the future is done and the value is equal to the
         * expected value
         *
         * @param key the key of the entry to remove from the cache
         * @param value the value expected to be associated with the key
         * @param onRemoval a callback for the removed entry
         */
        void remove(K key, V value, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
            CompletableFuture<Entry<K, V>> future;
            boolean removed = false;
            try (ReleasableLock ignored = writeLock.acquire()) {
                future = map.get(key);
                try {
                    if (future != null) {
                        if (future.isDone()) {
                            Entry<K, V> entry = future.get();
                            if (Objects.equals(value, entry.value)) {
                                removed = map.remove(key, future);
                            }
                        }
                    }
                } catch (ExecutionException | InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            }

            if (future != null && removed) {
                segmentStats.eviction();
                onRemoval.accept(future);
            }
        }

        private static class SegmentStats {
            private final LongAdder hits = new LongAdder();
            private final LongAdder misses = new LongAdder();
            private final LongAdder evictions = new LongAdder();

            void hit() {
                hits.increment();
            }

            void miss() {
                misses.increment();
            }

            void eviction() {
                evictions.increment();
            }
        }
    }
复制代码
  • CacheSegment使用ReentrantReadWriteLock的readLock及writeLock创建了两个ReleasableLock,一个为readLock,一个为writeLock;由于ReleasableLock实现了Releasable接口(close方法),而该接口继承了java.lang.AutoCloseable接口,因而可以直接利用try with resources语法来自动close,从而释放锁

小结

  • ReleasableLock实现了Releasable接口(close方法);它的构造器要求输入Lock参数,只有在开启了assertions的条件下才会初始化holdingThreads;isHeldByCurrentThread方法判断调用线程是否正在使用lock
  • acquire方法首先调用lock的lock方法,然后利用assert来断言addCurrentThread方法,该方法会增加调用线程正在使用lock的次数
  • close方法首先调用lock的unlock方法,然后利用assert来断言removeCurrentThread方法,该方法会减少调用线程正在使用lock的次数

ReleasableLock实现了Releasable接口(close方法),而该接口继承了java.lang.AutoCloseable接口,因而可以直接利用try with resources语法来自动close,从而释放锁

doc

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/34613
 
378 次点击