使用Redisson操作分布式队列的注意事项
先说结论: 使用Redisson提供的RedissonPriorityQueue时, 比较操作不使用对象的equals, 而是使用compare比较.
最近做一个定时任务的服务, 需要用到分布式优先级队列, 选择了Redisson库. Redisson的队列有很多, 也都继承了java包的Queue接口, 看起来非常方便. 修改代码后, 发现一个很奇怪的问题, 将一个元素加进去之后, 移除不了, remove 返回了false. 元素是一个自定义的类, 重写了equals和hashcode, 所以应该没有什么问题, 使用java con包中的队列一切正常. 这就很奇怪了, Redisson虽然实现了接口, 但预期行为不对, 没有通过equals函数来比较元素是否相同, 通过代码分析RedissonPriorityQueue来查找原因(只显示关键代码):
@Override public boolean remove(Object value) { BinarySearchResult<V> res = binarySearch((V) value, codec); remove((int) res.getIndex()); }
上面的remove函数显示使用binarySearch找到元素, 然后删除它
public BinarySearchResult<V> binarySearch(V value, Codec codec) { //调用比较器 int cmp = comparator.compare(value, res); if (cmp == 0) { //... } else if (cmp < 0) { // ... } else { // ... } } }
上面的代码表示通过 comparator.compare比较两个元素, 返回元素在队列中的位置
//比较器代码 private Comparator<? super V> comparator = NaturalComparator.NATURAL_ORDER; private static class NaturalComparator<V> implements Comparator<V>, Serializable { private static final long serialVersionUID = 7207038068494060240L; static final NaturalComparator NATURAL_ORDER = new NaturalComparator(); public int compare(V c1, V c2) { Comparable<Object> c1co = (Comparable<Object>) c1; Comparable<Object> c2co = (Comparable<Object>) c2; return c1co.compareTo(c2co); } }
可以看到比较器最终调用了c1co.compareTo(c2co)来比较两个元素. 通常来说, compare函数是比较两个元素的优先级, 而不是比较两个元素内容是否相同的, 但RedissonPriorityQueue打破常规, 不但使用compare来比较优先级, 也同样用来比较元素是否相同. 所以要非常注意compare函数的实现, 比如一个任务(任务名唯一标识一个任务, 时间作为优先级), 可以这样写:
@Override public int compareTo(TaskNode o) { if (getName().equals(o.getName())) { //名称相同, 说明是同一个任务 return 0; } //比较延时 long diffMs = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); if (diffMs == 0) { //延时相同,比较名称 return getName().compareTo(o.getName()); } return diffMs > 0 ? 1 : -1; }