「java读取avro」java读取xml文件内容
今天给各位分享java读取avro的知识,其中也会对java读取xml文件内容进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
本文目录一览:
string类型的Avro模式
纯编程模式。在JAVA中的string类型中Avro是最受欢迎的模式,为纯编程模式,Avro模式只支持二进制编码和JSON编码两种序列化方式,全部采用编程语言,序列化方法为深度优先,从左到右遍历,对于编程高手来说非常方便。
如何将java对象转成parquet文件
把文本文件 直接转 parquet
可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量
压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间
只读取需要的列,支持向量运算,能够获取更好的扫描性能
Parquet就是基于Google的Dremel系统的数据模型和算法实现的。核心思想是使用“record shredding and assembly algorithm”来表示复杂的嵌套数据类型,同时辅以按列的高效压缩和编码技术,实现降低存
与Avro之前新统计系统的日志都是用Avro做序列化和存储,鉴于Parquet的优势和对Avro的兼容,将HDFS上的存储格式改为Paruqet,并且只需做很小的改动就用原读取Avro的API读取Parquet,以提高近一个数量级。
Parquet文件尾部存储了文件的元数据信息和统计信息,自描述的,方便解析
如何将Avro数据加载到Spark
如下:
{
"type" : "record",
"name" : "twitter_schema",
"namespace" : "com.miguno.avro",
"fields" : [
{ "name" : "username",
"type" : "string",
"doc" : "Name of the user account on Twitter.com" },
{
"name" : "tweet",
"type" : "string",
"doc" : "The content of the user's Twitter message" },
{
"name" : "timestamp",
"type" : "long",
"doc" : "Unix epoch time in seconds" }
],
"doc:" : "A basic schema for storing Twitter messages"
}
twitter.json 中有一些数据:
{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 }
我们将这些数据转换成二进制的 Avro 格式:
$ java -jar ~/avro-tools-1.7.7.jar fromjson --schema-file twitter.avsc twitter.json twitter.avro
然后,我们将 Avro 数据转换为 Java:
$ java -jar /app/avro/avro-tools-1.7.7.jar compile schema /app/avro/data/twitter.avsc /app/avro/data/
现在,我们编译这些类并将其打包:
$ CLASSPATH=/app/avro/avro-1.7.7-javadoc.jar:/app/avro/avro-mapred-1.7.7-hadoop1.jar:/app/avro/avro-tools-1.7.7.jar
$ javac -classpath $CLASSPATH /app/avro/data/com/miguno/avro/twitter_schema.java
$ jar cvf Twitter.jar com/miguno/avro/*.class
我们启动 Spark,并将上面创建的 Jar 和一些需要的库(Hadoop 和 Avro)传递给 Spark 程序:
$ ./bin/spark-shell --jars /app/avro/avro-mapred-1.7.7-hadoop1.jar,/avro/avro-1.7.7.jar,/app/avro/data/Twitter.jar
在 REPL 中,我们获取数据并创建一个 RDD:
scala
import com.miguno.avro.twitter_schema
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.avro.mapred.AvroKey
import org.apache.hadoop.io.NullWritable
import org.apache.avro.mapred.AvroInputFormat
import org.apache.avro.mapred.AvroWrapper
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable
val path = "/app/avro/data/twitter.avro"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
avroRDD.map(l = new String(l._1.datum.get("username").toString() ) ).first
返回结果:
res2: String = miguno
httpclient怎么调用avro
例如如下XML,其中 requestSource...之类的字符串是经过加工处理之后的,用于POJO类对象的参数替换。
soapenv:Envelope xmlns:soapenv="" xmlns:mom=""
soapenv:Header/
soapenv:Body
mom:MT_MOMServiceRequestCount
MOMServiceRequestCountRequest
MOMServiceRequestCount
RequestSourcerequestSource/RequestSource
RequestTargetrequestTarget/RequestTarget
RequestUserrequestUser/RequestUser
RequestTimerequestTime/RequestTime
RequestTyperequestType/RequestType
UserCodeuserCode/UserCode
UserNumberuserNumber/UserNumber
/MOMServiceRequestCount
/MOMServiceRequestCountRequest
/mom:MT_MOMServiceRequestCount
/soapenv:Body
/soapenv:Envelope
处理通过SOAP-UI工具能获得POST-DATA外,还需要知道 SOAP-PATH,因为这才是要发送数据的WEB-SERVER的处理端点地址。例如:
;senderService=BC_MOMreceiverParty=receiverService=interface=SI_MOMServiceRequestCount_OutinterfaceNamespace=http%3A%2F%2Fyutong.com%2Fesb%2Fmom%2Fcrm%2FMOMServiceRequestCount
有了这两个数据,基本上能调用WEB-SERVICE 了。
下面是具体的CODE:
package com.yutong.mom.util;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.StringRequestEntity;
import org.dom4j.DocumentException;
import org.dom4j.io.SAXReader;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.Namespace;
import org.jdom.input.SAXBuilder;
import bsh.EvalError;
import bsh.Interpreter;
import com.yutong.mom.soapbean.model.SoapBeanScriptInfo;
import com.yutong.mom.vo.SoapBeanScriptMap;
import com.yutong.mom.vo.WebServiceConfigInfo;
public class CallWebServiceUtil {
private SoapBeanScriptMap soapBeanScriptMap;
private WebServiceConfigInfo webServiceConfigInfo;
private HttpClient httpClient = new HttpClient();
private Interpreter interpreter = new Interpreter();
private PostMethod postMethod;
private String faultError = "{operationFlag:'F',errorMsg:'";
public String callWebService(String serviceId,String strJson,MapString,String soapParameter)
{
String rtVal = "";
try
{
SoapBeanScriptInfo scriptInfo = soapBeanScriptMap.getSoapBeanScript(serviceId);
String soapPath = scriptInfo.getSoapPath();
soapPath = appendPIUserInfo(soapPath);
String soapRequestData = scriptInfo.getSoapData();
soapRequestData = injectSoapData(soapRequestData,soapParameter);
if(false)
{
URL wsUrl = new URL(soapPath);
HttpURLConnection conn = (HttpURLConnection) wsUrl.openConnection();
conn.setDoInput(true);// 有输入
conn.setDoOutput(true);// 有输出
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "text/xml;charset=utf-8");
conn.setRequestProperty("Content-Length", Integer.toString(soapRequestData.length()));
//conn.setRequestProperty("SOAPAction","");
OutputStream os = conn.getOutputStream();
os.write(soapRequestData.getBytes("UTF-8"));
int statusCode = conn.getResponseCode();
if (statusCode == 200) {
InputStream is = conn.getInputStream();
byte[] b = new byte[8192];
int len = 0;
String soapResponseData = "";
while ((len = is.read(b)) != -1) {
soapResponseData += new String(b, 0, len, "UTF-8");
}
soapResponseData = "?xml version='1.0' encoding='GBK'?"+soapResponseData;
rtVal = parseResponse(soapResponseData,scriptInfo);
os.close();
is.close();
conn.disconnect();
}
else
{
rtVal = faultError+"调用失败!错误码:" + statusCode+"'}";
String soapResponseData = postMethod.getResponseBodyAsString();
System.out.println(soapResponseData);
}
}
else
{
postMethod = new PostMethod(soapPath);
StringRequestEntity requestEntity = new StringRequestEntity(soapRequestData,"application/soap+xml; charset=GBK; type=\"text/xml\"","GBK");
postMethod.setRequestEntity(requestEntity);
//int statusCode = httpClient.executeMethod(postMethod);
Date invokeDate = new Date();
int statusCode = httpClient.executeMethod(postMethod);
Date afterVoke = new Date();
Long startTime = invokeDate.getTime();
Long endTime = afterVoke.getTime();
Long diff = endTime - startTime;
System.out.println("---------spend-----"+diff);
if (statusCode == 200) {
String soapResponseData = postMethod.getResponseBodyAsString();
soapResponseData = "?xml version='1.0' encoding='GBK'?"+soapResponseData;
//soapResponseData = new String(soapResponseData.getBytes("UTF-8"));
//System.out.println(soapResponseData);
rtVal = parseResponseUserDom4j(soapResponseData,scriptInfo);
//rtVal = parseResponse(soapResponseData,scriptInfo);
} else {
rtVal = faultError+"调用失败!错误码:" + statusCode+"'}";
strJson+="operationFlag:'"+nodeValue+"',";
}
else if(nodeName.equals("Return_Value") !nodeValue.trim().equals(""))
{
strJson+="errorMsg:"+nodeValue+",";
}
else if(nodeName.equals("PageTotal"))
{
strJson+="pageTotal:"+nodeValue+",";
}
else if(nodeName.equals("PageNumber"))
{
strJson+="pageNumber:"+nodeValue+",";
}
else if(nodeName.equals("PageItem"))
{
strJson+="pageItem:"+nodeValue+",";
}
else if(nodeName.equals("BusDetails"))
{
List busDetails = ele.elements();
datas = "datas:[";
for(Iterator itInner= busDetails.iterator(); itInner.hasNext();)
{
org.dom4j.Element busDetailEle = (org.dom4j.Element) itInner.next();
List detailNodes = busDetailEle.elements();
String innerObjJson = "{";
for(Iterator itInner2= detailNodes.iterator(); itInner2.hasNext();)
{
org.dom4j.Element busDetailNode = (org.dom4j.Element) itInner2.next();
String innerNodeName = busDetailNode.getName();
String innerNodeValue= busDetailNode.getText();
if(innerNodeName.equals("BusCode"))
{
innerObjJson+="busCode:'"+innerNodeValue+"',";
}
else if(innerNodeName.equals("BusPlate"))
{
innerObjJson+="busPlate:'"+innerNodeValue+"',";
}
else if(innerNodeName.equals("PurchaseDate"))
{
innerObjJson+="purchaseDate:'"+innerNodeValue+"',";
}
else if(innerNodeName.equals("BusType"))
{
innerObjJson+="busType:'"+innerNodeValue+"',";
}
}
innerObjJson = innerObjJson.substring(0,innerObjJson.length()-1);
innerObjJson+= "},";
datas += innerObjJson;
}
datas = datas.substring(0,datas.length()-1);
datas += "]";
}
}
strJson += datas;
strJson += "}";
rtVal = strJson;
*
public void setSoapBeanScriptMap(SoapBeanScriptMap soapBeanScriptMap) {
this.soapBeanScriptMap = soapBeanScriptMap;
}
public WebServiceConfigInfo getWebServiceConfigInfo() {
return webServiceConfigInfo;
}
public void setWebServiceConfigInfo(WebServiceConfigInfo webServiceConfigInfo) {
this.webServiceConfigInfo = webServiceConfigInfo;
}
}
其实,后来发现使用 BEAN-SHELL 解析的方式,也不是很完美,因为BEAN-SHELL进行XML解析的逻辑处理代码我存放到了数据库中,每当业务逻辑发生变化,该处理代码页必须跟着调整。
后来想到了更好的解决思路:把 解析XML数据的代码应该通过Interface来抽象出来,然后程序可以通过ClassLoader 的相关类来动态加载 处理解析XML响应数据的类信息。这样做的好处:比使用BEAN-SHELL的代码效率高,另外该CLASS-LOADER加载 .class 文件的方式也多种多样,提供了部署灵活性。
关于java读取avro和java读取xml文件内容的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
发布于:2022-12-12,除非注明,否则均为
原创文章,转载请注明出处。