社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Redis

redis使用事务获取和设置密钥到期

Abhay • 5 年前 • 858 次点击  

我正在使用etaty rediscala( https://github.com/etaty/rediscala )客户。这是我的功能

private def getVersionTime(db: RedisClient, interval: Long)(implicit ec: ExecutionContext): Future[Long] = {

import akka.util.ByteString
import redis.ByteStringFormatter

implicit val byteStringLongFormatter = new ByteStringFormatter[Long] {
  def serialize(data: Long): ByteString = ByteString(data.toString.getBytes)
  def deserialize(bs: ByteString): Long = bs.utf8String.toLong
}

db.get[Long]("versionTime").map {
  case Some(v) => loggerF.info(s"Retrieved version time ${v}")
    v
  case None => val current = System.currentTimeMillis()
    db.setex[Long]("versionTime", (current / 1000) + interval, current)
    loggerF.info(s"set version time ${current}")
    current
}

}

这是我的测验。此测试调用上述方法

it("check with multiple tasks"){
  val target = 10
  val latch = new java.util.concurrent.CountDownLatch(target)
  (1 to target).map{t =>
    getVersionTime(prodDb, 10).map{r => print("\n" + r); latch.countDown()}
  }
  assert(latch.await(10, TimeUnit.SECONDS))
}

试验输出

14:52:46.692[池-1-线程-12]信息结束指示-设置版本时间1548062566687 14:52:46.693[池-1-线程-6]信息结束指示-设置版本时间1548062566687 14:52:46.693[池-1-线程-20]信息结束指示-设置版本时间1548062566687 14:52:46.692[池-1-线程-2]信息结束指示-设置版本时间1548062566668 14:52:46.692[池-1-线程-10]信息结束指示-设置版本时间1548062566687 14:52:46.693[池-1-线程-8]信息结束指示-设置版本时间1548062566687 14:52:46.692[池-1-线程-4]信息结束指示-设置版本时间1548062566668 14:52:46.692[池-1-线程-11]信息结束指示-设置版本时间1548062566687 14:52:46.692[池-1-线程-9]信息结束指示-设置版本时间1548062566687 14:52:46.692[池-1-线程-7]信息结束指示-设置版本时间1548062566687

预期的行为是-设置版本时间应该出现一次,对于其余线程,应该打印检索到的版本时间。我想我需要在这里使用事务,以便get和setex包装在watch和exec中

  private def getVersionTimeTrans(db: RedisClient, interval: Long): Long = {
    import akka.util.ByteString
    import redis.ByteStringFormatter

    implicit val byteStringLongFormatter = new ByteStringFormatter[Long] {
      def serialize(data: Long): ByteString = ByteString(data.toString.getBytes)
      def deserialize(bs: ByteString): Long = bs.utf8String.toLong
    }

    val redisTransaction = db.transaction()
    redisTransaction.watch("versionTime")
    val result: Future[Long] = redisTransaction.get[Long]("versionTime").map {
      case Some(v) => loggerF.info(s"Retrieved version time ${v}")
        v
      case None => val current = System.currentTimeMillis()
        redisTransaction.setex[Long]("versionTime", (current / 1000) + interval, current)
        loggerF.info(s"set version time ${current}")
        current
    }
    redisTransaction.exec()
    val r = for {
      i <- result
    } yield {
      i
    }
    Await.result(r, 10 seconds)
  }

测试

it("check with multiple threads "){
  val target = 10
  val latch = new java.util.concurrent.CountDownLatch(target)
  (1 to target).map{t =>
    Future(getVersionTimeTrans(prodDb, 10)).map{r => latch.countDown()}
  }
  assert(latch.await(10, TimeUnit.SECONDS))
}

对于这个测试,输出也是一样的。我想不出如何在事务中正确地包装它。请帮忙。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/47881
 
858 次点击  
文章 [ 2 ]  |  最新文章 5 年前