博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Spark读写外部存储介质(Mysql、Hbase、Redis)
阅读量:2160 次
发布时间:2019-05-01

本文共 16177 字,大约阅读时间需要 53 分钟。

使用Spark读写外部存储介质(Mysql、Hbase、Redis)

1、使用Spark与Mysql交互

1.1 需要加载的依赖

druid提供数据库连接池创建、维护和监控的功能。

mysql
mysql-connector-java
5.1.38
commons-dbcp
commons-dbcp
1.4
com.alibaba
druid
1.0.9

1.2 需要设置的配置项

需要指定url,url由连接串、端口号、SSL认证组成。

driverClassName=org.gjt.mm.mysql.Driverurl=连接串:端口号/库名?useSSL=trueusername=账号password=密码initialSize=2maxActive=10maxWait=10000validationQuery=SELECT 1testWhileIdle=truetestOnBorrow=falsetestOnReturn=false

1.3 代码

连接池ds在一个实例中只需要初始化一次,便可多次复用从中getConnection。

import java.sql.{
Connection, ResultSet, Statement}import java.text.SimpleDateFormatimport java.util.{
Date, Properties}import com.alibaba.druid.pool.{
DruidDataSource, DruidDataSourceFactory}object demoMysql {
def main(args: Array[String]): Unit = {
/** 读取mysql连接配置 **/ val properties = new Properties() val propertiesPath = getClass.getClassLoader.getResourceAsStream("druid_config_test.properties") properties.load(propertiesPath) /** 创建连接 **/ val ds = DruidDataSourceFactory.createDataSource(properties) //基于配置创建连接池 val conn: Connection = ds.getConnection() //获取连接对象 val now = new Date().getTime //时间戳 val dateFormatDay: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd") val dayBefore = dateFormatDay.format(now) val incDay = "20200418" val sql = s"select * from dceport.ll_khsc_dept_dist_city_map partition (p$incDay) where dept_code = 'TC771Y' order by inc_day desc limit 1" println(sql) val stmt: Statement = conn.createStatement val rs: ResultSet = stmt.executeQuery(sql) while (rs.next()) {
val res = rs.getString("dept_code") println(res) } }}

2、使用Spark与Hbase交互

2.1 需要加载的依赖

org.apache.hbase
hbase-client
1.3.1

2.2 代码

quorum一般由运维提供,测试环境zk域名需要在本地hosts文件中添加。

import org.apache.hadoop.hbase.{
HBaseConfiguration, TableName}import org.apache.hadoop.hbase.client.{
ConnectionFactory, Delete, Get, HTable, Put}import org.apache.hadoop.hbase.util.Bytesobject demoHbase {
def main(args: Array[String]): Unit = {
val quorum = "CNSZ22PL0138,CNSZ22PL0139,CNSZ22PL0140,CNSZ22PL0141,CNSZ22PL0142" val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", quorum) hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") hbaseConf.set("zookeeper.znode.parent", "/hbase") val connection = ConnectionFactory.createConnection(hbaseConf) val table = connection.getTable(TableName.valueOf("upai_tempData")) println(1) val rowKey = "4S" val rkGet = new Get(Bytes.toBytes(rowKey)) val res = table.get(rkGet) println(table.exists(rkGet)) table.close() connection.close() }}
package spark.util;import redis.clients.jedis.HostAndPort;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPoolConfig;import redis.clients.jedis.JedisSentinelPool;import redis.clients.util.Hashing;import java.util.*;/** * 用一致性哈希算法 */public class ShardedJedisSentinelPool {
private static Map
poolMap = new HashMap<>(); private static Hashing algo = Hashing.MURMUR_HASH; private static TreeMap
nodes = new TreeMap<>(); static {
initPool(); } private ShardedJedisSentinelPool(){
} private static class SingletonInstance {
private static final ShardedJedisSentinelPool INSTANCE = new ShardedJedisSentinelPool(); } public static ShardedJedisSentinelPool getInstance() {
return SingletonInstance.INSTANCE; } public static void initPool() {
String[] masterNames = PropertiesLoader.getInstance().getProperty("redis.master.names").split(","); initialize(masterNames); JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(Integer.parseInt(PropertiesLoader.getInstance().getProperty("redis.maxTotal")));// 最大连接数 poolConfig.setMaxIdle(Integer.parseInt(PropertiesLoader.getInstance().getProperty("redis.maxIdle")));// 最大空闲数 poolConfig.setMaxWaitMillis(Integer.parseInt(PropertiesLoader.getInstance().getProperty("redis.maxWait")));// 最大允许等待时间,如果超过这个时间还未获取到连接,则会报JedisException异常:Could not get a resource from the pool poolConfig.setTestOnBorrow(false); poolConfig.setTestOnReturn(false); String[] hostAndPortArray = PropertiesLoader.getInstance().getProperty("redis.cluster").split(","); Set
sentinels = new HashSet<>(); for (int i = 0; i < hostAndPortArray.length; i++) {
String host = hostAndPortArray[i].split(":")[0]; int port = Integer.parseInt(hostAndPortArray[i].split(":")[1]); sentinels.add(new HostAndPort(host, port).toString()); } String password = PropertiesLoader.getInstance().getProperty("redis.auth"); for (String masterName : masterNames) {
poolMap.put(masterName, new JedisSentinelPool(masterName, sentinels, poolConfig, password)); } System.out.println("the redis shard cluster pool init success"); } public static JedisSentinelPool getShardPool(String key) {
String masterName = getShardInfo(key.getBytes()); if (System.currentTimeMillis() % 500 == 1) {
// System.out.println(key + "=" + masterName); } return poolMap.get(masterName); } public static Jedis getResource(String key) {
return getShardPool(key).getResource(); } private static void initialize(String[] masterNames) {
for (int i = 0; i != masterNames.length; ++i) {
for (int n = 0; n < 160; n++) {
nodes.put(algo.hash("SHARD-" + i + "-NODE-" + n), masterNames[i]); } } } public static String getShardInfo(byte[] key) {
SortedMap
tail = nodes.tailMap(algo.hash(key)); if (tail.isEmpty()) {
return nodes.get(nodes.firstKey()); } return tail.get(tail.firstKey()); } public static void close() {
for (Map.Entry
iter : poolMap.entrySet()) {
iter.getValue().destroy(); } }}
package spark.util;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import redis.clients.jedis.*;import java.util.Map;import java.util.Set;/** * @Description:redis分片连接池自动关闭连接工具类 * @Author x * @CreateDate: 2019/3/18 10:31 */public class ShardRedisProxyUtils {
private static final Logger logger = LoggerFactory.getLogger(ShardRedisProxyUtils.class); private static ShardedJedisSentinelPool shardPool = ShardedJedisSentinelPool.getInstance(); private ShardRedisProxyUtils() {
} public static Boolean exists(String key) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.exists(key); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static Long del(String key) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.del(key); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static void set(String key, String value, int seconds) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); Pipeline pipeline = jedis.pipelined(); pipeline.set(key, value); pipeline.expire(key, seconds); pipeline.sync(); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static Long setnx(String key, String value) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.setnx(key, value); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static Long expire(String key, int seconds) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.expire(key, seconds); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static String get(String key) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.get(key); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static String hget(String key, String field) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.hget(key, field); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static Long hset(String key, String field, String value) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.hset(key, field, value); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static Map
hgetAll(String key) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.hgetAll(key); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static Long zadd(String key, double score, String menber) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.zadd(key, score, menber); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static Set
zrevrangeWithScores(String key, double min, double max) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.zrangeByScoreWithScores(key, min, max); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static Long hdel(String key, String field) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.hdel(key, field); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } public static ScanResult
> hscan(String key, String cursor, ScanParams params) {
Jedis jedis = null; try {
jedis = shardPool.getResource(key); return jedis.hscan(key, cursor, params); } catch (Exception e) {
throw e; } finally {
if (jedis != null) {
jedis.close(); } } } /** * 关闭redis连接池 */ public static void close() {
try {
if (shardPool != null) {
shardPool.close(); System.out.println("close the redis shard pool success!"); } } catch (Exception e) {
logger.error("close the redis shard pool error:", e); } }}
package spark.util;import java.util.Map;import com.sf.framework.cacheproxy.redis.RedisCache;import com.sf.framework.cacheproxy.redis.RedisConfig;import com.sf.framework.cacheproxy.redis.RedisType;/** * @Description:redis分片连接池自动关闭连接工具类 * @Author 01381119 * @CreateDate: 2019/3/18 10:31 */public class RedisUtils {
private RedisUtils() {
} private static RedisCache redisCache = null; public static RedisCache getRedisCache() {
if (redisCache != null) {
return redisCache; } synchronized (RedisUtils.class) {
if (redisCache != null) {
return redisCache; } String nodesStr = PropertiesLoader.getInstance().getProperty("redis.cluster"); String password = PropertiesLoader.getInstance().getProperty("redis.auth");// String nodesStr = "Up89A52y-1.cachesit.sfcloud.local:8080,Up89A52y-2.cachesit.sfcloud.local:8080,Up89A52y-3.cachesit.sfcloud.local:8080,Up89A52y-4.cachesit.sfcloud.local:8080,Up89A52y-5.cachesit.sfcloud.local:8080,Up89A52y-6.cachesit.sfcloud.local:8080";// String password = "RQtri9RwVb1oTc7km2QXjUgv"; RedisConfig config = new RedisConfig(); config.setServers(nodesStr); config.setPassword(password); config.setType(RedisType.CLUSTER); redisCache = new RedisCache(config); } return redisCache; } public static Boolean exists(String key) {
RedisCache cache = getRedisCache(); return cache.exists(key); } public static void set(String key, String value, int seconds) {
RedisCache cache = getRedisCache(); cache.set(key, value, seconds); } public static boolean setnx(String key, String value, int seconds) {
RedisCache cache = getRedisCache(); return cache.setnx(key, value, seconds); } public static void expire(String key, int seconds) {
RedisCache cache = getRedisCache(); cache.setTtl(key, seconds); } public static String get(String key) {
RedisCache cache = getRedisCache(); return cache.get(key); } public static String hget(String key, String field) {
RedisCache cache = getRedisCache(); return cache.hget(key, field); } public static void hset(String key, String field, String value, int seconds) {
RedisCache cache = getRedisCache(); cache.hset(key, field, value, seconds); } public static Map
hgetAll(String key) {
RedisCache cache = getRedisCache(); return cache.hgetAll(key); } public static void del(String key) {
RedisCache cache = getRedisCache(); cache.remove(key); } public static void shutdown() {
RedisCache cache = getRedisCache(); cache.shutdown(); } public static void main(String[] args) {
int threadCount = 1; for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
@Override public void run() {
testWrite(); } }).start(); } sleep(10000000); } private static void testWrite() {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
String uuid = "";//UuidUtils.get(); String key = "testkey" + uuid; String value = "testValue" + uuid; set(key, value, 6); sleep(5000); System.out.println(value + "\t" + get(key)); sleep(1000); System.out.println(value + "\t" + get(key)); if (i % 1000 == 0) {
System.out.println(Thread.currentThread().getName() + "\t" + i); } } } private static void sleep(long t) {
try {
Thread.sleep(t); } catch (InterruptedException e) {
e.printStackTrace(); } }}

转载地址:http://xwwzb.baihongyu.com/

你可能感兴趣的文章
Go语言学习Part2:流程控制语句:for、if、else、switch 和 defer
查看>>
Go语言学习Part3:struct、slice和映射
查看>>
Go语言学习Part4-1:方法和接口
查看>>
Leetcode Go 《精选TOP面试题》20200628 69.x的平方根
查看>>
leetcode 130. Surrounded Regions
查看>>
【托业】【全真题库】TEST2-语法题
查看>>
博客文格式优化
查看>>
【托业】【新托业全真模拟】疑难语法题知识点总结(01~05)
查看>>
【SQL】group by 和order by 的区别。
查看>>
【Python】详解Python多线程Selenium跨浏览器测试
查看>>
Jmeter之参数化
查看>>
Shell 和Python的区别。
查看>>
Python 列表(list)、字典(dict)、字符串(string)常用基本操作小结
查看>>
Loadrunner之https协议录制回放报错如何解决?(九)
查看>>
python中xrange和range的异同
查看>>
列表、元组、集合、字典
查看>>
【Python】easygui小甲鱼
查看>>
【Python】关于Python多线程的一篇文章转载
查看>>
【Pyton】【小甲鱼】文件
查看>>
【Pyton】【小甲鱼】永久存储:腌制一缸美味的泡菜
查看>>