8000 Added ConcurrentOrdereredMap to avoid synchronization on GetQueues by klockla · Pull Request #117 · crawler-commons/url-frontier · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Added ConcurrentOrdereredMap to avoid synchronization on GetQueues #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10000 1E0A
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -119,8 +118,8 @@ public abstract class AbstractFrontierService
private List<String> nodes;

// in memory map of metadata for each queue
private final Map<QueueWithinCrawl, QueueInterface> queues =
Collections.synchronizedMap(new LinkedHashMap<>());
private final ConcurrentInsertionOrderMap<QueueWithinCrawl, QueueInterface> queues =
new ConcurrentOrderedMap<>();

protected final ExecutorService readExecutorService;
protected final ExecutorService writeExecutorService;
Expand All @@ -147,7 +146,7 @@ protected AbstractFrontierService(
LOG.info("Using {} threads for writing to queues", wthreadNum);
}

public Map<QueueWithinCrawl, QueueInterface> getQueues() {
public ConcurrentInsertionOrderMap<QueueWithinCrawl, QueueInterface> getQueues() {
return queues;
}

Expand Down Expand Up @@ -209,14 +208,14 @@ public void listCrawls(

Set<String> crawlIDs = new HashSet<>();

synchronized (getQueues()) {
Iterator<Entry<QueueWithinCrawl, QueueInterface>> iterator =
getQueues().entrySet().iterator();
while (iterator.hasNext()) {
Entry<QueueWithinCrawl, QueueInterface> e = iterator.next();
crawlIDs.add(e.getKey().getCrawlid());
}
Iterator<Entry<QueueWithinCrawl, QueueInterface>> iterator =
getQueues().entrySet().iterator();

while (iterator.hasNext()) {
Entry<QueueWithinCrawl, QueueInterface> e = iterator.next();
crawlIDs.add(e.getKey().getCrawlid());
}

responseObserver.onNext(StringList.newBuilder().addAllValues(crawlIDs).build());
responseObserver.onCompleted();
}
Expand All @@ -234,21 +233,19 @@ public void deleteCrawl(

final Set<QueueWithinCrawl> toDelete = new HashSet<>();

synchronized (getQueues()) {
Iterator<Entry<QueueWithinCrawl, QueueInterface>> iterator =
getQueues().entrySet().iterator();
while (iterator.hasNext()) {
Entry<QueueWithinCrawl, QueueInterface> e = iterator.next();
QueueWithinCrawl qwc = e.getKey();
if (qwc.getCrawlid().equals(normalisedCrawlID)) {
toDelete.add(qwc);
}
Iterator<Entry<QueueWithinCrawl, QueueInterface>> iterator =
getQueues().entrySet().iterator();
while (iterator.hasNext()) {
Entry<QueueWithinCrawl, QueueInterface> e = iterator.next();
QueueWithinCrawl qwc = e.getKey();
if (qwc.getCrawlid().equals(normalisedCrawlID)) {
toDelete.add(qwc);
}
}

for (QueueWithinCrawl quid : toDelete) {
QueueInterface q = getQueues().remove(quid);
total += q.countActive();
}
for (QueueWithinCrawl quid : toDelete) {
QueueInterface q = getQueues().remove(quid);
total += q.countActive();
}
}
responseObserver.onNext(
Expand Down Expand Up @@ -334,33 +331,32 @@ public void listQueues(

crawlercommons.urlfrontier.Urlfrontier.QueueList.Builder list = QueueList.newBuilder();

synchronized (getQueues()) {
Iterator<Entry<QueueWithinCrawl, QueueInterface>> iterator =
getQueues().entrySet().iterator();
Iterator<Entry<QueueWithinCrawl, QueueInterface>> iterator =
getQueues().entrySet().iterator();

while (iterator.hasNext() && sent <= maxQueues) {
Entry<QueueWithinCrawl, QueueInterface> e = iterator.next();
pos++;
while (iterator.hasNext() && sent <= maxQueues) {
Entry<QueueWithinCrawl, QueueInterface> e = iterator.next();
pos++;

// check that it is within the right crawlID
if (!e.getKey().getCrawlid().equals(normalisedCrawlID)) {
continue;
}
// check that it is within the right crawlID
if (!e.getKey().getCrawlid().equals(normalisedCrawlID)) {
continue;
}

// check that it isn't blocked
if (!include_inactive && e.getValue().getBlockedUntil() >= now) {
continue;
}
// check that it isn't blocked
if (!include_inactive && e.getValue().getBlockedUntil() >= now) {
continue;
}

// ignore the nextfetchdate
if (include_inactive || e.getValue().countActive() > 0) {
if (pos >= start) {
list.addValues(e.getKey().getQueue());
sent++;
}
// ignore the nextfetchdate
if (include_inactive || e.getValue().countActive() > 0) {
if (pos >= start) {
list.addValues(e.getKey().getQueue());
sent++;
}
}
}

responseObserver.onNext(list.build());
responseObserver.onCompleted();
}
Expand Down Expand Up @@ -451,16 +447,14 @@ public void getStats(
// all the queues within the crawlID
else {

synchronized (getQueues()) {
// check that the queues belong to the crawlid specified
Iterator<Entry<QueueWithinCrawl, QueueInterface>> iterator =
getQueues().entrySet().iterator();
while (iterator.hasNext()) {
Entry<QueueWithinCrawl, QueueInterface> e = iterator.next();
QueueWithinCrawl qwc = e.getKey();
if (qwc.getCrawlid().equals(normalisedCrawlID)) {
_queues.add(e.getValue());
}
// check that the queues belong to the crawlid specified
Iterator<Entry<QueueWithinCrawl, QueueInterface>> iterator =
getQueues().entrySet().iterator();
while (iterator.hasNext()) {
Entry<QueueWithinCrawl, QueueInterface> e = iterator.next();
QueueWithinCrawl qwc = e.getKey();
if (qwc.getCrawlid().equals(normalisedCrawlID)) {
_queues.add(e.getValue());
}
}
}
Expand All @@ -469,18 +463,16 @@ public void getStats(

long now = Instant.now().getEpochSecond();

synchronized (getQueues()) {
for (QueueInterface q : _queues) {
final int inProcForQ = q.getInProcess(now);
final int activeForQ = q.countActive();
if (inProcForQ > 0 || activeForQ > 0) {
activeQueues++;
}
inProc += inProcForQ;
numQueues++;
size += activeForQ;
completed += q.getCountCompleted();
for (QueueInterface q : _queues) {
final int inProcForQ = q.getInProcess(now);
final int activeForQ = q.countActive();
if (inProcForQ > 0 || activeForQ > 0) {
activeQueues++;
}
inProc += inProcForQ;
numQueues++;
size += activeForQ;
completed += q.getCountCompleted();
}

// put count completed as custom stats for now
Expand Down Expand Up @@ -646,24 +638,21 @@ public void getURLs(GetParams request, StreamObserver<URLInfo> responseObserver)
final QueueInterface currentQueue;
final QueueWithinCrawl currentCrawlQueue;

synchronized (getQueues()) {
Iterator<Entry<QueueWithinCrawl, QueueInterface>> iterator =
getQueues().entrySet().iterator();
Entry<QueueWithinCrawl, QueueInterface> e = iterator.next();
currentQueue = e.getValue();
currentCrawlQueue = e.getKey();

// to make sure we don't loop over the ones we already processed
if (firstCrawlQueue == null) {
firstCrawlQueue = currentCrawlQueue;
} else if (firstCrawlQueue.equals(currentCrawlQueue)) {
break;
}
// We remove the entry and put it at the end of the map
iterator.remove();
getQueues().put(currentCrawlQueue, currentQueue);
Entry<QueueWithinCrawl, QueueInterface> e = getQueues().firstEntry();
currentQueue = e.getValue();
currentCrawlQueue = e.getKey();

// to make sure we don't loop over the ones we already processed
if (firstCrawlQueue == null) {
firstCrawlQueue = currentCrawlQueue;
} else if (firstCrawlQueue.equals(currentCrawlQueue)) {
break;
}

// We remove the entry and put it at the end of the map
Entry<QueueWithinCrawl, QueueInterface> first = getQueues().pollFirstEntry();
getQueues().put(first.getKey(), first.getValue());

// if a crawlID has been specified make sure it matches
if (crawlID != null && !currentCrawlQueue.getCrawlid().equals(crawlID)) {
continue;
Expand Down Expand Up @@ -875,19 +864,18 @@ public void close() throws IOException {

public void setCrawlLimit(CrawlLimitParams params, StreamObserver<Empty> responseObserver) {
QueueWithinCrawl searchKey = new QueueWithinCrawl(params.getKey(), params.getCrawlID());
synchronized (getQueues()) {
QueueInterface qi = getQueues().get(searchKey);
if (qi != null) {
qi.setCrawlLimit(params.getLimit());
} else {
LOG.error(
"Queue with key: {} and CrawlId: {} was not found.",
searchKey.getQueue(),
searchKey.getCrawlid());
responseObserver.onError(
new RuntimeException("CrawlId and Queue combination is not found."));
return;
}

QueueInterface qi = getQueues().get(searchKey);
if (qi != null) {
qi.setCrawlLimit(params.getLimit());
} else {
LOG.error(
"Queue with key: {} and CrawlId: {} was not found.",
searchKey.getQueue(),
searchKey.getCrawlid());
responseObserver.onError(
new RuntimeException("CrawlId and Queue combination is not found."));
return;
}

responseObserver.onCompleted();
Expand Down Expand Up @@ -927,43 +915,41 @@ public void listURLs(
long pos = -1; // Current position in the list of (filtered) URLs
long sentCount = 0;

synchronized (getQueues()) {
Iterator<Entry<QueueWithinCrawl, QueueInterface>> qiterator =
getQueues().entrySet().iterator();
Iterator<Entry<QueueWithinCrawl, QueueInterface>> qiterator =
getQueues().entrySet().iterator();

while (qiterator.hasNext() && sentCount < maxURLs) {
Entry<QueueWithinCrawl, QueueInterface> e = qiterator.next();
while (qiterator.hasNext() && sentCount < maxURLs) {
Entry<QueueWithinCrawl, QueueInterface> e = qiterator.next();

// check that it is within the right crawlID
if (!e.getKey().getCrawlid().equals(normalisedCrawlID)) {
continue;
}
// check that it is within the right crawlID
if (!e.getKey().getCrawlid().equals(normalisedCrawlID)) {
continue;
}

// check that it is within the right key/queue
if (key != null && !key.isEmpty() && !e.getKey().getQueue().equals(key)) {
continue;
}
// check that it is within the right key/queue
if (key != null && !key.isEmpty() && !e.getKey().getQueue().equals(key)) {
continue;
}

CloseableIterator<URLItem> urliter = urlIterator(e);
CloseableIterator<URLItem> urliter = urlIterator(e);

while (urliter.hasNext() && sentCount < maxURLs) {
URLItem cur = urliter.next();
while (urliter.hasNext() && sentCount < maxURLs) {
URLItem cur = urliter.next();

if (!doFilter || filterURL(cur, filter, ignoreCase)) {
pos++;
if (!doFilter || filterURL(cur, filter, ignoreCase)) {
pos++;

if (pos >= start && sentCount < maxURLs) {
sentCount++;
responseObserver.onNext(cur);
}
if (pos >= start && sentCount < maxURLs) {
sentCount++;
responseObserver.onNext(cur);
}
}
}

try {
urliter.close();
} catch (IOException e1) {
LOG.warn("Error closing URLIterator", e1);
}
try {
urliter.close();
} catch (IOException e1) {
LOG.warn("Error closing URLIterator", e1);
}
}

Expand Down Expand Up @@ -1022,38 +1008,36 @@ public void countURLs(

long totalCount = 0;

synchronized (getQueues()) {
Iterator<Entry<QueueWithinCrawl, QueueInterface>> qiterator =
getQueues().entrySet().iterator();
Iterator<Entry<QueueWithinCrawl, QueueInterface>> qiterator =
getQueues().entrySet().iterator();

while (qiterator.hasNext()) {
Entry<QueueWithinCrawl, QueueInterface> e = qiterator.next();
while (qiterator.hasNext()) {
Entry<QueueWithinCrawl, QueueInterface> e = qiterator.next();

// check that it is within the right crawlID
if (!e.getKey().getCrawlid().equals(normalisedCrawlID)) {
continue;
}
// check that it is within the right crawlID
if (!e.getKey().getCrawlid().equals(normalisedCrawlID)) {
continue;
}

// check that it is within the right key/queue
if (key != null && !key.isEmpty() && !e.getKey().getQueue().equals(key)) {
continue;
}
// check that it is within the right key/queue
if (key != null && !key.isEmpty() && !e.getKey().getQueue().equals(key)) {
continue;
}

CloseableIterator<URLItem> urliter = urlIterator(e);
CloseableIterator<URLItem> urliter = urlIterator(e);

while (urliter.hasNext()) {
URLItem cur = urliter.next();
while (urliter.hasNext()) {
URLItem cur = urliter.next();

if (!doFilter || filterURL(cur, filter, ignoreCase)) {
totalCount++;
}
if (!doFilter || filterURL(cur, filter, ignoreCase)) {
totalCount++;
}
}

try {
urliter.close();
} catch (Exception e1) {
LOG.warn("Error closing URLIterator", e1);
}
try {
urliter.close();
} catch (Exception e1) {
LOG.warn("Error closing URLIterator", e1);
}
}

Expand Down
Loading
0