社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  DATABASE

聊聊maxwell的MysqlPositionStore

go4it • 5 年前 • 366 次点击  
阅读 5

聊聊maxwell的MysqlPositionStore

本文主要研究一下maxwell的MysqlPositionStore

MysqlPositionStore

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/schema/MysqlPositionStore.java

public class MysqlPositionStore {
	static final Logger LOGGER = LoggerFactory.getLogger(MysqlPositionStore.class);
	private static final Long DEFAULT_GTID_SERVER_ID = new Long(0);
	private final Long serverID;
	private String clientID;
	private final boolean gtidMode;
	private final ConnectionPool connectionPool;

	public MysqlPositionStore(ConnectionPool pool, Long serverID, String clientID, boolean gtidMode) {
		this.connectionPool = pool;
		this.clientID = clientID;
		this.gtidMode = gtidMode;
		if (gtidMode) {
			// we don't use server id for position store in gtid mode
			this.serverID = DEFAULT_GTID_SERVER_ID;
		} else {
			this.serverID = serverID;
		}
	}

	public void set(Position newPosition) throws SQLException, DuplicateProcessException {
		if ( newPosition == null )
			return;

		Long heartbeat = newPosition.getLastHeartbeatRead();

		String sql = "INSERT INTO `positions` set "
				+ "server_id = ?, "
				+ "gtid_set = ?, "
				+ "binlog_file = ?, "
				+ "binlog_position = ?, "
				+ "last_heartbeat_read = ?, "
				+ "client_id = ? "
				+ "ON DUPLICATE KEY UPDATE "
				+ "last_heartbeat_read = ?, "
				+ "gtid_set = ?, binlog_file = ?, binlog_position=?";

		BinlogPosition binlogPosition = newPosition.getBinlogPosition();
		connectionPool.withSQLRetry(1, (c) -> {
			PreparedStatement s = c.prepareStatement(sql);

			LOGGER.debug("Writing binlog position to " + c.getCatalog() + ".positions: " + newPosition + ", last heartbeat read: " + heartbeat);
			s.setLong(1, serverID);
			s.setString(2, binlogPosition.getGtidSetStr());
			s.setString(3, binlogPosition.getFile());
			s.setLong(4, binlogPosition.getOffset());
			s.setLong(5, heartbeat);
			s.setString(6, clientID);
			s.setLong(7, heartbeat);
			s.setString(8, binlogPosition.getGtidSetStr());
			s.setString(9, binlogPosition.getFile());
			s.setLong(10, binlogPosition.getOffset());

			s.execute();
		});
	}

	public Position get() throws SQLException {
		try ( Connection c = connectionPool.getConnection() ) {
			PreparedStatement s = c.prepareStatement("SELECT * from `positions` where server_id = ? and client_id = ?");
			s.setLong(1, serverID);
			s.setString(2, clientID);

			return positionFromResultSet(s.executeQuery());
		}
	}

	//......

}	
复制代码
  • MysqlPositionStore提供了set、get方法,其中set方法会insert一条记录到positions表中,get方法则从positions表中取出指定server_id和client_id的position记录;其中set方法使用了connectionPool.withSQLRetry来执行sql

ConnectionPool

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/util/ConnectionPool.java

public interface ConnectionPool {
	@FunctionalInterface
	public interface RetryableSQLFunction<T> {
		void apply(T t) throws SQLException, NoSuchElementException, DuplicateProcessException;
	}

	Connection getConnection() throws SQLException;
	void release();

	void withSQLRetry(int nTries, RetryableSQLFunction<Connection> inner)
		throws SQLException, NoSuchElementException, DuplicateProcessException;
}
复制代码
  • ConnectionPool定义了RetryableSQLFunction、getConnection、release、withSQLRetry

C3P0ConnectionPool

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/util/C3P0ConnectionPool.java

public class C3P0ConnectionPool implements ConnectionPool {
	private final ComboPooledDataSource cpds;
	static final Logger LOGGER = LoggerFactory.getLogger(C3P0ConnectionPool.class);

	@Override
	public Connection getConnection() throws SQLException {
		return cpds.getConnection();
	}

	@Override
	public void release() {
		cpds.close();
	}

	public C3P0ConnectionPool(String url, String user, String password) {
		cpds = new ComboPooledDataSource();
		cpds.setJdbcUrl(url);
		cpds.setUser(user);
		cpds.setPassword(password);


		// the settings below are optional -- c3p0 can work with defaults
		cpds.setMinPoolSize(1);
		cpds.setMaxPoolSize(5);
	}

	@Override
	public void withSQLRetry(int nTries, RetryableSQLFunction<Connection> inner)
		throws SQLException, DuplicateProcessException, NoSuchElementException {
		try ( final Connection c = getConnection() ){
			inner.apply(c);
			return;
		} catch (SQLException e) {
			if ( nTries > 0 ) {
				LOGGER.error("got SQL Exception: {}, {}, retrying...",
					e.getLocalizedMessage(),
					e.getCause().getLocalizedMessage()
				);
				withSQLRetry(nTries - 1, inner);
			} else {
				throw(e);
			}
		}
	}
}
复制代码
  • C3P0ConnectionPool实现了ConnectionPool接口,其withSQLRetry接口通过RetryableSQLFunction来执行逻辑,然后捕获SQLException,在nTries大于0时递归执行withSQLRetry(nTries - 1, inner)

小结

MysqlPositionStore提供了set、get方法,其中set方法会insert一条记录到positions表中,get方法则从positions表中取出指定server_id和client_id的position记录;其中set方法使用了connectionPool.withSQLRetry来执行sql

doc

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/62506
 
366 次点击