「java升级jdbc版本」jdbc版本适配
今天给各位分享java升级jdbc版本的知识,其中也会对jdbc版本适配进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
本文目录一览:
- 1、如何将jdbc driver升级到11.2.0.2
- 2、spark - jdbc升级版数据源
- 3、严重: 此驱动程序不支持 Java Runtime Environment (JRE)1.8版。请使用支持JDBC4.0的 sqljdbc4.jar 类库
如何将jdbc driver升级到11.2.0.2
OC4J升级Oracle JDBC driver版本
DB升级到11g,application server是10133,必须升级OJDBC driver版本
备份$OC4J_HOME/j2ee/home/config下的system-application.xml和server.xml
在$OC4J_HOME/j2ee/home/shared-lib下新建目录oracle.jdbc/11.1.0.7.0,如果是application server版本还可以在instance的根目录创建shared-lib目录,这个shared-lib就只会对该Instance起作用。
接着将ojdbc5.jar, ojdbc5dms.jar 和 orai18n.jar复制到2中创建的目录下
修改server.xml,添加:
shared-library name="oracle.jdbc" version="11.1.0.7.0"
code-source path="ojdbc5.jar"/
code-source path="ojdbc5dms.jar"/
code-source path="orai18n.jar"/
/shared-library
5修改system-application.xml将
import-shared-library name="oracle.jdbc" /
改成
import-shared-library name="oracle.jdbc" min-version="11.1.0.7.0"/
6确认修改
spark - jdbc升级版数据源
在spark的数据源中,只支持Append, Overwrite, ErrorIfExists, Ignore,这几种模式,但是我们在线上的业务几乎全是需要upsert功能的,就是已存在的数据肯定不能覆盖,在mysql中实现就是采用:`ON DUPLICATE KEY UPDATE`,有没有这样一种实现?官方:不好意思,不提供,dounine:我这有呀,你来用吧。哈哈,为了方便大家的使用我已经把项目打包到maven中央仓库了,为的就是使用快,容易使用。
## 吃土的方案
MysqlClient.scala
```
import java.sql._
import java.time.{LocalDate, LocalDateTime}
import scala.collection.mutable.ListBuffer
class MysqlClient(jdbcUrl: String) {
private var connection: Connection = null
val driver = "com.mysql.jdbc.Driver"
init()
def init(): Unit = {
if (connection == null || connection.isClosed) {
val split = jdbcUrl.split("\\|")
Class.forName(driver)
connection = DriverManager.getConnection(split(0), split(1), split(2))
}
}
def close(): Unit = {
connection.close()
}
def execute(sql: String, params: Any*): Unit = {
try {
val statement = connection.prepareStatement(sql)
this.fillStatement(statement, params: _*)
statement.executeUpdate
} catch {
case e: SQLException =
e.printStackTrace()
}
}
@throws[SQLException]
def fillStatement(statement: PreparedStatement, params: Any*): Unit = {
for (i - 1 until params.length + 1) {
val value: Any = params(i - 1)
value match {
case s: String = statement.setString(i, value.toString)
case i: Integer = statement.setInt(i, value.toString.asInstanceOf[Int])
case b: Boolean = statement.setBoolean(i, value.toString.asInstanceOf[Boolean])
case ld: LocalDate = statement.setString(i, value.toString)
case ldt: LocalDateTime = statement.setString(i, value.toString)
case l: Long = statement.setLong(i, value.toString.asInstanceOf[Long])
case d: Double = statement.setDouble(i, value.toString.asInstanceOf[Double])
case f: Float = statement.setFloat(i, value.toString.asInstanceOf[Float])
case _ = statement.setString(i, value.toString)
}
}
}
def upsert(query: Query, update: Update, tableName: String): Unit = {
val names = ListBuffer[String]()
val values = ListBuffer[String]()
val params = ListBuffer[AnyRef]()
val updates = ListBuffer[AnyRef]()
val keysArr = scala.Array(query.values.keys, update.sets.keys, update.incs.keys)
val valuesArr = scala.Array(update.sets.values, update.incs.values)
for (i: Int - 0 until keysArr.length) {
val item = keysArr(i)
item.foreach {
key = {
names += s"`${key}`"
values += "?"
}
}
i match {
case 0 = {
params.++=(query.values.values)
}
case 1 | 2 = {
params.++=(valuesArr(i - 1).toList)
}
}
}
update.sets.foreach {
item = {
updates += s" `${item._1}` = ? "
params += item._2
}
}
update.incs.foreach {
item = {
updates += s" `${item._1}` = `${item._1}` + ? "
params += item._2
}
}
val sql = s"INSERT INTO `$tableName` (${names.mkString(",")}) VALUES(${values.mkString(",")}) ON DUPLICATE KEY UPDATE ${updates.mkString(",")}"
this.execute(sql, params.toArray[AnyRef]: _*)
}
}
case class Update(sets: Map[String, AnyRef] = Map(), incs: Map[String, AnyRef] = Map())
case class Query(values: Map[String, AnyRef] = Map())
```
吃土的程序
```
val fieldMaps = (row: Row, fields: Array[String]) = fields.map {
field = (field, Option(row.getAs[String](field)).getOrElse(""))
}.toMap
sc.sql(
s"""select time,count(userid) as pv,count(distinct(userid)) as uv from log group by time""")
.foreachPartition(item = {
val props: Properties = PropertiesUtils.properties("mysql")
val mysqlClient: MysqlClient = new MysqlClient(props.getProperty("jdbcUrl"))
while (item.hasNext) {
val row: Row = item.next()
val pv: Long = row.getAs("pv")
val uv: Long = row.getAs("uv")
val indicatorMap = Map(
"pv" - pv.toString,
"uv" - uv.toString
)
val update = if (overrideIndicator) {//覆盖
Update(sets = indicatorMap)
} else {//upsert
Update(incs = indicatorMap)
}
var queryMap = fieldMaps(row,"time".split(","))
mysqlClient.upsert(
Query(queryMap),
update,
"test"
)
}
mysqlClient.close()
})
```
真的是丑极了,不想看了
## 如今升级为jdbc2之后
依赖 [spark-sql-datasource]()
```
dependency
groupIdcom.dounine/groupId
artifactIdspark-sql-datasource/artifactId
version1.0.1/version
/dependency
```
创建一张测试表
```
CREATE TABLE `test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`time` date NOT NULL,
`pv` int(255) DEFAULT '0',
`uv` int(255) DEFAULT '0',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq` (`time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
```
程序
```
val spark = SparkSession
.builder()
.appName("jdbc2")
.master("local[*]")
.getOrCreate()
val readSchmeas = StructType(
Array(
StructField("userid", StringType, nullable = false),
StructField("time", StringType, nullable = false),
StructField("indicator", LongType, nullable = false)
)
)
val rdd = spark.sparkContext.parallelize(
Array(
Row.fromSeq(Seq("lake", "2019-02-01", 10L)),
Row.fromSeq(Seq("admin", "2019-02-01", 10L)),
Row.fromSeq(Seq("admin", "2019-02-01", 11L))
)
)
spark.createDataFrame(rdd, readSchmeas).createTempView("log")
spark.sql("select time,count(userid) as pv,count(distinct(userid)) as uv from log group by time")
.write
.format("org.apache.spark.sql.execution.datasources.jdbc2")
.options(
Map(
"savemode" - JDBCSaveMode.Update.toString,
"driver" - "com.mysql.jdbc.Driver",
"url" - "jdbc:mysql://localhost:3306/ttable",
"user" - "root",
"password" - "root",
"dbtable" - "test",
"useSSL" - "false",
"duplicateIncs" - "pv,uv",
"showSql" - "true"
)
).save()
```
实际程序上运行会生成下面的 SQL 语句
```
INSERT INTO test (`time`,`pv`,`uv`)
VALUES (?,?,?)
ON DUPLICATE KEY UPDATE `time`=?,`pv`=`pv`+?,`uv`=`uv`+?
```
生成结果
![spark-sql datasource jdbc2 upsert]()
## jdbc2新增配置
|format| duplicateIncs | showSql |
|--|--|--|
|org.apache.spark.sql.execution.datasources.jdbc2|upsert 字段|是否打印 SQL|
其他配置与内置`jdbc`数据源一样~~~
严重: 此驱动程序不支持 Java Runtime Environment (JRE)1.8版。请使用支持JDBC4.0的 sqljdbc4.jar 类库
降低你的jdk版本或者升级jdbc的版本都行。不过要注意jdbc支持的sql server版本。也可以使用支持java8的第三方jdbc驱动。
关于java升级jdbc版本和jdbc版本适配的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
发布于:2022-12-26,除非注明,否则均为
原创文章,转载请注明出处。