1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.block_app.reducers;
1920import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
2122import java.io.DataInput;
23import java.io.DataOutput;
24import java.io.IOException;
25import java.util.PriorityQueue;
2627/**28 * Extracts top N largest elements29 *30 * @param <S> Single value type, objects passed on workers31 */32publicclass TopNReduce<S extends Comparable<S>>
33extends KryoWrappedReduceOperation<S, PriorityQueue<S>> {
34privateint capacity;
3536publicTopNReduce(int capacity) {
37this.capacity = capacity;
38 }
3940publicTopNReduce() { }
4142 @Override
43public PriorityQueue<S> createValue() {
44returnnew PriorityQueue<S>();
45 }
4647 @Override
48publicvoid reduce(PriorityQueue<S> heap, S value) {
49if (capacity == 0) {
50return;
51 }
5253if (heap.size() < capacity) {
54 heap.add(value);
55 } else {
56 S head = heap.peek();
57if (head.compareTo(value) < 0) {
58 heap.poll();
59 heap.add(value);
60 }
61 }
62 }
6364 @Override
65publicvoid reduceMerge(
66 PriorityQueue<S> reduceInto,
67 PriorityQueue<S> toReduce
68 ) {
69for (S element : toReduce) {
70 reduce(reduceInto, element);
71 }
72 }
7374 @Override
75publicvoid write(DataOutput out) throws IOException {
76super.write(out);
77 out.writeInt(capacity);
78 }
7980 @Override
81publicvoid readFields(DataInput in) throws IOException {
82super.readFields(in);
83 capacity = in.readInt();
84 }
85 }