「java读取avro」java读取xml文件内容

博主:adminadmin 2022-12-12 10:36:09 65

今天给各位分享java读取avro的知识,其中也会对java读取xml文件内容进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

string类型的Avro模式

纯编程模式。在JAVA中的string类型中Avro是最受欢迎的模式,为纯编程模式,Avro模式只支持二进制编码和JSON编码两种序列化方式,全部采用编程语言,序列化方法为深度优先,从左到右遍历,对于编程高手来说非常方便。

如何将java对象转成parquet文件

把文本文件 直接转 parquet

可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量

压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间

只读取需要的列,支持向量运算,能够获取更好的扫描性能

Parquet就是基于Google的Dremel系统的数据模型和算法实现的。核心思想是使用“record shredding and assembly algorithm”来表示复杂的嵌套数据类型,同时辅以按列的高效压缩和编码技术,实现降低存

与Avro之前新统计系统的日志都是用Avro做序列化和存储,鉴于Parquet的优势和对Avro的兼容,将HDFS上的存储格式改为Paruqet,并且只需做很小的改动就用原读取Avro的API读取Parquet,以提高近一个数量级。

Parquet文件尾部存储了文件的元数据信息和统计信息,自描述的,方便解析

如何将Avro数据加载到Spark

如下:

{

"type" : "record",

"name" : "twitter_schema",

"namespace" : "com.miguno.avro",

"fields" : [

{ "name" : "username",

"type" : "string",

"doc" : "Name of the user account on Twitter.com" },

{

"name" : "tweet",

"type" : "string",

"doc" : "The content of the user's Twitter message" },

{

"name" : "timestamp",

"type" : "long",

"doc" : "Unix epoch time in seconds" }

],

"doc:" : "A basic schema for storing Twitter messages"

}

twitter.json 中有一些数据:

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }

{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 }

我们将这些数据转换成二进制的 Avro 格式:

$ java -jar ~/avro-tools-1.7.7.jar fromjson --schema-file twitter.avsc twitter.json twitter.avro

然后,我们将 Avro 数据转换为 Java:

$ java -jar /app/avro/avro-tools-1.7.7.jar compile schema /app/avro/data/twitter.avsc /app/avro/data/

现在,我们编译这些类并将其打包:

$ CLASSPATH=/app/avro/avro-1.7.7-javadoc.jar:/app/avro/avro-mapred-1.7.7-hadoop1.jar:/app/avro/avro-tools-1.7.7.jar

$ javac -classpath $CLASSPATH /app/avro/data/com/miguno/avro/twitter_schema.java

$ jar cvf Twitter.jar com/miguno/avro/*.class

我们启动 Spark,并将上面创建的 Jar 和一些需要的库(Hadoop 和 Avro)传递给 Spark 程序:

$ ./bin/spark-shell --jars /app/avro/avro-mapred-1.7.7-hadoop1.jar,/avro/avro-1.7.7.jar,/app/avro/data/Twitter.jar

在 REPL 中,我们获取数据并创建一个 RDD:

scala

import com.miguno.avro.twitter_schema

import org.apache.avro.file.DataFileReader;

import org.apache.avro.file.DataFileWriter;

import org.apache.avro.io.DatumReader;

import org.apache.avro.io.DatumWriter;

import org.apache.avro.specific.SpecificDatumReader;

import org.apache.avro.mapreduce.AvroKeyInputFormat

import org.apache.avro.mapred.AvroKey

import org.apache.hadoop.io.NullWritable

import org.apache.avro.mapred.AvroInputFormat

import org.apache.avro.mapred.AvroWrapper

import org.apache.avro.generic.GenericRecord

import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}

import org.apache.hadoop.io.NullWritable

val path = "/app/avro/data/twitter.avro"

val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)

avroRDD.map(l = new String(l._1.datum.get("username").toString() ) ).first

返回结果:

res2: String = miguno

httpclient怎么调用avro

例如如下XML,其中 requestSource...之类的字符串是经过加工处理之后的,用于POJO类对象的参数替换。

soapenv:Envelope xmlns:soapenv="" xmlns:mom=""

soapenv:Header/

soapenv:Body

mom:MT_MOMServiceRequestCount

MOMServiceRequestCountRequest

MOMServiceRequestCount

RequestSourcerequestSource/RequestSource

RequestTargetrequestTarget/RequestTarget

RequestUserrequestUser/RequestUser

RequestTimerequestTime/RequestTime

RequestTyperequestType/RequestType

UserCodeuserCode/UserCode

UserNumberuserNumber/UserNumber

/MOMServiceRequestCount

/MOMServiceRequestCountRequest

/mom:MT_MOMServiceRequestCount

/soapenv:Body

/soapenv:Envelope

处理通过SOAP-UI工具能获得POST-DATA外,还需要知道 SOAP-PATH,因为这才是要发送数据的WEB-SERVER的处理端点地址。例如:

;senderService=BC_MOMreceiverParty=receiverService=interface=SI_MOMServiceRequestCount_OutinterfaceNamespace=http%3A%2F%2Fyutong.com%2Fesb%2Fmom%2Fcrm%2FMOMServiceRequestCount

有了这两个数据,基本上能调用WEB-SERVICE 了。

下面是具体的CODE:

package com.yutong.mom.util;

import java.io.ByteArrayInputStream;

import java.io.IOException;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.io.OutputStream;

import java.io.Reader;

import java.net.HttpURLConnection;

import java.net.URL;

import java.util.Date;

import java.util.Iterator;

import java.util.Map;

import java.util.Set;

import org.apache.commons.httpclient.HttpClient;

import org.apache.commons.httpclient.methods.PostMethod;

import org.apache.commons.httpclient.methods.StringRequestEntity;

import org.dom4j.DocumentException;

import org.dom4j.io.SAXReader;

import org.jdom.Document;

import org.jdom.Element;

import org.jdom.JDOMException;

import org.jdom.Namespace;

import org.jdom.input.SAXBuilder;

import bsh.EvalError;

import bsh.Interpreter;

import com.yutong.mom.soapbean.model.SoapBeanScriptInfo;

import com.yutong.mom.vo.SoapBeanScriptMap;

import com.yutong.mom.vo.WebServiceConfigInfo;

public class CallWebServiceUtil {

private SoapBeanScriptMap soapBeanScriptMap;

private WebServiceConfigInfo webServiceConfigInfo;

private HttpClient httpClient = new HttpClient();

private Interpreter interpreter = new Interpreter();

private PostMethod postMethod;

private String faultError = "{operationFlag:'F',errorMsg:'";

public String callWebService(String serviceId,String strJson,MapString,String soapParameter)

{

String rtVal = "";

try

{

SoapBeanScriptInfo scriptInfo = soapBeanScriptMap.getSoapBeanScript(serviceId);

String soapPath = scriptInfo.getSoapPath();

soapPath = appendPIUserInfo(soapPath);

String soapRequestData = scriptInfo.getSoapData();

soapRequestData = injectSoapData(soapRequestData,soapParameter);

if(false)

{

URL wsUrl = new URL(soapPath);

HttpURLConnection conn = (HttpURLConnection) wsUrl.openConnection();

conn.setDoInput(true);// 有输入

conn.setDoOutput(true);// 有输出

conn.setRequestMethod("POST");

conn.setRequestProperty("Content-Type", "text/xml;charset=utf-8");

conn.setRequestProperty("Content-Length", Integer.toString(soapRequestData.length()));

//conn.setRequestProperty("SOAPAction","");

OutputStream os = conn.getOutputStream();

os.write(soapRequestData.getBytes("UTF-8"));

int statusCode = conn.getResponseCode();

if (statusCode == 200) {

InputStream is = conn.getInputStream();

byte[] b = new byte[8192];

int len = 0;

String soapResponseData = "";

while ((len = is.read(b)) != -1) {

soapResponseData += new String(b, 0, len, "UTF-8");

}

soapResponseData = "?xml version='1.0' encoding='GBK'?"+soapResponseData;

rtVal = parseResponse(soapResponseData,scriptInfo);

os.close();

is.close();

conn.disconnect();

}

else

{

rtVal = faultError+"调用失败!错误码:" + statusCode+"'}";

String soapResponseData = postMethod.getResponseBodyAsString();

System.out.println(soapResponseData);

}

}

else

{

postMethod = new PostMethod(soapPath);

StringRequestEntity requestEntity = new StringRequestEntity(soapRequestData,"application/soap+xml; charset=GBK; type=\"text/xml\"","GBK");

postMethod.setRequestEntity(requestEntity);

//int statusCode = httpClient.executeMethod(postMethod);

Date invokeDate = new Date();

int statusCode = httpClient.executeMethod(postMethod);

Date afterVoke = new Date();

Long startTime = invokeDate.getTime();

Long endTime = afterVoke.getTime();

Long diff = endTime - startTime;

System.out.println("---------spend-----"+diff);

if (statusCode == 200) {

String soapResponseData = postMethod.getResponseBodyAsString();

soapResponseData = "?xml version='1.0' encoding='GBK'?"+soapResponseData;

//soapResponseData = new String(soapResponseData.getBytes("UTF-8"));

//System.out.println(soapResponseData);

rtVal = parseResponseUserDom4j(soapResponseData,scriptInfo);

//rtVal = parseResponse(soapResponseData,scriptInfo);

} else {

rtVal = faultError+"调用失败!错误码:" + statusCode+"'}";

strJson+="operationFlag:'"+nodeValue+"',";

}

else if(nodeName.equals("Return_Value") !nodeValue.trim().equals(""))

{

strJson+="errorMsg:"+nodeValue+",";

}

else if(nodeName.equals("PageTotal"))

{

strJson+="pageTotal:"+nodeValue+",";

}

else if(nodeName.equals("PageNumber"))

{

strJson+="pageNumber:"+nodeValue+",";

}

else if(nodeName.equals("PageItem"))

{

strJson+="pageItem:"+nodeValue+",";

}

else if(nodeName.equals("BusDetails"))

{

List busDetails = ele.elements();

datas = "datas:[";

for(Iterator itInner= busDetails.iterator(); itInner.hasNext();)

{

org.dom4j.Element busDetailEle = (org.dom4j.Element) itInner.next();

List detailNodes = busDetailEle.elements();

String innerObjJson = "{";

for(Iterator itInner2= detailNodes.iterator(); itInner2.hasNext();)

{

org.dom4j.Element busDetailNode = (org.dom4j.Element) itInner2.next();

String innerNodeName = busDetailNode.getName();

String innerNodeValue= busDetailNode.getText();

if(innerNodeName.equals("BusCode"))

{

innerObjJson+="busCode:'"+innerNodeValue+"',";

}

else if(innerNodeName.equals("BusPlate"))

{

innerObjJson+="busPlate:'"+innerNodeValue+"',";

}

else if(innerNodeName.equals("PurchaseDate"))

{

innerObjJson+="purchaseDate:'"+innerNodeValue+"',";

}

else if(innerNodeName.equals("BusType"))

{

innerObjJson+="busType:'"+innerNodeValue+"',";

}

}

innerObjJson = innerObjJson.substring(0,innerObjJson.length()-1);

innerObjJson+= "},";

datas += innerObjJson;

}

datas = datas.substring(0,datas.length()-1);

datas += "]";

}

}

strJson += datas;

strJson += "}";

rtVal = strJson;

*

public void setSoapBeanScriptMap(SoapBeanScriptMap soapBeanScriptMap) {

this.soapBeanScriptMap = soapBeanScriptMap;

}

public WebServiceConfigInfo getWebServiceConfigInfo() {

return webServiceConfigInfo;

}

public void setWebServiceConfigInfo(WebServiceConfigInfo webServiceConfigInfo) {

this.webServiceConfigInfo = webServiceConfigInfo;

}

}

其实,后来发现使用 BEAN-SHELL 解析的方式,也不是很完美,因为BEAN-SHELL进行XML解析的逻辑处理代码我存放到了数据库中,每当业务逻辑发生变化,该处理代码页必须跟着调整。

后来想到了更好的解决思路:把 解析XML数据的代码应该通过Interface来抽象出来,然后程序可以通过ClassLoader 的相关类来动态加载 处理解析XML响应数据的类信息。这样做的好处:比使用BEAN-SHELL的代码效率高,另外该CLASS-LOADER加载 .class 文件的方式也多种多样,提供了部署灵活性。

关于java读取avro和java读取xml文件内容的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

The End

发布于:2022-12-12,除非注明,否则均为首码项目网原创文章,转载请注明出处。