关于java通信技术方式的信息

博主:adminadmin 2023-01-04 03:09:06 693

本篇文章给大家谈谈java通信技术方式,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Java Socket通信技术收发线程互斥的解决方法

Java Socket通信技术在很长的时间里都在使用 在不少的程序员眼中都有很多高的评价 那么下面我们就看看如何才能掌握这门复杂的编程语言 希望大家在今后的Java Socket通信技术使用中有所收获

下面就是Java Socket通信技术在解决收发线程互斥的代码介绍

package bill svr;

import java io IOException;

import java io InputStream;

import java io OutputStream;

import InetSocketAddress;

import Socket;

import SocketException;

import SocketTimeoutException;

import java text SimpleDateFormat;

import java util Date;

import java util Properties;

import java util Timer;

import java util TimerTask;

import ncurrent ConcurrentHashMap;

import ncurrent TimeUnit;

import ncurrent locks Condition;

import ncurrent locks ReentrantLock;

import apache log j Logger;

/**

*ptitle: socket通信包装类/p

*pDescription: /p

*pCopyRight: CopyRight (c) /p

*pCompany: /p

*pCreate date: /P

*author sunnylocusA mailto:

/A * v 初类

* v 对命令收发逻辑及收发线程互斥机制进行了优化

处理命令速度由原来 ~ 个/秒提高到 ~ 个/秒

*/ public class SocketConnection {

private volatile Socket socket;

private int timeout = * ; //超时时间 初始值 秒

private boolean isLaunchHeartcheck = false;//是否已启动心跳检测

private boolean isNeorkConnect = false; //网络是否已连接

private static String host = ;

private static int port;

static InputStream inStream = null;

static OutputStream outStream = null;

private static Logger log =Logger getLogger

(SocketConnection class);

private static SocketConnection socketConnection = null;

private static java util Timer heartTimer=null;

//private final MapString Object recMsgMap= Collections

synchronizedMap(new HashMapString Object());

private final ConcurrentHashMapString Object recMsgMap

= new ConcurrentHashMapString Object();

private static Thread receiveThread = null;

private final ReentrantLock lock = new ReentrantLock();

private SocketConnection(){

Properties conf = new Properties();

try {

nf load(SocketConnection class getResourceAsStream

( nf ));

this timeout = Integer valueOf(conf getProperty( timeout ));

init(conf getProperty( ip ) Integer valueOf

(conf getProperty( port )));

} catch(IOException e) {

log fatal( socket初始化异常! e);

throw new RuntimeException( socket初始化异常 请检查配置参数 );

}

}

/**

* 单态模式

*/

public static SocketConnection getInstance() {

if(socketConnection==null) {

synchronized(SocketConnection class) {

if(socketConnection==null) {

socketConnection = new SocketConnection();

return socketConnection;

}

}

}

return socketConnection;

}

private void init(String host int port) throws IOException {

InetSocketAddress addr = new InetSocketAddress(host port);

socket = new Socket();

synchronized (this) {

( 【准备与 +addr+ 建立连接】 );

nnect(addr timeout);

( 【与 +addr+ 连接已建立】 );

inStream = socket getInputStream();

outStream = socket getOutputStream();

socket setTcpNoDelay(true);//数据不作缓冲 立即发送

socket setSoLinger(true );//socket关闭时 立即释放资源

socket setKeepAlive(true);

socket setTrafficClass( x | x );//高可靠性和最小延迟传输

isNeorkConnect=true;

receiveThread = new Thread(new ReceiveWorker());

receiveThread start();

SocketConnection host=host;

SocketConnection port=port;

if(!isLaunchHeartcheck)

launchHeartcheck();

}

}

/**

* 心跳包检测

*/

private void launchHeartcheck() {

if(socket == null)

throw new IllegalStateException( socket is not

established! );

heartTimer = new Timer();

isLaunchHeartcheck = true;

heartTimer schedule(new TimerTask() {

public void run() {

String msgStreamNo = StreamNoGenerator getStreamNo( kq );

int mstType = ;// 心跳包请求

SimpleDateFormat dateformate = new SimpleDateFormat

( yyyyMMddHHmmss );

String msgDateTime = dateformate format(new Date());

int msgLength = ;//消息头长度

String mandstr = +msgLength + mstType + msgStreamNo;

( 心跳检测包 IVR +mandstr);

int reconnCounter = ;

while(true) {

String responseMsg =null;

try {

responseMsg = readReqMsg(mandstr);

} catch (IOException e) {

log error( IO流异常 e);

reconnCounter ++;

}

if(responseMsg!=null) {

( 心跳响应包 IVR +responseMsg);

reconnCounter = ;

break;

} else {

reconnCounter ++;

}

if(reconnCounter ) {//重连次数已达三次 判定网络连接中断

重新建立连接 连接未被建立时不释放锁

reConnectToCTCC(); break;

}

}

}

} * * * * );

}

/**

* 重连与目标IP建立重连

*/

private void reConnectToCTCC() {

new Thread(new Runnable(){

public void run(){

( 重新建立与 +host+ : +port+ 的连接 );

//清理工作 中断计时器 中断接收线程 恢复初始变量

heartTimer cancel();

isLaunchHeartcheck=false;

isNeorkConnect = false;

receiveThread interrupt();

try {

socket close();

} catch (IOException e ) {log error( 重连时 关闭socket连

接发生IO流异常 e );}

//

synchronized(this){

for(; ;){

try {

Thread currentThread();

Thread sleep( * );

init(host port);

this notifyAll();

break ;

} catch (IOException e) {

log error( 重新建立连接未成功 e);

} catch (InterruptedException e){

log error( 重连线程中断 e);

}

}

}

}

}) start();

}

/**

* 发送命令并接受响应

* @param requestMsg

* @return

* @throws SocketTimeoutException

* @throws IOException

*/

public String readReqMsg(String requestMsg) throws IOException {

if(requestMsg ==null) {

return null;

}

if(!isNeorkConnect) {

synchronized(this){

try {

this wait( * ); //等待 秒 如果网络还没有恢复 抛出IO流异常

if(!isNeorkConnect) {

throw new IOException( 网络连接中断! );

}

} catch (InterruptedException e) {

log error( 发送线程中断 e);

}

}

}

String msgNo = requestMsg substring( + );//读取流水号

outStream = socket getOutputStream();

outStream write(requestMsg getBytes());

outStream flush();

Condition msglock = lock newCondition(); //消息锁

//注册等待接收消息

recMsgMap put(msgNo msglock);

try {

lock lock();

msglock await(timeout TimeUnit MILLISECONDS);

} catch (InterruptedException e) {

log error( 发送线程中断 e);

} finally {

lock unlock();

}

Object respMsg = recMsgMap remove(msgNo); //响应信息

if(respMsg!=null (respMsg != msglock)) {

//已经接收到消息 注销等待 成功返回消息

return (String) respMsg;

} else {

log error(msgNo+ 超时 未收到响应消息 );

throw new SocketTimeoutException(msgNo+ 超时 未收到响应消息 );

}

}

public void finalize() {

if (socket != null) {

try {

socket close();

} catch (IOException e) {

e printStackTrace();

}

}

}

//消息接收线程

private class ReceiveWorker implements Runnable {

String intStr= null;

public void run() {

while(!Thread interrupted()){

try {

byte[] headBytes = new byte[ ];

if(inStream read(headBytes)== ){

log warn( 读到流未尾 对方已关闭流! );

reConnectToCTCC();//读到流未尾 对方已关闭流

return;

}

byte[] tmp =new byte[ ];

tmp = headBytes;

String tempStr = new String(tmp) trim();

if(tempStr==null || tempStr equals( )) {

log error( received message is null );

ntinue;

}

intStr = new String(tmp);

int totalLength =Integer parseInt(intStr);

//

byte[] msgBytes = new byte[totalLength ];

inStream read(msgBytes);

String resultMsg = new String(headBytes)+ new

String(msgBytes);

//抽出消息ID

String msgNo = resultMsg substring( + );

Condition msglock =(Condition) recMsgMap get(msgNo);

if(msglock ==null) {

log warn(msgNo+ 序号可能已被注销!响应消息丢弃 );

recMsgMap remove(msgNo);

ntinue;

}

recMsgMap put(msgNo resultMsg);

try{

lock lock();

msglock signalAll();

}finally {

lock unlock();

}

}catch(SocketException e){

log error( 服务端关闭socket e);

reConnectToCTCC();

} catch(IOException e) {

log error( 接收线程读取响应数据时发生IO流异常 e);

} catch(NumberFormatException e){

log error( 收到没良心包 String转int异常 异常字符: +intStr);

}

}

}

}

}

lishixinzhi/Article/program/Java/hx/201311/25550

JAVA线程间通信的几种方式

Java多线程间的通信

Java还提供了一种线程间通信的机制,这种通信通什么实现?

wait,notify等机制

或使用pipeInputStream和pipeOutputStream

1. 线程的几种状态

线程有四种状态,任何一个线程肯定处于这四种状态中的一种:

1) 产生(New):线程对象已经产生,但尚未被启动,所以无法执行。如通过new产生了一个线程对象后没对它调用start()函数之前。

2) 可执行(Runnable):每个支持多线程的系统都有一个排程器,排程器会从线程池中选择一个线程并启动它。当一个线程处于可执行状态时,表示它可能正处于线程池中等待排排程器启动它;也可能它已正在执行。如执行了一个线程对象的start()方法后,线程就处于可执行状态,但显而易见的是此时线程不一定正在执行中。

3) 死亡(Dead):当一个线程正常结束,它便处于死亡状态。如一个线程的run()函数执行完毕后线程就进入死亡状态。

4) 停滞(Blocked):当一个线程处于停滞状态时,系统排程器就会忽略它,不对它进行排程。

如何在学习Java过程中实现线程之间的通信

在java中,每个对象都有两个池,锁池(monitor)和等待池(waitset),每个对象又都有wait、notify、notifyAll方法,使用它们可以实现线程之间的通信,只是平时用的较少.

wait(): 使当前线程处于等待状态,直到另外的线程调用notify或notifyAll将它唤醒

notify(): 唤醒该对象监听的其中一个线程(规则取决于JVM厂商,FILO,FIFO,随机…)

notifyAll(): 唤醒该对象监听的所有线程

锁池: 假设T1线程已经拥有了某个对象(注意:不是类)的锁,而其它的线程想要调用该对象的synchronized方法(或者synchronized块),由于这些线程在进入对象的synchronized方法之前都需要先获得该对象的锁的拥有权,但是该对象的锁目前正被T1线程拥有,所以这些线程就进入了该对象的锁池中.

等待池: 假设T1线程调用了某个对象的wait()方法,T1线程就会释放该对象的锁(因为wait()方法必须出现在synchronized中,这样自然在执行wait()方法之前T1线程就已经拥有了该对象的锁),同时T1线程进入到了该对象的等待池中.如果有其它线程调用了相同对象的notifyAll()方法,那么处于该对象的等待池中的线程就会全部进入该对象的锁池中,从新争夺锁的拥有权.如果另外的一个线程调用了相同对象的notify()方法,那么仅仅有一个处于该对象的等待池中的线程(随机)会进入该对象的锁池.

java实现线程间通信的四种方式

1、synchronized同步:这种方式,本质上就是“共享内存”式的通信。多个线程需要访问同一个共享变量,谁拿到了锁(获得了访问权限),谁就可以执行。

2、while轮询:其实就是多线程同时执行,会牺牲部分CPU性能。

3、wait/notify机制

4、管道通信:管道流主要用来实现两个线程之间的二进制数据的传播

java通信技术方式的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于、java通信技术方式的信息别忘了在本站进行查找喔。