Commit 65064349 by zhiwei

斗鱼弹幕级b站消息队列添加

parent c472b5c4
...@@ -65,8 +65,8 @@ public class BilibiliMessageHandler extends ChannelInboundHandlerAdapter{ ...@@ -65,8 +65,8 @@ public class BilibiliMessageHandler extends ChannelInboundHandlerAdapter{
Matcher matcher = pattern.matcher(source); Matcher matcher = pattern.matcher(source);
while(matcher.find()) { while(matcher.find()) {
JSONObject dataJson = JSONObject.parseObject(matcher.group()); JSONObject dataJson = JSONObject.parseObject(matcher.group());
BilibiliEntity bilibiliEntity = new BilibiliEntity(dataJson); BilibiliMessage bilibiliMessage = new BilibiliMessage(dataJson);
System.out.println(bilibiliEntity.toString()); System.out.println(bilibiliMessage);
} }
} }
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
......
package com.zhiwei.live.danmu.bilibili;
import java.util.List;
import java.util.Vector;
public class BilibiliMessageListener {
public static List<BilibiliMessage> messages = new Vector<>();
public static void addMessages(BilibiliMessage bilibiliMessage) {
messages.add(bilibiliMessage);
}
}
package com.zhiwei.live.danmu.douyu; package com.zhiwei.live.danmu.douyu;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.net.Socket;
import java.util.Arrays; import java.net.SocketAddress;
import java.util.List; import java.util.ArrayList;
import java.util.Arrays;
import org.slf4j.Logger; import java.util.List;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import com.zhiwei.live.danmu.douyu.constants.DouYuConstants; import org.slf4j.LoggerFactory;
import com.zhiwei.live.danmu.douyu.constants.MsgType;
import com.zhiwei.live.danmu.douyu.entity.BaseMsg; import com.zhiwei.crawler.proxy.ProxyHolder;
import com.zhiwei.live.danmu.douyu.entity.ChatMsg; import com.zhiwei.live.danmu.douyu.constants.DouYuConstants;
import com.zhiwei.live.danmu.douyu.entity.DgbMsg; import com.zhiwei.live.danmu.douyu.constants.MsgType;
import com.zhiwei.live.danmu.douyu.entity.ErrorMsg; import com.zhiwei.live.danmu.douyu.entity.BaseMsg;
import com.zhiwei.live.danmu.douyu.entity.GgbbMsg; import com.zhiwei.live.danmu.douyu.entity.ChatMsg;
import com.zhiwei.live.danmu.douyu.entity.MsgEntity; import com.zhiwei.live.danmu.douyu.entity.DgbMsg;
import com.zhiwei.live.danmu.douyu.entity.SpbcMsg; import com.zhiwei.live.danmu.douyu.entity.ErrorMsg;
import com.zhiwei.live.danmu.douyu.entity.SsdMsg; import com.zhiwei.live.danmu.douyu.entity.GgbbMsg;
import com.zhiwei.live.danmu.douyu.entity.UenterMsg; import com.zhiwei.live.danmu.douyu.entity.MsgEntity;
import com.zhiwei.live.danmu.douyu.exceptions.DouYuSDKException; import com.zhiwei.live.danmu.douyu.entity.SpbcMsg;
import com.zhiwei.live.danmu.douyu.util.DataUtil; import com.zhiwei.live.danmu.douyu.entity.SsdMsg;
import com.zhiwei.live.danmu.douyu.util.STTUtil; import com.zhiwei.live.danmu.douyu.entity.UenterMsg;
import com.zhiwei.live.danmu.douyu.util.UUIDUtil; import com.zhiwei.live.danmu.douyu.exceptions.DouYuSDKException;
import com.zhiwei.live.danmu.douyu.util.DataUtil;
/** import com.zhiwei.live.danmu.douyu.util.STTUtil;
* 功能描述:斗鱼SDK import com.zhiwei.live.danmu.douyu.util.UUIDUtil;
*
* @auther: coffee /**
* @date: 2018-07-04 15:19:51 * 功能描述:斗鱼SDK
* 修改日志: *
*/ * @auther: coffee
public class DouYuClient { * @date: 2018-07-04 15:19:51
private final static Logger logger = LoggerFactory.getLogger(DouYuClient.class); * 修改日志:
private String host; //API Host */
private int port; //API 端口 public class DouYuClient {
private String roomId; //房间ID(房间号) private final static Logger logger = LoggerFactory.getLogger(DouYuClient.class);
private String groupId = DouYuConstants.MASSIVE_GID; //分组ID private String host; //API Host
private int port; //API 端口
private Socket socket; //通讯socket对象 private String roomId; //房间ID(房间号)
private Thread dataSyncThread; //异步读取消息线程 private String groupId = DouYuConstants.MASSIVE_GID; //分组ID
private Boolean isExitMark = false; //退出标记
private MessageHandler messageHandler; private Socket socket; //通讯socket对象
private List<MessageListener> messageListenerList = new ArrayList<>(); private Thread dataSyncThread; //异步读取消息线程
private Boolean isExitMark = false; //退出标记
private MessageHandler messageHandler;
public DouYuClient(String host, int port, String roomId) { private List<MessageListener> messageListenerList = new ArrayList<>();
this.host = host;
this.port = port; public DouYuClient(String host, int port, String roomId) {
this.roomId = roomId; this.host = host;
//初始化 this.port = port;
this.init(); this.roomId = roomId;
} //初始化
this.init();
/** }
* 初始化Client
*/ public DouYuClient(String roomId) {
private void init() { this.host = "openbarrage.douyutv.com";
logger.info("初始化斗鱼SDK_Client..."); this.port = 8601;
this.connect(); this.roomId = roomId;
} //初始化
this.init();
/** }
* 打开socket连接
*/ /**
private void connect() { * 初始化Client
try { */
logger.info("从服务器({}:{})获取弹幕服务器数据", host, port); private void init() {
socket = new Socket(host, port); logger.info("初始化斗鱼SDK_Client...");
messageHandler = new MessageHandler(socket); this.connect();
} catch (IOException e) { }
throw new RuntimeException(e);
} /**
} * 打开socket连接
*/
/** private void connect() {
* 登入斗鱼弹幕服务器 try {
*/ logger.info("从服务器({}:{})获取弹幕服务器数据", host, port);
public DouYuClient login() { socket = new Socket(ProxyHolder.NAT_PROXY.getProxy());
logger.info("登录房间 {}", roomId); socket.connect(new InetSocketAddress(host, port));
String content = String.format(DouYuApi.LOGIN_REQ, roomId); messageHandler = new MessageHandler(socket);
messageHandler.send(content); } catch (IOException e) {
return this; throw new RuntimeException(e);
} }
}
/**
* 加入房间分组并接收房间消息 /**
* * 登入斗鱼弹幕服务器
* @return */
*/ public DouYuClient login() {
private DouYuClient joinGroup(String groupId) { logger.info("登录房间 {}", roomId);
if (DouYuConstants.MASSIVE_GID.equals(groupId)) { String content = String.format(DouYuApi.LOGIN_REQ, roomId);
logger.info("开启海量弹幕接收模式"); messageHandler.send(content);
} else { return this;
logger.info("关闭海量弹幕接收模式"); }
}
logger.info("加入分组({})并开始接收消息", groupId); /**
String content = String.format(DouYuApi.JOIN_GROUP, roomId, groupId); * 加入房间分组并接收房间消息
messageHandler.send(content); *
return this; * @return
} */
private DouYuClient joinGroup(String groupId) {
/** if (DouYuConstants.MASSIVE_GID.equals(groupId)) {
* 注册消息监听器 logger.info("开启海量弹幕接收模式");
* } else {
* @param messageListener logger.info("关闭海量弹幕接收模式");
* @return }
*/ logger.info("加入分组({})并开始接收消息", groupId);
public DouYuClient registerMessageListener(MessageListener messageListener) { String content = String.format(DouYuApi.JOIN_GROUP, roomId, groupId);
logger.info("注册消息监听器:{}", messageListener.getClass()); messageHandler.send(content);
this.messageListenerList.add(messageListener); return this;
return this; }
}
/**
/** * 注册消息监听器
* 开始同步并接收房间消息 *
*/ * @param messageListener
public DouYuClient sync() { * @return
//加入海量弹幕分组,接收所有消息 */
this.joinGroup(groupId); public DouYuClient registerMessageListener(MessageListener messageListener) {
logger.info("注册消息监听器:{}", messageListener.getClass());
//开启异步线程接收房间消息 this.messageListenerList.add(messageListener);
dataSyncThread = new Thread(new Runnable() { return this;
@Override }
public void run() {
long start = System.currentTimeMillis(); /**
while (true) { * 开始同步并接收房间消息
//判断是否退出线程 */
if (isExitMark == true) { public DouYuClient sync() {
break; //加入海量弹幕分组,接收所有消息
} this.joinGroup(groupId);
//读取弹幕消息 //开启异步线程接收房间消息
byte[] bytes = messageHandler.read(); dataSyncThread = new Thread(new Runnable() {
String msg = new String(Arrays.copyOfRange(bytes, 8, bytes.length)); @Override
public void run() {
//获取消息类型 long start = System.currentTimeMillis();
String msgType = DataUtil.getMsgType(msg); while (true) {
if (msgType == null) { //判断是否退出线程
logger.error("获取消息类型失败,消息:{}", msg); if (isExitMark == true) {
continue; break;
} }
//封装基础消息对象 //读取弹幕消息
BaseMsg msgBase = new BaseMsg(); byte[] bytes = messageHandler.read();
msgBase.setUuid(UUIDUtil.simpleUUID()); String msg = new String(Arrays.copyOfRange(bytes, 8, bytes.length));
msgBase.setType(msgType);
msgBase.setMessage(msg); //获取消息类型
String msgType = DataUtil.getMsgType(msg);
//根据不同的消息类型 序列化不同的 实体对象 if (msgType == null) {
MsgEntity entity = null; logger.error("获取消息类型失败,消息:{}", msg);
if (MsgType.CHAT_MSG.equals(msgType)) { continue;
entity = STTUtil.toBean(msg, ChatMsg.class); }
} else if (MsgType.DGB.equals(msgType)) {
entity = STTUtil.toBean(msg, DgbMsg.class); //封装基础消息对象
} else if (MsgType.GGBB.equals(msgType)) { BaseMsg msgBase = new BaseMsg();
entity = STTUtil.toBean(msg, GgbbMsg.class); msgBase.setUuid(UUIDUtil.simpleUUID());
} else if (MsgType.SPBC.equals(msgType)) { msgBase.setType(msgType);
entity = STTUtil.toBean(msg, SpbcMsg.class); msgBase.setMessage(msg);
} else if (MsgType.SSD.equals(msgType)) {
entity = STTUtil.toBean(msg, SsdMsg.class); //根据不同的消息类型 序列化不同的 实体对象
} else if (MsgType.UENTER.equals(msgType)) { MsgEntity entity = null;
entity = STTUtil.toBean(msg, UenterMsg.class); if (MsgType.CHAT_MSG.equals(msgType)) {
} else if (MsgType.ERROR.equals(msgType)) { entity = STTUtil.toBean(msg, ChatMsg.class);
entity = STTUtil.toBean(msg, ErrorMsg.class); } else if (MsgType.DGB.equals(msgType)) {
} entity = STTUtil.toBean(msg, DgbMsg.class);
if (entity != null) { } else if (MsgType.GGBB.equals(msgType)) {
entity.setMessage(msg); entity = STTUtil.toBean(msg, GgbbMsg.class);
entity.setUuid(msgBase.getUuid()); } else if (MsgType.SPBC.equals(msgType)) {
} entity = STTUtil.toBean(msg, SpbcMsg.class);
} else if (MsgType.SSD.equals(msgType)) {
//消息监听器处理 entity = STTUtil.toBean(msg, SsdMsg.class);
for (MessageListener messageListener : messageListenerList) { } else if (MsgType.UENTER.equals(msgType)) {
try { entity = STTUtil.toBean(msg, UenterMsg.class);
//基础消息监听器处理 } else if (MsgType.ERROR.equals(msgType)) {
if (messageListener.getMsgClazz() == BaseMsg.class) { entity = STTUtil.toBean(msg, ErrorMsg.class);
messageListener.read(msgBase); }
} if (entity != null) {
//指定类型消息监听器处理 entity.setMessage(msg);
else if (entity != null && messageListener.getMsgClazz() == entity.getClass()) { entity.setUuid(msgBase.getUuid());
messageListener.read(entity); }
}
//String消息监听器处理 //消息监听器处理
else if (messageListener.getMsgClazz() == String.class) { for (MessageListener messageListener : messageListenerList) {
messageListener.read(msg); try {
} //基础消息监听器处理
} catch (Exception e) { if (messageListener.getMsgClazz() == BaseMsg.class) {
logger.error("消息处理出现异常:", e); messageListener.read(msgBase);
} }
} //指定类型消息监听器处理
else if (entity != null && messageListener.getMsgClazz() == entity.getClass()) {
//发送心跳消息保持通道 messageListener.read(entity);
long end = System.currentTimeMillis(); }
if (end - start > 30000) { //String消息监听器处理
doKeepLive(); else if (messageListener.getMsgClazz() == String.class) {
start = System.currentTimeMillis(); messageListener.read(msg);
} }
//休眠1毫秒 } catch (Exception e) {
try { logger.error("消息处理出现异常:", e);
Thread.sleep(1); }
} catch (InterruptedException e) { }
throw new DouYuSDKException(e);
} //发送心跳消息保持通道
} long end = System.currentTimeMillis();
if (end - start > 30000) {
//客户端关闭,断开socket通道 doKeepLive();
messageHandler.close(); start = System.currentTimeMillis();
}
logger.info("斗鱼弹幕SDK客户端已成功退出"); //休眠1毫秒
} try {
}); Thread.sleep(1);
dataSyncThread.start(); } catch (InterruptedException e) {
return this; throw new DouYuSDKException(e);
} }
}
/**
* 发送心跳消息,保持通道 //客户端关闭,断开socket通道
*/ messageHandler.close();
public void doKeepLive() {
String content = String.format(DouYuApi.KEEP_LIVE); logger.info("斗鱼弹幕SDK客户端已成功退出");
logger.info("发送心跳信息,保持通道中..."); }
messageHandler.send(content); });
} dataSyncThread.start();
return this;
/** }
* 发送登出消息,用于退出
*/ /**
public void logout(){ * 发送心跳消息,保持通道
String content = String.format(DouYuApi.LOGOUT); */
logger.info("发送登出消息中..."); public void doKeepLive() {
messageHandler.send(content); String content = String.format(DouYuApi.KEEP_LIVE);
} logger.info("发送心跳信息,保持通道中...");
messageHandler.send(content);
public void exit() { }
logout();
isExitMark = true; /**
logger.info("斗鱼弹幕SDK客户端正在退出中..."); * 发送登出消息,用于退出
} */
} public void logout(){
String content = String.format(DouYuApi.LOGOUT);
logger.info("发送登出消息中...");
messageHandler.send(content);
}
public void exit() {
logout();
isExitMark = true;
logger.info("斗鱼弹幕SDK客户端正在退出中...");
}
}
package com.zhiwei.live.danmu.douyu;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zhiwei.common.config.GroupType;
import com.zhiwei.crawler.proxy.ProxyFactory;
import com.zhiwei.live.danmu.douyu.entity.ChatMsg;
/**
* 功能描述:
*
* @auther: coffee
* @date: 2018-07-04 19:37:41
* 修改日志:
*/
public class Main {
private final static Logger logger = LoggerFactory.getLogger(Main.class);
private static final String registry = "zookeeper://192.168.0.36:2181";
private static final String group = "local";
static {
ProxyFactory.init(registry, group, GroupType.PROVIDER);
}
public static void main(String[] args) throws InterruptedException {
DouYuClient client = new DouYuClient("3084150");
client.registerMessageListener(new MessageListener<ChatMsg>() {
@Override
public void read(ChatMsg message) {
//logger.info(message.toChatStr());
// System.out.println(message.toChatStr());
}
});
client.registerMessageListener(new MessageListener<String>() {
@Override
public void read(String message) {
System.out.println(message);
// logger.info(message);
}
});
client.login();
client.sync();
Thread.sleep(30000);
client.exit();
}
}
package com.zhiwei.live.danmu.douyu; package com.zhiwei.live.danmu.douyu;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.zhiwei.live.danmu.douyu.exceptions.DouYuSDKException; import com.zhiwei.live.danmu.douyu.exceptions.DouYuSDKException;
/** /**
* 功能描述:消息处理助手 * 功能描述:消息处理助手
* *
* @auther: coffee * @auther: coffee
* @date: 2018-07-04 15:11:13 * @date: 2018-07-04 15:11:13
* 修改日志: * 修改日志:
*/ */
public class MessageHandler { public class MessageHandler {
private final static Logger logger = LoggerFactory.getLogger(MessageHandler.class); private final static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
private Socket socket; private Socket socket;
public MessageHandler(Socket socket) { public MessageHandler(Socket socket) {
this.socket = socket; this.socket = socket;
} }
/** /**
* 发送消息 * 发送消息
* *
* @param content * @param content
*/ */
public void send(String content) { public void send(String content) {
try { try {
Message message = new Message(content); Message message = new Message(content);
OutputStream out = socket.getOutputStream(); OutputStream out = socket.getOutputStream();
out.write(message.getBytes()); out.write(message.getBytes());
out.flush(); out.flush();
} catch (IOException e) { } catch (IOException e) {
throw new DouYuSDKException(e); throw new DouYuSDKException(e);
} }
} }
/** /**
* 读取消息 * 读取消息
* *
* @return * @return
*/ */
public byte[] read(){ public byte[] read(){
try { try {
InputStream inputStream = socket.getInputStream(); InputStream inputStream = socket.getInputStream();
//下条信息的长度 //下条信息的长度
int contentLen = 0; int contentLen = 0;
//读取前4个字节,得到数据长度 //读取前4个字节,得到数据长度
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
int tmp = inputStream.read(); int tmp = inputStream.read();
contentLen += tmp * Math.pow(16, 2 * i); contentLen += tmp * Math.pow(16, 2 * i);
} }
int len = 0; int len = 0;
int readLen = 0; int readLen = 0;
byte[] bytes = new byte[contentLen]; byte[] bytes = new byte[contentLen];
ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
while ((len = inputStream.read(bytes, 0, contentLen - readLen)) != -1) { while ((len = inputStream.read(bytes, 0, contentLen - readLen)) != -1) {
byteArray.write(bytes, 0, len); byteArray.write(bytes, 0, len);
readLen += len; readLen += len;
if (readLen == contentLen) { if (readLen == contentLen) {
break; break;
} }
} }
return byteArray.toByteArray(); return byteArray.toByteArray();
} catch (IOException e) { } catch (IOException e) {
throw new DouYuSDKException(e); throw new DouYuSDKException(e);
} }
} }
/** /**
* 关闭socket通道 * 关闭socket通道
* *
* @throws IOException * @throws IOException
*/ */
public void close(){ public void close(){
try { try {
socket.close(); socket.close();
} catch (IOException e) { } catch (IOException e) {
logger.warn("socket通道关闭异常",e); logger.warn("socket通道关闭异常",e);
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment