「互斥JAVA」互斥事件的概率计算
本篇文章给大家谈谈互斥JAVA,以及互斥事件的概率计算对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
如何在Java多线程编程中实现程序同与互斥
作为一个完全面向对象的语言,Java提供了类 Java.lang.Thread 来方便多线程编程,这个类提供了大量的方法来方便我们控制自己的各个线程,我们以后的讨论都将围绕这个类进行。
Thread 类最重要的方法是 run() ,它为Thread 类的方法 start() 所调用,提供我们的线程所要执行的代码。为了指定我们自己的代码,只需要覆盖它!
方法一:继承 Thread 类,覆盖方法 run()
我们在创建的 Thread 类的子类中重写 run() ,加入线程所要执行的代码即可。
下面是一个例子:
public class MyThread extends Thread {
int count= 1, number;
public MyThread(int num) {
number = num;
System.out.println("创建线程 " + number);
}
public void run() {
while(true) {
System.out.println("线程 " + number + ":计数 " + count);
if(++count== 6) return;
}
}
public static void main(String args[]) {
for(int i = 0; i 5; i++) new MyThread(i+1).start();
}
}
这种方法简单明了,符合大家的习惯,但是,它也有一个很大的缺点,那就是如果我们的类已经从一个类继承(如小程序必须继承自 Applet 类),则无法再继承 Thread 类,这时如果我们又不想建立一个新的类.
一种新的方法:不创建 Thread 类的子类,而是直接使用它,那么我们只能将我们的方法作为参数传递给 Thread 类的实例,有点类似回调函数。但是 Java 没有指针,我们只能传递一个包含这个方法的类的实例。那么如何限制这个类必须包含这一方法呢?当然是使用接口!(虽然抽象类也可满足,但是需要继承,而我们之所以要采用这种新方法,不就是为了避免继承带来的限制吗?)
Java 提供了接口 Java.lang.Runnable 来支持这种方法。
方法二:实现 Runnable 接口
Runnable 接口只有一个方法 run(),我们声明自己的类实现 Runnable 接口并提供这一方法,将我们的线程代码写入其中,就完成了这一部分的任务。
但是 Runnable 接口并没有任何对线程的支持,我们还必须创建 Thread 类的实例,这一点通过 Thread 类的构造函数
public Thread(Runnable target);
来实现。
下面是一个例子:
public class MyThread implements Runnable {
int count= 1, number;
public MyThread(int num) {
number = num;
System.out.println("创建线程 " + number);
}
public void run() {
while(true) {
System.out.println("线程 " + number + ":计数 " + count);
if(++count== 6) return;
}
}
public static void main(String args[])
{
for(int i = 0; i 5; i++) new Thread(new MyThread(i+1)).start();
}
}
Java如何实现线程之间的互斥
临界区(Critical Section):适合一个进程内的多线程访问公共区域或代码段时使用
Java如何实现线程之间的互斥
互斥量 (Mutex):适合不同进程内多线程访问公共区域或代码段时使用,与临界区相似。
事件(Event):通过线程间触发事件实现同步互斥
信号量(Semaphore):与临界区和互斥量不同,可以实现多个线程同时访问公共区域数据,原理与操作系统中PV操作类似,先设置一个访问公共区域的线程最大连接数,每有一个线程访问共享区资源数就减一,直到资源数小于等于零。
Java Socket通信技术收发线程互斥的解决方法
Java Socket通信技术在很长的时间里都在使用 在不少的程序员眼中都有很多高的评价 那么下面我们就看看如何才能掌握这门复杂的编程语言 希望大家在今后的Java Socket通信技术使用中有所收获
下面就是Java Socket通信技术在解决收发线程互斥的代码介绍
package bill svr;
import java io IOException;
import java io InputStream;
import java io OutputStream;
import InetSocketAddress;
import Socket;
import SocketException;
import SocketTimeoutException;
import java text SimpleDateFormat;
import java util Date;
import java util Properties;
import java util Timer;
import java util TimerTask;
import ncurrent ConcurrentHashMap;
import ncurrent TimeUnit;
import ncurrent locks Condition;
import ncurrent locks ReentrantLock;
import apache log j Logger;
/**
*ptitle: socket通信包装类/p
*pDescription: /p
*pCopyRight: CopyRight (c) /p
*pCompany: /p
*pCreate date: /P
*author sunnylocusA mailto:
/A * v 初类
* v 对命令收发逻辑及收发线程互斥机制进行了优化
处理命令速度由原来 ~ 个/秒提高到 ~ 个/秒
*/ public class SocketConnection {
private volatile Socket socket;
private int timeout = * ; //超时时间 初始值 秒
private boolean isLaunchHeartcheck = false;//是否已启动心跳检测
private boolean isNeorkConnect = false; //网络是否已连接
private static String host = ;
private static int port;
static InputStream inStream = null;
static OutputStream outStream = null;
private static Logger log =Logger getLogger
(SocketConnection class);
private static SocketConnection socketConnection = null;
private static java util Timer heartTimer=null;
//private final MapString Object recMsgMap= Collections
synchronizedMap(new HashMapString Object());
private final ConcurrentHashMapString Object recMsgMap
= new ConcurrentHashMapString Object();
private static Thread receiveThread = null;
private final ReentrantLock lock = new ReentrantLock();
private SocketConnection(){
Properties conf = new Properties();
try {
nf load(SocketConnection class getResourceAsStream
( nf ));
this timeout = Integer valueOf(conf getProperty( timeout ));
init(conf getProperty( ip ) Integer valueOf
(conf getProperty( port )));
} catch(IOException e) {
log fatal( socket初始化异常! e);
throw new RuntimeException( socket初始化异常 请检查配置参数 );
}
}
/**
* 单态模式
*/
public static SocketConnection getInstance() {
if(socketConnection==null) {
synchronized(SocketConnection class) {
if(socketConnection==null) {
socketConnection = new SocketConnection();
return socketConnection;
}
}
}
return socketConnection;
}
private void init(String host int port) throws IOException {
InetSocketAddress addr = new InetSocketAddress(host port);
socket = new Socket();
synchronized (this) {
( 【准备与 +addr+ 建立连接】 );
nnect(addr timeout);
( 【与 +addr+ 连接已建立】 );
inStream = socket getInputStream();
outStream = socket getOutputStream();
socket setTcpNoDelay(true);//数据不作缓冲 立即发送
socket setSoLinger(true );//socket关闭时 立即释放资源
socket setKeepAlive(true);
socket setTrafficClass( x | x );//高可靠性和最小延迟传输
isNeorkConnect=true;
receiveThread = new Thread(new ReceiveWorker());
receiveThread start();
SocketConnection host=host;
SocketConnection port=port;
if(!isLaunchHeartcheck)
launchHeartcheck();
}
}
/**
* 心跳包检测
*/
private void launchHeartcheck() {
if(socket == null)
throw new IllegalStateException( socket is not
established! );
heartTimer = new Timer();
isLaunchHeartcheck = true;
heartTimer schedule(new TimerTask() {
public void run() {
String msgStreamNo = StreamNoGenerator getStreamNo( kq );
int mstType = ;// 心跳包请求
SimpleDateFormat dateformate = new SimpleDateFormat
( yyyyMMddHHmmss );
String msgDateTime = dateformate format(new Date());
int msgLength = ;//消息头长度
String mandstr = +msgLength + mstType + msgStreamNo;
( 心跳检测包 IVR +mandstr);
int reconnCounter = ;
while(true) {
String responseMsg =null;
try {
responseMsg = readReqMsg(mandstr);
} catch (IOException e) {
log error( IO流异常 e);
reconnCounter ++;
}
if(responseMsg!=null) {
( 心跳响应包 IVR +responseMsg);
reconnCounter = ;
break;
} else {
reconnCounter ++;
}
if(reconnCounter ) {//重连次数已达三次 判定网络连接中断
重新建立连接 连接未被建立时不释放锁
reConnectToCTCC(); break;
}
}
}
} * * * * );
}
/**
* 重连与目标IP建立重连
*/
private void reConnectToCTCC() {
new Thread(new Runnable(){
public void run(){
( 重新建立与 +host+ : +port+ 的连接 );
//清理工作 中断计时器 中断接收线程 恢复初始变量
heartTimer cancel();
isLaunchHeartcheck=false;
isNeorkConnect = false;
receiveThread interrupt();
try {
socket close();
} catch (IOException e ) {log error( 重连时 关闭socket连
接发生IO流异常 e );}
//
synchronized(this){
for(; ;){
try {
Thread currentThread();
Thread sleep( * );
init(host port);
this notifyAll();
break ;
} catch (IOException e) {
log error( 重新建立连接未成功 e);
} catch (InterruptedException e){
log error( 重连线程中断 e);
}
}
}
}
}) start();
}
/**
* 发送命令并接受响应
* @param requestMsg
* @return
* @throws SocketTimeoutException
* @throws IOException
*/
public String readReqMsg(String requestMsg) throws IOException {
if(requestMsg ==null) {
return null;
}
if(!isNeorkConnect) {
synchronized(this){
try {
this wait( * ); //等待 秒 如果网络还没有恢复 抛出IO流异常
if(!isNeorkConnect) {
throw new IOException( 网络连接中断! );
}
} catch (InterruptedException e) {
log error( 发送线程中断 e);
}
}
}
String msgNo = requestMsg substring( + );//读取流水号
outStream = socket getOutputStream();
outStream write(requestMsg getBytes());
outStream flush();
Condition msglock = lock newCondition(); //消息锁
//注册等待接收消息
recMsgMap put(msgNo msglock);
try {
lock lock();
msglock await(timeout TimeUnit MILLISECONDS);
} catch (InterruptedException e) {
log error( 发送线程中断 e);
} finally {
lock unlock();
}
Object respMsg = recMsgMap remove(msgNo); //响应信息
if(respMsg!=null (respMsg != msglock)) {
//已经接收到消息 注销等待 成功返回消息
return (String) respMsg;
} else {
log error(msgNo+ 超时 未收到响应消息 );
throw new SocketTimeoutException(msgNo+ 超时 未收到响应消息 );
}
}
public void finalize() {
if (socket != null) {
try {
socket close();
} catch (IOException e) {
e printStackTrace();
}
}
}
//消息接收线程
private class ReceiveWorker implements Runnable {
String intStr= null;
public void run() {
while(!Thread interrupted()){
try {
byte[] headBytes = new byte[ ];
if(inStream read(headBytes)== ){
log warn( 读到流未尾 对方已关闭流! );
reConnectToCTCC();//读到流未尾 对方已关闭流
return;
}
byte[] tmp =new byte[ ];
tmp = headBytes;
String tempStr = new String(tmp) trim();
if(tempStr==null || tempStr equals( )) {
log error( received message is null );
ntinue;
}
intStr = new String(tmp);
int totalLength =Integer parseInt(intStr);
//
byte[] msgBytes = new byte[totalLength ];
inStream read(msgBytes);
String resultMsg = new String(headBytes)+ new
String(msgBytes);
//抽出消息ID
String msgNo = resultMsg substring( + );
Condition msglock =(Condition) recMsgMap get(msgNo);
if(msglock ==null) {
log warn(msgNo+ 序号可能已被注销!响应消息丢弃 );
recMsgMap remove(msgNo);
ntinue;
}
recMsgMap put(msgNo resultMsg);
try{
lock lock();
msglock signalAll();
}finally {
lock unlock();
}
}catch(SocketException e){
log error( 服务端关闭socket e);
reConnectToCTCC();
} catch(IOException e) {
log error( 接收线程读取响应数据时发生IO流异常 e);
} catch(NumberFormatException e){
log error( 收到没良心包 String转int异常 异常字符: +intStr);
}
}
}
}
}
lishixinzhi/Article/program/Java/hx/201311/25550
互斥JAVA的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于互斥事件的概率计算、互斥JAVA的信息别忘了在本站进行查找喔。