关于java通信技术方式的信息
本篇文章给大家谈谈java通信技术方式,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
Java Socket通信技术收发线程互斥的解决方法
Java Socket通信技术在很长的时间里都在使用 在不少的程序员眼中都有很多高的评价 那么下面我们就看看如何才能掌握这门复杂的编程语言 希望大家在今后的Java Socket通信技术使用中有所收获
下面就是Java Socket通信技术在解决收发线程互斥的代码介绍
package bill svr;
import java io IOException;
import java io InputStream;
import java io OutputStream;
import InetSocketAddress;
import Socket;
import SocketException;
import SocketTimeoutException;
import java text SimpleDateFormat;
import java util Date;
import java util Properties;
import java util Timer;
import java util TimerTask;
import ncurrent ConcurrentHashMap;
import ncurrent TimeUnit;
import ncurrent locks Condition;
import ncurrent locks ReentrantLock;
import apache log j Logger;
/**
*ptitle: socket通信包装类/p
*pDescription: /p
*pCopyRight: CopyRight (c) /p
*pCompany: /p
*pCreate date: /P
*author sunnylocusA mailto:
/A * v 初类
* v 对命令收发逻辑及收发线程互斥机制进行了优化
处理命令速度由原来 ~ 个/秒提高到 ~ 个/秒
*/ public class SocketConnection {
private volatile Socket socket;
private int timeout = * ; //超时时间 初始值 秒
private boolean isLaunchHeartcheck = false;//是否已启动心跳检测
private boolean isNeorkConnect = false; //网络是否已连接
private static String host = ;
private static int port;
static InputStream inStream = null;
static OutputStream outStream = null;
private static Logger log =Logger getLogger
(SocketConnection class);
private static SocketConnection socketConnection = null;
private static java util Timer heartTimer=null;
//private final MapString Object recMsgMap= Collections
synchronizedMap(new HashMapString Object());
private final ConcurrentHashMapString Object recMsgMap
= new ConcurrentHashMapString Object();
private static Thread receiveThread = null;
private final ReentrantLock lock = new ReentrantLock();
private SocketConnection(){
Properties conf = new Properties();
try {
nf load(SocketConnection class getResourceAsStream
( nf ));
this timeout = Integer valueOf(conf getProperty( timeout ));
init(conf getProperty( ip ) Integer valueOf
(conf getProperty( port )));
} catch(IOException e) {
log fatal( socket初始化异常! e);
throw new RuntimeException( socket初始化异常 请检查配置参数 );
}
}
/**
* 单态模式
*/
public static SocketConnection getInstance() {
if(socketConnection==null) {
synchronized(SocketConnection class) {
if(socketConnection==null) {
socketConnection = new SocketConnection();
return socketConnection;
}
}
}
return socketConnection;
}
private void init(String host int port) throws IOException {
InetSocketAddress addr = new InetSocketAddress(host port);
socket = new Socket();
synchronized (this) {
( 【准备与 +addr+ 建立连接】 );
nnect(addr timeout);
( 【与 +addr+ 连接已建立】 );
inStream = socket getInputStream();
outStream = socket getOutputStream();
socket setTcpNoDelay(true);//数据不作缓冲 立即发送
socket setSoLinger(true );//socket关闭时 立即释放资源
socket setKeepAlive(true);
socket setTrafficClass( x | x );//高可靠性和最小延迟传输
isNeorkConnect=true;
receiveThread = new Thread(new ReceiveWorker());
receiveThread start();
SocketConnection host=host;
SocketConnection port=port;
if(!isLaunchHeartcheck)
launchHeartcheck();
}
}
/**
* 心跳包检测
*/
private void launchHeartcheck() {
if(socket == null)
throw new IllegalStateException( socket is not
established! );
heartTimer = new Timer();
isLaunchHeartcheck = true;
heartTimer schedule(new TimerTask() {
public void run() {
String msgStreamNo = StreamNoGenerator getStreamNo( kq );
int mstType = ;// 心跳包请求
SimpleDateFormat dateformate = new SimpleDateFormat
( yyyyMMddHHmmss );
String msgDateTime = dateformate format(new Date());
int msgLength = ;//消息头长度
String mandstr = +msgLength + mstType + msgStreamNo;
( 心跳检测包 IVR +mandstr);
int reconnCounter = ;
while(true) {
String responseMsg =null;
try {
responseMsg = readReqMsg(mandstr);
} catch (IOException e) {
log error( IO流异常 e);
reconnCounter ++;
}
if(responseMsg!=null) {
( 心跳响应包 IVR +responseMsg);
reconnCounter = ;
break;
} else {
reconnCounter ++;
}
if(reconnCounter ) {//重连次数已达三次 判定网络连接中断
重新建立连接 连接未被建立时不释放锁
reConnectToCTCC(); break;
}
}
}
} * * * * );
}
/**
* 重连与目标IP建立重连
*/
private void reConnectToCTCC() {
new Thread(new Runnable(){
public void run(){
( 重新建立与 +host+ : +port+ 的连接 );
//清理工作 中断计时器 中断接收线程 恢复初始变量
heartTimer cancel();
isLaunchHeartcheck=false;
isNeorkConnect = false;
receiveThread interrupt();
try {
socket close();
} catch (IOException e ) {log error( 重连时 关闭socket连
接发生IO流异常 e );}
//
synchronized(this){
for(; ;){
try {
Thread currentThread();
Thread sleep( * );
init(host port);
this notifyAll();
break ;
} catch (IOException e) {
log error( 重新建立连接未成功 e);
} catch (InterruptedException e){
log error( 重连线程中断 e);
}
}
}
}
}) start();
}
/**
* 发送命令并接受响应
* @param requestMsg
* @return
* @throws SocketTimeoutException
* @throws IOException
*/
public String readReqMsg(String requestMsg) throws IOException {
if(requestMsg ==null) {
return null;
}
if(!isNeorkConnect) {
synchronized(this){
try {
this wait( * ); //等待 秒 如果网络还没有恢复 抛出IO流异常
if(!isNeorkConnect) {
throw new IOException( 网络连接中断! );
}
} catch (InterruptedException e) {
log error( 发送线程中断 e);
}
}
}
String msgNo = requestMsg substring( + );//读取流水号
outStream = socket getOutputStream();
outStream write(requestMsg getBytes());
outStream flush();
Condition msglock = lock newCondition(); //消息锁
//注册等待接收消息
recMsgMap put(msgNo msglock);
try {
lock lock();
msglock await(timeout TimeUnit MILLISECONDS);
} catch (InterruptedException e) {
log error( 发送线程中断 e);
} finally {
lock unlock();
}
Object respMsg = recMsgMap remove(msgNo); //响应信息
if(respMsg!=null (respMsg != msglock)) {
//已经接收到消息 注销等待 成功返回消息
return (String) respMsg;
} else {
log error(msgNo+ 超时 未收到响应消息 );
throw new SocketTimeoutException(msgNo+ 超时 未收到响应消息 );
}
}
public void finalize() {
if (socket != null) {
try {
socket close();
} catch (IOException e) {
e printStackTrace();
}
}
}
//消息接收线程
private class ReceiveWorker implements Runnable {
String intStr= null;
public void run() {
while(!Thread interrupted()){
try {
byte[] headBytes = new byte[ ];
if(inStream read(headBytes)== ){
log warn( 读到流未尾 对方已关闭流! );
reConnectToCTCC();//读到流未尾 对方已关闭流
return;
}
byte[] tmp =new byte[ ];
tmp = headBytes;
String tempStr = new String(tmp) trim();
if(tempStr==null || tempStr equals( )) {
log error( received message is null );
ntinue;
}
intStr = new String(tmp);
int totalLength =Integer parseInt(intStr);
//
byte[] msgBytes = new byte[totalLength ];
inStream read(msgBytes);
String resultMsg = new String(headBytes)+ new
String(msgBytes);
//抽出消息ID
String msgNo = resultMsg substring( + );
Condition msglock =(Condition) recMsgMap get(msgNo);
if(msglock ==null) {
log warn(msgNo+ 序号可能已被注销!响应消息丢弃 );
recMsgMap remove(msgNo);
ntinue;
}
recMsgMap put(msgNo resultMsg);
try{
lock lock();
msglock signalAll();
}finally {
lock unlock();
}
}catch(SocketException e){
log error( 服务端关闭socket e);
reConnectToCTCC();
} catch(IOException e) {
log error( 接收线程读取响应数据时发生IO流异常 e);
} catch(NumberFormatException e){
log error( 收到没良心包 String转int异常 异常字符: +intStr);
}
}
}
}
}
lishixinzhi/Article/program/Java/hx/201311/25550
JAVA线程间通信的几种方式
Java多线程间的通信
Java还提供了一种线程间通信的机制,这种通信通什么实现?
wait,notify等机制
或使用pipeInputStream和pipeOutputStream
1. 线程的几种状态
线程有四种状态,任何一个线程肯定处于这四种状态中的一种:
1) 产生(New):线程对象已经产生,但尚未被启动,所以无法执行。如通过new产生了一个线程对象后没对它调用start()函数之前。
2) 可执行(Runnable):每个支持多线程的系统都有一个排程器,排程器会从线程池中选择一个线程并启动它。当一个线程处于可执行状态时,表示它可能正处于线程池中等待排排程器启动它;也可能它已正在执行。如执行了一个线程对象的start()方法后,线程就处于可执行状态,但显而易见的是此时线程不一定正在执行中。
3) 死亡(Dead):当一个线程正常结束,它便处于死亡状态。如一个线程的run()函数执行完毕后线程就进入死亡状态。
4) 停滞(Blocked):当一个线程处于停滞状态时,系统排程器就会忽略它,不对它进行排程。
如何在学习Java过程中实现线程之间的通信
在java中,每个对象都有两个池,锁池(monitor)和等待池(waitset),每个对象又都有wait、notify、notifyAll方法,使用它们可以实现线程之间的通信,只是平时用的较少.
wait(): 使当前线程处于等待状态,直到另外的线程调用notify或notifyAll将它唤醒
notify(): 唤醒该对象监听的其中一个线程(规则取决于JVM厂商,FILO,FIFO,随机…)
notifyAll(): 唤醒该对象监听的所有线程
锁池: 假设T1线程已经拥有了某个对象(注意:不是类)的锁,而其它的线程想要调用该对象的synchronized方法(或者synchronized块),由于这些线程在进入对象的synchronized方法之前都需要先获得该对象的锁的拥有权,但是该对象的锁目前正被T1线程拥有,所以这些线程就进入了该对象的锁池中.
等待池: 假设T1线程调用了某个对象的wait()方法,T1线程就会释放该对象的锁(因为wait()方法必须出现在synchronized中,这样自然在执行wait()方法之前T1线程就已经拥有了该对象的锁),同时T1线程进入到了该对象的等待池中.如果有其它线程调用了相同对象的notifyAll()方法,那么处于该对象的等待池中的线程就会全部进入该对象的锁池中,从新争夺锁的拥有权.如果另外的一个线程调用了相同对象的notify()方法,那么仅仅有一个处于该对象的等待池中的线程(随机)会进入该对象的锁池.
java实现线程间通信的四种方式
1、synchronized同步:这种方式,本质上就是“共享内存”式的通信。多个线程需要访问同一个共享变量,谁拿到了锁(获得了访问权限),谁就可以执行。
2、while轮询:其实就是多线程同时执行,会牺牲部分CPU性能。
3、wait/notify机制
4、管道通信:管道流主要用来实现两个线程之间的二进制数据的传播
java通信技术方式的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于、java通信技术方式的信息别忘了在本站进行查找喔。