Commit 63a8907e by chenweiyang

弹幕采集添加数据库存储提交

parent 87226592
...@@ -15,6 +15,12 @@ ...@@ -15,6 +15,12 @@
<version>0.6.2.1-RELEASE</version> <version>0.6.2.1-RELEASE</version>
</dependency> </dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
......
...@@ -89,8 +89,6 @@ public class BilibiliMessageHandler extends ChannelInboundHandlerAdapter{ ...@@ -89,8 +89,6 @@ public class BilibiliMessageHandler extends ChannelInboundHandlerAdapter{
* @param data * @param data
* @throws Exception * @throws Exception
*/ */
private void analyzeData(byte[] data) throws IOException, DataFormatException { private void analyzeData(byte[] data) throws IOException, DataFormatException {
int dataLength = data.length; int dataLength = data.length;
if (dataLength < 16) { if (dataLength < 16) {
...@@ -110,7 +108,9 @@ public class BilibiliMessageHandler extends ChannelInboundHandlerAdapter{ ...@@ -110,7 +108,9 @@ public class BilibiliMessageHandler extends ChannelInboundHandlerAdapter{
if (action == 2) { if (action == 2) {
inputStream.readInt(); inputStream.readInt();
int userCount = inputStream.readInt(); int userCount = inputStream.readInt();
System.out.println(userCount); JSONObject json = new JSONObject();
json.put("userCount", userCount);
dataCallBack.onData(json);
} else if (action == 4) { } else if (action == 4) {
int param = inputStream.readInt(); int param = inputStream.readInt();
int msgBodyLength = dataLength - 16; int msgBodyLength = dataLength - 16;
...@@ -119,7 +119,7 @@ public class BilibiliMessageHandler extends ChannelInboundHandlerAdapter{ ...@@ -119,7 +119,7 @@ public class BilibiliMessageHandler extends ChannelInboundHandlerAdapter{
if (version != 2) { if (version != 2) {
String jsonStr = new String(msgBody, StandardCharsets.UTF_8); String jsonStr = new String(msgBody, StandardCharsets.UTF_8);
JSONObject dataJson = JSONObject.parseObject(jsonStr); JSONObject dataJson = JSONObject.parseObject(jsonStr);
System.out.println(dataJson); dataCallBack.onData(dataJson);
} else { } else {
Inflater inflater = new Inflater(); Inflater inflater = new Inflater();
inflater.setInput(msgBody); inflater.setInput(msgBody);
...@@ -137,7 +137,7 @@ public class BilibiliMessageHandler extends ChannelInboundHandlerAdapter{ ...@@ -137,7 +137,7 @@ public class BilibiliMessageHandler extends ChannelInboundHandlerAdapter{
if (innerAction == 4) { if (innerAction == 4) {
String jsonStr = new String(innerData, StandardCharsets.UTF_8); String jsonStr = new String(innerData, StandardCharsets.UTF_8);
JSONObject dataJson = JSONObject.parseObject(jsonStr); JSONObject dataJson = JSONObject.parseObject(jsonStr);
System.out.println(dataJson); dataCallBack.onData(dataJson);
} else if (innerAction == 2) { } else if (innerAction == 2) {
// pass // pass
} }
......
package com.zhiwei.live.danmu.util; package com.zhiwei.live.danmu.util;
import com.zhiwei.live.danmu.bilibili.BilibiliMessage;
import com.zhiwei.live.danmu.douyu.DouYuMessage;
import com.zhiwei.live.danmu.pandam.PandamMessage;
public interface DataCallBack { public interface DataCallBack {
/** /**
...@@ -15,8 +11,4 @@ public interface DataCallBack { ...@@ -15,8 +11,4 @@ public interface DataCallBack {
*/ */
void onData(Object message); void onData(Object message);
} }
package com.zhiwei.live.test.danmu; package com.zhiwei.live.test.danmu;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.bson.Document;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.zhiwei.live.danmu.bilibili.BilibiliClient; import com.zhiwei.live.danmu.bilibili.BilibiliClient;
import com.zhiwei.live.danmu.bilibili.BilibiliMessage;
import com.zhiwei.live.danmu.util.DataCallBack; import com.zhiwei.live.danmu.util.DataCallBack;
import com.zhiwei.tools.tools.ZhiWeiTools;
public class BilibiliDanMuTest { public class BilibiliDanMuTest {
private static BlockingQueue<Object> dataQuery = new ArrayBlockingQueue<>(100000);
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
String roomUrl = "https://live.bilibili.com/387"; Mongo mongo = new Mongo("127.0.0.1", 27017);
DB mdb = mongo.getDB("bilibili");
DBCollection dbc = mdb.getCollection("bilibili_danmu");
String roomUrl = "https://live.bilibili.com/650";
try { try {
BilibiliClient.getDanmu(new DataCallBack() { BilibiliClient.getDanmu(new DataCallBack() {
@Override @Override
public void onData(Object message) { public void onData(Object message) {
if(message instanceof BilibiliMessage) { System.out.println(message);
BilibiliMessage bilibiliMessage = (BilibiliMessage)message; dataQuery.add(message);
System.out.println("-------------" + bilibiliMessage.toString());
}
} }
}, roomUrl); }, roomUrl);
Long time = System.currentTimeMillis();
while(true) {
try {
if(dataQuery.size() > 20 || System.currentTimeMillis() - time > 5000) {
List<DBObject> docList = new ArrayList<>();
while(true) {
Object message = dataQuery.poll();
if(Objects.nonNull(message)) {
docList.add(new BasicDBObject("data", String.valueOf(message)));
}else {
break;
}
}
System.out.println("此轮存储量 " + docList.size());
if(!docList.isEmpty()) {
dbc.insert(docList);
}
time = System.currentTimeMillis();
}
} catch (Exception e) {
}
}
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment